CommDist.c 28.4 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
7
#include "MAM_Configuration.h"
8
#include "malleabilityDataStructures.h"
iker_martin's avatar
iker_martin committed
9

10
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts);
11
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty);
12

13
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
14
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm);
15
16
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
iker_martin's avatar
iker_martin committed
17

18
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
19
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win);
20
21
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
22

23
24
25
26
27
28
29
30
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

31
    get_block_dist(qty, myId, numP, &dist_data);
32
    if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
33
34
35
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
36

37
/*
38
39
40
41
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
42
43
44
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
45
}
46
47
48

//================================================================================
//================================================================================
49
//========================SYNCHRONOUS FUNCTIONS===================================
50
51
52
//================================================================================
//================================================================================

53
/*
54
55
56
57
58
59
60
61
62
63
64
65
66
67
 * Performs a communication to redistribute an array in a block distribution.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
 *
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
68
69
 *
 */
70
void sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm) {
71
    struct Counts s_counts, r_counts;
iker_martin's avatar
iker_martin committed
72
73
    struct Dist_data dist_data;

74
    /* PREPARE COMMUNICATION */
75
    prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts);
76

77
    /* PERFORM COMMUNICATION */
78
    switch(mall_conf->red_method) {
79
80
81
82
83
      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
84
          get_block_dist(qty, mall->myId, numO, &dist_data);
85
	}
86
        sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm);
87
88
89
	break;

      case MALL_RED_POINT:
90
        sync_point2point(send, *recv, datatype, s_counts, r_counts, comm);
91
92
93
	break;
      case MALL_RED_BASELINE:
      default:
94
        MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm);
95
96
	break;
    }
iker_martin's avatar
iker_martin committed
97

98
99
    freeCounts(&s_counts);
    freeCounts(&r_counts);
iker_martin's avatar
iker_martin committed
100
101
}

102
/*
103
104
105
106
107
108
109
110
111
112
113
 * Performs a series of blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
114
115
 *
 */
116
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
117
118
    int i, j, init, end, total_sends, datasize;
    size_t offset, offset2;
119
    MPI_Request *sends;
iker_martin's avatar
iker_martin committed
120

121
    MPI_Type_size(datatype, &datasize);
122
123
    init = s_counts.idI;
    end = s_counts.idE;
124
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE && (s_counts.idI == mall->myId || s_counts.idE == mall->myId + 1)) {
125
126
127
      offset = s_counts.displs[mall->myId] * datasize;
      offset2 = r_counts.displs[mall->myId] * datasize;
      memcpy(recv+offset2, send+offset, s_counts.counts[mall->myId]);
128
      
129
      if(s_counts.idI == mall->myId) init = s_counts.idI+1;
130
131
      else end = s_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
132

133
134
135
136
137
138
139
    total_sends = end - init;
    j = 0;
    if(total_sends > 0) {
      sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
    }
    for(i=init; i<end; i++) {
      sends[j] = MPI_REQUEST_NULL;
140
141
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
142
143
      j++;
    }
iker_martin's avatar
iker_martin committed
144

145
146
    init = r_counts.idI;
    end = r_counts.idE;
147
148
149
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
      if(r_counts.idI == mall->myId) init = r_counts.idI+1;
      else if(r_counts.idE == mall->myId + 1) end = r_counts.idE-1;
150
    }
iker_martin's avatar
iker_martin committed
151

152
    for(i=init; i<end; i++) {
153
154
      offset = r_counts.displs[i] * datasize;
      MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
155
156
157
158
    }

    if(total_sends > 0) {
      MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
159
      free(sends);
160
    }
iker_martin's avatar
iker_martin committed
161
162
163
}

/*
164
165
166
167
168
169
170
171
172
173
174
 * Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 *
175
176
177
178
179
180
 * FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get
 * A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount
 * you need to perform multiples Gets.
 * prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000
 * prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA      16000
 * prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
iker_martin's avatar
iker_martin committed
181
 */
182
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm) {
183
  int datasize;
184
  MPI_Win win;
185
186
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win);
187

188
189
190
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
191
  switch(mall_conf->red_method) {
192
    case MALL_RED_RMA_LOCKALL:
193
      sync_rma_lockall(recv, datatype, r_counts, win);
194
195
      break;
    case MALL_RED_RMA_LOCK:
196
      sync_rma_lock(recv, datatype, r_counts, win);
197
198
      break;
  }
199
200
201
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
202
  MPI_Win_free(&win);
iker_martin's avatar
iker_martin committed
203
204
}

205
206


207
/*
208
209
210
211
212
213
214
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
215
 */
216
217
218
219
220
221
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
222
223

  for(i=r_counts.idI; i<r_counts.idE; i++) {
224
    offset = r_counts.displs[i] * datasize;
225
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
226
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
227
228
229
    MPI_Win_unlock(i, win);
    target_displs=0;
  }
230
231
}

232
233
234
235
236
237
238
239
240
/*
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
 */
241
242
243
244
245
246
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
247
248
249

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
250
251
    offset = r_counts.displs[i] * datasize;
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
252
253
254
255
256
    target_displs=0;
  }
  MPI_Win_unlock_all(win);
}

257
258
//================================================================================
//================================================================================
259
//========================ASYNCHRONOUS FUNCTIONS==================================
260
261
262
263
//================================================================================
//================================================================================

/*
264
265
266
 * Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
267
 *
268
269
270
271
272
273
274
275
276
277
278
279
280
281
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be 
 *               modified to the expected value.
282
283
 *
 */
284
void async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
285
    struct Counts s_counts, r_counts;
286
    struct Dist_data dist_data;
287
288

    /* PREPARE COMMUNICATION */
289
    prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts); 
290
    check_requests(s_counts, r_counts, requests, request_qty); //FIXME Error related to second reconf if Merge Shrink + P2P -->Invalid requests
291
292

    /* PERFORM COMMUNICATION */
293
    switch(mall_conf->red_method) {
294
295
296
297
298
299

      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
300
          get_block_dist(qty, mall->myId, numO, &dist_data);
301
	}
302
        async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm, *requests, win);
303
304
	break;
      case MALL_RED_POINT:
305
        async_point2point(send, *recv, datatype, s_counts, r_counts, comm, *requests);
306
307
308
	break;
      case MALL_RED_BASELINE:
      default:
309
        MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm, &((*requests)[0]));
310
311
	break;
    }
312

313
314
    freeCounts(&s_counts);
    freeCounts(&r_counts);
315
316
317
}

/*
318
 * Checks if a set of requests have been completed (1) or not (0).
319
 *
320
321
322
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
323
 *
324
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
325
 */
326
327
int async_communication_check(int is_children_group, MPI_Request *requests, size_t request_qty) {
  int completed, req_completed, test_err;
328
329
330
331
  size_t i;
  completed = 1;
  test_err = MPI_SUCCESS;

332
  if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
333

334
335
336
  for(i=0; i<request_qty; i++) {
    test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
    completed = completed && req_completed;
337
  }
338
  //test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
339

340
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
341
    printf("P%d aborting -- Test Async\n", mall->myId);
342
343
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }
344

345
  return completed;
346
347
}

348

349
/*
350
351
 * Waits until the completion of a set of requests. If the Ibarrier strategy
 * is being used, the corresponding ibarrier is posted.
352
 *
353
354
355
 * - comm (IN): Communicator to use to confirm finalizations of redistribution
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
356
 */
357
void async_communication_wait(MPI_Request *requests, size_t request_qty) {
358
  MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); 
359
  #if USE_MAL_DEBUG >= 3
360
    DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
361
  #endif
362
363
}

364
/*
365
366
 * Frees Requests/Windows associated to a particular redistribution.
 * Should be called for each output result of calling "async_communication_start".
367
 *
368
369
370
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
 * - win (IN): Window to free.
371
 */
372
void async_communication_end(MPI_Request *requests, size_t request_qty, MPI_Win *win) {
373
374
375

  //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
  //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
376
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); }
377

378
379
  if((mall_conf->red_method == MALL_RED_RMA_LOCKALL || mall_conf->red_method == MALL_RED_RMA_LOCK) 
		  && *win != MPI_WIN_NULL) { MPI_Win_free(win); }
380
381
}

382
/*
383
384
385
386
387
388
389
390
391
392
393
394
 * Performs a series of non-blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
395
 *
396
 */
397
398
399
400
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
    int i, j = 0, datasize;
    size_t offset;
    MPI_Type_size(datatype, &datasize);
401

402
    for(i=s_counts.idI; i<s_counts.idE; i++) {
403
404
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
405
406
      j++;
    }
407

408
    for(i=r_counts.idI; i<r_counts.idE; i++) {
409
410
      offset = r_counts.displs[i] * datasize;
      MPI_Irecv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
411
412
      j++;
    }
iker_martin's avatar
iker_martin committed
413
414
}

415
/*
416
417
418
419
420
421
422
423
424
425
426
427
 * Performs asynchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 * - window (OUT): Pointer to a window object used for the RMA operations.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
428
429
 *
 */
430
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win) {
431
  int datasize;
432

433
434
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, win);
435
  switch(mall_conf->red_method) {
436
    case MALL_RED_RMA_LOCKALL:
437
      async_rma_lockall(recv, datatype, r_counts, *win, requests);
438
439
      break;
    case MALL_RED_RMA_LOCK:
440
      async_rma_lock(recv, datatype, r_counts, *win, requests);
441
442
443
      break;
  }
}
444

445
446
447
448
449
450
451
452
453
454
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
455
456
457
458
459
460
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
461
462

  for(i=r_counts.idI; i<r_counts.idE; i++) {
463
    offset = r_counts.displs[i] * datasize;
464
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
465
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
    MPI_Win_unlock(i, win);
    target_displs=0;
    j++;
  }
}
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
481
482
483
484
485
486
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
487
488
489

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
490
491
    offset = r_counts.displs[i] * datasize;
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
492
493
494
495
    target_displs=0;
    j++;
  }
  MPI_Win_unlock_all(win);
496
497
}

iker_martin's avatar
iker_martin committed
498
499
500
501
502
503
504
505
506
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
507
508
509
510
511
512
513
514
515
516
517
518
519
 * Performs a communication to redistribute an array in a block distribution. For each process calculates
 * how many elements sends/receives to other processes for the new group.
 *
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
iker_martin's avatar
iker_martin committed
520
521
 *
 */
522
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
523
524
  int array_size = numO;
  int offset_ids = 0;
525
  int datasize;
526
527
  struct Dist_data dist_data;

528
529
530
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    offset_ids =  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL) ? 
	    0 : numP;
531
532
533
  } else {
    array_size = numP > numO ? numP : numO;
  }
534

535
536
  mallocCounts(s_counts, array_size+offset_ids);
  mallocCounts(r_counts, array_size+offset_ids);
537
  MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
538
539

  if(is_children_group) {
540
    offset_ids = 0;
541
    prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, r_counts);
542
543
    
    // Obtener distribución para este hijo
544
    get_block_dist(qty, mall->myId, numP, &dist_data);
545
    *recv = malloc(dist_data.tamBl * datasize);
546
547
548
549
550

    #if USE_MAL_DEBUG >= 4
      get_block_dist(qty, mall->myId, numP, &dist_data);
      print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
    #endif
551
  } else {
552
553
554
555
556
557
558
559
560
561
562
563
564
    #if USE_MAL_DEBUG >= 4
      get_block_dist(qty, mall->myId, numP, &dist_data);
    #endif

    prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, s_counts);
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId < numO) {
      prepare_comm_alltoall(mall->myId, numO, numP, qty, offset_ids, r_counts);
      // Obtener distribución para este hijo y reservar vector de recibo
      get_block_dist(qty, mall->myId, numO, &dist_data);
      *recv = malloc(dist_data.tamBl * datasize);
      #if USE_MAL_DEBUG >= 4
        print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
      #endif
iker_martin's avatar
iker_martin committed
565
    }
566
567
568
    #if USE_MAL_DEBUG >= 4
      print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
    #endif
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
  }
}

/*
 * Ensures that the array of request of a process has an amount of elements equal to the amount of communication
 * functions the process will perform. In case the array is not initialized or does not have enough space it is
 * allocated/reallocated to the minimum amount of space needed.
 *
 * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
 * - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
 *               functions to perform, it is modified to the minimum value.
 */
584
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty) {
585
586
587
  size_t i, sum;
  MPI_Request *aux;

588
  switch(mall_conf->red_method) {
589
590
591
592
593
594
595
596
597
    case MALL_RED_BASELINE:
      sum = 1;
      break;
    case MALL_RED_POINT:
    default:
      sum = (size_t) s_counts.idE - s_counts.idI;
      sum += (size_t) r_counts.idE - r_counts.idI;
      break;
  }
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617

  if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests

  if (*requests == NULL) {
    *requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request));
  } else { // Array exists, but is too small
    aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request));
    *requests = aux;
  }

  if (*requests == NULL) {
    fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n");
    MPI_Abort(MPI_COMM_WORLD, 1);
  }

  for(i=0; i < sum; i++) {
    (*requests)[i] = MPI_REQUEST_NULL;
  }
  *request_qty = sum;
}