CommDist.c 15.4 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
iker_martin's avatar
iker_martin committed
7

8
9
void send_sync_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts);
void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts);
10

11
12
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
iker_martin's avatar
iker_martin committed
13

14
15
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
16

iker_martin's avatar
iker_martin committed
17
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
18
19
20
21
22
23
24
25
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

26
    get_block_dist(qty, myId, numP, &dist_data);
27
    if( (*array = malloc( (size_t) dist_data.tamBl * sizeof(char))) == NULL) {
28
29
30
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
31

32
/*
33
34
35
36
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
37
38
39
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
40
}
41
42
43

//================================================================================
//================================================================================
44
//========================SYNCHRONOUS FUNCTIONS===================================
45
46
47
//================================================================================
//================================================================================

48
49
50
51
52
53
/*
 * Realiza un envio síncrono del vector array desde este grupo de procesos al grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El vector array no se modifica en esta funcion.
 */
54
int send_sync(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child) {
iker_martin's avatar
iker_martin committed
55
56
57
58
    int *idS = NULL;
    struct Counts counts;
    struct Dist_data dist_data;

59
    get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
iker_martin's avatar
iker_martin committed
60
61
62
    dist_data.intercomm = intercomm;

    // Create arrays which contains info about how many elements will be send to each created process
63
    mallocCounts(&counts, (size_t)numP_child);
iker_martin's avatar
iker_martin committed
64
65
66

    getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos

67
    send_sync_arrays(dist_data, array, numP_child, counts);
iker_martin's avatar
iker_martin committed
68
69
70
71
72
73
74
75

    freeCounts(&counts);
    free(idS);

    return 1;
}


76
77
78
79
80
81
82
/*
 * Realiza una recepcion síncrona del vector array a este grupo de procesos desde el grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
 * Tiene que ser liberado posteriormente por el usuario.
 */
83
void recv_sync(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents) {
iker_martin's avatar
iker_martin committed
84
85
86
87
88
    int *idS = NULL;
    struct Counts counts;
    struct Dist_data dist_data;

    // Obtener distribución para este hijo
89
    get_block_dist(qty, myId, numP, &dist_data);
90
    *array = malloc( (size_t)dist_data.tamBl * sizeof(char));
91
    //(*array)[dist_data.tamBl] = '\0';
iker_martin's avatar
iker_martin committed
92
93
94
    dist_data.intercomm = intercomm;

    /* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
95
    mallocCounts(&counts, (size_t)numP_parents);
iker_martin's avatar
iker_martin committed
96
97
98

    getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos

99
    recv_sync_arrays(dist_data, *array, numP_parents, counts);
100
    //printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
iker_martin's avatar
iker_martin committed
101
102
103
104
105
106

    freeCounts(&counts);
    free(idS);
}

/*
107
108
109
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
iker_martin's avatar
iker_martin committed
110
 */
111
void send_sync_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts) {
112

113
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
iker_martin's avatar
iker_martin committed
114
    /* COMUNICACION DE DATOS */
115
    MPI_Alltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm);
iker_martin's avatar
iker_martin committed
116
117
}

118
119
120
121
122
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 */
123
void recv_sync_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts) {
124
	
125
    char aux;
iker_martin's avatar
iker_martin committed
126

127
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
iker_martin's avatar
iker_martin committed
128
    // Ajustar los valores de recepcion
129
    /*
iker_martin's avatar
iker_martin committed
130
    if(idI == 0) {
131
      set_counts(0, numP_parents, dist_data, counts.counts);
iker_martin's avatar
iker_martin committed
132
133
134
      idI++;
    }
    for(i=idI; i<idE; i++) {
135
136
      set_counts(i, numP_parents, dist_data, counts.counts);
      counts.displs[i] = counts.displs[i-1] + counts.counts[i-1];
137
    }*/
138
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
iker_martin's avatar
iker_martin committed
139
140

    /* COMUNICACION DE DATOS */
141
    MPI_Alltoallv(&aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm);
142
143
144
145
146
}


//================================================================================
//================================================================================
147
//========================ASYNCHRONOUS FUNCTIONS==================================
148
149
150
151
152
153
154
155
156
157
158
159
//================================================================================
//================================================================================

/*
 * Realiza un envio asincrono del vector array desde este grupo de procesos al grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El objeto MPI_Request se devuelve con el manejador para comprobar si la comunicacion
 * ha terminado.
 *
 * El vector array no se modifica en esta funcion.
 */
160
161
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
    int i;
162
163
164
165
    int *idS = NULL;
    struct Counts counts;
    struct Dist_data dist_data;

166
    get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
167
168
169
    dist_data.intercomm = intercomm;

    // Create arrays which contains info about how many elements will be send to each created process
170
    mallocCounts(&counts, (size_t)numP_child);
171
172
173

    getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos

174
    // MAL_USE_THREAD sigue el camino sincrono
175
    if(parents_wait == MAL_USE_NORMAL) {
176
      //*comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
177
      *comm_req[0] = MPI_REQUEST_NULL;
178
      send_async_arrays(dist_data, array, numP_child, counts, &(*comm_req[0])); 
179

180
    } else if (parents_wait == MAL_USE_IBARRIER){
181
182
183
      //*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
      *comm_req[0] = MPI_REQUEST_NULL;
      *comm_req[1] = MPI_REQUEST_NULL;
184
      send_async_arrays(dist_data, array, numP_child, counts, &((*comm_req)[1])); 
185
      MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
186
    } else if (parents_wait == MAL_USE_POINT){
187
      //*comm_req = (MPI_Request *) malloc(numP_child * sizeof(MPI_Request));
188
189
190
      for(i=0; i<numP_child; i++){
        (*comm_req)[i] = MPI_REQUEST_NULL;
      }
191
      send_async_point_arrays(dist_data, array, numP_child, counts, *comm_req); 
192
    } else if (parents_wait == MAL_USE_THREAD) { //TODO 
193
    }
194
195
196
197
198
199
200
201
202
203
204
205
206

    freeCounts(&counts);
    free(idS);

    return 1;
}

/*
 * Realiza una recepcion asincrona del vector array a este grupo de procesos desde el grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
 * Tiene que ser liberado posteriormente por el usuario.
207
208
209
 *
 * El argumento "parents_wait" sirve para indicar si se usará la versión en la los padres 
 * espera a que terminen de enviar, o en la que esperan a que los hijos acaben de recibir.
210
 */
211
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait) {
212
    int *idS = NULL;
213
    int wait_err, i;
214
215
    struct Counts counts;
    struct Dist_data dist_data;
216
    MPI_Request *comm_req, aux;
217
218

    // Obtener distribución para este hijo
219
    get_block_dist(qty, myId, numP, &dist_data);
220
    *array = malloc( (size_t)dist_data.tamBl * sizeof(char));
221
222
223
    dist_data.intercomm = intercomm;

    /* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
224
    mallocCounts(&counts, (size_t)numP_parents);
225
226
227

    getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos

228
    // MAL_USE_THREAD sigue el camino sincrono
229
    if(parents_wait == MAL_USE_POINT) {
230
      comm_req = (MPI_Request *) malloc((size_t)numP_parents * sizeof(MPI_Request));
231
232
233
      for(i=0; i<numP_parents; i++){
        comm_req[i] = MPI_REQUEST_NULL;
      }
234
      recv_async_point_arrays(dist_data, *array, numP_parents, counts, comm_req);
235
236
      wait_err = MPI_Waitall(numP_parents, comm_req, MPI_STATUSES_IGNORE);

237
    } else if (parents_wait == MAL_USE_NORMAL || parents_wait == MAL_USE_IBARRIER) {
238
239
      comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
      *comm_req = MPI_REQUEST_NULL;
240
      recv_async_arrays(dist_data, *array, numP_parents, counts, comm_req);
241
      wait_err = MPI_Wait(comm_req, MPI_STATUS_IGNORE);
242
    } else if (parents_wait == MAL_USE_THREAD) { //TODO
243
    }
244
245
246
247
248

    if(wait_err != MPI_SUCCESS) {
      MPI_Abort(MPI_COMM_WORLD, wait_err);
    }

249
    if(parents_wait == MAL_USE_IBARRIER) { //MAL USE IBARRIER END
250
251
252
      MPI_Ibarrier(intercomm, &aux);
      MPI_Wait(&aux, MPI_STATUS_IGNORE); //Es necesario comprobar que la comunicación ha terminado para desconectar los grupos de procesos
    }
253

254
    //printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
255
256
    freeCounts(&counts);
    free(idS);
257
    free(comm_req);
258
259
260
261
262
263
}

/*
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
264
265
 *
 * El envio se realiza a partir de una comunicación colectiva.
266
 */
267
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
268

269
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
270
271
272
273
274

    /* COMUNICACION DE DATOS */
    MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
}

275
276
277
278
279
280
281
/*
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 *
 * El envio se realiza a partir de varias comunicaciones punto a punto.
 */
282
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
283
284
    int i;
    // PREPARAR ENVIO DEL VECTOR
285
286
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);

287
288
289
290
    for(i=0; i<numP_child; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
      if(counts.counts[0] != 0) {
        MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
      }
291
    }
292
    //print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
293
294
}

295
296
297
298
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
299
300
 *
 * La recepcion se realiza a partir de una comunicacion colectiva.
301
 */
302
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
303
304
305
    char *aux = malloc(1);

    // Ajustar los valores de recepcion
306
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
307
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
308
309
310
311

    /* COMUNICACION DE DATOS */
    MPI_Ialltoallv(aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm, comm_req);
    free(aux);
iker_martin's avatar
iker_martin committed
312
313
}

314
315
316
317
318
319
320
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 *
 * La recepcion se realiza a partir de varias comunicaciones punto a punto.
 */
321
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
322
323
324
    int i;

    // Ajustar los valores de recepcion
325
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
326

327
328
329
330
    for(i=0; i<numP_parents; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
      if(counts.counts[0] != 0) {
        MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
      }
331
332
333
334
    }
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
}

iker_martin's avatar
iker_martin committed
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
 * Obtiene para un proceso de un grupo a que rango procesos de 
 * otro grupo tiene que enviar o recibir datos.
 *
 * Devuelve el primer identificador y el último (Excluido) con el que
 * comunicarse.
 */
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
    int idI, idE;
    int tamOther = dist_data.qty / numP_other;
    int remOther = dist_data.qty % numP_other;
354
355
356
    // Indica el punto de corte del grupo de procesos externo que 
    // divide entre los procesos que tienen 
    // un tamaño tamOther + 1 y un tamaño tamOther
iker_martin's avatar
iker_martin committed
357
358
    int middle = (tamOther + 1) * remOther;

359
360
361
    // Calcular idI teniendo en cuenta si se comunica con un
    // proceso con tamano tamOther o tamOther+1
    if(middle > dist_data.ini) { // First subgroup (tamOther+1)
iker_martin's avatar
iker_martin committed
362
      idI = dist_data.ini / (tamOther + 1);
363
    } else { // Second subgroup (tamOther)
iker_martin's avatar
iker_martin committed
364
365
366
      idI = ((dist_data.ini - middle) / tamOther) + remOther;
    }

367
368
369
    // Calcular idR teniendo en cuenta si se comunica con un
    // proceso con tamano tamOther o tamOther+1
    if(middle >= dist_data.fin) { // First subgroup (tamOther +1)
iker_martin's avatar
iker_martin committed
370
371
      idE = dist_data.fin / (tamOther + 1);
      idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
372
    } else { // Second subgroup (tamOther)
iker_martin's avatar
iker_martin committed
373
374
375
376
      idE = ((dist_data.fin - middle) / tamOther) + remOther;
      idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
    }

377
    *idS = malloc(2 * sizeof(int));
iker_martin's avatar
iker_martin committed
378
379
380
    (*idS)[0] = idI;
    (*idS)[1] = idE;
}