Main.c 9.07 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include "../IOcodes/read_ini.h"
iker_martin's avatar
iker_martin committed
5
6
#include "../malleability/ProcessDist.h"
#include "../malleability/CommDist.h"
7
8
9

#define ROOT 0

iker_martin's avatar
iker_martin committed
10
11
12
int work();
void Sons_init();

13
int checkpoint(int iter, int state, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
14
void TC(int numS);
15
16
int start_redistribution(int numS, MPI_Request **comm_req);
int check_redistribution(int iter, MPI_Request **comm_req);
iker_martin's avatar
iker_martin committed
17

18
19
20
21
void iterate(double *matrix, int n);
void computeMatrix(double *matrix, int n);
void initMatrix(double **matrix, int n);

iker_martin's avatar
iker_martin committed
22
23
24
25
typedef struct {
  int myId;
  int numP;
  int grp;
26
  int iter_start;
iker_martin's avatar
iker_martin committed
27
28
29
30


  MPI_Comm children, parents;
  char **argv;
31
  char *sync_array, *async_array;
iker_martin's avatar
iker_martin committed
32
33
34
35
36
} group_data;

configuration *config_file;
group_data *group;

37
38
39
40
41
int main(int argc, char *argv[]) {
    int numP, myId;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numP);
iker_martin's avatar
iker_martin committed
42
43
44
    MPI_Comm_rank(MPI_COMM_WORLD, &myId);

    group = malloc(1 * sizeof(group_data));
45
46
47
48
49
    group->myId        = myId;
    group->numP        = numP;
    group->grp         = 0;
    group->iter_start  = 0;
    group->argv        = argv;
iker_martin's avatar
iker_martin committed
50
51

    MPI_Comm_get_parent(&(group->parents));
52
    if(group->parents != MPI_COMM_NULL ) { // Si son procesos hijos deben comunicarse con las padres
iker_martin's avatar
iker_martin committed
53
      Sons_init();
54
    } else { // Si son el primer grupo de procesos, recogen la configuracion inicial
iker_martin's avatar
iker_martin committed
55
56
57
      config_file = read_ini_file(argv[1]);
      if(config_file->sdr > 0) {
        malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP);
58
      }
59
60
61
      if(config_file->adr > 0) {
        malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP);
      }
62
    }
iker_martin's avatar
iker_martin committed
63

64
    if(myId== ROOT) print_config(config_file, numP);
iker_martin's avatar
iker_martin committed
65
    int res = work();
66

67
    if(res) { // Ultimo set de procesos muestra resultados
68
69
70
	    //RESULTADOS
    }

71
72
73
74
    free_config(config_file);
    free(group->sync_array);
    free(group->async_array);
    free(group);
75
76
77
78
79
80

    MPI_Finalize();
    return 0;
}

/*
81
82
83
84
85
86
87
88
89
 * Función de trabajo principal.
 *
 * Incializa los datos para realizar el computo y a continuacion
 * pasa a realizar "maxiter" iteraciones de computo.
 *
 * Terminadas las iteraciones realiza el redimensionado de procesos.
 * Si el redimensionado se realiza de forma asincrona se 
 * siguen realizando iteraciones de computo hasta que termine la 
 * comunicacion asincrona y realizar entonces la sincrona.
90
 */
iker_martin's avatar
iker_martin committed
91
int work() {
92
  int iter, maxiter, state;
93
  double *matrix;
94
  MPI_Request *async_comm;
95

iker_martin's avatar
iker_martin committed
96
97
98
  maxiter = config_file->iters[group->grp];
  initMatrix(&matrix, config_file->matrix_tam);

99
  for(iter=group->iter_start; iter < maxiter; iter++) {
iker_martin's avatar
iker_martin committed
100
    iterate(matrix, config_file->matrix_tam);
101
  }
iker_martin's avatar
iker_martin committed
102

103
  state = checkpoint(iter, MAL_COMM_UNINITIALIZED, &async_comm);
iker_martin's avatar
iker_martin committed
104

105
106
  iter = 0;
  while(state == MAL_ASYNC_PENDING) {
107
108
    iterate(matrix, config_file->matrix_tam);
    iter++;
109
    state = checkpoint(iter, state, &async_comm);
110
111
  }

112
113
114
  return 0;
}

115
116
117
118
119
120
121
122
/*
 * Se realiza el redimensionado de procesos por parte de los padres.
 *
 * Se crean los nuevos procesos con la distribucion fisica elegida y
 * a continuacion se transmite la informacion a los mismos.
 *
 * Si hay datos asincronos a transmitir, primero se comienza a
 * transmitir estos y se termina la funcion. Se tiene que comprobar con
123
 * llamando a la función de nuevo que se han terminado de enviar //TODO
124
125
126
127
128
129
 *
 * Si hay ademas datos sincronos a enviar, no se envian aun.
 *
 * Si solo hay datos sincronos se envian tras la creacion de los procesos
 * y finalmente se desconectan los dos grupos de procesos.
 */
130
131
132
133
134
135
136
137
138
139
140
141
142
int checkpoint(int iter, int state, MPI_Request **comm_req) {
  
  if(state == MAL_COMM_UNINITIALIZED) {
    // Comprobar si se tiene que realizar un redimensionado
    if(config_file->iters[group->grp] > iter || config_file->resizes == group->grp + 1) {return MAL_COMM_UNINITIALIZED;}

    int numS = config_file->procs[group->grp +1];
    TC(numS);
    state = start_redistribution(numS, comm_req);

  } else if(MAL_ASYNC_PENDING) {
    state = check_redistribution(iter, comm_req);
  }
iker_martin's avatar
iker_martin committed
143

144
145
  return state;
}
iker_martin's avatar
iker_martin committed
146

147
148
149
150
151
152
153
/*
 * Se encarga de realizar la creacion de los procesos hijos.
 */
void TC(int numS){
  // Inicialización de la comunicación con SLURM
  int dist = config_file->phy_dist[group->grp +1];
  init_slurm_comm(group->argv, group->myId, numS, ROOT, dist, COMM_SPAWN_SERIAL);
iker_martin's avatar
iker_martin committed
154

155
156
157
158
159
160
161
  // Esperar a que la comunicación y creación de procesos
  // haya finalizado
  int test = -1;
  while(test != MPI_SUCCESS) {
    test = check_slurm_comm(group->myId, ROOT, MPI_COMM_WORLD, &(group->children));
  }
}
iker_martin's avatar
iker_martin committed
162

163
int start_redistribution(int numS, MPI_Request **comm_req) {
iker_martin's avatar
iker_martin committed
164
165
166
167
168
169
170
  int rootBcast = MPI_PROC_NULL;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, rootBcast, group->children);
  send_config_file(config_file, rootBcast, group->children);

171
  if(config_file->adr > 0) {
172
173
174
    send_async(group->async_array, config_file->adr, group->myId, group->numP, ROOT, group->children, numS, comm_req, config_file->aib);
    return MAL_ASYNC_PENDING;
  } 
iker_martin's avatar
iker_martin committed
175
176
177
178
179
180
  if(config_file->sdr > 0) {
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }
  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));

181
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
182
183
}

184
185
186
187
188
189
int check_redistribution(int iter, MPI_Request **comm_req) {
  int completed, all_completed, test_err, iter_send;
  int numS = config_file->procs[group->grp +1];
  int rootBcast = MPI_PROC_NULL;
  MPI_Request *req_completed;
  if(group->myId == ROOT) rootBcast = MPI_ROOT;
iker_martin's avatar
iker_martin committed
190

191
192
193
194
  if(config_file->aib == MAL_USE_NORMAL) {
    req_completed = &(*comm_req)[0];
  } else {
    req_completed = &(*comm_req)[1];
iker_martin's avatar
iker_martin committed
195
  }
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220

  test_err = MPI_Test(req_completed, &completed, MPI_STATUS_IGNORE);
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
    printf("P%d aborting\n", group->myId);
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }

  MPI_Allreduce(&completed, &all_completed, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD);
  if(!all_completed) return MAL_ASYNC_PENDING; // Continue only if asynchronous send has ended  

  iter_send = iter;
  MPI_Bcast(&iter_send, 1, MPI_INT, rootBcast, group->children);
  if(config_file->sdr > 0) {
    send_sync(group->sync_array, config_file->sdr, group->myId, group->numP, ROOT, group->children, numS);
  }

  if(config_file->aib == MAL_USE_IBARRIER) {
    MPI_Wait(&(*comm_req)[0], MPI_STATUS_IGNORE); // Indicar como completado el envio asincrono
  }

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->children));

  free(*comm_req);
  return MAL_COMM_COMPLETED;
iker_martin's avatar
iker_martin committed
221
222
}

223
224
225
226
227
228
/*
 * Inicializacion de los datos de los hijos.
 * En la misma se reciben datos de los padres: La configuracion
 * de la ejecucion a realizar; y los datos a recibir de los padres
 * ya sea de forma sincrona, asincrona o ambas.
 */
iker_martin's avatar
iker_martin committed
229
230
231
232
233
234
235
236
237
void Sons_init() {

  // Enviar a los hijos que grupo de procesos son
  MPI_Bcast(&(group->grp), 1, MPI_INT, ROOT, group->parents);
  group->grp++;

  config_file = recv_config_file(ROOT, group->parents);
  int numP_parents = config_file->procs[group->grp -1];

238
239
240
241
  if(config_file->adr > 0) { // Recibir datos asincronos
    recv_async(&(group->async_array), config_file->adr, group->myId, group->numP, ROOT, group->parents, numP_parents, config_file->aib);
    MPI_Bcast(&(group->iter_start), 1, MPI_INT, ROOT, group->parents);
  }
242
  if(config_file->sdr > 0) { // Recibir datos sincronos
iker_martin's avatar
iker_martin committed
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
    recv_sync(&(group->sync_array), config_file->sdr, group->myId, group->numP, ROOT, group->parents, numP_parents);
  }

  // Desconectar intercomunicador con los hijos
  MPI_Comm_disconnect(&(group->parents));
}


/////////////////////////////////////////
/////////////////////////////////////////
//COMPUTE FUNCTIONS
/////////////////////////////////////////
/////////////////////////////////////////


258
259
/*
 * Simula la ejecucción de una iteración de computo en la aplicación
260
 * que dura al menos un tiempo de "time" segundos.
261
262
263
 */
void iterate(double *matrix, int n) {
  double start_time, actual_time;
iker_martin's avatar
iker_martin committed
264
  double time = config_file->general_time * config_file->factors[group->grp];
265
266

  start_time = actual_time = MPI_Wtime();
iker_martin's avatar
iker_martin committed
267
  while (actual_time - start_time < time) {
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
    computeMatrix(matrix, n);
    actual_time = MPI_Wtime();
  }
}

/*
 * Realiza una multiplicación de matrices de tamaño n
 */
void computeMatrix(double *matrix, int n) {
  int row, col, i, aux;

  for(row=0; i<n; row++) {
    /* COMPUTE */
    for(col=0; col<n; col++) {
      aux=0;
      for(i=0; i<n; i++) {
        aux += matrix[row*n + i] * matrix[i*n + col];
      }
    }
  }
}

/*
 * Init matrix
 */
void initMatrix(double **matrix, int n) {
  int i, j;

  // Init matrix
  if(matrix != NULL) {
    *matrix = malloc(n * n * sizeof(double));
    if(*matrix == NULL) { MPI_Abort(MPI_COMM_WORLD, -1);}
    for(i=0; i < n; i++) {
      for(j=0; j < n; j++) {
        (*matrix)[i*n + j] = i+j;
      }
    }
  }
}