PortService.c 6.74 KB
Newer Older
Iker Martín Álvarez's avatar
Iker Martín Álvarez committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "../MAM_Constants.h"
#include "../MAM_DataStructures.h"
#include "PortService.h"

#define MAM_SERVICE_CONSTANT_NAME 22  // Constant size name
#define MAM_SERVICE_VARIABLE_NAME 4  // Variable size name + '\0'
#define MAM_SERVICE_NAME_SIZE MAM_SERVICE_CONSTANT_NAME + MAM_SERVICE_VARIABLE_NAME
// Example of mam service name --> "mam_service_jid0010_gr001\0"
//                                  constant part        |variable part
//
void init_ports(Spawn_ports *spawn_port) {
  spawn_port->opened_port = 0;
  spawn_port->port_name = NULL;
  spawn_port->service_name = NULL;
  spawn_port->remote_port = NULL;
  spawn_port->remote_service = NULL;
}

/*
 * Opens an MPI port for inter-process communication and optionally publishes it as a service.
 * Allows MaM to find other spawned groups which are not connected.
 *
 * Parameters:
 *   spawn_data: A structure containing information related to the port and service names.
 *   open_port : A flag that indicates if this process should Open (1) or only malloc(0).
 *   open_service: A flag that indicates if the service should be published.
 *                 If it is not MAM_SERVICE_UNNEEDED, a service name is generated and published with the chosen number.
 *
 * Functionality:
 *   - Ensures that a port is only opened if it hasn't been opened already.
 *   - The process with the root rank opens the port and, if required, publishes a service name for it.
 *     - If SLURM is being used, it attempts to get the SLURM job ID from the environment.
 *   - For non-root ranks, it simply allocates 1 byte of memory for the port_name to avoid it being NULL (a placeholder operation).
 *
 * Notes:
 *   - SLURM is conditionally used to obtain job-specific information.
 *   - Error handling is not included in this function (e.g., failed memory allocation, failed MPI calls).
 */
void open_port(Spawn_ports *spawn_port, int open_port, int open_service)
{
    int job_id = 0;
    if (spawn_port->port_name != NULL)
        return;

    if (open_port) {
        spawn_port->opened_port = 1;
        spawn_port->port_name = (char *)malloc(MPI_MAX_PORT_NAME * sizeof(char));
        MPI_Open_port(MPI_INFO_NULL, spawn_port->port_name);
        if (open_service != MAM_SERVICE_UNNEEDED) {
            spawn_port->service_name = (char *)malloc((MAM_SERVICE_NAME_SIZE) * sizeof(char));
#if MAM_USE_SLURM
      char *tmp = getenv("SLURM_JOB_ID");
      if(tmp != NULL) { job_id = atoi(tmp)%1000; }
#endif
      snprintf(spawn_port->service_name, MAM_SERVICE_NAME_SIZE, "mam_service_jid%04d_gr%03d", job_id, open_service);
      MPI_Publish_name(spawn_port->service_name, MPI_INFO_NULL, spawn_port->port_name);
    }
  } else {
    spawn_port->port_name = malloc(1);
    spawn_port->port_name[0] = '\0';
  }
}

/*
 * Function: close_port
 * --------------------
 * Closes an open MPI local port and cleans up associated resources.
 *
 * Parameters:
 *   spawn_data: A structure containing information related to the port and service names.
 *
 * Functionality:
 *   - The root process is the only one responsible for closing the MPI port and service.
 *   - Frees the memory allocated for the port and service and sets the pointer to NULL.
 *
 * Notes:
 *   - This function assumes that MPI resources were successfully allocated and opened in the corresponding `open_port` function.
 *   - No explicit error handling is present (e.g., checking the return value of MPI functions).
 */
void close_port(Spawn_ports *spawn_port) {
  if(spawn_port->port_name != NULL) {
    if(spawn_port->service_name != NULL) {
      MPI_Unpublish_name(spawn_port->service_name, MPI_INFO_NULL, spawn_port->port_name);
      free(spawn_port->service_name);
      spawn_port->service_name = NULL;
    }
    if(spawn_port->opened_port) MPI_Close_port(spawn_port->port_name);
    free(spawn_port->port_name);
    spawn_port->port_name = NULL;
  }
}

/*
 * Function: discover_remote_port
 * ------------------------------
 * Discovers the MPI port associated with a remote service using its service name. 
 * If the port cannot be found, it retries a set number of times before aborting the MPI execution.
 * This function must at least be called by the root process which will call MPI_Comm_connect, altough
 * it could be called by all processes without any issues.
 *
 * Parameters:
 *   remote_service: A pointer to a string that will hold the remote service name.
 *                   If this is the first time discovering the service, memory will be allocated and the name will be generated.
 *   id_group: An integer representing the group ID, used to identify the service.
 *   remote_port: A string where the discovered remote port name will be stored.
 *
 * Notes:
 *   - This function assumes that the service name follows a fixed pattern (`mam_service_jid%04d_gr%03d`).
 *   - If id_group is MAM_SERVICE_UNNEEDED, it is assumed the process is not the root and does not require
 *     to discover the real port.
 *   - SLURM is conditionally used to retrieve the job ID from the environment.
 *   - The number of retry attempts before aborting is limited to 5.
 *   - No explicit error handling is present (e.g., checking the return value of MPI functions).
 */
void discover_remote_port(int id_group, Spawn_ports *spawn_port) {
  int error_tries = 0, job_id = 0;

  if(spawn_port->remote_port == NULL) { 
    spawn_port->remote_port = (char*) malloc(MPI_MAX_PORT_NAME * sizeof(char));
    if(id_group == MAM_SERVICE_UNNEEDED) { spawn_port->remote_port[0] = '\0'; }
  }
  if(id_group == MAM_SERVICE_UNNEEDED) { return; }

  if(spawn_port->remote_service == NULL) { //First discover
    spawn_port->remote_service = (char*) malloc(MAM_SERVICE_NAME_SIZE * sizeof(char));
#if MAM_USE_SLURM
    char *tmp = getenv("SLURM_JOB_ID");
    if(tmp != NULL) { job_id = atoi(tmp)%1000; }
#endif
    snprintf(spawn_port->remote_service, MAM_SERVICE_NAME_SIZE, "mam_service_jid%04d_gr%03d", job_id, id_group);
  } else { // For subsequent lookups, only update the variable part (group ID) of the service name.
    snprintf(spawn_port->remote_service + MAM_SERVICE_CONSTANT_NAME, MAM_SERVICE_VARIABLE_NAME, "%03d", id_group);
  }

  snprintf(spawn_port->remote_port, 5, "NULL");

  MPI_Lookup_name(spawn_port->remote_service, MPI_INFO_NULL, spawn_port->remote_port);
  while(strncmp(spawn_port->remote_port, "NULL", 4) == 0) {
    sleep(1);
    MPI_Lookup_name(spawn_port->remote_service, MPI_INFO_NULL, spawn_port->remote_port);
    if(++error_tries > 5) MPI_Abort(MPI_COMM_WORLD, -1);
  }
}


void free_ports(Spawn_ports *spawn_port) {
  close_port(spawn_port);
  if(spawn_port->remote_port != NULL) {
    free(spawn_port->remote_port);
    spawn_port->remote_port = NULL;
  }

  if(spawn_port->remote_service != NULL) {
    free(spawn_port->remote_service);
    spawn_port->remote_service = NULL;
  }
}