CommDist.c 28.3 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
7
#include "MAM_Configuration.h"
8
#include "malleabilityDataStructures.h"
iker_martin's avatar
iker_martin committed
9

10
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts);
11
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty);
12

13
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
14
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm);
15
16
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
iker_martin's avatar
iker_martin committed
17

18
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
19
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win);
20
21
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
22

23
24
25
26
27
28
29
30
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

31
    get_block_dist(qty, myId, numP, &dist_data);
32
    if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
33
34
35
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
36

37
/*
38
39
40
41
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
42
43
44
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
45
}
46
47
48

//================================================================================
//================================================================================
49
//========================SYNCHRONOUS FUNCTIONS===================================
50
51
52
//================================================================================
//================================================================================

53
/*
54
55
56
57
58
59
60
61
62
63
64
65
66
67
 * Performs a communication to redistribute an array in a block distribution.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
 *
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
68
69
 *
 */
70
void sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm) {
71
    struct Counts s_counts, r_counts;
iker_martin's avatar
iker_martin committed
72
73
    struct Dist_data dist_data;

74
    /* PREPARE COMMUNICATION */
75
    prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts);
76

77
    /* PERFORM COMMUNICATION */
78
    switch(mall_conf->red_method) {
79
80
81
82
83
      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
84
          get_block_dist(qty, mall->myId, numO, &dist_data);
85
	}
86
        sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm);
87
88
89
	break;

      case MALL_RED_POINT:
90
        sync_point2point(send, *recv, datatype, s_counts, r_counts, comm);
91
92
93
	break;
      case MALL_RED_BASELINE:
      default:
94
        MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm);
95
96
	break;
    }
iker_martin's avatar
iker_martin committed
97

98
99
    freeCounts(&s_counts);
    freeCounts(&r_counts);
iker_martin's avatar
iker_martin committed
100
101
}

102
/*
103
104
105
106
107
108
109
110
111
112
113
 * Performs a series of blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
114
115
 *
 */
116
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
117
118
    int i, j, init, end, total_sends, datasize;
    size_t offset, offset2;
119
    MPI_Request *sends;
iker_martin's avatar
iker_martin committed
120

121
    MPI_Type_size(datatype, &datasize);
122
123
    init = s_counts.idI;
    end = s_counts.idE;
124
125
126
127
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE && (s_counts.idI == mall->myId || s_counts.idE == mall->myId + 1)) {
      offset = s_counts.displs[mall->myId] + datasize;
      offset2 = r_counts.displs[mall->myId] + datasize;
      memcpy(send+offset, recv+offset2, s_counts.counts[mall->myId]);
128
      
129
      if(s_counts.idI == mall->myId) init = s_counts.idI+1;
130
131
      else end = s_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
132

133
134
135
136
137
138
139
    total_sends = end - init;
    j = 0;
    if(total_sends > 0) {
      sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
    }
    for(i=init; i<end; i++) {
      sends[j] = MPI_REQUEST_NULL;
140
141
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
142
143
      j++;
    }
iker_martin's avatar
iker_martin committed
144

145
146
    init = r_counts.idI;
    end = r_counts.idE;
147
148
149
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
      if(r_counts.idI == mall->myId) init = r_counts.idI+1;
      else if(r_counts.idE == mall->myId + 1) end = r_counts.idE-1;
150
    }
iker_martin's avatar
iker_martin committed
151

152
    for(i=init; i<end; i++) {
153
154
      offset = r_counts.displs[i] * datasize;
      MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
155
156
157
158
159
    }

    if(total_sends > 0) {
      MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
    }
iker_martin's avatar
iker_martin committed
160
161
162
}

/*
163
164
165
166
167
168
169
170
171
172
173
 * Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 *
174
175
176
177
178
179
 * FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get
 * A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount
 * you need to perform multiples Gets.
 * prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000
 * prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA      16000
 * prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
iker_martin's avatar
iker_martin committed
180
 */
181
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm) {
182
  int datasize;
183
  MPI_Win win;
184
185
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win);
186

187
188
189
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
190
  switch(mall_conf->red_method) {
191
    case MALL_RED_RMA_LOCKALL:
192
      sync_rma_lockall(recv, datatype, r_counts, win);
193
194
      break;
    case MALL_RED_RMA_LOCK:
195
      sync_rma_lock(recv, datatype, r_counts, win);
196
197
      break;
  }
198
199
200
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
201
  MPI_Win_free(&win);
iker_martin's avatar
iker_martin committed
202
203
}

204
205


206
/*
207
208
209
210
211
212
213
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
214
 */
215
216
217
218
219
220
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
221
222

  for(i=r_counts.idI; i<r_counts.idE; i++) {
223
    offset = r_counts.displs[i] * datasize;
224
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
225
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
226
227
228
    MPI_Win_unlock(i, win);
    target_displs=0;
  }
229
230
}

231
232
233
234
235
236
237
238
239
/*
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
 */
240
241
242
243
244
245
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
246
247
248

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
249
250
    offset = r_counts.displs[i] * datasize;
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
251
252
253
254
255
    target_displs=0;
  }
  MPI_Win_unlock_all(win);
}

256
257
//================================================================================
//================================================================================
258
//========================ASYNCHRONOUS FUNCTIONS==================================
259
260
261
262
//================================================================================
//================================================================================

/*
263
264
265
 * Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
266
 *
267
268
269
270
271
272
273
274
275
276
277
278
279
280
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be 
 *               modified to the expected value.
281
282
 *
 */
283
void async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
284
    struct Counts s_counts, r_counts;
285
    struct Dist_data dist_data;
286
287

    /* PREPARE COMMUNICATION */
288
    prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts); 
289
    check_requests(s_counts, r_counts, requests, request_qty);
290
291

    /* PERFORM COMMUNICATION */
292
    switch(mall_conf->red_method) {
293
294
295
296
297
298

      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
299
          get_block_dist(qty, mall->myId, numO, &dist_data);
300
	}
301
        async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm, *requests, win);
302
303
	break;
      case MALL_RED_POINT:
304
        async_point2point(send, *recv, datatype, s_counts, r_counts, comm, *requests);
305
306
307
	break;
      case MALL_RED_BASELINE:
      default:
308
        MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm, &((*requests)[0]));
309
310
	break;
    }
311

312
313
    freeCounts(&s_counts);
    freeCounts(&r_counts);
314
315
316
}

/*
317
 * Checks if a set of requests have been completed (1) or not (0).
318
 *
319
320
321
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
322
 *
323
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
324
 */
325
326
int async_communication_check(int is_children_group, MPI_Request *requests, size_t request_qty) {
  int completed, req_completed, test_err;
327
328
329
330
  size_t i;
  completed = 1;
  test_err = MPI_SUCCESS;

331
  if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
332

333
334
335
  for(i=0; i<request_qty; i++) {
    test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
    completed = completed && req_completed;
336
  }
337
  //test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
338

339
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
340
    printf("P%d aborting -- Test Async\n", mall->myId);
341
342
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }
343

344
  return completed;
345
346
}

347

348
/*
349
350
 * Waits until the completion of a set of requests. If the Ibarrier strategy
 * is being used, the corresponding ibarrier is posted.
351
 *
352
353
354
 * - comm (IN): Communicator to use to confirm finalizations of redistribution
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
355
 */
356
void async_communication_wait(MPI_Request *requests, size_t request_qty) {
357
  MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); 
358
  #if USE_MAL_DEBUG >= 3
359
    DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
360
  #endif
361
362
}

363
/*
364
365
 * Frees Requests/Windows associated to a particular redistribution.
 * Should be called for each output result of calling "async_communication_start".
366
 *
367
368
369
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
 * - win (IN): Window to free.
370
 */
371
void async_communication_end(MPI_Request *requests, size_t request_qty, MPI_Win *win) {
372
373
374

  //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
  //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
375
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); }
376

377
  if(mall_conf->red_method == MALL_RED_RMA_LOCKALL || mall_conf->red_method == MALL_RED_RMA_LOCK) { MPI_Win_free(win); }
378
379
}

380
/*
381
382
383
384
385
386
387
388
389
390
391
392
 * Performs a series of non-blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
393
 *
394
 */
395
396
397
398
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
    int i, j = 0, datasize;
    size_t offset;
    MPI_Type_size(datatype, &datasize);
399

400
    for(i=s_counts.idI; i<s_counts.idE; i++) {
401
402
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
403
404
      j++;
    }
405

406
    for(i=r_counts.idI; i<r_counts.idE; i++) {
407
408
      offset = r_counts.displs[i] * datasize;
      MPI_Irecv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
409
410
      j++;
    }
iker_martin's avatar
iker_martin committed
411
412
}

413
/*
414
415
416
417
418
419
420
421
422
423
424
425
 * Performs asynchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 * - window (OUT): Pointer to a window object used for the RMA operations.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
426
427
 *
 */
428
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win) {
429
  int datasize;
430

431
432
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, win);
433
  switch(mall_conf->red_method) {
434
    case MALL_RED_RMA_LOCKALL:
435
      async_rma_lockall(recv, datatype, r_counts, *win, requests);
436
437
      break;
    case MALL_RED_RMA_LOCK:
438
      async_rma_lock(recv, datatype, r_counts, *win, requests);
439
440
441
      break;
  }
}
442

443
444
445
446
447
448
449
450
451
452
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
453
454
455
456
457
458
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
459
460

  for(i=r_counts.idI; i<r_counts.idE; i++) {
461
    offset = r_counts.displs[i] * datasize;
462
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
463
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
    MPI_Win_unlock(i, win);
    target_displs=0;
    j++;
  }
}
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
479
480
481
482
483
484
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
485
486
487

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
488
489
    offset = r_counts.displs[i] * datasize;
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
490
491
492
493
    target_displs=0;
    j++;
  }
  MPI_Win_unlock_all(win);
494
495
}

iker_martin's avatar
iker_martin committed
496
497
498
499
500
501
502
503
504
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
505
506
507
508
509
510
511
512
513
514
515
516
517
 * Performs a communication to redistribute an array in a block distribution. For each process calculates
 * how many elements sends/receives to other processes for the new group.
 *
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
iker_martin's avatar
iker_martin committed
518
519
 *
 */
520
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
521
522
  int array_size = numO;
  int offset_ids = 0;
523
  int datasize;
524
525
  struct Dist_data dist_data;

526
527
528
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    offset_ids =  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL) ? 
	    0 : numP;
529
530
531
  } else {
    array_size = numP > numO ? numP : numO;
  }
532

533
534
  mallocCounts(s_counts, array_size+offset_ids);
  mallocCounts(r_counts, array_size+offset_ids);
535
  MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
536
537

  if(is_children_group) {
538
    offset_ids = 0;
539
    prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, r_counts);
540
541
    
    // Obtener distribución para este hijo
542
    get_block_dist(qty, mall->myId, numP, &dist_data);
543
    *recv = malloc(dist_data.tamBl * datasize);
544
545
546
547
548

    #if USE_MAL_DEBUG >= 4
      get_block_dist(qty, mall->myId, numP, &dist_data);
      print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
    #endif
549
  } else {
550
551
552
553
554
555
556
557
558
559
560
561
562
    #if USE_MAL_DEBUG >= 4
      get_block_dist(qty, mall->myId, numP, &dist_data);
    #endif

    prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, s_counts);
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId < numO) {
      prepare_comm_alltoall(mall->myId, numO, numP, qty, offset_ids, r_counts);
      // Obtener distribución para este hijo y reservar vector de recibo
      get_block_dist(qty, mall->myId, numO, &dist_data);
      *recv = malloc(dist_data.tamBl * datasize);
      #if USE_MAL_DEBUG >= 4
        print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
      #endif
iker_martin's avatar
iker_martin committed
563
    }
564
565
566
    #if USE_MAL_DEBUG >= 4
      print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
    #endif
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
  }
}

/*
 * Ensures that the array of request of a process has an amount of elements equal to the amount of communication
 * functions the process will perform. In case the array is not initialized or does not have enough space it is
 * allocated/reallocated to the minimum amount of space needed.
 *
 * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
 * - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
 *               functions to perform, it is modified to the minimum value.
 */
582
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty) {
583
584
585
  size_t i, sum;
  MPI_Request *aux;

586
  switch(mall_conf->red_method) {
587
588
589
590
591
592
593
594
595
    case MALL_RED_BASELINE:
      sum = 1;
      break;
    case MALL_RED_POINT:
    default:
      sum = (size_t) s_counts.idE - s_counts.idI;
      sum += (size_t) r_counts.idE - r_counts.idI;
      break;
  }
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615

  if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests

  if (*requests == NULL) {
    *requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request));
  } else { // Array exists, but is too small
    aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request));
    *requests = aux;
  }

  if (*requests == NULL) {
    fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n");
    MPI_Abort(MPI_COMM_WORLD, 1);
  }

  for(i=0; i < sum; i++) {
    (*requests)[i] = MPI_REQUEST_NULL;
  }
  *request_qty = sum;
}