CommDist.c 14.6 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
iker_martin's avatar
iker_martin committed
7

8
9
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
iker_martin's avatar
iker_martin committed
10

11
12
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req);
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req);
13

iker_martin's avatar
iker_martin committed
14
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS);
15
16
17
18
19
20
21
22
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

23
    get_block_dist(qty, myId, numP, &dist_data);
24
    if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
25
26
27
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
28

29
/*
30
31
32
33
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
34
35
36
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
37
}
38
39
40

//================================================================================
//================================================================================
41
//========================SYNCHRONOUS FUNCTIONS===================================
42
43
44
//================================================================================
//================================================================================

45
/*
46
47
48
 * Performs a communication to redistribute an array in a block distribution.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
49
 *
50
51
52
53
54
55
56
57
58
59
60
 * - send (IN):  Array with the data to send. This value can not be NULL.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
61
 *
62
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
63
 */
64
65
66
int sync_communication(char *send, char **recv, int qty, int myId, int numP, int numO, int is_children_group, MPI_Comm comm) {
    int is_intercomm;
    struct Counts s_counts, r_counts;
iker_martin's avatar
iker_martin committed
67
68
    struct Dist_data dist_data;

69
70
71
72
73
74
    if(is_children_group) {
      mallocCounts(&s_counts, numO);
      prepare_comm_alltoall(myId, numP, numO, qty, &r_counts);
      // Obtener distribución para este hijo
      get_block_dist(qty, myId, numP, &dist_data);
      *recv = malloc(dist_data.tamBl * sizeof(char));
75
76
    //get_block_dist(qty, myId, numP, &dist_data);
    //print_counts(dist_data, r_counts.counts, r_counts.displs, numO, 1, "Children C");
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    } else {
      prepare_comm_alltoall(myId, numP, numO, qty, &s_counts);

      MPI_Comm_test_inter(comm, &is_intercomm);
      if(is_intercomm) {
        mallocCounts(&r_counts, numO);
      } else {
	if(myId < numO) {
          prepare_comm_alltoall(myId, numO, numP, qty, &r_counts);
          // Obtener distribución para este hijo
          get_block_dist(qty, myId, numO, &dist_data);
          *recv = malloc(dist_data.tamBl * sizeof(char));
	} else {
          mallocCounts(&r_counts, numP);
	}	
92
93
94
        //get_block_dist(qty, myId, numP, &dist_data);
        //print_counts(dist_data, r_counts.counts, r_counts.displs, numP, 1, "Children P ");
        //print_counts(dist_data, s_counts.counts, s_counts.displs, numO, 1, "Parents ");
95
      }
iker_martin's avatar
iker_martin committed
96
97
98
    }

    /* COMUNICACION DE DATOS */
99
    MPI_Alltoallv(send, s_counts.counts, s_counts.displs, MPI_CHAR, *recv, r_counts.counts, r_counts.displs, MPI_CHAR, comm);
100

101
102
103
104
    freeCounts(&s_counts);
    freeCounts(&r_counts);
    return 1;
}
105
106
107

//================================================================================
//================================================================================
108
//========================ASYNCHRONOUS FUNCTIONS==================================
109
110
111
112
113
114
115
116
117
118
119
120
//================================================================================
//================================================================================

/*
 * Realiza un envio asincrono del vector array desde este grupo de procesos al grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El objeto MPI_Request se devuelve con el manejador para comprobar si la comunicacion
 * ha terminado.
 *
 * El vector array no se modifica en esta funcion.
 */
121
122
int send_async(char *array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_child, MPI_Request **comm_req, int parents_wait) {
    int i;
123
124
125
126
    int *idS = NULL;
    struct Counts counts;
    struct Dist_data dist_data;

127
    get_block_dist(qty, myId, numP, &dist_data); // Distribucion de este proceso en su grupo
128
129
130
    dist_data.intercomm = intercomm;

    // Create arrays which contains info about how many elements will be send to each created process
131
    mallocCounts(&counts, numP_child);
132
133
134

    getIds_intercomm(dist_data, numP_child, &idS); // Obtener rango de Id hijos a los que este proceso manda datos

135
    // MAL_USE_THREAD sigue el camino sincrono
136
    if(parents_wait == MAL_USE_NORMAL) {
137
      //*comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
138
      *comm_req[0] = MPI_REQUEST_NULL;
139
      send_async_arrays(dist_data, array, numP_child, counts, &(*comm_req[0])); 
140

141
    } else if (parents_wait == MAL_USE_IBARRIER){
142
143
144
      //*comm_req = (MPI_Request *) malloc(2 * sizeof(MPI_Request));
      *comm_req[0] = MPI_REQUEST_NULL;
      *comm_req[1] = MPI_REQUEST_NULL;
145
      send_async_arrays(dist_data, array, numP_child, counts, &((*comm_req)[1])); 
146
      MPI_Ibarrier(intercomm, &((*comm_req)[0]) );
147
    } else if (parents_wait == MAL_USE_POINT){
148
      //*comm_req = (MPI_Request *) malloc(numP_child * sizeof(MPI_Request));
149
150
151
      for(i=0; i<numP_child; i++){
        (*comm_req)[i] = MPI_REQUEST_NULL;
      }
152
      send_async_point_arrays(dist_data, array, numP_child, counts, *comm_req); 
153
    } else if (parents_wait == MAL_USE_THREAD) { //TODO 
154
    }
155
156
157
158
159
160
161
162
163
164
165
166
167

    freeCounts(&counts);
    free(idS);

    return 1;
}

/*
 * Realiza una recepcion asincrona del vector array a este grupo de procesos desde el grupo
 * enlazado por el intercomunicador intercomm.
 *
 * El vector array se reserva dentro de la funcion y se devuelve en el mismo argumento.
 * Tiene que ser liberado posteriormente por el usuario.
168
169
170
 *
 * El argumento "parents_wait" sirve para indicar si se usará la versión en la los padres 
 * espera a que terminen de enviar, o en la que esperan a que los hijos acaben de recibir.
171
 */
172
void recv_async(char **array, int qty, int myId, int numP, MPI_Comm intercomm, int numP_parents, int parents_wait) {
173
    int *idS = NULL;
174
    int wait_err, i;
175
176
    struct Counts counts;
    struct Dist_data dist_data;
177
    MPI_Request *comm_req, aux;
178
179

    // Obtener distribución para este hijo
180
    get_block_dist(qty, myId, numP, &dist_data);
181
    *array = malloc( dist_data.tamBl * sizeof(char));
182
183
184
    dist_data.intercomm = intercomm;

    /* PREPARAR DATOS DE RECEPCION SOBRE VECTOR*/
185
    mallocCounts(&counts, numP_parents);
186
187
188

    getIds_intercomm(dist_data, numP_parents, &idS); // Obtener el rango de Ids de padres del que este proceso recibira datos

189
    // MAL_USE_THREAD sigue el camino sincrono
190
    if(parents_wait == MAL_USE_POINT) {
191
      comm_req = (MPI_Request *) malloc(numP_parents * sizeof(MPI_Request));
192
193
194
      for(i=0; i<numP_parents; i++){
        comm_req[i] = MPI_REQUEST_NULL;
      }
195
      recv_async_point_arrays(dist_data, *array, numP_parents, counts, comm_req);
196
197
      wait_err = MPI_Waitall(numP_parents, comm_req, MPI_STATUSES_IGNORE);

198
    } else if (parents_wait == MAL_USE_NORMAL || parents_wait == MAL_USE_IBARRIER) {
199
200
      comm_req = (MPI_Request *) malloc(sizeof(MPI_Request));
      *comm_req = MPI_REQUEST_NULL;
201
      recv_async_arrays(dist_data, *array, numP_parents, counts, comm_req);
202
      wait_err = MPI_Wait(comm_req, MPI_STATUS_IGNORE);
203
    } else if (parents_wait == MAL_USE_THREAD) { //TODO
204
    }
205
206
207
208
209

    if(wait_err != MPI_SUCCESS) {
      MPI_Abort(MPI_COMM_WORLD, wait_err);
    }

210
    if(parents_wait == MAL_USE_IBARRIER) { //MAL USE IBARRIER END
211
212
213
      MPI_Ibarrier(intercomm, &aux);
      MPI_Wait(&aux, MPI_STATUS_IGNORE); //Es necesario comprobar que la comunicación ha terminado para desconectar los grupos de procesos
    }
214

215
    //printf("S%d Tam %d String: %s END\n", myId, dist_data.tamBl, *array);
216
217
    freeCounts(&counts);
    free(idS);
218
    free(comm_req);
219
220
221
222
223
224
}

/*
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
225
226
 *
 * El envio se realiza a partir de una comunicación colectiva.
227
 */
228
void send_async_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
229

230
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);
231
232
233
234
235

    /* COMUNICACION DE DATOS */
    MPI_Ialltoallv(array, counts.counts, counts.displs, MPI_CHAR, NULL, counts.zero_arr, counts.zero_arr, MPI_CHAR, dist_data.intercomm, comm_req);
}

236
237
238
239
240
241
242
/*
 * Envia a los hijos un vector que es redistribuido a los procesos
 * hijos. Antes de realizar la comunicacion, cada proceso padre calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 *
 * El envio se realiza a partir de varias comunicaciones punto a punto.
 */
243
void send_async_point_arrays(struct Dist_data dist_data, char *array, int numP_child, struct Counts counts, MPI_Request *comm_req) {
244
245
    int i;
    // PREPARAR ENVIO DEL VECTOR
246
247
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_child, dist_data.qty, &counts);

248
249
250
251
    for(i=0; i<numP_child; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
      if(counts.counts[0] != 0) {
        MPI_Isend(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i]));
      }
252
    }
253
    //print_counts(dist_data, counts.counts, counts.displs, numP_child, "Padres");
254
255
}

256
257
258
259
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
260
261
 *
 * La recepcion se realiza a partir de una comunicacion colectiva.
262
 */
263
void recv_async_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
264
265
266
    char *aux = malloc(1);

    // Ajustar los valores de recepcion
267
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
268
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
269
270
271
272

    /* COMUNICACION DE DATOS */
    MPI_Ialltoallv(aux, counts.zero_arr, counts.zero_arr, MPI_CHAR, array, counts.counts, counts.displs, MPI_CHAR, dist_data.intercomm, comm_req);
    free(aux);
iker_martin's avatar
iker_martin committed
273
274
}

275
276
277
278
279
280
281
/*
 * Recibe de los padres un vector que es redistribuido a los procesos
 * de este grupo. Antes de realizar la comunicacion cada hijo calcula sobre que procesos
 * del otro grupo se transmiten elementos.
 *
 * La recepcion se realiza a partir de varias comunicaciones punto a punto.
 */
282
void recv_async_point_arrays(struct Dist_data dist_data, char *array, int numP_parents, struct Counts counts, MPI_Request *comm_req) {
283
284
285
    int i;

    // Ajustar los valores de recepcion
286
    prepare_comm_alltoall(dist_data.myId, dist_data.numP, numP_parents, dist_data.qty, &counts);
287

288
289
290
291
    for(i=0; i<numP_parents; i++) { //TODO Esta propuesta ya no usa el IdI y Ide
      if(counts.counts[0] != 0) {
        MPI_Irecv(array+counts.displs[i], counts.counts[i], MPI_CHAR, i, 99, dist_data.intercomm, &(comm_req[i])); //FIXME BUffer recv
      }
292
293
294
295
    }
    //print_counts(dist_data, counts.counts, counts.displs, numP_parents, "Hijos");
}

iker_martin's avatar
iker_martin committed
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
 * Obtiene para un proceso de un grupo a que rango procesos de 
 * otro grupo tiene que enviar o recibir datos.
 *
 * Devuelve el primer identificador y el último (Excluido) con el que
 * comunicarse.
 */
void getIds_intercomm(struct Dist_data dist_data, int numP_other, int **idS) {
    int idI, idE;
    int tamOther = dist_data.qty / numP_other;
    int remOther = dist_data.qty % numP_other;
315
316
317
    // Indica el punto de corte del grupo de procesos externo que 
    // divide entre los procesos que tienen 
    // un tamaño tamOther + 1 y un tamaño tamOther
iker_martin's avatar
iker_martin committed
318
319
    int middle = (tamOther + 1) * remOther;

320
321
322
    // Calcular idI teniendo en cuenta si se comunica con un
    // proceso con tamano tamOther o tamOther+1
    if(middle > dist_data.ini) { // First subgroup (tamOther+1)
iker_martin's avatar
iker_martin committed
323
      idI = dist_data.ini / (tamOther + 1);
324
    } else { // Second subgroup (tamOther)
iker_martin's avatar
iker_martin committed
325
326
327
      idI = ((dist_data.ini - middle) / tamOther) + remOther;
    }

328
329
330
    // Calcular idR teniendo en cuenta si se comunica con un
    // proceso con tamano tamOther o tamOther+1
    if(middle >= dist_data.fin) { // First subgroup (tamOther +1)
iker_martin's avatar
iker_martin committed
331
332
      idE = dist_data.fin / (tamOther + 1);
      idE = (dist_data.fin % (tamOther + 1) > 0 && idE+1 <= numP_other) ? idE+1 : idE;
333
    } else { // Second subgroup (tamOther)
iker_martin's avatar
iker_martin committed
334
335
336
337
      idE = ((dist_data.fin - middle) / tamOther) + remOther;
      idE = ((dist_data.fin - middle) % tamOther > 0 && idE+1 <= numP_other) ? idE+1 : idE;
    }

338
    *idS = malloc(2 * sizeof(int));
iker_martin's avatar
iker_martin committed
339
340
341
    (*idS)[0] = idI;
    (*idS)[1] = idE;
}