#include #include #include #include #include #include #include "computing_func.h" #include "../malleability/CommDist.h" #include "../malleability/malleabilityManager.h" #include "../malleability/malleabilityStates.h" #define ROOT 0 int work(); void iterate(double *matrix, int n, int async_comm, int iter); void init_group_struct(char *argv[], int argc, int myId, int numP); void init_application(); void obtain_op_times(); void free_application_data(); void print_general_info(int myId, int grp, int numP); int print_local_results(); int print_final_results(); int create_out_file(char *nombre, int *ptr, int newstdout); typedef struct { int myId; int numP; int grp; int iter_start; int argc; int numS; // Cantidad de procesos hijos MPI_Comm children, parents; char *compute_comm_array; char **argv; char *sync_array, *async_array; } group_data; configuration *config_file; group_data *group; results_data *results; MPI_Comm comm; int run_id = 0; // Utilizado para diferenciar más fácilmente ejecuciones en el análisis int main(int argc, char *argv[]) { int numP, myId, res; int req; int im_child; //FIXME El codigo no es capaz de hacer mas de una redistribucion - Arreglar malleabilityTypes.c int num_cpus, num_nodes; //nodelist_len; //FIXME Eliminar cuando se utilice Slurm char *nodelist = NULL; num_cpus = 20; //FIXME NUMERO MAGICO if (argc >= 5) { nodelist = argv[3]; //nodelist_len = strlen(nodelist); num_nodes = atoi(argv[4]); num_cpus = num_nodes * num_cpus; } MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &req); MPI_Comm_size(MPI_COMM_WORLD, &numP); MPI_Comm_rank(MPI_COMM_WORLD, &myId); comm = MPI_COMM_WORLD; if(req != MPI_THREAD_MULTIPLE) { printf("No se ha obtenido la configuración de hilos necesaria\nSolicitada %d -- Devuelta %d\n", req, MPI_THREAD_MULTIPLE); } init_group_struct(argv, argc, myId, numP); im_child = init_malleability(myId, numP, ROOT, comm, argv[0], nodelist, num_cpus, num_nodes); if(!im_child) { //TODO REFACTOR Simplificar inicio init_application(); set_benchmark_grp(group->grp); set_benchmark_configuration(config_file); set_benchmark_results(results); MPI_Barrier(comm); results->exec_start = MPI_Wtime(); } else { //Init hijos get_malleability_user_comm(&comm); get_benchmark_configuration(&config_file); get_benchmark_results(&results); set_results_post_reconfig(results, group->grp, config_file->sdr, config_file->adr); //TODO Cambio al añadir nueva redistribucion if(config_file->comm_tam) { group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char)); } // TODO Refactor - Que sea una unica funcion // Obtiene las variables que van a utilizar los hijos void *value = NULL; malleability_get_data(&value, 0, 1, 1); group->grp = *((int *)value); free(value); malleability_get_data(&value, 1, 1, 1); run_id = *((int *)value); free(value); malleability_get_data(&value, 2, 1, 1); group->iter_start = *((int *)value); free(value); //FIXME Eliminar cuando se utilice SLURM /* malleability_get_data(&value, 4, 1, 1); num_nodes = *((int *)value); free(value); malleability_get_data(&value, 5, 1, 1); nodelist = (char *)value; //free(value); nodelist_len = strlen(nodelist); */ group->grp = group->grp + 1; } group->grp = group->grp - 1; // TODO REFACTOR??? do { group->grp = group->grp + 1; set_benchmark_grp(group->grp); get_malleability_user_comm(&comm); MPI_Comm_size(comm, &(group->numP)); MPI_Comm_rank(comm, &(group->myId)); if(config_file->resizes != group->grp + 1) { set_malleability_configuration(config_file->cst, config_file->css, config_file->phy_dist[group->grp+1], -1, config_file->aib, -1); set_children_number(config_file->procs[group->grp+1]); // TODO TO BE DEPRECATED if(group->grp == 0) { malleability_add_data(&(group->grp), 1, MAL_INT, 1, 1); malleability_add_data(&run_id, 1, MAL_INT, 1, 1); malleability_add_data(&(group->iter_start), 1, MAL_INT, 1, 1); //FIXME Eliminar cuando se utilice SLURM //malleability_add_data(&num_nodes, 1, MAL_INT, 1, 1); //malleability_add_data(&nodelist, nodelist_len, MAL_CHAR, 1, 1); } } res = work(); if(res == MAL_ZOMBIE) break; print_local_results(); reset_results_index(results); } while((config_file->resizes > group->grp + 1) && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD)); if(res==1) { // Se he llegado al final de la aplicacion MPI_Barrier(comm); // TODO Posible error al utilizar SHRINK results->exec_time = MPI_Wtime() - results->exec_start; } print_final_results(); // Pasado este punto ya no pueden escribir los procesos if(comm != MPI_COMM_WORLD && comm != MPI_COMM_NULL) { MPI_Comm_free(&comm); } if(group->myId == ROOT && (config_file->cst == COMM_SPAWN_MERGE || config_file->cst == COMM_SPAWN_MERGE_PTHREAD)) { MPI_Abort(MPI_COMM_WORLD, -100); } free_application_data(); MPI_Finalize(); return 0; } /* * Función de trabajo principal. * * Incializa los datos para realizar el computo y a continuacion * pasa a realizar "maxiter" iteraciones de computo. * * Terminadas las iteraciones realiza el redimensionado de procesos. * Si el redimensionado se realiza de forma asincrona se * siguen realizando iteraciones de computo hasta que termine la * comunicacion asincrona y realizar entonces la sincrona. * * Si el grupo de procesos es el ultimo que va a ejecutar, se devuelve * el valor 1 para indicar que no se va a seguir trabajando con nuevos grupos * de procesos. En caso contrario se devuelve 0. */ int work() { int iter, maxiter, state, res; double *matrix = NULL; maxiter = config_file->iters[group->grp]; //initMatrix(&matrix, config_file->matrix_tam); state = MAL_NOT_STARTED; res = 0; for(iter=group->iter_start; iter < maxiter; iter++) { iterate(matrix, config_file->matrix_tam, state, iter); } if(config_file->resizes != group->grp + 1) state = malleability_checkpoint(); iter = 0; while(state == MAL_DIST_PENDING || state == MAL_SPAWN_PENDING || state == MAL_SPAWN_SINGLE_PENDING) { if(iter < config_file->iters[group->grp+1]) { iterate(matrix, config_file->matrix_tam, state, iter); iter++; group->iter_start = iter; } state = malleability_checkpoint(); } if(config_file->resizes - 1 == group->grp) res=1; if(state == MAL_ZOMBIE) res=state; return res; } ///////////////////////////////////////// ///////////////////////////////////////// //COMPUTE FUNCTIONS ///////////////////////////////////////// ///////////////////////////////////////// /* * Simula la ejecucción de una iteración de computo en la aplicación * que dura al menos un tiempo de "time" segundos. */ void iterate(double *matrix, int n, int async_comm, int iter) { double start_time, actual_time; double time = config_file->general_time * config_file->factors[group->grp]; double Top = config_file->Top; int i, operations = 0; double aux = 0; start_time = MPI_Wtime(); operations = time / Top; //FIXME Calcular una sola vez for(i=0; i < operations; i++) { aux += computePiSerial(n); } if(config_file->comm_tam) { MPI_Bcast(group->compute_comm_array, config_file->comm_tam, MPI_CHAR, ROOT, comm); } actual_time = MPI_Wtime(); // Guardar tiempos // TODO Que diferencie entre ambas en el IO if(async_comm == MAL_DIST_PENDING || async_comm == MAL_SPAWN_PENDING || async_comm == MAL_SPAWN_SINGLE_PENDING) { // Se esta realizando una redistribucion de datos asincrona operations=0; } if(results->iter_index == results->iters_size) { // Aumentar tamaño de ambos vectores de resultados realloc_results_iters(results, results->iters_size + 100); } results->iters_time[results->iter_index] = actual_time - start_time; results->iters_type[results->iter_index] = operations; results->iter_index = results->iter_index + 1; } //======================================================|| //======================================================|| //=============INIT/FREE/PRINT FUNCTIONS================|| //======================================================|| //======================================================|| /* * Muestra datos generales sobre los procesos, su grupo, * en que nodo residen y la version de MPI utilizada. */ void print_general_info(int myId, int grp, int numP) { int len; char *name = malloc(MPI_MAX_PROCESSOR_NAME * sizeof(char)); char *version = malloc(MPI_MAX_LIBRARY_VERSION_STRING * sizeof(char)); MPI_Get_processor_name(name, &len); MPI_Get_library_version(version, &len); printf("P%d Nuevo GRUPO %d de %d procs en nodo %s con %s\n", myId, grp, numP, name, version); free(name); free(version); } /* * Pide al proceso raiz imprimir los datos sobre las iteraciones realizadas por el grupo de procesos. */ int print_local_results() { int ptr_local, ptr_out, err; char *file_name; compute_results_iter(results, group->myId, ROOT, comm); if(group->myId == ROOT) { ptr_out = dup(1); file_name = NULL; file_name = malloc(40 * sizeof(char)); if(file_name == NULL) return -1; // No ha sido posible alojar la memoria err = snprintf(file_name, 40, "R%d_G%dNP%dID%d.out", run_id, group->grp, group->numP, group->myId); if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero create_out_file(file_name, &ptr_local, 1); print_config_group(config_file, group->grp); print_iter_results(*results, config_file->iters[group->grp] - 1); free(file_name); fflush(stdout); close(1); dup(ptr_out); } return 0; } /* * Si es el ultimo grupo de procesos, pide al proceso raiz mostrar los datos obtenidos de tiempo de ejecucion, creacion de procesos * y las comunicaciones. */ int print_final_results() { int ptr_global, err; char *file_name; if(group->myId == ROOT) { if(group->grp == config_file->resizes -1) { file_name = NULL; file_name = malloc(20 * sizeof(char)); if(file_name == NULL) return -1; // No ha sido posible alojar la memoria err = snprintf(file_name, 20, "R%d_Global.out", run_id); if(err < 0) return -2; // No ha sido posible obtener el nombre de fichero create_out_file(file_name, &ptr_global, 1); print_config(config_file, group->grp); print_global_results(*results, config_file->resizes); fflush(stdout); free(file_name); } } return 0; } /* * Inicializa la estructura group */ void init_group_struct(char *argv[], int argc, int myId, int numP) { group = malloc(1 * sizeof(group_data)); group->myId = myId; group->numP = numP; group->grp = 0; group->iter_start = 0; group->argc = argc; group->argv = argv; } /* * Inicializa los datos para este grupo de procesos. * * En caso de ser el primer grupo de procesos, lee el fichero de configuracion * e inicializa los vectores de comunicacion. * * En caso de ser otro grupo de procesos entra a la funcion "Sons_init()" donde * se comunican con los padres para inicializar sus datos. */ void init_application() { if(group->argc < 2) { printf("Falta el fichero de configuracion. Uso:\n./programa config.ini id\nEl argumento numerico id es opcional\n"); MPI_Abort(MPI_COMM_WORLD, -1); } if(group->argc > 2) { run_id = atoi(group->argv[2]); } config_file = read_ini_file(group->argv[1]); results = malloc(sizeof(results_data)); init_results_data(results, config_file->resizes, config_file->iters[group->grp]); if(config_file->comm_tam) { group->compute_comm_array = malloc(config_file->comm_tam * sizeof(char)); } if(config_file->sdr) { malloc_comm_array(&(group->sync_array), config_file->sdr , group->myId, group->numP); } if(config_file->adr) { malloc_comm_array(&(group->async_array), config_file->adr , group->myId, group->numP); } obtain_op_times(); } /* * Obtiene cuanto tiempo es necesario para realizar una operacion de PI */ void obtain_op_times() { double result, start_time = MPI_Wtime(); int i, qty = 20000; result = 0; for(i=0; imatrix_tam); } //printf("Creado Top con valor %lf\n", result); //fflush(stdout); config_file->Top = (MPI_Wtime() - start_time) / qty; //Tiempo de una operacion MPI_Bcast(&(config_file->Top), 1, MPI_DOUBLE, ROOT, comm); } /* * Libera toda la memoria asociada con la aplicacion */ void free_application_data() { if(config_file->comm_tam) { free(group->compute_comm_array); } if(config_file->sdr) { free(group->sync_array); } if(config_file->adr) { free(group->async_array); } free_malleability(); free_config(config_file); if(group->grp == 0) { //FIXME Revisar porque cuando es diferente a 0 no funciona free_results_data(results); free(results); } free(group); } /* * Función para crear un fichero con el nombre pasado como argumento. * Si el nombre ya existe, se escribe la informacion a continuacion. * * El proceso que llama a la función pasa a tener como salida estandar * dicho fichero si el valor "newstdout" es verdadero. * */ int create_out_file(char *nombre, int *ptr, int newstdout) { int err; *ptr = open(nombre, O_WRONLY | O_CREAT | O_APPEND, 0644); if(*ptr < 0) return -1; // No ha sido posible crear el fichero if(newstdout) { err = close(1); if(err < 0) return -2; // No es posible modificar la salida estandar err = dup(*ptr); if(err < 0) return -3; // No es posible modificar la salida estandar } return 0; }