Main.c 11.8 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../IOcodes/read_ini.h"
5
#include "../IOcodes/results.h"
iker_martin's avatar
iker_martin committed
6
7
#include "../malleability/ProcessDist.h"
#include "../malleability/CommDist.h"
8
9
10

#define ROOT 0

iker_martin's avatar
iker_martin committed
11
12
13
int work();
void Sons_init();

14
int checkpoint(int iter, int state, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
15
void TC(int numS);
16
17
int start_redistribution(int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
18

19
void iterate(double *matrix, int n, int async_comm);
20
21
22
void computeMatrix(double *matrix, int n);
void initMatrix(double **matrix, int n);

23
24
void print_general_info(int myId, int grp, int numP);

iker_martin's avatar
iker_martin committed
25
26
27
28
typedef struct {
  int myId;
  int numP;
  int grp;
29
  int iter_start;
iker_martin's avatar
iker_martin committed
30
31
32

  MPI_Comm children, parents;
  char **argv;
33
  char *sync_array, *async_array;
iker_martin's avatar
iker_martin committed
34
35
36
37
} group_data;

configuration *config_file;
group_data *group;
38
results_data *results;
39

40
int main(int argc, char *argv[]) {
41
    int numP, myId;
42

43
    MPI_Init(&argc, &argv);
44
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
45
46
47
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);

    group = malloc(1 * sizeof(group_data));
48
49
50
51
52
    group->myId        = myId;
    group->numP        = numP;
    group->grp         = 0;
    group->iter_start  = 0;
    group->argv        = argv;
iker_martin's avatar
iker_martin committed
53
54

    MPI_Comm_get_parent(&(group->parents));
55
    if(group->parents != MPI_COMM_NULL ) { // Si son procesos hijos deben comunicarse con las padres
iker_martin's avatar
iker_martin committed
56
      Sons_init();
57

58
    } else { // Si son el primer grupo de procesos, recogen la configuracion inicial
iker_martin's avatar
iker_martin committed
59
      config_file = read_ini_file(argv[1]);
60
      init_results_data(&results, config_file->resizes - 1, config_file->iters[group->grp]);
iker_martin's avatar
iker_martin committed
61
62
      if(config_file->sdr > 0) {
        malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
63
      }
64
65
66
      if(config_file->adr > 0) {
        malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
      }
67
    }
iker_martin's avatar
iker_martin committed
68

69
70

    //if(myId== ROOT) print_config(config_file, group->grp);
71
    work();
72

73
    if(group->myId == ROOT) { // Print results
74
      print_config_group(config_file, group->grp);
75
76
      print_iter_results(results, config_file->iters[group->grp] -1);
    }
77

78

79
80
81
82
83
84
    if(config_file->sdr > 0) {
      free(group->sync_array);
    }
    if(config_file->adr > 0) {
      free(group->async_array);
    }
85
    free(group);
86
    free_config(config_file);
87
    free_results_data(&results); //FIXME Provoca un error - Entro mal a algun vector??
88
    
89
90
91
92
93
    MPI_Finalize();
    return 0;
}

/*
94
95
96
97
98
99
100
101
102
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
103
 */
iker_martin's avatar
iker_martin committed
104
int work() {
105
  int iter, maxiter, state;
106
  double *matrix;
107
  MPI_Request *async_comm;
108

iker_martin's avatar
iker_martin committed
109
110
  maxiter = config_file->iters[group->grp];
  initMatrix(&matrix, config_file->matrix_tam);
111
  state = MAL_COMM_UNINITIALIZED;
iker_martin's avatar
iker_martin committed
112

113
  for(iter=group->iter_start; iter < maxiter; iter++) {
114
    iterate(matrix, config_file->matrix_tam, state);
115
  }
116
  state = checkpoint(iter, state, &async_comm);
117
  
118
119
  iter = 0;
  while(state == MAL_ASYNC_PENDING) {
120
    iterate(matrix, config_file->matrix_tam, state);
121
    iter++;
122
    state = checkpoint(iter, state, &async_comm);
123
  }
124
  
125
126
127
  return 0;
}

128
129
130
131
132
133
134
135
/*
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
136
 * llamando a la función de nuevo que se han terminado de enviar
137
138
139
140
141
142
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
143
144
145
146
147
148
149
int checkpoint(int iter, int state, MPI_Request **comm_req) {
  
  if(state == MAL_COMM_UNINITIALIZED) {
    // Comprobar si se tiene que realizar un redimensionado
    if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}

    int numS = config_file->procs[group->grp +1];
150
151

      results->spawn_start = MPI_Wtime();
152
    TC(numS);
153
154
      results->spawn_time[group->grp + 1] = MPI_Wtime() - results->spawn_start;

155
156
157
158
159
    state = start_redistribution(numS, comm_req);

  } else if(MAL_ASYNC_PENDING) {
    state = check_redistribution(iter, comm_req);
  }
iker_martin's avatar
iker_martin committed
160

161
162
  return state;
}
iker_martin's avatar
iker_martin committed
163

164
165
166
167
168
169
170
/*
 * Se encarga de realizar la creacion de los procesos hijos.
 */
void TC(int numS){
  // Inicialización de la comunicación con SLURM
  int dist = config_file->phy_dist[group->grp +1];
  init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, COMM_SPAWN_SERIAL);
iker_martin's avatar
iker_martin committed
171

172
173
174
175
176
177
178
  // Esperar a que la comunicación y creación de procesos
  // haya finalizado
  int test = -1;
  while(test != MPI_SUCCESS) {
    test = check_slurm_comm(group->myId, ROOT, MPI_COMM_WORLD, &(group->children));
  }
}
iker_martin's avatar
iker_martin committed
179

180
int start_redistribution(int numS, MPI_Request **comm_req) {
iker_martin's avatar
iker_martin committed
181
182
183
184
185
186
187
  int rootBcast = MPI_PROC_NULL;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
  send_config_file(config_file, rootBcast, group->children);

188
  if(config_file->adr > 0) {
189
    results->async_start = MPI_Wtime();
190
191
192
    send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
    return MAL_ASYNC_PENDING;
  } 
iker_martin's avatar
iker_martin committed
193
  if(config_file->sdr > 0) {
194
      results->sync_start = MPI_Wtime();
iker_martin's avatar
iker_martin committed
195
196
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }
197
198
199

  
  send_results(results, rootBcast, group->children);
iker_martin's avatar
iker_martin committed
200
201
202
  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));

203
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
204
205
}

206
207
208
209
210
211
int check_redistribution(int iter, MPI_Request **comm_req) {
  int completed, all_completed, test_err, iter_send;
  int numS = config_file->procs[group->grp +1];
  int rootBcast = MPI_PROC_NULL;
  MPI_Request *req_completed;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;
iker_martin's avatar
iker_martin committed
212

213
214
  if(config_file->aib == MAL_USE_NORMAL) {
    req_completed = &(*comm_req)[0];
215
  } else { // MAL_USE_IBARRIER
216
    req_completed = &(*comm_req)[1];
iker_martin's avatar
iker_martin committed
217
  }
218
 
219
  
220
221
  test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
222
    printf("P%d aborting -- Test Async\n", group->myId);
223
224
225
226
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }

  MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
227
  if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended 
228
229
230
231
232

  //MPI_Wait(req_completed, MPI_STATUS_IGNORE);
  if(config_file->aib == MAL_USE_IBARRIER) {
    MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
  }
233
  
234
235
  iter_send = iter;
  MPI_Bcast(&iter_send, 1, MPI_INT, rootBcast, group->children);
236
  if(config_file->sdr > 0) { // Realizar envio sincrono
237
      results->sync_start = MPI_Wtime();
238
239
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }
240
  send_results(results, rootBcast, group->children);
241
242
243

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));
244
  free(*comm_req);
245
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
246
247
}

248
249
250
251
252
253
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
iker_martin's avatar
iker_martin committed
254
255
256
257
258
259
260
261
void Sons_init() {

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, ROOT, group->parents);
  group->grp++;

  config_file = recv_config_file(ROOT, group->parents);
  int numP_parents = config_file->procs[group->grp -1];
262
  init_results_data(&results, config_file->resizes - 1, config_file->iters[group->grp]);
iker_martin's avatar
iker_martin committed
263

264
265
266
  if(config_file->adr > 0) { // Recibir datos asincronos
    recv_async(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents, config_file->aib);
    MPI_Bcast(&(group->iter_start), 1, MPI_INT, ROOT, group->parents);
267
    results->async_time[group->grp] = MPI_Wtime();
268
  }
269
  if(config_file->sdr > 0) { // Recibir datos sincronos
iker_martin's avatar
iker_martin committed
270
    recv_sync(&(group->sync_array), config_file->sdr, group->myId, group->numP, ROOT, group->parents, numP_parents);
271
    results->sync_time[group->grp] = MPI_Wtime();
iker_martin's avatar
iker_martin committed
272
  }
273
274
275
  recv_results(results, ROOT, group->parents);
  results->sync_time[group->grp]  = MPI_Wtime() - results->sync_start;
  results->async_time[group->grp] = MPI_Wtime() - results->async_start;
iker_martin's avatar
iker_martin committed
276
277
278
279
280
281
282
283
284
285
286
287
288

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->parents));
}


/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


289
290
/*
 * Simula la ejecucción de una iteración de computo en la aplicación
291
 * que dura al menos un tiempo de "time" segundos.
292
 */
293
void iterate(double *matrix, int n, int async_comm) {
294
  double start_time, actual_time;
iker_martin's avatar
iker_martin committed
295
  double time = config_file->general_time * config_file->factors[group->grp];
296
  int i, operations = 0;
297
298

  start_time = actual_time = MPI_Wtime();
299
300
  if(async_comm == MAL_ASYNC_PENDING) { // Se esta realizando una redistribucion de datos asincrona
    operations = results->iters_type[config_file->iters[group->grp] - 1];
301
302
303
304
305
306
    for (i=0; i<operations; i++) {
      computeMatrix(matrix, n);
      actual_time = MPI_Wtime(); // Guardar tiempos
    }
    operations = 0;

307
  } else { // No hay redistribucion de datos actualmente	  
308
309
310
311
312
    while (actual_time - start_time < time) {
      computeMatrix(matrix, n);
      operations++;
      actual_time = MPI_Wtime(); // Guardar tiempos
    }
313
  }
314

315
316
317
  results->iters_time[results->iter_index] = actual_time - start_time;
  results->iters_type[results->iter_index] = operations;
  results->iter_index = results->iter_index + 1;
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
}

/*
 * Realiza una multiplicación de matrices de tamaño n
 */
void computeMatrix(double *matrix, int n) {
  int row, col, i, aux;

  for(row=0; i<n; row++) {
    /* COMPUTE */
    for(col=0; col<n; col++) {
      aux=0;
      for(i=0; i<n; i++) {
        aux += matrix[row*n + i] * matrix[i*n + col];
      }
    }
  }
}

/*
 * Init matrix
 */
void initMatrix(double **matrix, int n) {
  int i, j;

  // Init matrix
  if(matrix != NULL) {
    *matrix = malloc(n * n * sizeof(double));
    if(*matrix == NULL) { MPI_Abort(MPI_COMM_WORLD, -1);}
    for(i=0; i < n; i++) {
      for(j=0; j < n; j++) {
        (*matrix)[i*n + j] = i+j;
      }
    }
  }
}
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373

//======================================================||
//======================================================||
//=============???????¿¿¿¿¿¿¿¿ FUNCTIONS================||
//======================================================||
//======================================================||

void print_general_info(int myId, int grp, int numP) {
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version);

  free(name);
  free(version);
}