#include #include #include #include #include #include #include #include #include "ProcessDist.h" int commSlurm = MAL_NOT_STARTED; struct Slurm_data *slurm_data; pthread_t slurm_thread; MPI_Comm *returned_comm; struct Slurm_data { char *cmd; // Executable name int qty_procs; MPI_Info info; int type_creation; }; typedef struct { char *argv; int numP_childs, myId, root, type_dist; MPI_Comm comm; }Creation_data; //--------------PRIVATE SPAWN TYPE DECLARATIONS---------------// void* thread_work(void* creation_data_arg); //--------------PRIVATE DECLARATIONS---------------// void processes_dist(char *argv, int numP_childs, int type_dist); int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm); void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty, int *used_nodes); void fill_str_hostfile(slurm_job_info_t job_record, int *qty, int used_nodes, char **hostfile_str); int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name); //@deprecated functions int create_hostfile(char *jobId, char **file_name); int write_hostfile_node(int ptr, int qty, char *node_name); void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_nodes); //--------------PUBLIC FUNCTIONS---------------// /* * Se solicita la creacion de un nuevo grupo de "numP" procesos con una distribucion * fisica "type_dist". * * Se puede solicitar en primer plano, encargandose por tanto el proceso que llama a esta funcion, * o en segundo plano, donde un hilo se encarga de configurar esta creacion. * * Si se pide en primer plano, al terminarla es posible llamar a "check_slurm_comm()" para crear * los procesos. * * Si se pide en segundo plano, llamar a "check_slurm_comm()" comprobara si la configuracion para * crearlos esta lista, y si es asi, los crea. * * Devuelve el estado de el procedimiento. Si no devuelve "COMM_FINISHED", es necesario llamar a * "check_slurm_comm()". */ int init_slurm_comm(char *argv, int myId, int numP, int root, int type_dist, int type_creation, MPI_Comm comm, MPI_Comm *child) { slurm_data = malloc(sizeof(struct Slurm_data)); slurm_data->type_creation = type_creation; if(type_creation == COMM_SPAWN_SERIAL) { if(myId == root) { processes_dist(argv, numP, type_dist); } else { slurm_data->cmd = malloc(1 * sizeof(char)); slurm_data->info = MPI_INFO_NULL; } create_processes(myId, root, child, comm); if(myId == root && slurm_data->info != MPI_INFO_NULL) { MPI_Info_free(&(slurm_data->info)); } free(slurm_data->cmd); free(slurm_data); commSlurm = MAL_SPAWN_COMPLETED; } else if(type_creation == COMM_SPAWN_PTHREAD) { commSlurm = MAL_SPAWN_PENDING; Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data)); creation_data->argv = argv; creation_data->numP_childs = numP; creation_data->myId = myId; creation_data->root = root; creation_data->type_dist = type_dist; creation_data->comm = comm; if(pthread_create(&slurm_thread, NULL, thread_work, (void *)creation_data)) { printf("Error al crear el hilo de contacto con SLURM\n"); MPI_Abort(MPI_COMM_WORLD, -1); return -1; } } return commSlurm; } /* * Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista, * y en caso de que lo este, se devuelve el communicador a estos nuevos procesos. */ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child) { // TODO Borrar numP si no se usa int state=-10; if(slurm_data->type_creation == COMM_SPAWN_PTHREAD) { MPI_Allreduce(&commSlurm, &state, 1, MPI_INT, MPI_MIN, MPI_COMM_WORLD); if(state != MAL_SPAWN_COMPLETED) return state; // Continue only if asynchronous process creation has ended } else { return commSlurm; } if(pthread_join(slurm_thread, NULL)) { printf("Error al esperar al hilo\n"); MPI_Abort(MPI_COMM_WORLD, -1); return -10; } *child = *returned_comm; if(myId == root && slurm_data->info != MPI_INFO_NULL) { MPI_Info_free(&(slurm_data->info)); } free(slurm_data->cmd); free(slurm_data); return commSlurm; } //--------------PRIVATE SPAWN TYPE FUNCTIONS---------------// /* * Funcion llamada por un hilo para que este se encarge * de configurar la creacion de un nuevo grupo de procesos. * * Una vez esta lista la configuracion y es posible crear los procesos * se avisa al hilo maestro. */ void* thread_work(void* creation_data_arg) { Creation_data *creation_data = (Creation_data*) creation_data_arg; returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm)); if(creation_data->myId == creation_data->root) { processes_dist(creation_data->argv, creation_data->numP_childs, creation_data->type_dist); } else { slurm_data->cmd = malloc(1 * sizeof(char)); slurm_data->info = MPI_INFO_NULL; } create_processes(creation_data->myId, creation_data->root, returned_comm, creation_data->comm); commSlurm = MAL_SPAWN_COMPLETED; free(creation_data); pthread_exit(NULL); } //--------------PRIVATE SPAWN CREATION FUNCTIONS---------------// /* * Configura la creacion de un nuevo grupo de procesos, reservando la memoria * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica * para los procesos y creando un fichero hostfile. */ void processes_dist(char *argv, int numP_childs, int type) { int jobId; char *tmp; job_info_msg_t *j_info; slurm_job_info_t last_record; int used_nodes=0; int *procs_array; char *hostfile; // Get Slurm job info tmp = getenv("SLURM_JOB_ID"); jobId = atoi(tmp); slurm_load_job(&j_info, jobId, 1); last_record = j_info->job_array[j_info->record_count - 1]; //COPY PROGRAM NAME slurm_data->cmd = malloc(strlen(argv) * sizeof(char)); strcpy(slurm_data->cmd, argv); // GET NEW DISTRIBUTION node_dist(last_record, type, numP_childs, &procs_array, &used_nodes); slurm_data->qty_procs = numP_childs; /* // CREATE/UPDATE HOSTFILE int ptr; ptr = create_hostfile(tmp, &hostfile); MPI_Info_create(&(slurm_data->info)); MPI_Info_set(slurm_data->info, "hostfile", hostfile); free(hostfile); // SET NEW DISTRIBUTION fill_hostfile(last_record, ptr, procs_array, used_nodes); close(ptr); */ // TEST fill_str_hostfile(last_record, procs_array, used_nodes, &hostfile); MPI_Info_create(&(slurm_data->info)); MPI_Info_set(slurm_data->info, "hosts", hostfile); free(hostfile); // Free JOB INFO slurm_free_job_info_msg(j_info); } /* * Crea un grupo de procesos segun la configuracion indicada por la funcion * "processes_dist()". */ int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm) { int spawn_err = MPI_Comm_spawn(slurm_data->cmd, MPI_ARGV_NULL, slurm_data->qty_procs, slurm_data->info, root, comm, child, MPI_ERRCODES_IGNORE); if(spawn_err != MPI_SUCCESS) { printf("Error creating new set of %d procs.\n", slurm_data->qty_procs); } return spawn_err; } /* * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo * cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada * nodo. * * Se permiten dos tipos de distribuciones fisicas segun el valor de "type": * * COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre * todos los nodos disponibles. * COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de * ocupar otro nodo. */ void node_dist(slurm_job_info_t job_record, int type, int total_procs, int **qty, int *used_nodes) { int i, asigCores; int tamBl, remainder; int *procs; procs = calloc(job_record.num_nodes, sizeof(int)); // Numero de procesos por nodo /* GET NEW DISTRIBUTION */ if(type == 1) { // DIST NODES *used_nodes = job_record.num_nodes; tamBl = total_procs / job_record.num_nodes; remainder = total_procs % job_record.num_nodes; for(i=0; i job_record.num_nodes) *used_nodes = job_record.num_nodes; //FIXME Si ocurre esto no es un error? } *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo for(i=0; i< *used_nodes; i++) { (*qty)[i] = procs[i]; } free(procs); } /* * Crea y devuelve una cadena para ser utilizada por la llave "hosts" * al crear procesos e indicar donde tienen que ser creados. */ void fill_str_hostfile(slurm_job_info_t job_record, int *qty, int used_nodes, char **hostfile_str) { int i=0, len=0; char *host; hostlist_t hostlist; hostlist = slurm_hostlist_create(job_record.nodes); while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) { len = write_str_node(hostfile_str, len, qty[i], host); i++; free(host); } slurm_hostlist_destroy(hostlist); } /* * AƱade en una cadena "qty" entradas de "node_name". * Realiza la reserva de memoria y la realoja si es necesario. */ int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name) { int err, len_node, len, i; char *ocurrence; len_node = strlen(node_name); len = qty * (len_node + 1); if(len_og == 0) { // Memoria no reservada *hostfile_str = (char *) malloc(len * sizeof(char) - (1 * sizeof(char))); } else { // Cadena ya tiene datos *hostfile_str = (char *) realloc(*hostfile_str, (len_og + len) * sizeof(char) - (1 * sizeof(char))); } if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria ocurrence = (char *) malloc((len_node+1) * sizeof(char)); if(ocurrence == NULL) return -1; // No ha sido posible alojar la memoria err = sprintf(ocurrence, ",%s", node_name); if(err < 0) return -2; // No ha sido posible escribir sobre la variable auxiliar i=0; if(len_og == 0) { // Si se inicializa, la primera es una copia i++; strcpy(*hostfile_str, node_name); } for(; i