CommDist.c 16.6 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
iker_martin's avatar
iker_martin committed
7

8
9
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);

10
11
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
iker_martin's avatar
iker_martin committed
12

13
14
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
15

iker_martin's avatar
iker_martin committed
16
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
17
18
19
20
21
22
23
24
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

25
    get_block_dist(qty, myId, numP, &dist_data);
26
    if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
27
28
29
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
30

31
/*
32
33
34
35
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
36
37
38
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
39
}
40
41
42

//================================================================================
//================================================================================
43
//========================SYNCHRONOUS FUNCTIONS===================================
44
45
46
//================================================================================
//================================================================================

47
/*
48
49
50
 * Performs a communication to redistribute an array in a block distribution.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
51
 *
52
53
54
55
56
57
58
59
60
61
62
 * - send (IN):  Array with the data to send. This value can not be NULL.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
63
 *
64
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
65
 */
66
67
68
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, MPI_Comm comm) {
    int is_intercomm;
    struct Counts s_counts, r_counts;
iker_martin's avatar
iker_martin committed
69

70
71
72
    /* PREPARE COMMUNICATION */
    MPI_Comm_test_inter(comm, &is_intercomm);
    prepare_redistribution(qty, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
73

74
    /* PERFORM COMMUNICATION */
75
    MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm);
76

77
78
    freeCounts(&s_counts);
    freeCounts(&r_counts);
79
    return 1; //FIXME In this case is always true...
80
}
81
82
83

//================================================================================
//================================================================================
84
//========================ASYNCHRONOUS FUNCTIONS==================================
85
86
87
88
89
90
91
92
93
94
95
96
//================================================================================
//================================================================================

/*
 * Realiza un envio asincrono del vector array desde este grupo de procesos al grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El objeto MPI_Request se devuelve con el manejador para comprobar si la comunicacion
 * ha terminado.
 *
 * El vector array no se modifica en esta funcion.
 */
97
98
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
    int i;
99
100
101
102
    int *idS = NULL;
    struct Counts counts;
    struct Dist_data dist_data;

103
    get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
104
105
106
    dist_data.intercomm = intercomm;

    // Create arrays which contains info about how many elements will be send to each created process
107
    mallocCounts(&counts, numP_child);
108
109
110

    getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos

111
    // MAL_USE_THREAD sigue el camino sincrono
112
    if(parents_wait == MAL_USE_NORMAL) {
113
      //*comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
114
      *comm_req[0] = MPI_REQUEST_NULL;
115
      send_async_arrays(dist_data, array, numP_child, counts, &(*comm_req[0])); 
116

117
    } else if (parents_wait == MAL_USE_IBARRIER){
118
119
120
      //*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
      *comm_req[0] = MPI_REQUEST_NULL;
      *comm_req[1] = MPI_REQUEST_NULL;
121
      send_async_arrays(dist_data, array, numP_child, counts, &((*comm_req)[1])); 
122
      MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
123
    } else if (parents_wait == MAL_USE_POINT){
124
      //*comm_req = (MPI_Request *) malloc(numP_child * sizeof(MPI_Request));
125
126
127
      for(i=0; i<numP_child; i++){
        (*comm_req)[i] = MPI_REQUEST_NULL;
      }
128
      send_async_point_arrays(dist_data, array, numP_child, counts, *comm_req); 
129
    } else if (parents_wait == MAL_USE_THREAD) { //TODO 
130
    }
131
132
133
134
135
136
137
138
139
140
141
142
143

    freeCounts(&counts);
    free(idS);

    return 1;
}

/*
 * Realiza una recepcion asincrona del vector array a este grupo de procesos desde el grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
 * Tiene que ser liberado posteriormente por el usuario.
144
145
146
 *
 * El argumento "parents_wait" sirve para indicar si se usará la versión en la los padres 
 * espera a que terminen de enviar, o en la que esperan a que los hijos acaben de recibir.
147
 */
148
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait) {
149
    int *idS = NULL;
150
    int wait_err, i;
151
152
    struct Counts counts;
    struct Dist_data dist_data;
153
    MPI_Request *comm_req, aux;
154
155

    // Obtener distribución para este hijo
156
    get_block_dist(qty, myId, numP, &dist_data);
157
    *array = malloc( dist_data.tamBl * sizeof(char));
158
159
160
    dist_data.intercomm = intercomm;

    /* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
161
    mallocCounts(&counts, numP_parents);
162
163
164

    getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos

165
    // MAL_USE_THREAD sigue el camino sincrono
166
    if(parents_wait == MAL_USE_POINT) {
167
      comm_req = (MPI_Request *) malloc(numP_parents * sizeof(MPI_Request));
168
169
170
      for(i=0; i<numP_parents; i++){
        comm_req[i] = MPI_REQUEST_NULL;
      }
171
      recv_async_point_arrays(dist_data, *array, numP_parents, counts, comm_req);
172
173
      wait_err = MPI_Waitall(numP_parents, comm_req, MPI_STATUSES_IGNORE);

174
    } else if (parents_wait == MAL_USE_NORMAL || parents_wait == MAL_USE_IBARRIER) {
175
176
      comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
      *comm_req = MPI_REQUEST_NULL;
177
      recv_async_arrays(dist_data, *array, numP_parents, counts, comm_req);
178
      wait_err = MPI_Wait(comm_req, MPI_STATUS_IGNORE);
179
    } else if (parents_wait == MAL_USE_THREAD) { //TODO
180
    }
181
182
183
184
185

    if(wait_err != MPI_SUCCESS) {
      MPI_Abort(MPI_COMM_WORLD, wait_err);
    }

186
    if(parents_wait == MAL_USE_IBARRIER) { //MAL USE IBARRIER END
187
188
189
      MPI_Ibarrier(intercomm, &aux);
      MPI_Wait(&aux, MPI_STATUS_IGNORE); //Es necesario comprobar que la comunicación ha terminado para desconectar los grupos de procesos
    }
190

191
    //printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
192
193
    freeCounts(&counts);
    free(idS);
194
    free(comm_req);
195
196
197
198
199
200
}

/*
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
201
202
 *
 * El envio se realiza a partir de una comunicación colectiva.
203
 */
204
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
205

206
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
207
208
209
210
211

    /* COMUNICACION DE DATOS */
    MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
}

212
213
214
215
216
217
218
/*
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 *
 * El envio se realiza a partir de varias comunicaciones punto a punto.
 */
219
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
220
221
    int i;
    // PREPARAR ENVIO DEL VECTOR
222
223
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);

224
225
226
227
    for(i=0; i<numP_child; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
      if(counts.counts[0] != 0) {
        MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
      }
228
    }
229
    //print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
230
231
}

232
233
234
235
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
236
237
 *
 * La recepcion se realiza a partir de una comunicacion colectiva.
238
 */
239
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
240
241
242
    char *aux = malloc(1);

    // Ajustar los valores de recepcion
243
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
244
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
245
246
247
248

    /* COMUNICACION DE DATOS */
    MPI_Ialltoallv(aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm, comm_req);
    free(aux);
iker_martin's avatar
iker_martin committed
249
250
}

251
252
253
254
255
256
257
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 *
 * La recepcion se realiza a partir de varias comunicaciones punto a punto.
 */
258
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
259
260
261
    int i;

    // Ajustar los valores de recepcion
262
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
263

264
265
266
267
    for(i=0; i<numP_parents; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
      if(counts.counts[0] != 0) {
        MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
      }
268
269
270
271
    }
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
}

iker_martin's avatar
iker_martin committed
272
273
274
275
276
277
278
279
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
/*
 * Performs a communication to redistribute an array in a block distribution. For each process calculates
 * how many elements sends/receives to other processes for the new group.
 *
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - is_intercomm (IN): Indicates wether the used communicator is a intercomunicator(TRUE) or intracommunicator(FALSE).
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
 *
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
 */
void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts) {
  struct Dist_data dist_data;

  if(is_children_group) {
    mallocCounts(s_counts, numO);
    prepare_comm_alltoall(myId, numP, numO, qty, r_counts);
    // Obtener distribución para este hijo
    get_block_dist(qty, myId, numP, &dist_data);
    *recv = malloc(dist_data.tamBl * sizeof(char));
//get_block_dist(qty, myId, numP, &dist_data);
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO, 1, "Children C");
  } else {
//get_block_dist(qty, myId, numP, &dist_data);
    prepare_comm_alltoall(myId, numP, numO, qty, s_counts);

    if(is_intercomm) {
      mallocCounts(r_counts, numO);
    } else {
      if(myId < numO) {
        prepare_comm_alltoall(myId, numO, numP, qty, r_counts);
        // Obtener distribución para este hijo
        get_block_dist(qty, myId, numO, &dist_data);
        *recv = malloc(dist_data.tamBl * sizeof(char));
      } else {
        mallocCounts(r_counts, numP);
      }	
//print_counts(dist_data, r_counts->counts, r_counts->displs, numP, 1, "Children P ");
    }
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO, 1, "Parents ");
  }
}



iker_martin's avatar
iker_martin committed
333
334
335
336
337
338
339
340
341
342
343
/*
 * Obtiene para un proceso de un grupo a que rango procesos de 
 * otro grupo tiene que enviar o recibir datos.
 *
 * Devuelve el primer identificador y el último (Excluido) con el que
 * comunicarse.
 */
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
    int idI, idE;
    int tamOther = dist_data.qty / numP_other;
    int remOther = dist_data.qty % numP_other;
344
345
346
    // Indica el punto de corte del grupo de procesos externo que 
    // divide entre los procesos que tienen 
    // un tamaño tamOther + 1 y un tamaño tamOther
iker_martin's avatar
iker_martin committed
347
348
    int middle = (tamOther + 1) * remOther;

349
350
351
    // Calcular idI teniendo en cuenta si se comunica con un
    // proceso con tamano tamOther o tamOther+1
    if(middle > dist_data.ini) { // First subgroup (tamOther+1)
iker_martin's avatar
iker_martin committed
352
      idI = dist_data.ini / (tamOther + 1);
353
    } else { // Second subgroup (tamOther)
iker_martin's avatar
iker_martin committed
354
355
356
      idI = ((dist_data.ini - middle) / tamOther) + remOther;
    }

357
358
359
    // Calcular idR teniendo en cuenta si se comunica con un
    // proceso con tamano tamOther o tamOther+1
    if(middle >= dist_data.fin) { // First subgroup (tamOther +1)
iker_martin's avatar
iker_martin committed
360
361
      idE = dist_data.fin / (tamOther + 1);
      idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
362
    } else { // Second subgroup (tamOther)
iker_martin's avatar
iker_martin committed
363
364
365
366
      idE = ((dist_data.fin - middle) / tamOther) + remOther;
      idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
    }

367
    *idS = malloc(2 * sizeof(int));
iker_martin's avatar
iker_martin committed
368
369
370
    (*idS)[0] = idI;
    (*idS)[1] = idE;
}