Main.c 10.9 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../IOcodes/read_ini.h"
iker_martin's avatar
iker_martin committed
5
6
#include "../malleability/ProcessDist.h"
#include "../malleability/CommDist.h"
7
8
9

#define ROOT 0

iker_martin's avatar
iker_martin committed
10
11
12
int work();
void Sons_init();

13
int checkpoint(int iter, int state, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
14
void TC(int numS);
15
16
int start_redistribution(int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
17

18
void iterate(double *matrix, int n, int async_comm);
19
20
21
void computeMatrix(double *matrix, int n);
void initMatrix(double **matrix, int n);

iker_martin's avatar
iker_martin committed
22
23
24
25
typedef struct {
  int myId;
  int numP;
  int grp;
26
  int iter_start;
iker_martin's avatar
iker_martin committed
27
28
29
30


  MPI_Comm children, parents;
  char **argv;
31
  char *sync_array, *async_array;
iker_martin's avatar
iker_martin committed
32
33
34
35
36
} group_data;

configuration *config_file;
group_data *group;

37
38
39
40
// Variables sobre resultados
//int *iters_time, *iters_type, iter_index;


41
int main(int argc, char *argv[]) {
42
43
    int numP, myId, i;
    int thread_level;
44

45
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &thread_level);
46
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
47
48
49
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);

    group = malloc(1 * sizeof(group_data));
50
51
52
53
54
    group->myId        = myId;
    group->numP        = numP;
    group->grp         = 0;
    group->iter_start  = 0;
    group->argv        = argv;
iker_martin's avatar
iker_martin committed
55
56

    MPI_Comm_get_parent(&(group->parents));
57
    if(group->parents != MPI_COMM_NULL ) { // Si son procesos hijos deben comunicarse con las padres
iker_martin's avatar
iker_martin committed
58
      Sons_init();
59
    } else { // Si son el primer grupo de procesos, recogen la configuracion inicial
iker_martin's avatar
iker_martin committed
60
61
62
      config_file = read_ini_file(argv[1]);
      if(config_file->sdr > 0) {
        malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
63
      }
64
65
66
      if(config_file->adr > 0) {
        malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
      }
67
    }
iker_martin's avatar
iker_martin committed
68

69
70
71
72
73
    //iters_time = malloc(config_file->iters[group->grp] * 3 * sizeof(int));
    //iters_type = malloc(config_file->iters[group->grp] * 3 * sizeof(int));
    //iter_index = 0;

    //if(myId== ROOT) print_config(config_file, group->grp);
iker_martin's avatar
iker_martin committed
74
    int res = work();
75

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
    /*
    if(myId == ROOT) {
      print_config_group(config_file, group->grp);
      printf("Titer: ");
      for(i=0; i<iter_index; i++) {
        printf("%d ", iters_time[i]);
      }

      printf("\nTop: ");
      for(i=0; i<iter_index; i++) {
        printf("%d ", iters_type[i]);
      }
      printf("\n");
      free(iters_time);
      free(iters_type);
91
    }
92
93
94
95
96
97
98
99
100
101
    */
    
/*
  int len;
  char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char));
  char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char));
  MPI_Get_processor_name(name, &len);
  MPI_Get_library_version(version, &len);
  printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, group->grp, numP, name, version);
*/
102

103
104
105
106
    free_config(config_file);
    free(group->sync_array);
    free(group->async_array);
    free(group);
107
    
108
109
110
111
112
113

    MPI_Finalize();
    return 0;
}

/*
114
115
116
117
118
119
120
121
122
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
123
 */
iker_martin's avatar
iker_martin committed
124
int work() {
125
  int iter, maxiter, state;
126
  double *matrix;
127
  MPI_Request *async_comm;
128

iker_martin's avatar
iker_martin committed
129
130
  maxiter = config_file->iters[group->grp];
  initMatrix(&matrix, config_file->matrix_tam);
131
  state = MAL_COMM_UNINITIALIZED;
iker_martin's avatar
iker_martin committed
132

133
  for(iter=group->iter_start; iter < maxiter; iter++) {
134
    iterate(matrix, config_file->matrix_tam, state);
135
  }
iker_martin's avatar
iker_martin committed
136

137
  state = checkpoint(iter, state, &async_comm);
iker_martin's avatar
iker_martin committed
138

139
140
  iter = 0;
  while(state == MAL_ASYNC_PENDING) {
141
    iterate(matrix, config_file->matrix_tam, state);
142
    iter++;
143

144
    state = checkpoint(iter, state, &async_comm);
145
146
  }

147
148
149
  return 0;
}

150
151
152
153
154
155
156
157
/*
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
158
 * llamando a la función de nuevo que se han terminado de enviar
159
160
161
162
163
164
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
165
166
167
168
169
170
171
172
173
174
175
176
177
int checkpoint(int iter, int state, MPI_Request **comm_req) {
  
  if(state == MAL_COMM_UNINITIALIZED) {
    // Comprobar si se tiene que realizar un redimensionado
    if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}

    int numS = config_file->procs[group->grp +1];
    TC(numS);
    state = start_redistribution(numS, comm_req);

  } else if(MAL_ASYNC_PENDING) {
    state = check_redistribution(iter, comm_req);
  }
iker_martin's avatar
iker_martin committed
178

179
180
  return state;
}
iker_martin's avatar
iker_martin committed
181

182
183
184
185
186
187
188
/*
 * Se encarga de realizar la creacion de los procesos hijos.
 */
void TC(int numS){
  // Inicialización de la comunicación con SLURM
  int dist = config_file->phy_dist[group->grp +1];
  init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, COMM_SPAWN_SERIAL);
iker_martin's avatar
iker_martin committed
189

190
191
192
193
194
195
196
  // Esperar a que la comunicación y creación de procesos
  // haya finalizado
  int test = -1;
  while(test != MPI_SUCCESS) {
    test = check_slurm_comm(group->myId, ROOT, MPI_COMM_WORLD, &(group->children));
  }
}
iker_martin's avatar
iker_martin committed
197

198
int start_redistribution(int numS, MPI_Request **comm_req) {
iker_martin's avatar
iker_martin committed
199
200
201
202
203
204
205
  int rootBcast = MPI_PROC_NULL;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
  send_config_file(config_file, rootBcast, group->children);

206
  if(config_file->adr > 0) {
207
208
209
    send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
    return MAL_ASYNC_PENDING;
  } 
iker_martin's avatar
iker_martin committed
210
211
212
213
214
215
  if(config_file->sdr > 0) {
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }
  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));

216
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
217
218
}

219
220
221
222
223
224
int check_redistribution(int iter, MPI_Request **comm_req) {
  int completed, all_completed, test_err, iter_send;
  int numS = config_file->procs[group->grp +1];
  int rootBcast = MPI_PROC_NULL;
  MPI_Request *req_completed;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;
iker_martin's avatar
iker_martin committed
225

226
227
228
229
  if(config_file->aib == MAL_USE_NORMAL) {
    req_completed = &(*comm_req)[0];
  } else {
    req_completed = &(*comm_req)[1];
iker_martin's avatar
iker_martin committed
230
  }
231
 
232
233
  test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
234
    printf("P%d aborting -- Test Async\n", group->myId);
235
236
237
238
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }

  MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
239
240
  if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended 
  
241
  iter_send = iter;
242
  //MPI_Barrier(MPI_COMM_WORLD); if(group->myId == ROOT){ printf("TEST 2.A0.5\n"); fflush(stdout);} //FIXME BORRAR
243
  MPI_Bcast(&iter_send, 1, MPI_INT, rootBcast, group->children);
244
245
  //MPI_Barrier(MPI_COMM_WORLD); if(group->myId == ROOT){ printf("TEST 2.A0.5a\n"); fflush(stdout);} //FIXME BORRAR
  if(config_file->sdr > 0) { // Realizar envio sincrono
246
247
248
249
250
251
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }

  if(config_file->aib == MAL_USE_IBARRIER) {
    MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
  }
252
    free(*comm_req);
253
254
255
256

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
257
258
}

259
260
261
262
263
264
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
iker_martin's avatar
iker_martin committed
265
266
267
268
269
270
271
272
273
void Sons_init() {

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, ROOT, group->parents);
  group->grp++;

  config_file = recv_config_file(ROOT, group->parents);
  int numP_parents = config_file->procs[group->grp -1];

274
275
276
277
  if(config_file->adr > 0) { // Recibir datos asincronos
    recv_async(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents, config_file->aib);
    MPI_Bcast(&(group->iter_start), 1, MPI_INT, ROOT, group->parents);
  }
278
  if(config_file->sdr > 0) { // Recibir datos sincronos
iker_martin's avatar
iker_martin committed
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
    recv_sync(&(group->sync_array), config_file->sdr, group->myId, group->numP, ROOT, group->parents, numP_parents);
  }

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->parents));
}


/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


294
295
/*
 * Simula la ejecucción de una iteración de computo en la aplicación
296
 * que dura al menos un tiempo de "time" segundos.
297
 */
298
void iterate(double *matrix, int n, int async_comm) {
299
  double start_time, actual_time;
iker_martin's avatar
iker_martin committed
300
  double time = config_file->general_time * config_file->factors[group->grp];
301
  int i, operations = 0;
302
303

  start_time = actual_time = MPI_Wtime();
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
  /*
  if(async_comm == MAL_ASYNC_PENDING) { // Se esta realizando una redistribucion de datos asincrona
    operations = iters_type[iter_index - 1];
    for (i=0; i<operations; i++) {
      computeMatrix(matrix, n);
      actual_time = MPI_Wtime(); // Guardar tiempos
    }
    operations = 0;

  } else { // No hay redistribucion de datos actualmente
	  */
    while (actual_time - start_time < time) {
      computeMatrix(matrix, n);
      operations++;
      actual_time = MPI_Wtime(); // Guardar tiempos
    }
  //}

  //iters_time[iter_index] = actual_time - start_time;
  //iters_type[iter_index] = operations;
  //iters_type++;
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
}

/*
 * Realiza una multiplicación de matrices de tamaño n
 */
void computeMatrix(double *matrix, int n) {
  int row, col, i, aux;

  for(row=0; i<n; row++) {
    /* COMPUTE */
    for(col=0; col<n; col++) {
      aux=0;
      for(i=0; i<n; i++) {
        aux += matrix[row*n + i] * matrix[i*n + col];
      }
    }
  }
}

/*
 * Init matrix
 */
void initMatrix(double **matrix, int n) {
  int i, j;

  // Init matrix
  if(matrix != NULL) {
    *matrix = malloc(n * n * sizeof(double));
    if(*matrix == NULL) { MPI_Abort(MPI_COMM_WORLD, -1);}
    for(i=0; i < n; i++) {
      for(j=0; j < n; j++) {
        (*matrix)[i*n + j] = i+j;
      }
    }
  }
}