process_stage.c 10.8 KB
Newer Older
1
2
3
4
5
6
7
8
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include <mpi.h>
#include "computing_func.h"
#include "comunication_func.h"
#include "Main_datatypes.h"
#include "process_stage.h"
9
#include "../malleability/distribution_methods/block_distribution.h"
10

11
double init_emulation_comm_time(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
12
13
14
15
16
17
18

double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute);
void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm);
19

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/*
 * Calcula el tiempo por operacion o total de bytes a enviar
 * de cada fase de iteración para despues realizar correctamente
 * las iteraciones.
 *
 * Solo es calculado por el proceso ROOT que tras ello lo envia al
 * resto de procesos.
 *
 * Si la bandera "compute" esta activada, se realizaran las operaciones
 * para recalcular los tiempos desde 0. Si esta en falso solo se reservara
 * la memoria necesaria y utilizara los valores obtenidos en anteriores 
 * llamadas. Todos los procesos tienen que indicar el mismo valor en
 * la bandera.
 *
 * TODO Que el trabajo se divida entre los procesos.
35
 * TODO No tiene en cuenta cambios entre maquinas heterogeneas.
36
 */
37
38
39
double init_stage(configuration *config_file, int stage_i, group_data group, MPI_Comm comm, int compute) {
  double result = 0;
  int qty = 20000;
40

41
  iter_stage_t *stage = &(config_file->stages[stage_i]);
42
  stage->operations = qty;
43

44
  switch(stage->pt) {
45
46
    //Computo
    case COMP_MATRIX:
47
      result = init_matrix_pt(group, config_file, stage, comm, compute);
48
      break;
49
    case COMP_PI:
50
      result = init_pi_pt(group, config_file, stage, comm, compute);
51
52
      break;

53
54
    //Comunicación
    case COMP_POINT:
55
      init_comm_ptop_pt(group, config_file, stage, comm);
56
57
      break;
    case COMP_BCAST:
58
      result = init_comm_bcast_pt(group, config_file, stage, comm);
59
60
      break;
    case COMP_ALLGATHER:
61
      result = init_comm_allgatherv_pt(group, config_file, stage, comm);
62
63
64
      break;
    case COMP_REDUCE:
    case COMP_ALLREDUCE:
65
      result = init_comm_reduce_pt(group, config_file, stage, comm);
66
67
      break;
  }
68
  return result;
69
70
71
72
73
74
75
}

/*
 * Procesa una fase de la iteracion, concretando el tipo
 * de operacion a realizar y llamando a la funcion que
 * realizara la operacion.
 */
76
double process_stage(configuration config_file, iter_stage_t stage, group_data group, MPI_Comm comm) {
77
78
79
  int i;
  double result;

80
  switch(stage.pt) {
81
82
    //Computo
    case COMP_PI:
83
      for(i=0; i < stage.operations; i++) {
84
        result += computePiSerial(config_file.granularity);
85
86
87
      }
      break;
    case COMP_MATRIX:
88
      for(i=0; i < stage.operations; i++) {
89
        result += computeMatrix(stage.double_array, config_file.granularity);
90
91
92
93
      } 
      break;
    //Comunicaciones
    case COMP_POINT:
94
      point_to_point(group.myId, group.numP, ROOT, comm, stage.array, stage.real_bytes);
95
96
      break;
    case COMP_BCAST:
97
98
99
100
101
102
103
      if(stage.bytes != 0) {
        MPI_Bcast(stage.array, stage.real_bytes, MPI_CHAR, ROOT, comm);
      } else {
        for(i=0; i < stage.operations; i++) {
          point_to_point_inter(group.myId, group.numP, comm, stage.array, stage.real_bytes);
	}
      }
104
      break;
105
    case COMP_ALLGATHER:
106
      MPI_Allgatherv(stage.array, stage.my_bytes, MPI_CHAR, stage.full_array, stage.counts.counts, stage.counts.displs, MPI_CHAR, comm);
107
108
      break;
    case COMP_REDUCE:
109
      MPI_Reduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, ROOT, comm);
110
      break;
111
    case COMP_ALLREDUCE:
112
      MPI_Allreduce(stage.array, stage.full_array, stage.real_bytes, MPI_CHAR, MPI_MAX, comm);
113
      break;
114
115
116
117
118
119
120
121
122
123
124
125
126
  }
  return result;
}


// Se realizan varios tests de latencia al 
// mandar un único dato de tipo CHAR a los procesos impares
// desde el par inmediatamente anterior. Tras esto, los impares
// vuelven a enviar el dato al proceso par.
//
// Devuelve la latencia del sistema.
double latency(int myId, int numP, MPI_Comm comm) {
  int i, loop_count = 100;
127
  double start_time, stop_time, time;
128
129
130
  char aux;

  aux = '0';
131

132
133
  MPI_Barrier(comm);
  start_time = MPI_Wtime();
134
  if(myId == ROOT) {
135
136
137
138
139
140
    for(i=0; i<loop_count; i++){
      MPI_Send(&aux, 0, MPI_CHAR, numP-1, 99, comm);
    }
    MPI_Recv(&aux, 0, MPI_CHAR, numP-1, 99, comm, MPI_STATUS_IGNORE);
  } else if(myId+1 == numP) {
    for(i=0; i<loop_count; i++){
141
      MPI_Recv(&aux, 0, MPI_CHAR, ROOT, 99, comm, MPI_STATUS_IGNORE);
142
    }
143
    MPI_Send(&aux, 0, MPI_CHAR, ROOT, 99, comm);
144
145
146
147
  }
  MPI_Barrier(comm);
  stop_time = MPI_Wtime();
  time = (stop_time - start_time) / loop_count;
148

149
150
  MPI_Bcast(&time, 1, MPI_DOUBLE, ROOT, comm);
  return time;
151
152
153
154
155
156
157
158
159
160
}


// Se realizan varios tests de ancho de banda
// al mandar N datos a los procesos impares desde el
// par inmediatamente anterior. Tras esto, los impares
// vuelven a enviar los N datos al proceso par.
//
// Devuelve el tiempo necesario para realizar las pruebas
double bandwidth(int myId, int numP, MPI_Comm comm, double latency, int n) {
161
  int i, loop_count = 100;
162
  double start_time, stop_time, bw, time;
163
  char *aux;
164
  size_t n_bytes;
165

166
167
  n_bytes = n * sizeof(char);
  aux = malloc(n_bytes);
168
  time = 0;
169

170

171
172
  MPI_Barrier(comm);
  start_time = MPI_Wtime();
173
  if(myId == ROOT) {
174
175
    for(i=0; i<loop_count; i++){
      MPI_Send(aux, n, MPI_CHAR, numP-1, 99, comm);
176
    }
177
178
179
    MPI_Recv(aux, 0, MPI_CHAR, numP-1, 99, comm, MPI_STATUS_IGNORE);
  } else if(myId+1 == numP) {
    for(i=0; i<loop_count; i++){
180
      MPI_Recv(aux, n, MPI_CHAR, ROOT, 99, comm, MPI_STATUS_IGNORE);
181
    }
182
    MPI_Send(aux, 0, MPI_CHAR, ROOT, 99, comm);
183
  }
184
185
186
  MPI_Barrier(comm);
  stop_time = MPI_Wtime();
  time = (stop_time - start_time) / loop_count;
187
  bw = n_bytes / (time - latency);
188

189
  MPI_Bcast(&bw, 1, MPI_DOUBLE, ROOT, comm);
190
  free(aux);
191
192
  return bw;
}
193

194
195
196
197
198
199
200
201
/*
 * ========================================================================================
 * ========================================================================================
 * =================================INIT STAGE FUNCTIONS===================================
 * ========================================================================================
 * ========================================================================================
*/

202
203
204
double init_emulation_comm_time(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
  double start_time, time = 0;

205
  stage->array = malloc(config_file->granularity * sizeof(char));
206
207
208
209
210
211
212
213
214
215
216
217
218
219
  if(config_file->t_op_comms != 0) {
    stage->t_op = config_file->t_op_comms;
    return time;
  }

  MPI_Barrier(comm);
  start_time = MPI_Wtime();
  process_stage(*config_file, *stage, group, comm);
  MPI_Barrier(comm);
  stage->t_op = ceil((MPI_Wtime() - start_time) / stage->operations); //Tiempo de una operacion
  MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
  return time;
}

220
double init_matrix_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
221
  double result, t_stage, start_time;
222
223

  result = 0;
224
  t_stage = stage->t_stage * config_file->groups[group.grp].factor;
225
  initMatrix(&(stage->double_array), config_file->granularity);
226
227

  if(compute) {
228
229
230
231
232
    start_time = MPI_Wtime();
    if(group.myId == ROOT) {
      result+= process_stage(*config_file, *stage, group, comm);
      stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
    }
233
234
    MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
  }
235
  stage->operations = ceil(t_stage / stage->t_op);
236
237
238
239
240
241
242
243

  return result;
}

double init_pi_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm, int compute) {
  double result, t_stage, start_time;

  result = 0;
244
  t_stage = stage->t_stage * config_file->groups[group.grp].factor;
245
  if(compute) {
246
247
248
249
250
    start_time = MPI_Wtime();
    if(group.myId == ROOT) {
      result+= process_stage(*config_file, *stage, group, comm);
      stage->t_op = (MPI_Wtime() - start_time) / stage->operations; //Tiempo de una operacion
    }
251
    MPI_Bcast(&(stage->t_op), 1, MPI_DOUBLE, ROOT, comm);
252
  }
253
  stage->operations = ceil(t_stage / stage->t_op);
254
255
256
257
258

  return result;
}

void init_comm_ptop_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
259
  int aux_bytes = stage->bytes;
260
261
262

  if(stage->array != NULL)
    free(stage->array);
263
264
265
  if(aux_bytes == 0) {
    //aux_bytes = (stage->t_stage - config_file->latency_m) * config_file->bw_m;
    init_emulation_comm_time(group, config_file, stage, comm);
266
  }
267
  stage->real_bytes = aux_bytes;
268
  stage->array = malloc(stage->real_bytes * sizeof(char));
269
270
271
}

double init_comm_bcast_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
272
  double time = 0;
273
274
275
  if(stage->array != NULL)
    free(stage->array);

276
277
  if(stage->bytes != 0) {
    stage->real_bytes = stage->bytes;
278
    stage->array = malloc(stage->real_bytes * sizeof(char));
279
280
281
  } else { // Prepare to emulate Collective as PtoP
    time = init_emulation_comm_time(group, config_file, stage, comm);
  }
282
283
284
285
286
  return time;
}


double init_comm_allgatherv_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
287
  double time=0;
288
289
290
291
  struct Dist_data dist_data;

  if(stage->array != NULL)
    free(stage->array);
292
293
294
295
  if(stage->counts.counts != NULL)
    freeCounts(&(stage->counts));
  if(stage->full_array != NULL)
    free(stage->full_array);
296
297
298
299
300
301
302
303

  stage->real_bytes = stage->bytes;
  if(stage->bytes != 0) {
    prepare_comm_allgatherv(group.numP, stage->real_bytes, &(stage->counts));
      
    get_block_dist(stage->real_bytes, group.myId, group.numP, &dist_data);
    stage->my_bytes = dist_data.tamBl;

304
305
    stage->array = malloc(stage->my_bytes * sizeof(char));
    stage->full_array = malloc(stage->real_bytes * sizeof(char));
306
307
308
  } else {
    time = init_emulation_comm_time(group, config_file, stage, comm);
  }
309
310
311
312
313

  return time;
}

double init_comm_reduce_pt(group_data group, configuration *config_file, iter_stage_t *stage, MPI_Comm comm) {
314
315
316
  double time = 0;
  if(stage->array != NULL)
    free(stage->array);
317
318
  if(stage->full_array != NULL)
    free(stage->full_array);
319
320

  stage->real_bytes = stage->bytes;
321
  if(stage->bytes != 0) {
322
    stage->array = malloc(stage->real_bytes * sizeof(char));
323
    //Full array para el reduce necesita el mismo tamanyo
324
    stage->full_array = malloc(stage->real_bytes * sizeof(char));
325
326
  } else {
    init_emulation_comm_time(group, config_file, stage, comm);
327
328
329
  }

  return time;
330
}