CommDist.c 32.1 KB
Newer Older
iker_martin's avatar
iker_martin committed
1
2
3
4
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
#include <string.h>
5
#include "distribution_methods/block_distribution.h"
6
#include "CommDist.h"
7
#include "MAM_Configuration.h"
8
#include "malleabilityDataStructures.h"
iker_martin's avatar
iker_martin committed
9

10
//void prepare_redistribution(int qty, int myId, int numP, int numO, int is_children_group, int is_intercomm, char **recv, struct Counts *s_counts, struct Counts *r_counts);
11
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts); //FIXME Choose name for is_sync
12
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty);
13

14
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm);
15
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm);
16
17
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win);
iker_martin's avatar
iker_martin committed
18

19
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests);
20
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win);
21
22
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests);
23

24
25
26
27
28
29
30
31
/*
 * Reserva memoria para un vector de hasta "qty" elementos.
 * Los "qty" elementos se disitribuyen entre los "numP" procesos
 * que llaman a esta funcion.
 */
void malloc_comm_array(char **array, int qty, int myId, int numP) {
    struct Dist_data dist_data;

32
    get_block_dist(qty, myId, numP, &dist_data);
33
    if( (*array = calloc(dist_data.tamBl, sizeof(char))) == NULL) {
34
35
36
      printf("Memory Error (Malloc Arrays(%d))\n", dist_data.tamBl); 
      exit(1); 
    }
37

38
/*
39
40
41
42
        int i;
	for(i=0; i<dist_data.tamBl; i++) {
	  (*array)[i] = '!' + i + dist_data.ini;
	}
43
44
45
	
        printf("P%d Tam %d String: %s\n", myId, dist_data.tamBl, *array);
*/
46
}
47
48
49

//================================================================================
//================================================================================
50
//========================SYNCHRONOUS FUNCTIONS===================================
51
52
53
//================================================================================
//================================================================================

54
/*
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
 * Performs a communication to redistribute an array in a block distribution.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
 *
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
70
 *
71
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always true...
72
 */
73
int sync_communication(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, MPI_Comm comm) {
74
75
    int is_intercomm, aux_comm_used = 0;
    struct Counts s_counts, r_counts;
iker_martin's avatar
iker_martin committed
76
    struct Dist_data dist_data;
77
    MPI_Comm aux_comm = MPI_COMM_NULL;
iker_martin's avatar
iker_martin committed
78

79
80
    /* PREPARE COMMUNICATION */
    MPI_Comm_test_inter(comm, &is_intercomm);
81
//    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts); //FIXME Needs the datatype?
82
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
83
    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); //FIXME MAGICAL VALUE
84
85
86
87
88
89
90
91
92

    if(is_intercomm) {
      MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
      aux_comm_used = 1;
    } else {
      aux_comm = comm;
    }
// FIXME END REFACTOR

93
    /* PERFORM COMMUNICATION */
94
    switch(mall_conf->red_method) {
iker_martin's avatar
iker_martin committed
95

96
97
98
99
100
101
102
      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
          get_block_dist(qty, myId, numO, &dist_data);
	}
103
        sync_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm);
104
105
106
	break;

      case MALL_RED_POINT:
107
        sync_point2point(send, *recv, datatype, is_intercomm, myId, s_counts, r_counts, aux_comm);
108
109
110
	break;
      case MALL_RED_BASELINE:
      default:
111
        MPI_Alltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm);
112
113
	break;
    }
iker_martin's avatar
iker_martin committed
114

115
116
117
118
119
120
    if(aux_comm_used) {
      MPI_Comm_free(&aux_comm);
    } 
    freeCounts(&s_counts);
    freeCounts(&r_counts);
    return 1; //FIXME In this case is always true...
iker_martin's avatar
iker_martin committed
121
122
}

123
/*
124
125
126
127
128
129
130
131
132
133
134
135
136
137
 * Performs a series of blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - is_intercomm (IN): Indicates wether the communicator is an intercommunicator (TRUE) or an
 *               intracommunicator (FALSE).
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
138
139
 *
 */
140
141
142
void sync_point2point(void *send, void *recv, MPI_Datatype datatype, int is_intercomm, int myId, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm) {
    int i, j, init, end, total_sends, datasize;
    size_t offset, offset2;
143
    MPI_Request *sends;
iker_martin's avatar
iker_martin committed
144

145
    MPI_Type_size(datatype, &datasize);
146
147
148
    init = s_counts.idI;
    end = s_counts.idE;
    if(!is_intercomm && (s_counts.idI == myId || s_counts.idE == myId + 1)) {
149
150
151
152
      offset = s_counts.displs[myId] + datasize;
      offset2 = r_counts.displs[myId] + datasize;
      memcpy(send+offset, recv+offset2, s_counts.counts[myId]);
      
153
154
155
      if(s_counts.idI == myId) init = s_counts.idI+1;
      else end = s_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
156

157
158
159
160
161
162
163
    total_sends = end - init;
    j = 0;
    if(total_sends > 0) {
      sends = (MPI_Request *) malloc(total_sends * sizeof(MPI_Request));
    }
    for(i=init; i<end; i++) {
      sends[j] = MPI_REQUEST_NULL;
164
165
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(sends[j]));
166
167
      j++;
    }
iker_martin's avatar
iker_martin committed
168

169
170
171
172
173
174
    init = r_counts.idI;
    end = r_counts.idE;
    if(!is_intercomm) {
      if(r_counts.idI == myId) init = r_counts.idI+1;
      else if(r_counts.idE == myId + 1) end = r_counts.idE-1;
    }
iker_martin's avatar
iker_martin committed
175

176
    for(i=init; i<end; i++) {
177
178
      offset = r_counts.displs[i] * datasize;
      MPI_Recv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, MPI_STATUS_IGNORE);
179
180
181
182
183
    }

    if(total_sends > 0) {
      MPI_Waitall(total_sends, sends, MPI_STATUSES_IGNORE);
    }
iker_martin's avatar
iker_martin committed
184
185
186
}

/*
187
188
189
190
191
192
193
194
195
196
197
 * Performs synchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 *
198
199
200
201
202
203
 * FIXME: In libfabric one of these macros defines the maximum amount of BYTES that can be communicated in a SINGLE MPI_Get
 * A window can have more bytes than the amount shown in those macros, therefore, if you want to read more than that amount
 * you need to perform multiples Gets.
 * prov/psm3/psm3/psm_config.h:179:#define MQ_SHM_THRESH_RNDV 16000
 * prov/psm3/psm3/ptl_am/am_config.h:62:#define PSMI_MQ_RV_THRESH_CMA      16000
 * prov/psm3/psm3/ptl_am/am_config.h:65:#define PSMI_MQ_RV_THRESH_NO_KASSIST 16000
iker_martin's avatar
iker_martin committed
204
 */
205
void sync_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm) {
206
  int datasize;
207
  MPI_Win win;
208
209
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, &win);
210

211
212
213
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Created Window for synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
214
  switch(mall_conf->red_method) {
215
    case MALL_RED_RMA_LOCKALL:
216
      sync_rma_lockall(recv, datatype, r_counts, win);
217
218
      break;
    case MALL_RED_RMA_LOCK:
219
      sync_rma_lock(recv, datatype, r_counts, win);
220
221
      break;
  }
222
223
224
  #if USE_MAL_DEBUG >= 3
    DEBUG_FUNC("Completed synchronous RMA communication", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(comm);
  #endif
225
  MPI_Win_free(&win);
iker_martin's avatar
iker_martin committed
226
227
}

228
229


230
/*
231
232
233
234
235
236
237
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
238
 */
239
240
241
242
243
244
void sync_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
245
246

  for(i=r_counts.idI; i<r_counts.idE; i++) {
247
    offset = r_counts.displs[i] * datasize;
248
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
249
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
250
251
252
    MPI_Win_unlock(i, win);
    target_displs=0;
  }
253
254
}

255
256
257
258
259
260
261
262
263
/*
 * Performs a passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 *
 */
264
265
266
267
268
269
void sync_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win) {
  int i, target_displs, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
270
271
272

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
273
274
    offset = r_counts.displs[i] * datasize;
    MPI_Get(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win);
275
276
277
278
279
    target_displs=0;
  }
  MPI_Win_unlock_all(win);
}

280
281
//================================================================================
//================================================================================
282
//========================ASYNCHRONOUS FUNCTIONS==================================
283
284
285
286
//================================================================================
//================================================================================

/*
287
288
289
 * Performs a communication to redistribute an array in a block distribution with non-blocking MPI functions.
 * In the redistribution is differenciated parent group from the children and the values each group indicates can be
 * different.
290
 *
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
 * - send (IN):  Array with the data to send. This data can not be null for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (OUT): Quantity of requests to be used. If a process sends and receives data, this value will be 
 *               modified to the expected value.
306
 *
307
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE). //FIXME In this case is always false...
308
 */
309
int async_communication_start(void *send, void **recv, int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, MPI_Comm comm, MPI_Request **requests, size_t *request_qty, MPI_Win *win) {
310
311
    int is_intercomm, aux_comm_used = 0;
    struct Counts s_counts, r_counts;
312
    struct Dist_data dist_data;
313
314
315
316
    MPI_Comm aux_comm = MPI_COMM_NULL;

    /* PREPARE COMMUNICATION */
    MPI_Comm_test_inter(comm, &is_intercomm);
317
// TODO START REFACTOR POR DEFECTO USA SIEMPRE INTRACOMM
318
319
    //prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, recv, &s_counts, &r_counts);
    prepare_redistribution(qty, datatype, myId, numP, numO, is_children_group, is_intercomm, 1, recv, &s_counts, &r_counts); // TODO MAGICAL VALUE
320
321
322
323
324
325
326
    if(is_intercomm) {
      MPI_Intercomm_merge(comm, is_children_group, &aux_comm);
      aux_comm_used = 1;
    } else {
      aux_comm = comm;
    }
// FIXME END REFACTOR
327
    check_requests(s_counts, r_counts, requests, request_qty);
328
329

    /* PERFORM COMMUNICATION */
330
    switch(mall_conf->red_method) {
331
332
333
334
335
336
337
338

      case MALL_RED_RMA_LOCKALL:
      case MALL_RED_RMA_LOCK:
        if(is_children_group) {
	  dist_data.tamBl = 0;
	} else {
          get_block_dist(qty, myId, numO, &dist_data);
	}
339
        async_rma(send, *recv, datatype, r_counts, dist_data.tamBl, aux_comm, *requests, win);
340
341
	break;
      case MALL_RED_POINT:
342
        async_point2point(send, *recv, datatype, s_counts, r_counts, aux_comm, *requests);
343
344
345
	break;
      case MALL_RED_BASELINE:
      default:
346
        MPI_Ialltoallv(send, s_counts.counts, s_counts.displs, datatype, *recv, r_counts.counts, r_counts.displs, datatype, aux_comm, &((*requests)[0]));
347
348
	break;
    }
349

350
    /* POST REQUESTS CHECKS */
351
    if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
352
353
      if(!is_children_group && (is_intercomm || myId >= numO)) {
        MPI_Ibarrier(comm, &((*requests)[*request_qty-1]) ); //FIXME Not easy to read...
354
      }
355
    }
356

357
358
359
    if(aux_comm_used) {
      MPI_Comm_free(&aux_comm);
    } 
360

361
362
363
    freeCounts(&s_counts);
    freeCounts(&r_counts);
    return 0; //FIXME In this case is always false...
364
365
366
}

/*
367
 * Checks if a set of requests have been completed (1) or not (0).
368
 *
369
370
371
372
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
373
 *
374
 * returns: An integer indicating if the operation has been completed(TRUE) or not(FALSE).
375
 */
376
int async_communication_check(int myId, int is_children_group, MPI_Comm comm, MPI_Request *requests, size_t request_qty) {
377
378
379
380
381
382
  int completed, req_completed, all_req_null, test_err, aux_condition;
  size_t i;
  completed = 1;
  all_req_null = 1;
  test_err = MPI_SUCCESS;

383
  if (is_children_group) return 1; //FIXME Deberia devolver un num negativo
384

385
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
386
387
388
389
390
391
392
393
394
395

    // The Ibarrier should only be posted at this point if the process
    // has other requests which has not confirmed as completed yet,
    // but are confirmed now.
    if (requests[request_qty-1] == MPI_REQUEST_NULL) {
      for(i=0; i<request_qty; i++) {
	aux_condition = requests[i] == MPI_REQUEST_NULL;
	all_req_null  = all_req_null && aux_condition;
        test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
        completed = completed && req_completed;
396
      }
397
      if(completed && !all_req_null) { MPI_Ibarrier(comm, &(requests[request_qty-1])); }
398
    }
399
    test_err = MPI_Test(&(requests[request_qty-1]), &completed, MPI_STATUS_IGNORE);
400

401
402
403
404
  } else {
    for(i=0; i<request_qty; i++) {
      test_err = MPI_Test(&(requests[i]), &req_completed, MPI_STATUS_IGNORE);
      completed = completed && req_completed;
405
    }
406
407
//  test_err = MPI_Testall(request_qty, requests, &completed, MPI_STATUSES_IGNORE); //FIXME Some kind of bug with Mpich.
  }
408

409
410
411
412
  if (test_err != MPI_SUCCESS && test_err != MPI_ERR_PENDING) {
    printf("P%d aborting -- Test Async\n", myId);
    MPI_Abort(MPI_COMM_WORLD, test_err);
  }
413

414
  return completed;
415
416
}

417

418
/*
419
420
 * Waits until the completion of a set of requests. If the Ibarrier strategy
 * is being used, the corresponding ibarrier is posted.
421
 *
422
423
424
 * - comm (IN): Communicator to use to confirm finalizations of redistribution
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
425
 * - post_ibarrier (IN): Whether an Ibarrier should be posted by this process or not.
426
 */
427
void async_communication_wait(MPI_Comm comm, MPI_Request *requests, size_t request_qty, int post_ibarrier) {
428
  MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); 
429
  #if USE_MAL_DEBUG >= 3
430
    DEBUG_FUNC("Processes Waitall completed", mall->myId, mall->numP); fflush(stdout); MPI_Barrier(MPI_COMM_WORLD);
431
  #endif
432
  if(post_ibarrier) {
433
    MPI_Ibarrier(comm, &(requests[request_qty-1]) );
434
    MPI_Wait(&(requests[request_qty-1]), MPI_STATUS_IGNORE);
435
  }
436
437
}

438
/*
439
440
 * Frees Requests/Windows associated to a particular redistribution.
 * Should be called for each output result of calling "async_communication_start".
441
 *
442
443
444
 * - requests (IN): Pointer to array of requests to be used to determine if the communication has ended.
 * - request_qty (IN): Quantity of requests in "requests".
 * - win (IN): Window to free.
445
 */
446
void async_communication_end(MPI_Request *requests, size_t request_qty, MPI_Win *win) {
447
448
449

  //Para la desconexión de ambos grupos de procesos es necesario indicar a MPI que esta comm
  //ha terminado, aunque solo se pueda llegar a este punto cuando ha terminado
450
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) { MPI_Waitall(request_qty, requests, MPI_STATUSES_IGNORE); }
451

452
  if(mall_conf->red_method == MALL_RED_RMA_LOCKALL || mall_conf->red_method == MALL_RED_RMA_LOCK) { MPI_Win_free(win); }
453
454
}

455
/*
456
457
458
459
460
461
462
463
464
465
466
467
 * Performs a series of non-blocking point2point communications to redistribute an array in a block distribution. 
 * It should be called after calculating how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can not be NULL for parents.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to 
 *               receive data. If the process receives data and is NULL, the behaviour is undefined.
 * - s_counts (IN): Struct which describes how many elements will send this process to each children and 
 *               the displacements.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent 
 *               and the displacements.
 * - comm (IN):  Communicator to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
468
 *
469
 */
470
471
472
473
void async_point2point(void *send, void *recv, MPI_Datatype datatype, struct Counts s_counts, struct Counts r_counts, MPI_Comm comm, MPI_Request *requests) {
    int i, j = 0, datasize;
    size_t offset;
    MPI_Type_size(datatype, &datasize);
474

475
    for(i=s_counts.idI; i<s_counts.idE; i++) {
476
477
      offset = s_counts.displs[i] * datasize;
      MPI_Isend(send+offset, s_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
478
479
      j++;
    }
480

481
    for(i=r_counts.idI; i<r_counts.idE; i++) {
482
483
      offset = r_counts.displs[i] * datasize;
      MPI_Irecv(recv+offset, r_counts.counts[i], datatype, i, 99, comm, &(requests[j]));
484
485
      j++;
    }
iker_martin's avatar
iker_martin committed
486
487
}

488
/*
489
490
491
492
493
494
495
496
497
498
499
500
 * Performs asynchronous MPI-RMA operations to redistribute an array in a block distribution. Is should be called after calculating
 * how data should be redistributed.
 *
 * - send (IN):  Array with the data to send. This value can be NULL for children.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - tamBl (IN): How many elements are stored in the parameter "send".
 * - comm (IN):  Communicator to use to perform the redistribution. Must be an intracommunicator as MPI-RMA requirements.
 * - window (OUT): Pointer to a window object used for the RMA operations.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
501
502
 *
 */
503
void async_rma(void *send, void *recv, MPI_Datatype datatype, struct Counts r_counts, int tamBl, MPI_Comm comm, MPI_Request *requests, MPI_Win *win) {
504
  int datasize;
505

506
507
  MPI_Type_size(datatype, &datasize);
  MPI_Win_create(send, (MPI_Aint)tamBl * datasize, datasize, MPI_INFO_NULL, comm, win);
508
  switch(mall_conf->red_method) {
509
    case MALL_RED_RMA_LOCKALL:
510
      async_rma_lockall(recv, datatype, r_counts, *win, requests);
511
512
      break;
    case MALL_RED_RMA_LOCK:
513
      async_rma_lock(recv, datatype, r_counts, *win, requests);
514
515
516
      break;
  }
}
517

518
519
520
521
522
523
524
525
526
527
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lock/Unlock.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
528
529
530
531
532
533
void async_rma_lock(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
534
535

  for(i=r_counts.idI; i<r_counts.idE; i++) {
536
    offset = r_counts.displs[i] * datasize;
537
    MPI_Win_lock(MPI_LOCK_SHARED, i, MPI_MODE_NOCHECK, win);
538
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
    MPI_Win_unlock(i, win);
    target_displs=0;
    j++;
  }
}
/*
 * Performs an asynchronous and passive MPI-RMA data redistribution for a single array using the passive epochs Lockall/Unlockall.
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               If the process receives data and is NULL, the behaviour is undefined.
 * - r_counts (IN): Structure which describes how many elements will receive this process from each parent and the
 *               displacements.
 * - win (IN):   Window to use to perform the redistribution.
 * - requests (OUT): Pointer to array of requests to be used to determine if the communication has ended.
 *
 */
554
555
556
557
558
559
void async_rma_lockall(void *recv, MPI_Datatype datatype, struct Counts r_counts, MPI_Win win, MPI_Request *requests) {
  int i, target_displs, j = 0, datasize;
  size_t offset;

  MPI_Type_size(datatype, &datasize);
  target_displs = r_counts.first_target_displs; //TODO Check that datasize is not needed
560
561
562

  MPI_Win_lock_all(MPI_MODE_NOCHECK, win);
  for(i=r_counts.idI; i<r_counts.idE; i++) {
563
564
    offset = r_counts.displs[i] * datasize;
    MPI_Rget(recv+offset, r_counts.counts[i], datatype, i, target_displs, r_counts.counts[i], datatype, win, &(requests[j]));
565
566
567
568
    target_displs=0;
    j++;
  }
  MPI_Win_unlock_all(win);
569
570
}

iker_martin's avatar
iker_martin committed
571
572
573
574
575
576
577
578
579
/*
 * ========================================================================================
 * ========================================================================================
 * ================================DISTRIBUTION FUNCTIONS==================================
 * ========================================================================================
 * ========================================================================================
*/

/*
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
 * Performs a communication to redistribute an array in a block distribution. For each process calculates
 * how many elements sends/receives to other processes for the new group.
 *
 * - qty  (IN):  Sum of elements shared by all processes that will send data.
 * - myId (IN):  Rank of the MPI process in the local communicator. For the parents is not the rank obtained from "comm".
 * - numP (IN):  Size of the local group. If it is a children group, this parameter must correspond to using
 *               "MPI_Comm_size(comm)". For the parents is not always the size obtained from "comm".
 * - numO (IN):  Amount of processes in the remote group. For the parents is the target quantity of processes after the 
 *               resize, while for the children is the amount of parents.
 * - is_children_group (IN): Indicates wether this MPI rank is a children(TRUE) or a parent(FALSE).
 * - is_intercomm (IN): Indicates wether the used communicator is a intercomunicator(TRUE) or intracommunicator(FALSE).
 * - recv (OUT): Array where data will be written. A NULL value is allowed if the process is not going to receive data.
 *               process receives data and is NULL, the behaviour is undefined.
 * - s_counts (OUT): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (OUT): Struct where is indicated how many elements receives this process from other processes in the previous group.
iker_martin's avatar
iker_martin committed
595
596
 *
 */
597
//FIXME Ensure name for is_sync variable
598
void prepare_redistribution(int qty, MPI_Datatype datatype, int myId, int numP, int numO, int is_children_group, int is_intercomm, int is_sync, void **recv, struct Counts *s_counts, struct Counts *r_counts) {
599
600
  int array_size = numO;
  int offset_ids = 0;
601
  int datasize;
602
603
604
  struct Dist_data dist_data;

  if(is_intercomm) {
605
    offset_ids = is_sync ? numP : 0; //FIXME Modify only if active?
606
607
608
609
610
  } else {
    array_size = numP > numO ? numP : numO;
  }
  mallocCounts(s_counts, array_size+offset_ids);
  mallocCounts(r_counts, array_size+offset_ids);
611
  MPI_Type_size(datatype, &datasize); //FIXME Right now derived datatypes are not ensured to work
612
613

  if(is_children_group) {
614
    offset_ids = 0;
615
616
617
618
    prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, r_counts);
    
    // Obtener distribución para este hijo
    get_block_dist(qty, myId, numP, &dist_data);
619
    *recv = malloc(dist_data.tamBl * datasize);
620
//get_block_dist(qty, myId, numP, &dist_data);
621
//print_counts(dist_data, r_counts->counts, r_counts->displs, numO+offset_ids, 0, "Targets Recv");
622
623
624
625
626
627
628
629
  } else {
//get_block_dist(qty, myId, numP, &dist_data);

    prepare_comm_alltoall(myId, numP, numO, qty, offset_ids, s_counts);
    if(!is_intercomm && myId < numO) {
        prepare_comm_alltoall(myId, numO, numP, qty, offset_ids, r_counts);
        // Obtener distribución para este hijo y reservar vector de recibo
        get_block_dist(qty, myId, numO, &dist_data);
630
631
        *recv = malloc(dist_data.tamBl * datasize);
//print_counts(dist_data, r_counts->counts, r_counts->displs, array_size, 0, "Sources&Targets Recv");
iker_martin's avatar
iker_martin committed
632
    }
633
//print_counts(dist_data, s_counts->counts, s_counts->displs, numO+offset_ids, 0, "Sources Send");
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
  }
}

/*
 * Ensures that the array of request of a process has an amount of elements equal to the amount of communication
 * functions the process will perform. In case the array is not initialized or does not have enough space it is
 * allocated/reallocated to the minimum amount of space needed.
 *
 * - s_counts (IN): Struct where is indicated how many elements sends this process to processes in the new group.
 * - r_counts (IN): Struct where is indicated how many elements receives this process from other processes in the previous group.
 * - requests (IN/OUT): Pointer to array of requests to be used to determine if the communication has ended. If the pointer 
 *               is null or not enough space has been reserved the pointer is allocated/reallocated.
 * - request_qty (IN/OUT): Quantity of requests to be used. If the value is smaller than the amount of communication
 *               functions to perform, it is modified to the minimum value.
 */
649
void check_requests(struct Counts s_counts, struct Counts r_counts, MPI_Request **requests, size_t *request_qty) {
650
651
652
  size_t i, sum;
  MPI_Request *aux;

653
  switch(mall_conf->red_method) {
654
655
656
657
658
659
660
661
662
    case MALL_RED_BASELINE:
      sum = 1;
      break;
    case MALL_RED_POINT:
    default:
      sum = (size_t) s_counts.idE - s_counts.idI;
      sum += (size_t) r_counts.idE - r_counts.idI;
      break;
  }
663
  if(MAM_Contains_strat(MAM_RED_STRATEGIES, MAM_STRAT_RED_WAIT_TARGETS, NULL)) {
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
    sum++;
  }

  if (*requests != NULL && sum <= *request_qty) return; // Expected amount of requests

  if (*requests == NULL) {
    *requests = (MPI_Request *) malloc(sum * sizeof(MPI_Request));
  } else { // Array exists, but is too small
    aux = (MPI_Request *) realloc(*requests, sum * sizeof(MPI_Request));
    *requests = aux;
  }

  if (*requests == NULL) {
    fprintf(stderr, "Fatal error - It was not possible to allocate/reallocate memory for the MPI_Requests before the redistribution\n");
    MPI_Abort(MPI_COMM_WORLD, 1);
  }

  for(i=0; i < sum; i++) {
    (*requests)[i] = MPI_REQUEST_NULL;
  }
  *request_qty = sum;
}