CommDist.c 18 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
iker_martin's avatar
iker_martin committed
7

8
9
10
11
12
void send_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI,  int idE, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts);

void send_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_child, int idI,  int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);
iker_martin's avatar
iker_martin committed
13

14
15
16
void send_async_point_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI,  int idE, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req);

iker_martin's avatar
iker_martin committed
17
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
18
19
20
21
22
23
24
25
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

26
    get_block_dist(qty, myId, numP, &dist_data);
27
28
29
30
    if( (*array = malloc(dist_data.tamBl * sizeof(char))) == NULL) {
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
31

32
/*
33
34
35
36
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
37
38
39
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
40
}
41
42
43

//================================================================================
//================================================================================
44
//========================SYNCHRONOUS FUNCTIONS===================================
45
46
47
//================================================================================
//================================================================================

48
49
50
51
52
53
/*
 * Realiza un envio síncrono del vector array desde este grupo de procesos al grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El vector array no se modifica en esta funcion.
 */
iker_martin's avatar
iker_martin committed
54
55
56
57
58
59
60
61
int send_sync(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child) {
    int rootBcast = MPI_PROC_NULL;
    int *idS = NULL;
    struct Counts counts;
    struct Dist_data dist_data;

    if(myId == root) rootBcast = MPI_ROOT;

62
    get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
iker_martin's avatar
iker_martin committed
63
64
65
66
67
68
69
    dist_data.intercomm = intercomm;

    // Create arrays which contains info about how many elements will be send to each created process
    mallocCounts(&counts, numP_child);

    getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos

70
    send_sync_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts);
iker_martin's avatar
iker_martin committed
71
72
73
74
75
76
77
78

    freeCounts(&counts);
    free(idS);

    return 1;
}


79
80
81
82
83
84
85
/*
 * Realiza una recepcion síncrona del vector array a este grupo de procesos desde el grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
 * Tiene que ser liberado posteriormente por el usuario.
 */
iker_martin's avatar
iker_martin committed
86
87
88
89
90
91
void recv_sync(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents) {
    int *idS = NULL;
    struct Counts counts;
    struct Dist_data dist_data;

    // Obtener distribución para este hijo
92
    get_block_dist(qty, myId, numP, &dist_data);
93
    *array = malloc(dist_data.tamBl * sizeof(char));
94
    //(*array)[dist_data.tamBl] = '\0';
iker_martin's avatar
iker_martin committed
95
96
97
98
99
100
101
    dist_data.intercomm = intercomm;

    /* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
    mallocCounts(&counts, numP_parents);

    getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos

102
    recv_sync_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts);
103
    //printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
iker_martin's avatar
iker_martin committed
104
105
106
107
108
109

    freeCounts(&counts);
    free(idS);
}

/*
110
111
112
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
iker_martin's avatar
iker_martin committed
113
 */
114
115
void send_sync_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI,  int idE, struct Counts counts) {

116
117
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
    /*int i;
iker_martin's avatar
iker_martin committed
118
119
    // PREPARAR ENVIO DEL VECTOR
    if(idI == 0) {
120
      set_counts(0, numP_child, dist_data, counts.counts);
iker_martin's avatar
iker_martin committed
121
122
123
      idI++;
    }
    for(i=idI; i<idE; i++) {
124
125
      set_counts(i, numP_child, dist_data, counts.counts);
      counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
126
127
    }*/

128
    //print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
iker_martin's avatar
iker_martin committed
129
    /* COMUNICACION DE DATOS */
130
    MPI_Alltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm);
iker_martin's avatar
iker_martin committed
131
132
}

133
134
135
136
137
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 */
138
139
void recv_sync_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts) {
	
140
    char aux;
iker_martin's avatar
iker_martin committed
141

142
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
iker_martin's avatar
iker_martin committed
143
    // Ajustar los valores de recepcion
144
    /*
iker_martin's avatar
iker_martin committed
145
    if(idI == 0) {
146
      set_counts(0, numP_parents, dist_data, counts.counts);
iker_martin's avatar
iker_martin committed
147
148
149
      idI++;
    }
    for(i=idI; i<idE; i++) {
150
151
      set_counts(i, numP_parents, dist_data, counts.counts);
      counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
152
    }*/
153
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
iker_martin's avatar
iker_martin committed
154
155

    /* COMUNICACION DE DATOS */
156
    MPI_Alltoallv(&aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm);
157
158
159
160
161
}


//================================================================================
//================================================================================
162
//========================ASYNCHRONOUS FUNCTIONS==================================
163
164
165
166
167
168
169
170
171
172
173
174
//================================================================================
//================================================================================

/*
 * Realiza un envio asincrono del vector array desde este grupo de procesos al grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El objeto MPI_Request se devuelve con el manejador para comprobar si la comunicacion
 * ha terminado.
 *
 * El vector array no se modifica en esta funcion.
 */
175
int send_async(char *array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
176
    int i, rootBcast = MPI_PROC_NULL;
177
178
179
180
181
182
    int *idS = NULL;
    struct Counts counts;
    struct Dist_data dist_data;

    if(myId == root) rootBcast = MPI_ROOT;

183
    get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
184
185
186
187
188
189
190
    dist_data.intercomm = intercomm;

    // Create arrays which contains info about how many elements will be send to each created process
    mallocCounts(&counts, numP_child);

    getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos

191
    // MAL_USE_THREAD sigue el camino sincrono
192
    if(parents_wait == MAL_USE_NORMAL) {
193
      //*comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
194
195
196
      *comm_req[0] = MPI_REQUEST_NULL;
      send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &(*comm_req[0])); 

197
    } else if (parents_wait == MAL_USE_IBARRIER){
198
199
200
      //*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
      *comm_req[0] = MPI_REQUEST_NULL;
      *comm_req[1] = MPI_REQUEST_NULL;
201
202
      send_async_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, &((*comm_req)[1])); 
      MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
203
    } else if (parents_wait == MAL_USE_POINT){
204
      //*comm_req = (MPI_Request *) malloc(numP_child * sizeof(MPI_Request));
205
206
207
208
      for(i=0; i<numP_child; i++){
        (*comm_req)[i] = MPI_REQUEST_NULL;
      }
      send_async_point_arrays(dist_data, array, rootBcast, numP_child, idS[0], idS[1], counts, *comm_req); 
209
    } else if (parents_wait == MAL_USE_THREAD) { //TODO 
210
    }
211
212
213
214
215
216
217
218
219
220
221
222
223

    freeCounts(&counts);
    free(idS);

    return 1;
}

/*
 * Realiza una recepcion asincrona del vector array a este grupo de procesos desde el grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
 * Tiene que ser liberado posteriormente por el usuario.
224
225
226
 *
 * El argumento "parents_wait" sirve para indicar si se usará la versión en la los padres 
 * espera a que terminen de enviar, o en la que esperan a que los hijos acaben de recibir.
227
 */
228
void recv_async(char **array, int qty, int myId, int numP, int root, MPI_Comm intercomm, int numP_parents, int parents_wait) {
229
    int *idS = NULL;
230
    int wait_err, i;
231
232
    struct Counts counts;
    struct Dist_data dist_data;
233
    MPI_Request *comm_req, aux;
234
235

    // Obtener distribución para este hijo
236
    get_block_dist(qty, myId, numP, &dist_data);
237
238
239
240
241
242
243
244
    *array = malloc(dist_data.tamBl * sizeof(char));
    dist_data.intercomm = intercomm;

    /* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
    mallocCounts(&counts, numP_parents);

    getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos

245
    // MAL_USE_THREAD sigue el camino sincrono
246
247
248
249
250
251
252
253
    if(parents_wait == MAL_USE_POINT) {
      comm_req = (MPI_Request *) malloc(numP_parents * sizeof(MPI_Request));
      for(i=0; i<numP_parents; i++){
        comm_req[i] = MPI_REQUEST_NULL;
      }
      recv_async_point_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, comm_req);
      wait_err = MPI_Waitall(numP_parents, comm_req, MPI_STATUSES_IGNORE);

254
    } else if (parents_wait == MAL_USE_NORMAL || parents_wait == MAL_USE_IBARRIER) {
255
256
257
258
      comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
      *comm_req = MPI_REQUEST_NULL;
      recv_async_arrays(dist_data, *array, root, numP_parents, idS[0], idS[1], counts, comm_req);
      wait_err = MPI_Wait(comm_req, MPI_STATUS_IGNORE);
259
    } else if (parents_wait == MAL_USE_THREAD) { //TODO
260
    }
261
262
263
264
265

    if(wait_err != MPI_SUCCESS) {
      MPI_Abort(MPI_COMM_WORLD, wait_err);
    }

266
    if(parents_wait == MAL_USE_IBARRIER) { //MAL USE IBARRIER END
267
268
269
      MPI_Ibarrier(intercomm, &aux);
      MPI_Wait(&aux, MPI_STATUS_IGNORE); //Es necesario comprobar que la comunicación ha terminado para desconectar los grupos de procesos
    }
270

271
    //printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
272
273
    freeCounts(&counts);
    free(idS);
274
    free(comm_req);
275
276
277
278
279
280
}

/*
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
281
282
 *
 * El envio se realiza a partir de una comunicación colectiva.
283
284
285
 */
void send_async_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI,  int idE, struct Counts counts, MPI_Request *comm_req) {

286
287
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
    /*
288
289
290
291
292
293
294
295
296
    // PREPARAR ENVIO DEL VECTOR
    if(idI == 0) {
      set_counts(0, numP_child, dist_data, counts.counts);
      idI++;
    }
    for(i=idI; i<idE; i++) {
      set_counts(i, numP_child, dist_data, counts.counts);
      counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
    }
297
    */
298
    //print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
299
300
301
302
303

    /* COMUNICACION DE DATOS */
    MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
}

304
305
306
307
308
309
310
311
312
/*
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 *
 * El envio se realiza a partir de varias comunicaciones punto a punto.
 */
void send_async_point_arrays(struct Dist_data dist_data, char *array, int rootBcast, int numP_child, int idI,  int idE, struct Counts counts, MPI_Request *comm_req) {
    int i;
313
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
314
    // PREPARAR ENVIO DEL VECTOR
315
    /*
316
317
318
319
320
321
322
323
324
    if(idI == 0) {
      set_counts(0, numP_child, dist_data, counts.counts);
      idI++;
      MPI_Isend(array, counts.counts[0], MPI_CHAR, 0, 99, dist_data.intercomm, &(comm_req[0]));
    }	
    for(i=idI; i<idE; i++) {
      set_counts(i, numP_child, dist_data, counts.counts);
      counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
      MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
325
326
327
328
329
    }*/
    for(i=0; i<numP_child; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
      if(counts.counts[0] != 0) {
        MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
      }
330
    }
331
    //print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
332
333
}

334
335
336
337
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
338
339
 *
 * La recepcion se realiza a partir de una comunicacion colectiva.
340
341
342
343
344
 */
void recv_async_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
    char *aux = malloc(1);

    // Ajustar los valores de recepcion
345
346
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
    /*
347
348
349
350
351
352
353
    if(idI == 0) {
      set_counts(0, numP_parents, dist_data, counts.counts);
      idI++;
    }
    for(i=idI; i<idE; i++) {
      set_counts(i, numP_parents, dist_data, counts.counts);
      counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
354
    }*/
355
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
356
357
358
359

    /* COMUNICACION DE DATOS */
    MPI_Ialltoallv(aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm, comm_req);
    free(aux);
iker_martin's avatar
iker_martin committed
360
361
}

362
363
364
365
366
367
368
369
370
371
372
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 *
 * La recepcion se realiza a partir de varias comunicaciones punto a punto.
 */
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int root, int numP_parents, int idI, int idE, struct Counts counts, MPI_Request *comm_req) {
    int i;

    // Ajustar los valores de recepcion
373
374
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
    /*
375
376
377
    if(idI == 0) {
      set_counts(0, numP_parents, dist_data, counts.counts);
      idI++;
378
      MPI_Irecv(array, counts.counts[0], MPI_CHAR, 0, 99, dist_data.intercomm, &(comm_req[0])); //FIXME BUffer recv
379
380
381
382
    }
    for(i=idI; i<idE; i++) {
      set_counts(i, numP_parents, dist_data, counts.counts);
      counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
383
      MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
384
385
386
387
388
    }*/
    for(i=0; i<numP_parents; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
      if(counts.counts[0] != 0) {
        MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
      }
389
390
391
392
    }
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
}

iker_martin's avatar
iker_martin committed
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
 * Obtiene para un proceso de un grupo a que rango procesos de 
 * otro grupo tiene que enviar o recibir datos.
 *
 * Devuelve el primer identificador y el último (Excluido) con el que
 * comunicarse.
 */
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
    int idI, idE;
    int tamOther = dist_data.qty / numP_other;
    int remOther = dist_data.qty % numP_other;
412
413
414
    // Indica el punto de corte del grupo de procesos externo que 
    // divide entre los procesos que tienen 
    // un tamaño tamOther + 1 y un tamaño tamOther
iker_martin's avatar
iker_martin committed
415
416
    int middle = (tamOther + 1) * remOther;

417
418
419
    // Calcular idI teniendo en cuenta si se comunica con un
    // proceso con tamano tamOther o tamOther+1
    if(middle > dist_data.ini) { // First subgroup (tamOther+1)
iker_martin's avatar
iker_martin committed
420
      idI = dist_data.ini / (tamOther + 1);
421
    } else { // Second subgroup (tamOther)
iker_martin's avatar
iker_martin committed
422
423
424
      idI = ((dist_data.ini - middle) / tamOther) + remOther;
    }

425
426
427
    // Calcular idR teniendo en cuenta si se comunica con un
    // proceso con tamano tamOther o tamOther+1
    if(middle >= dist_data.fin) { // First subgroup (tamOther +1)
iker_martin's avatar
iker_martin committed
428
429
      idE = dist_data.fin / (tamOther + 1);
      idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
430
    } else { // Second subgroup (tamOther)
iker_martin's avatar
iker_martin committed
431
432
433
434
      idE = ((dist_data.fin - middle) / tamOther) + remOther;
      idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
    }

435
    *idS = malloc(2 * sizeof(int));
iker_martin's avatar
iker_martin committed
436
437
438
    (*idS)[0] = idI;
    (*idS)[1] = idE;
}