#include #include #include #include #include #include #include #include #include "ProcessDist.h" int commState = MAL_NOT_STARTED; struct Slurm_data *slurm_data; pthread_t spawn_thread; pthread_mutex_t spawn_mutex; MPI_Comm *returned_comm; double end_time; //FIXME REFACTOR struct Slurm_data { char *cmd; // Executable name char *nodelist; int num_cpus, num_nodes; int qty_procs, result_procs; MPI_Info info; int type_creation; int spawn_is_single; }; typedef struct { char *argv; int numP_childs, myId, root, already_created; int type_dist; int spawn_is_single; int spawn_method; MPI_Comm comm; }Creation_data; //--------------PRIVATE SPAWN TYPE DECLARATIONS---------------// void* thread_work(void* creation_data_arg); //--------------PRIVATE DECLARATIONS---------------// void processes_dist(char *argv, int numP_childs, int already_created, int type_dist); void generic_spawn(int myId, int root, int is_single, MPI_Comm *child, MPI_Comm comm); void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child); int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm); void node_dist(int type, int total_procs, int already_created, int **qty, int *used_nodes); void fill_str_hostfile(int *qty, int used_nodes, char **hostfile_str); int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name); //@deprecated functions int create_hostfile(char *jobId, char **file_name); int write_hostfile_node(int ptr, int qty, char *node_name); void fill_hostfile(slurm_job_info_t job_record, int ptr, int *qty, int used_nodes); //--------------PUBLIC FUNCTIONS---------------// /* * Se solicita la creacion de un nuevo grupo de "numP" procesos con una distribucion * fisica "type_dist". * * Se puede solicitar en primer plano, encargandose por tanto el proceso que llama a esta funcion, * o en segundo plano, donde un hilo se encarga de configurar esta creacion. * * Si se pide en primer plano, al terminarla es posible llamar a "check_slurm_comm()" para crear * los procesos. * * Si se pide en segundo plano, llamar a "check_slurm_comm()" comprobara si la configuracion para * crearlos esta lista, y si es asi, los crea. * * Devuelve el estado de el procedimiento. Si no devuelve "COMM_FINISHED", es necesario llamar a * "check_slurm_comm()". */ int init_slurm_comm(char *argv, int num_cpus, int num_nodes, char *nodelist, int myId, int numP, int numC, int root, int type_dist, int type_creation, int spawn_is_single, MPI_Comm comm, MPI_Comm *child) { int spawn_qty, already_created = 0; slurm_data = malloc(sizeof(struct Slurm_data)); spawn_thread = pthread_self(); slurm_data->type_creation = type_creation; slurm_data->spawn_is_single = spawn_is_single; slurm_data->result_procs = numC; slurm_data->num_cpus = num_cpus; slurm_data->num_nodes = num_nodes; slurm_data->nodelist = nodelist; spawn_qty = numC; if(type_creation == COMM_SPAWN_MERGE || type_creation == COMM_SPAWN_MERGE_PTHREAD) { if (numP < slurm_data->result_procs) { spawn_qty = slurm_data->result_procs - numP; already_created = numP; } } pthread_mutex_init(&spawn_mutex,NULL); if(type_creation == COMM_SPAWN_SERIAL || slurm_data->type_creation == COMM_SPAWN_MERGE) { if(myId == root) { processes_dist(argv, spawn_qty, already_created, type_dist); } else { slurm_data->cmd = malloc(1 * sizeof(char)); slurm_data->info = MPI_INFO_NULL; } // WORK generic_spawn(myId, root, slurm_data->spawn_is_single, child, comm); // END WORK if(myId == root && slurm_data->info != MPI_INFO_NULL) { MPI_Info_free(&(slurm_data->info)); } pthread_mutex_destroy(&spawn_mutex); free(slurm_data->cmd); free(slurm_data); } else if(type_creation == COMM_SPAWN_PTHREAD || slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) { commState = MAL_SPAWN_PENDING; if((spawn_is_single && myId == root) || !spawn_is_single || (slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && numP > slurm_data->result_procs)) { Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data)); creation_data->argv = argv; creation_data->numP_childs = spawn_qty; creation_data->already_created = already_created; creation_data->myId = myId; creation_data->root = root; creation_data->type_dist = type_dist; creation_data->comm = comm; if(pthread_create(&spawn_thread, NULL, thread_work, (void *)creation_data)) { printf("Error al crear el hilo de contacto con SLURM\n"); MPI_Abort(MPI_COMM_WORLD, -1); return -1; } } } return commState; } /* * Comprueba si una configuracion para crear un nuevo grupo de procesos esta lista, * y en caso de que lo este, se devuelve el communicador a estos nuevos procesos. */ int check_slurm_comm(int myId, int root, int numP, MPI_Comm *child, MPI_Comm comm, MPI_Comm comm_thread, double *real_time) { if(slurm_data->type_creation == COMM_SPAWN_PTHREAD || slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD) { if (slurm_data->type_creation == COMM_SPAWN_MERGE_PTHREAD && numP > slurm_data->result_procs) { //TODO REFACTOR printf("Error Check spawn: Configuracion invalida\nSe intenta usar el método Spawn junto a un Shrink merge\n"); MPI_Abort(MPI_COMM_WORLD, -1); return -10; } if(!slurm_data->spawn_is_single || commState == MAL_SPAWN_SINGLE_PENDING || commState == MAL_SPAWN_COMPLETED) { int state=-10; //printf("[%d][3] Test min\n", myId); fflush(stdout); //pthread_mutex_lock(&spawn_mutex); // TODO Descomentar MPI_Allreduce(&commState, &state, 1, MPI_INT, MPI_MIN, comm); //pthread_mutex_unlock(&spawn_mutex); if(state != MAL_SPAWN_COMPLETED) return state; // Continue only if asynchronous process creation has ended //printf("[%d][5] Test Passed-----------\n", myId); fflush(stdout); if(pthread_join(spawn_thread, NULL)) { printf("Error al esperar al hilo\n"); MPI_Abort(MPI_COMM_WORLD, -1); return -10; } *child = *returned_comm; } else if (slurm_data->spawn_is_single) { //pthread_mutex_lock(&spawn_mutex); // TODO Descomentar MPI_Bcast(&commState, 1, MPI_INT, root, comm); //pthread_mutex_unlock(&spawn_mutex); int threads_not_spawned = pthread_equal(pthread_self(), spawn_thread); // Non-root processes join root to finalize the spawn // They also must join if the application has ended its work if(commState == MAL_SPAWN_SINGLE_START) { commState = MAL_SPAWN_SINGLE_PENDING; if(myId != root && threads_not_spawned) { Creation_data *creation_data = (Creation_data *) malloc(sizeof(Creation_data)); creation_data->argv = NULL; creation_data->numP_childs = -1; creation_data->already_created = -1; creation_data->myId = myId; creation_data->root = root; creation_data->type_dist = -1; creation_data->comm = comm_thread; if(pthread_create(&spawn_thread, NULL, thread_work, (void *)creation_data)) { printf("Error al crear el hilo de apoyo\n"); MPI_Abort(MPI_COMM_WORLD, -1); return -1; } } } // Continue only if asynchronous process creation has ended or application does not have more work if(commState != MAL_SPAWN_COMPLETED) return commState; //printf("[%d][4] Test Passed-----------\n", myId); fflush(stdout); //Asegurar que los hilos han terminado if(pthread_join(spawn_thread, NULL)) { printf("Error al esperar al hilo\n"); MPI_Abort(MPI_COMM_WORLD, -1); return -10; } *child = *returned_comm; } else { printf("Error Check spawn: Configuracion invalida\n"); MPI_Abort(MPI_COMM_WORLD, -1); return -10; } } else { return commState; } //Free memory if(myId == root && slurm_data->info != MPI_INFO_NULL) { MPI_Info_free(&(slurm_data->info)); } free(slurm_data->cmd); free(slurm_data); pthread_mutex_destroy(&spawn_mutex); spawn_thread = pthread_self(); *real_time=end_time; return commState; } /* * Conectar grupo de hijos con grupo de padres * Devuelve un intercomunicador para hablar con los padres * * Solo se utiliza cuando la creación de los procesos ha sido * realizada por un solo proceso padre */ void malleability_establish_connection(int myId, int root, MPI_Comm *intercomm) { char *port_name; MPI_Comm newintercomm; if(myId == root) { port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char)); MPI_Open_port(MPI_INFO_NULL, port_name); MPI_Send(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *intercomm); } else { port_name = malloc(1); } MPI_Comm_accept(port_name, MPI_INFO_NULL, root, MPI_COMM_WORLD, &newintercomm); if(myId == root) { MPI_Close_port(port_name); } free(port_name); MPI_Comm_free(intercomm); *intercomm = newintercomm; } //--------------PRIVATE THREAD FUNCTIONS---------------// /* * Funcion llamada por un hilo para que este se encarge * de configurar la creacion de un nuevo grupo de procesos. * * Una vez esta lista la configuracion y es posible crear los procesos * se avisa al hilo maestro. */ void* thread_work(void* creation_data_arg) { Creation_data *creation_data = (Creation_data*) creation_data_arg; returned_comm = (MPI_Comm *) malloc(sizeof(MPI_Comm)); if(creation_data->myId == creation_data->root) { processes_dist(creation_data->argv, creation_data->numP_childs, creation_data->already_created, creation_data->type_dist); } else { slurm_data->cmd = malloc(1 * sizeof(char)); slurm_data->info = MPI_INFO_NULL; } generic_spawn(creation_data->myId, creation_data->root, slurm_data->spawn_is_single, returned_comm, creation_data->comm); free(creation_data); pthread_exit(NULL); } //--------------PRIVATE SPAWN CREATION FUNCTIONS---------------// /* * Funcion generica para la creacion de procesos. Obtiene la configuracion * y segun esta, elige como deberian crearse los procesos. * * Cuando termina, modifica la variable global para indicar este cambio */ void generic_spawn(int myId, int root, int spawn_is_single, MPI_Comm *child, MPI_Comm comm) { if(spawn_is_single) { single_spawn_connection(myId, root, comm, child); } else { int rootBcast = MPI_PROC_NULL; if(myId == root) rootBcast = MPI_ROOT; create_processes(myId, root, child, comm); MPI_Bcast(&spawn_is_single, 1, MPI_INT, rootBcast, *child); } pthread_mutex_lock(&spawn_mutex); commState = MAL_SPAWN_COMPLETED; end_time = MPI_Wtime(); pthread_mutex_unlock(&spawn_mutex); } /* * Crea un grupo de procesos segun la configuracion indicada por la funcion * "processes_dist()". */ int create_processes(int myId, int root, MPI_Comm *child, MPI_Comm comm) { int spawn_err = MPI_Comm_spawn(slurm_data->cmd, MPI_ARGV_NULL, slurm_data->qty_procs, slurm_data->info, root, comm, child, MPI_ERRCODES_IGNORE); if(spawn_err != MPI_SUCCESS) { printf("Error creating new set of %d procs.\n", slurm_data->qty_procs); } return spawn_err; } /* * Si la variable "type" es 1, la creación es con la participación de todo el grupo de padres * Si el valor es diferente, la creación es solo con la participación del proceso root */ void single_spawn_connection(int myId, int root, MPI_Comm comm, MPI_Comm *child){ char *port_name; int auxiliar_conf = COMM_SPAWN_SINGLE; MPI_Comm newintercomm; if (myId == root) { create_processes(myId, root, child, MPI_COMM_SELF); MPI_Bcast(&auxiliar_conf, 1, MPI_INT, MPI_ROOT, *child); port_name = (char *) malloc(MPI_MAX_PORT_NAME * sizeof(char)); MPI_Recv(port_name, MPI_MAX_PORT_NAME, MPI_CHAR, root, 130, *child, MPI_STATUS_IGNORE); commState = MAL_SPAWN_SINGLE_START; // Indicate other processes to join root to end spawn procedure } else { port_name = malloc(1); } MPI_Comm_connect(port_name, MPI_INFO_NULL, root, comm, &newintercomm); if(myId == root) MPI_Comm_free(child); free(port_name); *child = newintercomm; } //--------------PRIVATE MERGE TYPE FUNCTIONS---------------// /* * Se encarga de que el grupo de procesos resultante se * encuentren todos en un intra comunicador, uniendo a * padres e hijos en un solo comunicador. * * Se llama antes de la redistribución de datos. * * TODO REFACTOR */ void proc_adapt_expand(int *numP, int numC, MPI_Comm intercomm, MPI_Comm *comm, int is_children_group) { MPI_Comm new_comm = MPI_COMM_NULL; MPI_Intercomm_merge(intercomm, is_children_group, &new_comm); //El que pone 0 va primero //MPI_Comm_free(intercomm); TODO Nueva redistribucion para estos casos y liberar aqui // *intercomm = MPI_COMM_NULL; *numP = numC; if(*comm != MPI_COMM_WORLD && *comm != MPI_COMM_NULL) { MPI_Comm_free(comm); } *comm=new_comm; } /* * Se encarga de que el grupo de procesos resultante se * eliminen aquellos procesos que ya no son necesarios. * Los procesos eliminados se quedaran como zombies. * * Se llama una vez ha terminado la redistribución de datos. */ void proc_adapt_shrink(int numC, MPI_Comm *comm, int myId) { int color = MPI_UNDEFINED; MPI_Comm new_comm = MPI_COMM_NULL; if(myId < numC) { color = 1; } MPI_Comm_split(*comm, color, myId, &new_comm); if(*comm != MPI_COMM_WORLD && *comm != MPI_COMM_NULL) //MPI_Comm_free(comm); FIXME *comm=new_comm; } /* * Configura la creacion de un nuevo grupo de procesos, reservando la memoria * para una llamada a MPI_Comm_spawn, obteniendo una distribucion fisica * para los procesos y creando un fichero hostfile. */ void processes_dist(char *argv, int numP_childs, int already_created, int type) { //int jobId; //char *tmp; //job_info_msg_t *j_info; //slurm_job_info_t last_record; int used_nodes=0; int *procs_array; char *hostfile; // Get Slurm job info //tmp = getenv("SLURM_JOB_ID"); //jobId = atoi(tmp); //slurm_load_job(&j_info, jobId, 1); //last_record = j_info->job_array[j_info->record_count - 1]; //COPY PROGRAM NAME slurm_data->cmd = malloc(strlen(argv) * sizeof(char)); strcpy(slurm_data->cmd, argv); // GET NEW DISTRIBUTION node_dist(type, numP_childs, already_created, &procs_array, &used_nodes); slurm_data->qty_procs = numP_childs; /* // CREATE/UPDATE HOSTFILE int ptr; ptr = create_hostfile(tmp, &hostfile); MPI_Info_create(&(slurm_data->info)); MPI_Info_set(slurm_data->info, "hostfile", hostfile); free(hostfile); // SET NEW DISTRIBUTION fill_hostfile(last_record, ptr, procs_array, used_nodes); close(ptr); */ // CREATE AND SET STRING HOSTFILE fill_str_hostfile(procs_array, used_nodes, &hostfile); MPI_Info_create(&(slurm_data->info)); MPI_Info_set(slurm_data->info, "hosts", hostfile); free(hostfile); free(procs_array); // Free JOB INFO //slurm_free_job_info_msg(j_info); } /* * Obtiene la distribucion fisica del grupo de procesos a crear, devolviendo * cuantos nodos se van a utilizar y la cantidad de procesos que alojara cada * nodo. * * Se permiten dos tipos de distribuciones fisicas segun el valor de "type": * * COMM_PHY_NODES (1): Orientada a equilibrar el numero de procesos entre * todos los nodos disponibles. * COMM_PHY_CPU (2): Orientada a completar la capacidad de un nodo antes de * ocupar otro nodo. */ void node_dist(int type, int total_procs, int already_created, int **qty, int *used_nodes) { int i, asigCores; int tamBl, remainder; int *procs; procs = calloc(slurm_data->num_nodes, sizeof(int)); // Numero de procesos por nodo /* GET NEW DISTRIBUTION */ if(type == 1) { // DIST NODES *used_nodes = slurm_data->num_nodes; tamBl = total_procs / slurm_data->num_nodes; remainder = total_procs % slurm_data->num_nodes; for(i=0; inum_nodes; i++) { procs[i] = tamBl; } } else if (type == 2) { // DIST CPUs tamBl = slurm_data->num_cpus / slurm_data->num_nodes; asigCores = 0; i = *used_nodes = already_created / tamBl; remainder = already_created % tamBl; //First node could already have existing procs if (remainder) { procs[i] = asigCores = tamBl - remainder; i = (i+1) % slurm_data->num_nodes; (*used_nodes)++; } //Assing tamBl to each node while(asigCores+tamBl <= total_procs) { asigCores += tamBl; procs[i] += tamBl; i = (i+1) % slurm_data->num_nodes; (*used_nodes)++; } //Last node could have less procs than tamBl if(asigCores < total_procs) { procs[i] += total_procs - asigCores; (*used_nodes)++; } if(*used_nodes > slurm_data->num_nodes) *used_nodes = slurm_data->num_nodes; //FIXME Si ocurre esto no es un error? } *qty = calloc(*used_nodes, sizeof(int)); // Numero de procesos por nodo for(i=0; i< *used_nodes; i++) { (*qty)[i] = procs[i]; } free(procs); } /* * Crea y devuelve una cadena para ser utilizada por la llave "hosts" * al crear procesos e indicar donde tienen que ser creados. */ void fill_str_hostfile(int *qty, int used_nodes, char **hostfile_str) { int i=0, len=0; char *host; hostlist_t hostlist; hostlist = slurm_hostlist_create(slurm_data->nodelist); while ( (host = slurm_hostlist_shift(hostlist)) && i < used_nodes) { if(qty[i] != 0) { len = write_str_node(hostfile_str, len, qty[i], host); } i++; free(host); } slurm_hostlist_destroy(hostlist); } /* * Añade en una cadena "qty" entradas de "node_name". * Realiza la reserva de memoria y la realoja si es necesario. */ int write_str_node(char **hostfile_str, int len_og, int qty, char *node_name) { int err, len_node, len, i; char *ocurrence; len_node = strlen(node_name); len = qty * (len_node + 1); if(len_og == 0) { // Memoria no reservada *hostfile_str = (char *) malloc(len * sizeof(char) - (1 * sizeof(char))); } else { // Cadena ya tiene datos *hostfile_str = (char *) realloc(*hostfile_str, (len_og + len) * sizeof(char) - (1 * sizeof(char))); } if(hostfile_str == NULL) return -1; // No ha sido posible alojar la memoria ocurrence = (char *) malloc((len_node+1) * sizeof(char)); if(ocurrence == NULL) return -1; // No ha sido posible alojar la memoria err = sprintf(ocurrence, ",%s", node_name); if(err < 0) return -2; // No ha sido posible escribir sobre la variable auxiliar i=0; if(len_og == 0) { // Si se inicializa, la primera es una copia i++; strcpy(*hostfile_str, node_name); } for(; i