Distributed_CommDist.c 28.5 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
6
7
8
9
#include "block_distribution.h"
#include "Distributed_CommDist.h"
#include "../MAM_Constants.h"
#include "../MAM_Configuration.h"
#include "../MAM_DataStructures.h"
iker_martin's avatar
iker_martin committed
10

11
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts);
12
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty);
13

14
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
15
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm);
16
17
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
iker_martin's avatar
iker_martin committed
18

19
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
20
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win);
21
22
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
23

24
25
26
27
28
29
30
31
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

32
    get_block_dist(qty, myId, numP, &dist_data);
33
    if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
34
35
36
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
37

38
/*
39
40
41
42
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
43
44
45
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
46
}
47
48
49

//================================================================================
//================================================================================
50
//========================SYNCHRONOUS FUNCTIONS===================================
51
52
53
//================================================================================
//================================================================================

54
/*
55
56
57
58
59
60
61
62
63
64
65
66
67
68
 * Performs a communication to redistribute an array in a block distribution.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
 *
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
69
70
 *
 */
71
void sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm) {
72
    struct Counts s_counts, r_counts;
iker_martin's avatar
iker_martin committed
73
74
    struct Dist_data dist_data;

75
    /* PREPARE COMMUNICATION */
76
    prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts);
77

78
    /* PERFORM COMMUNICATION */
79
    switch(mall_conf->red_method) {
80
81
82
83
84
      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
85
          get_block_dist(qty, mall->myId, numO, &dist_data);
86
	}
87
        sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm);
88
89
90
	break;

      case MALL_RED_POINT:
91
        sync_point2point(send, *recv, datatype, s_counts, r_counts, comm);
92
93
94
	break;
      case MALL_RED_BASELINE:
      default:
95
        MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm);
96
97
	break;
    }
iker_martin's avatar
iker_martin committed
98

99
100
    freeCounts(&s_counts);
    freeCounts(&r_counts);
iker_martin's avatar
iker_martin committed
101
102
}

103
/*
104
105
106
107
108
109
110
111
112
113
114
 * Performs a series of blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
115
116
 *
 */
117
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
118
119
    int i, j, init, end, total_sends, datasize;
    size_t offset, offset2;
120
    MPI_Request *sends;
iker_martin's avatar
iker_martin committed
121

122
    MPI_Type_size(datatype, &datasize);
123
124
    init = s_counts.idI;
    end = s_counts.idE;
125
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE && (s_counts.idI == mall->myId || s_counts.idE == mall->myId + 1)) {
126
127
128
      offset = s_counts.displs[mall->myId] * datasize;
      offset2 = r_counts.displs[mall->myId] * datasize;
      memcpy(recv+offset2, send+offset, s_counts.counts[mall->myId]);
129
      
130
      if(s_counts.idI == mall->myId) init = s_counts.idI+1;
131
132
      else end = s_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
133

134
135
136
137
138
139
140
    total_sends = end - init;
    j = 0;
    if(total_sends > 0) {
      sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
    }
    for(i=init; i<end; i++) {
      sends[j] = MPI_REQUEST_NULL;
141
142
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
143
144
      j++;
    }
iker_martin's avatar
iker_martin committed
145

146
147
    init = r_counts.idI;
    end = r_counts.idE;
148
149
150
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE) {
      if(r_counts.idI == mall->myId) init = r_counts.idI+1;
      else if(r_counts.idE == mall->myId + 1) end = r_counts.idE-1;
151
    }
iker_martin's avatar
iker_martin committed
152

153
    for(i=init; i<end; i++) {
154
155
      offset = r_counts.displs[i] * datasize;
      MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
156
157
158
159
    }

    if(total_sends > 0) {
      MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
160
      free(sends);
161
    }
iker_martin's avatar
iker_martin committed
162
163
164
}

/*
165
166
167
168
169
170
171
172
173
174
175
 * Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 *
176
177
178
179
180
181
 * FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get
 * A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount
 * you need to perform multiples Gets.
 * prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000
 * prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA      16000
 * prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
iker_martin's avatar
iker_martin committed
182
 */
183
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm) {
184
  int datasize;
185
  MPI_Win win;
186
187
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win);
188

189
190
191
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
192
  switch(mall_conf->red_method) {
193
    case MALL_RED_RMA_LOCKALL:
194
      sync_rma_lockall(recv, datatype, r_counts, win);
195
196
      break;
    case MALL_RED_RMA_LOCK:
197
      sync_rma_lock(recv, datatype, r_counts, win);
198
199
      break;
  }
200
201
202
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
203
  MPI_Win_free(&win);
iker_martin's avatar
iker_martin committed
204
205
}

206
207


208
/*
209
210
211
212
213
214
215
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
216
 */
217
218
219
220
221
222
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
223
224

  for(i=r_counts.idI; i<r_counts.idE; i++) {
225
    offset = r_counts.displs[i] * datasize;
226
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
227
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
228
229
230
    MPI_Win_unlock(i, win);
    target_displs=0;
  }
231
232
}

233
234
235
236
237
238
239
240
241
/*
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
 */
242
243
244
245
246
247
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
248
249
250

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
251
252
    offset = r_counts.displs[i] * datasize;
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
253
254
255
256
257
    target_displs=0;
  }
  MPI_Win_unlock_all(win);
}

258
259
//================================================================================
//================================================================================
260
//========================ASYNCHRONOUS FUNCTIONS==================================
261
262
263
264
//================================================================================
//================================================================================

/*
265
266
267
 * Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
268
 *
269
270
271
272
273
274
275
276
277
278
279
280
281
282
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be 
 *               modified to the expected value.
283
284
 *
 */
285
void async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
286
    struct Counts s_counts, r_counts;
287
    struct Dist_data dist_data;
288
289

    /* PREPARE COMMUNICATION */
290
    prepare_redistribution(qty, datatype, numP, numO, is_children_group, recv, &s_counts, &r_counts); 
291
    check_requests(s_counts, r_counts, requests, request_qty); //FIXME Error related to second reconf if Merge Shrink + P2P -->Invalid requests
292
293

    /* PERFORM COMMUNICATION */
294
    switch(mall_conf->red_method) {
295
296
297
298
299
300

      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
301
          get_block_dist(qty, mall->myId, numO, &dist_data);
302
	}
303
        async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, comm, *requests, win);
304
305
	break;
      case MALL_RED_POINT:
306
        async_point2point(send, *recv, datatype, s_counts, r_counts, comm, *requests);
307
308
309
	break;
      case MALL_RED_BASELINE:
      default:
310
        MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, comm, &((*requests)[0]));
311
312
	break;
    }
313

314
315
    freeCounts(&s_counts);
    freeCounts(&r_counts);
316
317
318
}

/*
319
 * Checks if a set of requests have been completed (1) or not (0).
320
 *
321
322
323
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
324
 *
325
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
326
 */
327
328
int async_communication_check(int is_children_group, MPI_Request *requests, size_t request_qty) {
  int completed, req_completed, test_err;
329
330
331
332
  size_t i;
  completed = 1;
  test_err = MPI_SUCCESS;

333
  if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
334

335
336
337
  for(i=0; i<request_qty; i++) {
    test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
    completed = completed && req_completed;
338
  }
339
  //test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
340

341
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
342
    printf("P%d aborting -- Test Async\n", mall->myId);
343
344
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }
345

346
  return completed;
347
348
}

349

350
/*
351
352
 * Waits until the completion of a set of requests. If the Ibarrier strategy
 * is being used, the corresponding ibarrier is posted.
353
 *
354
355
356
 * - comm (IN): Communicator to use to confirm finalizations of redistribution
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
357
 */
358
void async_communication_wait(MPI_Request *requests, size_t request_qty) {
359
  MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); 
360
  #if USE_MAL_DEBUG >= 3
361
    DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
362
  #endif
363
364
}

365
/*
366
367
 * Frees Requests/Windows associated to a particular redistribution.
 * Should be called for each output result of calling "async_communication_start".
368
 *
369
370
371
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
 * - win (IN): Window to free.
372
 */
373
void async_communication_end(MPI_Request *requests, size_t request_qty, MPI_Win *win) {
374
375
376

  //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
  //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
377
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); }
378

379
380
  if((mall_conf->red_method == MALL_RED_RMA_LOCKALL || mall_conf->red_method == MALL_RED_RMA_LOCK) 
		  && *win != MPI_WIN_NULL) { MPI_Win_free(win); }
381
382
}

383
/*
384
385
386
387
388
389
390
391
392
393
394
395
 * Performs a series of non-blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
396
 *
397
 */
398
399
400
401
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
    int i, j = 0, datasize;
    size_t offset;
    MPI_Type_size(datatype, &datasize);
402

403
    for(i=s_counts.idI; i<s_counts.idE; i++) {
404
405
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
406
407
      j++;
    }
408

409
    for(i=r_counts.idI; i<r_counts.idE; i++) {
410
411
      offset = r_counts.displs[i] * datasize;
      MPI_Irecv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
412
413
      j++;
    }
iker_martin's avatar
iker_martin committed
414
415
}

416
/*
417
418
419
420
421
422
423
424
425
426
427
428
 * Performs asynchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 * - window (OUT): Pointer to a window object used for the RMA operations.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
429
430
 *
 */
431
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win) {
432
  int datasize;
433

434
435
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, win);
436
  switch(mall_conf->red_method) {
437
    case MALL_RED_RMA_LOCKALL:
438
      async_rma_lockall(recv, datatype, r_counts, *win, requests);
439
440
      break;
    case MALL_RED_RMA_LOCK:
441
      async_rma_lock(recv, datatype, r_counts, *win, requests);
442
443
444
      break;
  }
}
445

446
447
448
449
450
451
452
453
454
455
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
456
457
458
459
460
461
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
462
463

  for(i=r_counts.idI; i<r_counts.idE; i++) {
464
    offset = r_counts.displs[i] * datasize;
465
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
466
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
    MPI_Win_unlock(i, win);
    target_displs=0;
    j++;
  }
}
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
482
483
484
485
486
487
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
488
489
490

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
491
492
    offset = r_counts.displs[i] * datasize;
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
493
494
495
496
    target_displs=0;
    j++;
  }
  MPI_Win_unlock_all(win);
497
498
}

iker_martin's avatar
iker_martin committed
499
500
501
502
503
504
505
506
507
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
508
509
510
511
512
513
514
515
516
517
518
519
520
 * Performs a communication to redistribute an array in a block distribution. For each process calculates
 * how many elements sends/receives to other processes for the new group.
 *
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
iker_martin's avatar
iker_martin committed
521
522
 *
 */
523
void prepare_redistribution(int qty, MPI_Datatype datatype, int numP, int numO, int is_children_group, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
524
525
  int array_size = numO;
  int offset_ids = 0;
526
  int datasize;
527
528
  struct Dist_data dist_data;

529
530
531
  if(mall_conf->spawn_method == MALL_SPAWN_BASELINE) {
    offset_ids =  MAM_Contains_strat(MAM_SPAWN_STRATEGIES, MAM_STRAT_SPAWN_INTERCOMM, NULL) ? 
	    0 : numP;
532
533
534
  } else {
    array_size = numP > numO ? numP : numO;
  }
535

536
537
  mallocCounts(s_counts, array_size+offset_ids);
  mallocCounts(r_counts, array_size+offset_ids);
538
  MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
539
540

  if(is_children_group) {
541
    offset_ids = 0;
542
    prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, r_counts);
543
544
    
    // Obtener distribución para este hijo
545
    get_block_dist(qty, mall->myId, numP, &dist_data);
546
    *recv = malloc(dist_data.tamBl * datasize);
547
548
549
550
551

    #if USE_MAL_DEBUG >= 4
      get_block_dist(qty, mall->myId, numP, &dist_data);
      print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
    #endif
552
  } else {
553
554
555
556
557
558
559
560
561
562
563
564
565
    #if USE_MAL_DEBUG >= 4
      get_block_dist(qty, mall->myId, numP, &dist_data);
    #endif

    prepare_comm_alltoall(mall->myId, numP, numO, qty, offset_ids, s_counts);
    if(mall_conf->spawn_method == MALL_SPAWN_MERGE && mall->myId < numO) {
      prepare_comm_alltoall(mall->myId, numO, numP, qty, offset_ids, r_counts);
      // Obtener distribución para este hijo y reservar vector de recibo
      get_block_dist(qty, mall->myId, numO, &dist_data);
      *recv = malloc(dist_data.tamBl * datasize);
      #if USE_MAL_DEBUG >= 4
        print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
      #endif
iker_martin's avatar
iker_martin committed
566
    }
567
568
569
    #if USE_MAL_DEBUG >= 4
      print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
    #endif
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
  }
}

/*
 * Ensures that the array of request of a process has an amount of elements equal to the amount of communication
 * functions the process will perform. In case the array is not initialized or does not have enough space it is
 * allocated/reallocated to the minimum amount of space needed.
 *
 * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
 * - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
 *               functions to perform, it is modified to the minimum value.
 */
585
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty) {
586
587
588
  size_t i, sum;
  MPI_Request *aux;

589
  switch(mall_conf->red_method) {
590
591
592
593
594
595
596
597
598
    case MALL_RED_BASELINE:
      sum = 1;
      break;
    case MALL_RED_POINT:
    default:
      sum = (size_t) s_counts.idE - s_counts.idI;
      sum += (size_t) r_counts.idE - r_counts.idI;
      break;
  }
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618

  if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests

  if (*requests == NULL) {
    *requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request));
  } else { // Array exists, but is too small
    aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request));
    *requests = aux;
  }

  if (*requests == NULL) {
    fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n");
    MPI_Abort(MPI_COMM_WORLD, 1);
  }

  for(i=0; i < sum; i++) {
    (*requests)[i] = MPI_REQUEST_NULL;
  }
  *request_qty = sum;
}