"Codes/MaM/MAM_Types.h" did not exist on "60dedf0d41c23b84d0e686f8197a6898aca932a6"
CommDist.c 33.4 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
7
#include "malleabilityDataStructures.h"
iker_martin's avatar
iker_martin committed
8

9
//void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
10
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
11
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_method, int red_strategies, MPI_Request **requests, size_t *request_qty);
12

13
14
15
16
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method);
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
iker_martin's avatar
iker_martin committed
17

18
19
20
21
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win);
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
22

23
24
25
26
27
28
29
30
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

31
    get_block_dist(qty, myId, numP, &dist_data);
32
    if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
33
34
35
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
36

37
/*
38
39
40
41
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
42
43
44
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
45
}
46
47
48

//================================================================================
//================================================================================
49
//========================SYNCHRONOUS FUNCTIONS===================================
50
51
52
//================================================================================
//================================================================================

53
/*
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
 * Performs a communication to redistribute an array in a block distribution.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
 *
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
69
 *
70
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
71
 */
72
int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int red_method, MPI_Comm comm) {
73
74
    int is_intercomm, aux_comm_used = 0;
    struct Counts s_counts, r_counts;
iker_martin's avatar
iker_martin committed
75
    struct Dist_data dist_data;
76
    MPI_Comm aux_comm = MPI_COMM_NULL;
iker_martin's avatar
iker_martin committed
77

78
79
    /* PREPARE COMMUNICATION */
    MPI_Comm_test_inter(comm, &is_intercomm);
80
//    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts); //FIXME Needs the datatype?
81
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
82
    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); //FIXME MAGICAL VALUE
83
84
85
86
87
88
89
90
91

    if(is_intercomm) {
      MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
      aux_comm_used = 1;
    } else {
      aux_comm = comm;
    }
// FIXME END REFACTOR

iker_martin's avatar
iker_martin committed
92

93
94
    /* PERFORM COMMUNICATION */
    switch(red_method) {
iker_martin's avatar
iker_martin committed
95

96
97
98
99
100
101
102
      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
          get_block_dist(qty, myId, numO, &dist_data);
	}
103
        sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm, red_method);
104
105
106
	break;

      case MALL_RED_POINT:
107
        sync_point2point(send, *recv, datatype, is_intercomm, myId, s_counts, r_counts, aux_comm);
108
109
110
	break;
      case MALL_RED_BASELINE:
      default:
111
        MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm);
112
113
	break;
    }
iker_martin's avatar
iker_martin committed
114

115
116
117
118
119
120
    if(aux_comm_used) {
      MPI_Comm_free(&aux_comm);
    } 
    freeCounts(&s_counts);
    freeCounts(&r_counts);
    return 1; //FIXME In this case is always true...
iker_martin's avatar
iker_martin committed
121
122
}

123
/*
124
125
126
127
128
129
130
131
132
133
134
135
136
137
 * Performs a series of blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - is_intercomm (IN): Indicates wether the communicator is an intercommunicator (TRUE) or an
 *               intracommunicator (FALSE).
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
138
139
 *
 */
140
141
142
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
    int i, j, init, end, total_sends, datasize;
    size_t offset, offset2;
143
    MPI_Request *sends;
iker_martin's avatar
iker_martin committed
144

145
    MPI_Type_size(datatype, &datasize);
146
147
148
    init = s_counts.idI;
    end = s_counts.idE;
    if(!is_intercomm && (s_counts.idI == myId || s_counts.idE == myId + 1)) {
149
150
151
152
      offset = s_counts.displs[myId] + datasize;
      offset2 = r_counts.displs[myId] + datasize;
      memcpy(send+offset, recv+offset2, s_counts.counts[myId]);
      
153
154
155
      if(s_counts.idI == myId) init = s_counts.idI+1;
      else end = s_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
156

157
158
159
160
161
162
163
    total_sends = end - init;
    j = 0;
    if(total_sends > 0) {
      sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
    }
    for(i=init; i<end; i++) {
      sends[j] = MPI_REQUEST_NULL;
164
165
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
166
167
      j++;
    }
iker_martin's avatar
iker_martin committed
168

169
170
171
172
173
174
    init = r_counts.idI;
    end = r_counts.idE;
    if(!is_intercomm) {
      if(r_counts.idI == myId) init = r_counts.idI+1;
      else if(r_counts.idE == myId + 1) end = r_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
175

176
    for(i=init; i<end; i++) {
177
178
      offset = r_counts.displs[i] * datasize;
      MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
179
180
181
182
183
    }

    if(total_sends > 0) {
      MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
    }
iker_martin's avatar
iker_martin committed
184
185
186
}

/*
187
188
189
190
191
192
193
194
195
196
197
198
 * Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 * - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
 *
199
200
201
202
203
204
 * FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get
 * A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount
 * you need to perform multiples Gets.
 * prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000
 * prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA      16000
 * prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
iker_martin's avatar
iker_martin committed
205
 */
206
207
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method) {
  int datasize;
208
  MPI_Win win;
209
210
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win);
211

212
213
214
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
215
216
  switch(red_method) {
    case MALL_RED_RMA_LOCKALL:
217
      sync_rma_lockall(recv, datatype, r_counts, win);
218
219
      break;
    case MALL_RED_RMA_LOCK:
220
      sync_rma_lock(recv, datatype, r_counts, win);
221
222
      break;
  }
223
224
225
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
226
  MPI_Win_free(&win);
iker_martin's avatar
iker_martin committed
227
228
}

229
230


231
/*
232
233
234
235
236
237
238
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
239
 */
240
241
242
243
244
245
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
246
247

  for(i=r_counts.idI; i<r_counts.idE; i++) {
248
    offset = r_counts.displs[i] * datasize;
249
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
250
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
251
252
253
    MPI_Win_unlock(i, win);
    target_displs=0;
  }
254
255
}

256
257
258
259
260
261
262
263
264
/*
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
 */
265
266
267
268
269
270
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
271
272
273

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
274
275
    offset = r_counts.displs[i] * datasize;
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
276
277
278
279
280
    target_displs=0;
  }
  MPI_Win_unlock_all(win);
}

281
282
//================================================================================
//================================================================================
283
//========================ASYNCHRONOUS FUNCTIONS==================================
284
285
286
287
//================================================================================
//================================================================================

/*
288
289
290
 * Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
291
 *
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be 
 *               modified to the expected value.
307
 *
308
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always false...
309
 */
310
int async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int red_method, int red_strategies, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
311
312
    int is_intercomm, aux_comm_used = 0;
    struct Counts s_counts, r_counts;
313
    struct Dist_data dist_data;
314
315
316
317
    MPI_Comm aux_comm = MPI_COMM_NULL;

    /* PREPARE COMMUNICATION */
    MPI_Comm_test_inter(comm, &is_intercomm);
318
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
319
320
    //prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); // TODO MAGICAL VALUE
321
322
323
324
325
326
327
    if(is_intercomm) {
      MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
      aux_comm_used = 1;
    } else {
      aux_comm = comm;
    }
// FIXME END REFACTOR
328
    check_requests(s_counts, r_counts, red_method, red_strategies, requests, request_qty);
329
330
331
332
333
334
335
336
337
338
339

    /* PERFORM COMMUNICATION */
    switch(red_method) {

      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
          get_block_dist(qty, myId, numO, &dist_data);
	}
340
        async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm, red_method, *requests, win);
341
342
	break;
      case MALL_RED_POINT:
343
        async_point2point(send, *recv, datatype, s_counts, r_counts, aux_comm, *requests);
344
345
346
	break;
      case MALL_RED_BASELINE:
      default:
347
        MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm, &((*requests)[0]));
348
349
	break;
    }
350

351
352
353
354
    /* POST REQUESTS CHECKS */
    if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
      if(!is_children_group && (is_intercomm || myId >= numO)) {
        MPI_Ibarrier(comm, &((*requests)[*request_qty-1]) ); //FIXME Not easy to read...
355
      }
356
    }
357

358
359
360
    if(aux_comm_used) {
      MPI_Comm_free(&aux_comm);
    } 
361

362
363
364
    freeCounts(&s_counts);
    freeCounts(&r_counts);
    return 0; //FIXME In this case is always false...
365
366
367
}

/*
368
 * Checks if a set of requests have been completed (1) or not (0).
369
 *
370
371
372
373
374
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - red_strategies (IN):
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
375
 *
376
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
377
 */
378
379
380
381
382
383
384
int async_communication_check(int myId, int is_children_group, int red_strategies, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
  int completed, req_completed, all_req_null, test_err, aux_condition;
  size_t i;
  completed = 1;
  all_req_null = 1;
  test_err = MPI_SUCCESS;

385
  if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
386
387
388
389
390
391
392
393
394
395
396
397

  if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {

    // The Ibarrier should only be posted at this point if the process
    // has other requests which has not confirmed as completed yet,
    // but are confirmed now.
    if (requests[request_qty-1] == MPI_REQUEST_NULL) {
      for(i=0; i<request_qty; i++) {
	aux_condition = requests[i] == MPI_REQUEST_NULL;
	all_req_null  = all_req_null && aux_condition;
        test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
        completed = completed && req_completed;
398
      }
399
      if(completed && !all_req_null) { MPI_Ibarrier(comm, &(requests[request_qty-1])); }
400
    }
401
    test_err = MPI_Test(&(requests[request_qty-1]), &completed, MPI_STATUS_IGNORE);
402

403
404
405
406
  } else {
    for(i=0; i<request_qty; i++) {
      test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
      completed = completed && req_completed;
407
    }
408
409
//  test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
  }
410

411
412
413
414
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
    printf("P%d aborting -- Test Async\n", myId);
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }
415

416
  return completed;
417
418
}

419

420
/*
421
422
 * Waits until the completion of a set of requests. If the Ibarrier strategy
 * is being used, the corresponding ibarrier is posted.
423
 *
424
425
426
 * - comm (IN): Communicator to use to confirm finalizations of redistribution
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
427
 * - post_ibarrier (IN): Whether an Ibarrier should be posted by this process or not.
428
 */
429
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier) {
430
  MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); 
431
  #if USE_MAL_DEBUG >= 3
432
    DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
433
  #endif
434
  if(post_ibarrier) {
435
    MPI_Ibarrier(comm, &(requests[request_qty-1]) );
436
    MPI_Wait(&(requests[request_qty-1]), MPI_STATUS_IGNORE);
437
  }
438
439
}

440
/*
441
442
 * Frees Requests/Windows associated to a particular redistribution.
 * Should be called for each output result of calling "async_communication_start".
443
 *
444
445
446
447
448
 * - red_method (IN):
 * - red_strategies (IN):
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
 * - win (IN): Window to free.
449
 */
450
451
452
453
454
455
456
void async_communication_end(int red_method, int red_strategies, MPI_Request *requests, size_t request_qty, MPI_Win *win) {

  //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
  //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
  if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); }

  if(red_method == MALL_RED_RMA_LOCKALL || red_method == MALL_RED_RMA_LOCK) { MPI_Win_free(win); }
457
458
}

459
/*
460
461
462
463
464
465
466
467
468
469
470
471
 * Performs a series of non-blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
472
 *
473
 */
474
475
476
477
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
    int i, j = 0, datasize;
    size_t offset;
    MPI_Type_size(datatype, &datasize);
478

479
    for(i=s_counts.idI; i<s_counts.idE; i++) {
480
481
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
482
483
      j++;
    }
484

485
    for(i=r_counts.idI; i<r_counts.idE; i++) {
486
487
      offset = r_counts.displs[i] * datasize;
      MPI_Irecv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
488
489
      j++;
    }
iker_martin's avatar
iker_martin committed
490
491
}

492
/*
493
494
495
496
497
498
499
500
501
502
503
504
505
 * Performs asynchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 * - red_method (IN): Type of data redistribution to use. In this case indicates the RMA operation(Lock or LockAll).
 * - window (OUT): Pointer to a window object used for the RMA operations.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
506
507
 *
 */
508
509
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, int red_method, MPI_Request *requests, MPI_Win *win) {
  int datasize;
510

511
512
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, win);
513
514
  switch(red_method) {
    case MALL_RED_RMA_LOCKALL:
515
      async_rma_lockall(recv, datatype, r_counts, *win, requests);
516
517
      break;
    case MALL_RED_RMA_LOCK:
518
      async_rma_lock(recv, datatype, r_counts, *win, requests);
519
520
521
      break;
  }
}
522

523
524
525
526
527
528
529
530
531
532
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
533
534
535
536
537
538
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
539
540

  for(i=r_counts.idI; i<r_counts.idE; i++) {
541
    offset = r_counts.displs[i] * datasize;
542
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
543
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
    MPI_Win_unlock(i, win);
    target_displs=0;
    j++;
  }
}
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
559
560
561
562
563
564
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
565
566
567

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
568
569
    offset = r_counts.displs[i] * datasize;
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
570
571
572
573
    target_displs=0;
    j++;
  }
  MPI_Win_unlock_all(win);
574
575
}

iker_martin's avatar
iker_martin committed
576
577
578
579
580
581
582
583
584
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
 * Performs a communication to redistribute an array in a block distribution. For each process calculates
 * how many elements sends/receives to other processes for the new group.
 *
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - is_intercomm (IN): Indicates wether the used communicator is a intercomunicator(TRUE) or intracommunicator(FALSE).
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
iker_martin's avatar
iker_martin committed
600
601
 *
 */
602
//FIXME Ensure name for is_sync variable
603
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
604
605
  int array_size = numO;
  int offset_ids = 0;
606
  int datasize;
607
608
609
  struct Dist_data dist_data;

  if(is_intercomm) {
610
    offset_ids = is_sync ? numP : 0; //FIXME Modify only if active?
611
612
613
614
615
  } else {
    array_size = numP > numO ? numP : numO;
  }
  mallocCounts(s_counts, array_size+offset_ids);
  mallocCounts(r_counts, array_size+offset_ids);
616
  MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
617
618

  if(is_children_group) {
619
    offset_ids = 0;
620
621
622
623
    prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, r_counts);
    
    // Obtener distribución para este hijo
    get_block_dist(qty, myId, numP, &dist_data);
624
    *recv = malloc(dist_data.tamBl * datasize);
625
//get_block_dist(qty, myId, numP, &dist_data);
626
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
627
628
629
630
631
632
633
634
  } else {
//get_block_dist(qty, myId, numP, &dist_data);

    prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, s_counts);
    if(!is_intercomm && myId < numO) {
        prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts);
        // Obtener distribución para este hijo y reservar vector de recibo
        get_block_dist(qty, myId, numO, &dist_data);
635
636
        *recv = malloc(dist_data.tamBl * datasize);
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
iker_martin's avatar
iker_martin committed
637
    }
638
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
  }
}

/*
 * Ensures that the array of request of a process has an amount of elements equal to the amount of communication
 * functions the process will perform. In case the array is not initialized or does not have enough space it is
 * allocated/reallocated to the minimum amount of space needed.
 *
 * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
 * - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
 *               functions to perform, it is modified to the minimum value.
 */
654
void check_requests(struct Counts s_counts, struct Counts r_counts, int red_method, int red_strategies, MPI_Request **requests, size_t *request_qty) {
655
656
657
  size_t i, sum;
  MPI_Request *aux;

658
659
660
661
662
663
664
665
666
667
  switch(red_method) {
    case MALL_RED_BASELINE:
      sum = 1;
      break;
    case MALL_RED_POINT:
    default:
      sum = (size_t) s_counts.idE - s_counts.idI;
      sum += (size_t) r_counts.idE - r_counts.idI;
      break;
  }
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
  if(malleability_red_contains_strat(red_strategies, MALL_RED_IBARRIER, NULL)) {
    sum++;
  }

  if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests

  if (*requests == NULL) {
    *requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request));
  } else { // Array exists, but is too small
    aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request));
    *requests = aux;
  }

  if (*requests == NULL) {
    fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n");
    MPI_Abort(MPI_COMM_WORLD, 1);
  }

  for(i=0; i < sum; i++) {
    (*requests)[i] = MPI_REQUEST_NULL;
  }
  *request_qty = sum;
}
iker_martin's avatar
iker_martin committed
691

692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
/*
 * Función para obtener si entre las estrategias elegidas, se utiliza
 * la estrategia pasada como segundo argumento.
 *
 * Devuelve en "result" 1(Verdadero) si utiliza la estrategia, 0(Falso) en caso
 * contrario.
 */
int malleability_red_contains_strat(int comm_strategies, int strategy, int *result) {
  int value = comm_strategies % strategy ? 0 : 1;
  if(result != NULL) *result = value;
  return value;
}


/*
 * Función para anyadir una estrategia a un conjunto.
 *
 * Devuelve en "result" 1(Verdadero) si se ha anyadido, 0(Falso) en caso
 * contrario.
 */
int malleability_red_add_strat(int *comm_strategies, int strategy) {
  if(malleability_red_contains_strat(*comm_strategies, strategy, NULL)) return 1;
  *comm_strategies = *comm_strategies * strategy;
  return 1;
iker_martin's avatar
iker_martin committed
716
}