Main.c 11.6 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../IOcodes/read_ini.h"
iker_martin's avatar
iker_martin committed
5
6
#include "../malleability/ProcessDist.h"
#include "../malleability/CommDist.h"
7
8
9

#define ROOT 0

iker_martin's avatar
iker_martin committed
10
11
12
int work();
void Sons_init();

13
int checkpoint(int iter, int state, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
14
void TC(int numS);
15
16
int start_redistribution(int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
17

18
void iterate(double *matrix, int n, int async_comm);
19
20
21
void computeMatrix(double *matrix, int n);
void initMatrix(double **matrix, int n);

iker_martin's avatar
iker_martin committed
22
23
24
25
typedef struct {
  int myId;
  int numP;
  int grp;
26
  int iter_start;
iker_martin's avatar
iker_martin committed
27
28
29
30


  MPI_Comm children, parents;
  char **argv;
31
  char *sync_array, *async_array;
iker_martin's avatar
iker_martin committed
32
33
34
35
36
} group_data;

configuration *config_file;
group_data *group;

37
// Variables sobre resultados
38
int *iters_time, *iters_type, iter_index;
39
40


41
int main(int argc, char *argv[]) {
42
    int numP, myId, i;
43

44
    MPI_Init(&argc, &argv);
45
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
46
47
48
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);

    group = malloc(1 * sizeof(group_data));
49
50
51
52
53
    group->myId        = myId;
    group->numP        = numP;
    group->grp         = 0;
    group->iter_start  = 0;
    group->argv        = argv;
iker_martin's avatar
iker_martin committed
54
55

    MPI_Comm_get_parent(&(group->parents));
56
    if(group->parents != MPI_COMM_NULL ) { // Si son procesos hijos deben comunicarse con las padres
iker_martin's avatar
iker_martin committed
57
      Sons_init();
58

59
    } else { // Si son el primer grupo de procesos, recogen la configuracion inicial
iker_martin's avatar
iker_martin committed
60
61
62
      config_file = read_ini_file(argv[1]);
      if(config_file->sdr > 0) {
        malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
63
      }
64
65
66
      if(config_file->adr > 0) {
        malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
      }
67
    }
iker_martin's avatar
iker_martin committed
68

69
70
71
    iters_time = malloc(config_file->iters[group->grp] * 3 * sizeof(int));
    iters_type = malloc(config_file->iters[group->grp] * 3 * sizeof(int));
    iter_index = 0;
72
73

    //if(myId== ROOT) print_config(config_file, group->grp);
74
    work();
75

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
    /*
    if(myId == ROOT) {
      print_config_group(config_file, group->grp);
      printf("Titer: ");
      for(i=0; i<iter_index; i++) {
        printf("%d ", iters_time[i]);
      }

      printf("\nTop: ");
      for(i=0; i<iter_index; i++) {
        printf("%d ", iters_type[i]);
      }
      printf("\n");
      free(iters_time);
      free(iters_type);
91
    }*/
92
93
94
95
96
97
98
99
100
    
/*
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, group->grp, numP, name, version);
*/
101

102
103
104
105
106
107
    if(config_file->sdr > 0) {
      free(group->sync_array);
    }
    if(config_file->adr > 0) {
      free(group->async_array);
    }
108
    free(group);
109
110
111
    free_config(config_file);
    free(iters_time);
    free(iters_type);
112
    
113
114
115
116
117
    MPI_Finalize();
    return 0;
}

/*
118
119
120
121
122
123
124
125
126
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
127
 */
iker_martin's avatar
iker_martin committed
128
int work() {
129
  int iter, maxiter, state;
130
  double *matrix;
131
  MPI_Request *async_comm;
132

iker_martin's avatar
iker_martin committed
133
134
  maxiter = config_file->iters[group->grp];
  initMatrix(&matrix, config_file->matrix_tam);
135
  state = MAL_COMM_UNINITIALIZED;
iker_martin's avatar
iker_martin committed
136

137
  for(iter=group->iter_start; iter < maxiter; iter++) {
138
    iterate(matrix, config_file->matrix_tam, state);
139
  }
iker_martin's avatar
iker_martin committed
140

141
  state = checkpoint(iter, state, &async_comm);
142
  
143
144
  iter = 0;
  while(state == MAL_ASYNC_PENDING) {
145
    iterate(matrix, config_file->matrix_tam, state);
146
    iter++;
147
    state = checkpoint(iter, state, &async_comm);
148
  }
149
  
150
151
152
  return 0;
}

153
154
155
156
157
158
159
160
/*
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
161
 * llamando a la función de nuevo que se han terminado de enviar
162
163
164
165
166
167
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
168
169
170
171
172
173
174
175
176
177
178
179
180
int checkpoint(int iter, int state, MPI_Request **comm_req) {
  
  if(state == MAL_COMM_UNINITIALIZED) {
    // Comprobar si se tiene que realizar un redimensionado
    if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}

    int numS = config_file->procs[group->grp +1];
    TC(numS);
    state = start_redistribution(numS, comm_req);

  } else if(MAL_ASYNC_PENDING) {
    state = check_redistribution(iter, comm_req);
  }
iker_martin's avatar
iker_martin committed
181

182
183
  return state;
}
iker_martin's avatar
iker_martin committed
184

185
186
187
188
189
190
191
/*
 * Se encarga de realizar la creacion de los procesos hijos.
 */
void TC(int numS){
  // Inicialización de la comunicación con SLURM
  int dist = config_file->phy_dist[group->grp +1];
  init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, COMM_SPAWN_SERIAL);
iker_martin's avatar
iker_martin committed
192

193
194
195
196
197
198
199
  // Esperar a que la comunicación y creación de procesos
  // haya finalizado
  int test = -1;
  while(test != MPI_SUCCESS) {
    test = check_slurm_comm(group->myId, ROOT, MPI_COMM_WORLD, &(group->children));
  }
}
iker_martin's avatar
iker_martin committed
200

201
int start_redistribution(int numS, MPI_Request **comm_req) {
iker_martin's avatar
iker_martin committed
202
203
204
205
206
207
208
  int rootBcast = MPI_PROC_NULL;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
  send_config_file(config_file, rootBcast, group->children);

209
  if(config_file->adr > 0) {
210
211
212
    send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
    return MAL_ASYNC_PENDING;
  } 
iker_martin's avatar
iker_martin committed
213
214
215
216
217
218
  if(config_file->sdr > 0) {
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }
  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));

219
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
220
221
}

222
223
224
225
226
227
int check_redistribution(int iter, MPI_Request **comm_req) {
  int completed, all_completed, test_err, iter_send;
  int numS = config_file->procs[group->grp +1];
  int rootBcast = MPI_PROC_NULL;
  MPI_Request *req_completed;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;
iker_martin's avatar
iker_martin committed
228

229
230
  if(config_file->aib == MAL_USE_NORMAL) {
    req_completed = &(*comm_req)[0];
231
  } else { // MAL_USE_IBARRIER
232
    req_completed = &(*comm_req)[1];
iker_martin's avatar
iker_martin committed
233
  }
234
 
235
  
236
237
  test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
238
    printf("P%d aborting -- Test Async\n", group->myId);
239
240
241
242
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }

  MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
243
  if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended 
244
245
246
247
248

  //MPI_Wait(req_completed, MPI_STATUS_IGNORE);
  if(config_file->aib == MAL_USE_IBARRIER) {
    MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
  }
249
  
250
251
  iter_send = iter;
  MPI_Bcast(&iter_send, 1, MPI_INT, rootBcast, group->children);
252
  if(config_file->sdr > 0) { // Realizar envio sincrono
253
254
255
256
257
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));
258
  free(*comm_req);
259
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
260
261
}

262
263
264
265
266
267
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
iker_martin's avatar
iker_martin committed
268
269
270
271
272
273
274
275
276
void Sons_init() {

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, ROOT, group->parents);
  group->grp++;

  config_file = recv_config_file(ROOT, group->parents);
  int numP_parents = config_file->procs[group->grp -1];

277
278
279
280
  if(config_file->adr > 0) { // Recibir datos asincronos
    recv_async(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents, config_file->aib);
    MPI_Bcast(&(group->iter_start), 1, MPI_INT, ROOT, group->parents);
  }
281
  if(config_file->sdr > 0) { // Recibir datos sincronos
iker_martin's avatar
iker_martin committed
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    recv_sync(&(group->sync_array), config_file->sdr, group->myId, group->numP, ROOT, group->parents, numP_parents);
  }

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->parents));
}


/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


297
298
/*
 * Simula la ejecucción de una iteración de computo en la aplicación
299
 * que dura al menos un tiempo de "time" segundos.
300
 */
301
void iterate(double *matrix, int n, int async_comm) {
302
  double start_time, actual_time;
iker_martin's avatar
iker_martin committed
303
  double time = config_file->general_time * config_file->factors[group->grp];
304
  int i, operations = 0;
305
306

  start_time = actual_time = MPI_Wtime();
307
308
309
  
  if(async_comm == MAL_ASYNC_PENDING && iter_index > 0) { // Se esta realizando una redistribucion de datos asincrona
MPI_Barrier(MPI_COMM_WORLD); if(group->myId) printf("TEST 0\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
310
    operations = iters_type[iter_index - 1];
311
MPI_Barrier(MPI_COMM_WORLD); if(group->myId) printf("TEST 1\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
312
    for (i=0; i<operations; i++) {
313
//MPI_Barrier(MPI_COMM_WORLD); if(group->myId) printf("TEST 2\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
314
      computeMatrix(matrix, n);
315
//MPI_Barrier(MPI_COMM_WORLD); if(group->myId) printf("TEST 3\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
316
      actual_time = MPI_Wtime(); // Guardar tiempos
317
//MPI_Barrier(MPI_COMM_WORLD); if(group->myId) printf("TEST 4\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
318
    }
319
MPI_Barrier(MPI_COMM_WORLD); if(group->myId) printf("TEST 5\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
320
    operations = 0;
321
MPI_Barrier(MPI_COMM_WORLD); if(group->myId) printf("TEST 6\n"); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
322

323
324
  } else { // No hay redistribucion de datos actualmente	  
	  
325
326
327
328
329
    while (actual_time - start_time < time) {
      computeMatrix(matrix, n);
      operations++;
      actual_time = MPI_Wtime(); // Guardar tiempos
    }
330
  }
331

332
333
334
  iters_time[iter_index] = actual_time - start_time;
  iters_type[iter_index] = operations;
  iter_index = iter_index + 1;
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
}

/*
 * Realiza una multiplicación de matrices de tamaño n
 */
void computeMatrix(double *matrix, int n) {
  int row, col, i, aux;

  for(row=0; i<n; row++) {
    /* COMPUTE */
    for(col=0; col<n; col++) {
      aux=0;
      for(i=0; i<n; i++) {
        aux += matrix[row*n + i] * matrix[i*n + col];
      }
    }
  }
}

/*
 * Init matrix
 */
void initMatrix(double **matrix, int n) {
  int i, j;

  // Init matrix
  if(matrix != NULL) {
    *matrix = malloc(n * n * sizeof(double));
    if(*matrix == NULL) { MPI_Abort(MPI_COMM_WORLD, -1);}
    for(i=0; i < n; i++) {
      for(j=0; j < n; j++) {
        (*matrix)[i*n + j] = i+j;
      }
    }
  }
}