CommDist.c 33.2 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
7
#include "malleabilityDataStructures.h"
iker_martin's avatar
iker_martin committed
8

9
//void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
10
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
11
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_strategies, MPI_Request **requests, size_t *request_qty);
12

13
14
15
16
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method);
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
iker_martin's avatar
iker_martin committed
17

18
19
20
21
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win);
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
22

23
24
25
26
27
28
29
30
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

31
    get_block_dist(qty, myId, numP, &dist_data);
32
    if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
33
34
35
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
36

37
/*
38
39
40
41
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
42
43
44
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
45
}
46
47
48

//================================================================================
//================================================================================
49
//========================SYNCHRONOUS FUNCTIONS===================================
50
51
52
//================================================================================
//================================================================================

53
/*
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
 * Performs a communication to redistribute an array in a block distribution.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
 *
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
69
 *
70
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
71
 */
72
int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int red_method, MPI_Comm comm) {
73
74
    int is_intercomm, aux_comm_used = 0;
    struct Counts s_counts, r_counts;
iker_martin's avatar
iker_martin committed
75
    struct Dist_data dist_data;
76
    MPI_Comm aux_comm = MPI_COMM_NULL;
iker_martin's avatar
iker_martin committed
77

78
79
    /* PREPARE COMMUNICATION */
    MPI_Comm_test_inter(comm, &is_intercomm);
80
//    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts); //FIXME Needs the datatype?
81
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
82
    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); //FIXME MAGICAL VALUE
83
84
85
86
87
88
89
90
91

    if(is_intercomm) {
      MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
      aux_comm_used = 1;
    } else {
      aux_comm = comm;
    }
// FIXME END REFACTOR

iker_martin's avatar
iker_martin committed
92

93
94
    /* PERFORM COMMUNICATION */
    switch(red_method) {
iker_martin's avatar
iker_martin committed
95

96
97
98
99
100
101
102
      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
          get_block_dist(qty, myId, numO, &dist_data);
	}
103
        sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm, red_method);
104
105
106
	break;

      case MALL_RED_POINT:
107
        sync_point2point(send, *recv, datatype, is_intercomm, myId, s_counts, r_counts, aux_comm);
108
109
110
	break;
      case MALL_RED_BASELINE:
      default:
111
        MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm);
112
113
	break;
    }
iker_martin's avatar
iker_martin committed
114

115
116
117
118
119
120
    if(aux_comm_used) {
      MPI_Comm_free(&aux_comm);
    } 
    freeCounts(&s_counts);
    freeCounts(&r_counts);
    return 1; //FIXME In this case is always true...
iker_martin's avatar
iker_martin committed
121
122
}

123
/*
124
125
126
127
128
129
130
131
132
133
134
135
136
137
 * Performs a series of blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - is_intercomm (IN): Indicates wether the communicator is an intercommunicator (TRUE) or an
 *               intracommunicator (FALSE).
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
138
139
 *
 */
140
141
142
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
    int i, j, init, end, total_sends, datasize;
    size_t offset, offset2;
143
    MPI_Request *sends;
iker_martin's avatar
iker_martin committed
144

145
    MPI_Type_size(datatype, &datasize);
146
147
148
    init = s_counts.idI;
    end = s_counts.idE;
    if(!is_intercomm && (s_counts.idI == myId || s_counts.idE == myId + 1)) {
149
150
151
152
      offset = s_counts.displs[myId] + datasize;
      offset2 = r_counts.displs[myId] + datasize;
      memcpy(send+offset, recv+offset2, s_counts.counts[myId]);
      
153
154
155
      if(s_counts.idI == myId) init = s_counts.idI+1;
      else end = s_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
156

157
158
159
160
161
162
163
    total_sends = end - init;
    j = 0;
    if(total_sends > 0) {
      sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
    }
    for(i=init; i<end; i++) {
      sends[j] = MPI_REQUEST_NULL;
164
165
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
166
167
      j++;
    }
iker_martin's avatar
iker_martin committed
168

169
170
171
172
173
174
    init = r_counts.idI;
    end = r_counts.idE;
    if(!is_intercomm) {
      if(r_counts.idI == myId) init = r_counts.idI+1;
      else if(r_counts.idE == myId + 1) end = r_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
175

176
    for(i=init; i<end; i++) {
177
178
      offset = r_counts.displs[i] * datasize;
      MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
179
180
181
182
183
    }

    if(total_sends > 0) {
      MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
    }
iker_martin's avatar
iker_martin committed
184
185
186
}

/*
187
188
189
190
191
192
193
194
195
196
197
198
 * Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 * - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
 *
199
200
201
202
203
204
 * FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get
 * A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount
 * you need to perform multiples Gets.
 * prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000
 * prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA      16000
 * prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
iker_martin's avatar
iker_martin committed
205
 */
206
207
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) {
  int datasize;
208
  MPI_Win win;
209
210
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win);
211

212
213
214
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
215
216
  switch(red_method) {
    case MALL_RED_RMA_LOCKALL:
217
      sync_rma_lockall(recv, datatype, r_counts, win);
218
219
      break;
    case MALL_RED_RMA_LOCK:
220
      sync_rma_lock(recv, datatype, r_counts, win);
221
222
      break;
  }
223
224
225
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
226
  MPI_Win_free(&win);
iker_martin's avatar
iker_martin committed
227
228
}

229
230


231
/*
232
233
234
235
236
237
238
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
239
 */
240
241
242
243
244
245
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
246
247

  for(i=r_counts.idI; i<r_counts.idE; i++) {
248
    offset = r_counts.displs[i] * datasize;
249
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
250
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
251
252
253
    MPI_Win_unlock(i, win);
    target_displs=0;
  }
254
255
}

256
257
258
259
260
261
262
263
264
/*
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
 */
265
266
267
268
269
270
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
271
272
273

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
274
275
    offset = r_counts.displs[i] * datasize;
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
276
277
278
279
280
    target_displs=0;
  }
  MPI_Win_unlock_all(win);
}

281
282
//================================================================================
//================================================================================
283
//========================ASYNCHRONOUS FUNCTIONS==================================
284
285
286
287
//================================================================================
//================================================================================

/*
288
289
290
 * Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
291
 *
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be 
 *               modified to the expected value.
307
 *
308
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always false...
309
 */
310
int async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
311
312
    int is_intercomm, aux_comm_used = 0;
    struct Counts s_counts, r_counts;
313
    struct Dist_data dist_data;
314
315
316
317
    MPI_Comm aux_comm = MPI_COMM_NULL;

    /* PREPARE COMMUNICATION */
    MPI_Comm_test_inter(comm, &is_intercomm);
318
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
319
320
    //prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); // TODO MAGICAL VALUE
321
322
323
324
325
326
327
    if(is_intercomm) {
      MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
      aux_comm_used = 1;
    } else {
      aux_comm = comm;
    }
// FIXME END REFACTOR
328
329
330
331
332
333
334
335
336
337
338
339
    check_requests(s_counts, r_counts, red_strategies, requests, request_qty);

    /* PERFORM COMMUNICATION */
    switch(red_method) {

      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
          get_block_dist(qty, myId, numO, &dist_data);
	}
340
        async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm, red_method, *requests, win);
341
342
	break;
      case MALL_RED_POINT:
343
        async_point2point(send, *recv, datatype, s_counts, r_counts, aux_comm, *requests);
344
345
346
	break;
      case MALL_RED_BASELINE:
      default:
347
        MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm, &((*requests)[0]));
348
349
	break;
    }
350

351
352
353
354
    /* POST REQUESTS CHECKS */
    if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
      if(!is_children_group && (is_intercomm || myId >= numO)) {
        MPI_Ibarrier(comm, &((*requests)[*request_qty-1]) ); //FIXME Not easy to read...
355
      }
356
    }
357

358
359
360
    if(aux_comm_used) {
      MPI_Comm_free(&aux_comm);
    } 
361

362
363
364
    freeCounts(&s_counts);
    freeCounts(&r_counts);
    return 0; //FIXME In this case is always false...
365
366
367
}

/*
368
 * Checks if a set of requests have been completed (1) or not (0).
369
 *
370
371
372
373
374
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - red_strategies (IN):
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
375
 *
376
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
377
 */
378
379
380
381
382
383
384
int async_communication_check(int myId, int is_children_group, int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
  int completed, req_completed, all_req_null, test_err, aux_condition;
  size_t i;
  completed = 1;
  all_req_null = 1;
  test_err = MPI_SUCCESS;

385
  if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
386
387
388
389
390
391
392
393
394
395
396
397

  if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {

    // The Ibarrier should only be posted at this point if the process
    // has other requests which has not confirmed as completed yet,
    // but are confirmed now.
    if (requests[request_qty-1] == MPI_REQUEST_NULL) {
      for(i=0; i<request_qty; i++) {
	aux_condition = requests[i] == MPI_REQUEST_NULL;
	all_req_null  = all_req_null && aux_condition;
        test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
        completed = completed && req_completed;
398
      }
399
      if(completed && !all_req_null) { MPI_Ibarrier(comm, &(requests[request_qty-1])); }
400
    }
401
    test_err = MPI_Test(&(requests[request_qty-1]), &completed, MPI_STATUS_IGNORE);
402

403
404
405
406
  } else {
    for(i=0; i<request_qty; i++) {
      test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
      completed = completed && req_completed;
407
    }
408
409
//  test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
  }
410

411
412
413
414
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
    printf("P%d aborting -- Test Async\n", myId);
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }
415

416
  return completed;
417
418
}

419

420
/*
421
422
 * Waits until the completion of a set of requests. If the Ibarrier strategy
 * is being used, the corresponding ibarrier is posted.
423
 *
424
425
426
 * - comm (IN): Communicator to use to confirm finalizations of redistribution
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
427
 * - post_ibarrier (IN): Whether an Ibarrier should be posted by this process or not.
428
 */
429
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier) {
430
  MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); 
431
  #if USE_MAL_DEBUG >= 3
432
    DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
433
  #endif
434
  if(post_ibarrier) {
435
    MPI_Ibarrier(comm, &(requests[request_qty-1]) );
436
    MPI_Wait(&(requests[request_qty-1]), MPI_STATUS_IGNORE);
437
  }
438
439
}

440
/*
441
442
 * Frees Requests/Windows associated to a particular redistribution.
 * Should be called for each output result of calling "async_communication_start".
443
 *
444
445
446
447
448
 * - red_method (IN):
 * - red_strategies (IN):
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
 * - win (IN): Window to free.
449
 */
450
451
452
453
454
455
456
void async_communication_end(int red_method, int red_strategies, MPI_Request *requests, size_t request_qty, MPI_Win *win) {

  //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
  //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
  if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); }

  if(red_method == MALL_RED_RMA_LOCKALL || red_method == MALL_RED_RMA_LOCK) { MPI_Win_free(win); }
457
458
}

459
/*
460
461
462
463
464
465
466
467
468
469
470
471
 * Performs a series of non-blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
472
 *
473
 */
474
475
476
477
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
    int i, j = 0, datasize;
    size_t offset;
    MPI_Type_size(datatype, &datasize);
478

479
    for(i=s_counts.idI; i<s_counts.idE; i++) {
480
481
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
482
483
      j++;
    }
484

485
    for(i=r_counts.idI; i<r_counts.idE; i++) {
486
487
      offset = r_counts.displs[i] * datasize;
      MPI_Irecv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
488
489
      j++;
    }
iker_martin's avatar
iker_martin committed
490
491
}

492
/*
493
494
495
496
497
498
499
500
501
502
503
504
505
 * Performs asynchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 * - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
 * - window (OUT): Pointer to a window object used for the RMA operations.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
506
507
 *
 */
508
509
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win) {
  int datasize;
510

511
512
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, win);
513
514
  switch(red_method) {
    case MALL_RED_RMA_LOCKALL:
515
      async_rma_lockall(recv, datatype, r_counts, *win, requests);
516
517
      break;
    case MALL_RED_RMA_LOCK:
518
      async_rma_lock(recv, datatype, r_counts, *win, requests);
519
520
521
      break;
  }
}
522

523
524
525
526
527
528
529
530
531
532
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
533
534
535
536
537
538
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
539
540

  for(i=r_counts.idI; i<r_counts.idE; i++) {
541
    offset = r_counts.displs[i] * datasize;
542
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
543
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
    MPI_Win_unlock(i, win);
    target_displs=0;
    j++;
  }
}
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
559
560
561
562
563
564
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
565
566
567

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
568
569
    offset = r_counts.displs[i] * datasize;
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
570
571
572
573
    target_displs=0;
    j++;
  }
  MPI_Win_unlock_all(win);
574
575
}

iker_martin's avatar
iker_martin committed
576
577
578
579
580
581
582
583
584
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
 * Performs a communication to redistribute an array in a block distribution. For each process calculates
 * how many elements sends/receives to other processes for the new group.
 *
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - is_intercomm (IN): Indicates wether the used communicator is a intercomunicator(TRUE) or intracommunicator(FALSE).
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
iker_martin's avatar
iker_martin committed
600
601
 *
 */
602
//FIXME Ensure name for is_sync variable
603
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
604
605
  int array_size = numO;
  int offset_ids = 0;
606
  int datasize;
607
608
609
  struct Dist_data dist_data;

  if(is_intercomm) {
610
    offset_ids = is_sync ? numP : 0; //FIXME Modify only if active?
611
612
613
614
615
  } else {
    array_size = numP > numO ? numP : numO;
  }
  mallocCounts(s_counts, array_size+offset_ids);
  mallocCounts(r_counts, array_size+offset_ids);
616
  MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
617
618

  if(is_children_group) {
619
    offset_ids = 0;
620
621
622
623
    prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, r_counts);
    
    // Obtener distribución para este hijo
    get_block_dist(qty, myId, numP, &dist_data);
624
    *recv = malloc(dist_data.tamBl * datasize);
625
//get_block_dist(qty, myId, numP, &dist_data);
626
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
627
628
629
630
631
632
633
634
  } else {
//get_block_dist(qty, myId, numP, &dist_data);

    prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, s_counts);
    if(!is_intercomm && myId < numO) {
        prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts);
        // Obtener distribución para este hijo y reservar vector de recibo
        get_block_dist(qty, myId, numO, &dist_data);
635
636
        *recv = malloc(dist_data.tamBl * datasize);
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
iker_martin's avatar
iker_martin committed
637
    }
638
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
  }
}

/*
 * Ensures that the array of request of a process has an amount of elements equal to the amount of communication
 * functions the process will perform. In case the array is not initialized or does not have enough space it is
 * allocated/reallocated to the minimum amount of space needed.
 *
 * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
 * - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
 *               functions to perform, it is modified to the minimum value.
 */
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_strategies, MPI_Request **requests, size_t *request_qty) {
  size_t i, sum;
  MPI_Request *aux;

  sum = (size_t) s_counts.idE - s_counts.idI;
  sum += (size_t) r_counts.idE - r_counts.idI;
  if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
    sum++;
  }

  if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests

  if (*requests == NULL) {
    *requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request));
  } else { // Array exists, but is too small
    aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request));
    *requests = aux;
  }

  if (*requests == NULL) {
    fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n");
    MPI_Abort(MPI_COMM_WORLD, 1);
  }

  for(i=0; i < sum; i++) {
    (*requests)[i] = MPI_REQUEST_NULL;
  }
  *request_qty = sum;
}
iker_martin's avatar
iker_martin committed
683

684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
/*
 * Función para obtener si entre las estrategias elegidas, se utiliza
 * la estrategia pasada como segundo argumento.
 *
 * Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso
 * contrario.
 */
int malleability_red_contains_strat(int comm_strategies, int strategy, int *result) {
  int value = comm_strategies % strategy ? 0 : 1;
  if(result != NULL) *result = value;
  return value;
}


/*
 * Función para anyadir una estrategia a un conjunto.
 *
 * Devuelve en "result" 1(Verdadero) si se ha anyadido, 0(Falso) en caso
 * contrario.
 */
int malleability_red_add_strat(int *comm_strategies, int strategy) {
  if(malleability_red_contains_strat(*comm_strategies, strategy, NULL)) return 1;
  *comm_strategies = *comm_strategies * strategy;
  return 1;
iker_martin's avatar
iker_martin committed
708
}