Commit 184b5d8f authored by Vladislav Rykov's avatar Vladislav Rykov
Browse files

implementing multithreading + optimizations

parent 641b96cf
CC = gcc CC = gcc
INCLUDES = -I/usr/include/postgresql INCLUDES = -I/usr/include/postgresql -I$(LDIR)/gateway_protocol -I$(LDIR)/base64 -I$(LDIR)/task_queue
LFLAGS = -Llib LFLAGS = -Llib
LDIR = lib LDIR = lib
LIBS = $(LDIR)/gateway_protocol/gateway_protocol
LIBD = -pthread -lpq LIBD = -pthread -lpq
SRC = src/gateway.c SRC = src/gateway.c
...@@ -17,16 +15,17 @@ MAIN = gateway ...@@ -17,16 +15,17 @@ MAIN = gateway
.PHONY: depend clean .PHONY: depend clean
$(LDIR)/gateway_protocol/gateway_protocol.o: $(LDIR)/gateway_protocol/gateway_protocol.o:
$(CC) -c $(LIBS)/gateway_protocol.c -o $(OBJ)/gateway_protocol.o -I$(LIBS) $(CC) -c $(LDIR)/gateway_protocol/gateway_protocol.c -o $(OBJ)/gateway_protocol.o -I$(LDIR)/gateway_protocol
$(CC) -c $(LDIR)/base64/base64.c -o $(OBJ)/base64.o -Ilib/base64 $(CC) -c $(LDIR)/base64/base64.c -o $(OBJ)/base64.o -I$(LDIR)/base64
$(CC) $(SRC) $(OBJ)/gateway_protocol.o $(OBJ)/base64.o -o $(MAIN) -I$(LIBS) -Ilib/base64 $(LIBD) $(INCLUDES) $(CC) -c $(LDIR)/task_queue/task_queue.c -o $(OBJ)/task_queue.o -I$(LDIR)/task_queue
$(CC) $(SRC) $(OBJ)/gateway_protocol.o $(OBJ)/base64.o $(OBJ)/task_queue.o -o $(MAIN) $(LIBD) $(INCLUDES)
all: $(MAIN) all: $(MAIN)
@echo Compiling gateway project @echo Compiling gateway project
$(MAIN): $(OBJ)/gateway_protocol.o $(OBJ)/base64.o $(MAIN): $(OBJ)/gateway_protocol.o $(OBJ)/base64.o $(OBJ)/task_queue.o
$(CC) $(CFLAGS) $(INCLUDES) -o $(MAIN) $(OBJS) $(LFLAGS) $(LIBS) $(CC) $(CFLAGS) $(INCLUDES) -o $(MAIN) $(OBJS) $(LFLAGS)
.c.o: .c.o:
$(CC) $(CFLAGS) $(INCLUDES) -c $< -o $@ $(CC) $(CFLAGS) $(INCLUDES) -c $< -o $@
......
No preview for this file type
#include <stdlib.h>
#include <pthread.h>
#include "task_queue.h"
struct queue_job {
task_func_t func;
void *arg;
struct queue_job *next;
};
typedef struct queue_job queue_job_t;
struct task_queue {
queue_job_t *next;
queue_job_t *last;
pthread_mutex_t mutex;
int active_tasks;
int size;
int suspended;
};
static int max_active_tasks;
static queue_job_t * queue_job_create(task_func_t func, void *arg);
static void queue_job_destroy(queue_job_t *qj);
static queue_job_t * queue_job_get_next(task_queue_t *tq);
static void * queue_job_exec(void *arg_tq);
task_queue_t * task_queue_create(const int max_threads) {
task_queue_t *tq = (task_queue_t *)malloc(sizeof(task_queue_t));
if (!tq) {
return NULL;
}
tq->next = NULL;
tq->last = NULL;
tq->active_tasks = 0;
tq->size = 0;
tq->suspended = 0;
pthread_mutex_init(&(tq->mutex), NULL);
max_active_tasks = max_threads > 0 ? max_threads : 1;
return tq;
}
void task_queue_destroy(task_queue_t *tq) {
queue_job_t *qj1, *qj2;
if (!tq) {
return;
}
pthread_mutex_lock(&(tq->mutex));
tq->suspended = 1;
qj1 = tq->next;
while (qj1) {
qj2 = qj1->next;
queue_job_destroy(qj1);
qj1 = qj2;
}
pthread_mutex_destroy(&(tq->mutex));
free(tq);
}
int task_queue_enqueue(task_queue_t *tq, task_func_t task, void *arg) {
queue_job_t *qj;
pthread_t thread;
int i, can_run, need_run, run, ret;
if (!tq || !task) {
return -1;
}
qj = queue_job_create(task, arg);
if (!qj) {
return -1;
}
pthread_mutex_lock(&(tq->mutex));
if (!tq->next) {
tq->next = qj;
tq->last = qj;
} else {
tq->last->next = qj; // assign next
tq->last = qj; // move pointer
}
tq->size++;
if (tq->active_tasks < max_active_tasks & !tq->suspended) {
can_run = max_active_tasks - tq->active_tasks;
need_run = tq->size - tq->active_tasks;
run = need_run < can_run ? need_run : can_run;
for (i = 0; i < run; i++) {
if (pthread_create(&thread, NULL, queue_job_exec, tq) < 0) {
perror("new thread creation error");
} else {
pthread_detach(thread);
tq->active_tasks++;
}
}
}
ret = tq->active_tasks;
pthread_mutex_unlock(&(tq->mutex));
return ret;
}
void task_queue_suspend(task_queue_t *tq) {
if (!tq) {
return;
}
pthread_mutex_lock(&(tq->mutex));
tq->suspended = 1;
pthread_mutex_unlock(&(tq->mutex));
}
void task_queue_unsuspend(task_queue_t *tq) {
pthread_t thread;
int i, can_run, need_run, run;
if (!tq) {
return;
}
pthread_mutex_lock(&(tq->mutex));
tq->suspended = 0;
if (tq->size && tq->active_tasks < max_active_tasks) {
can_run = max_active_tasks - tq->active_tasks;
need_run = tq->size - tq->active_tasks;
run = need_run < can_run ? need_run : can_run;
for (i = 0; i < run; i++) {
if (pthread_create(&thread, NULL, &queue_job_exec, tq) < 0) {
perror("new thread creation error");
} else {
pthread_detach(thread);
tq->active_tasks++;
}
}
}
pthread_mutex_unlock(&(tq->mutex));
}
int task_queue_get_size(task_queue_t *tq) {
int res;
pthread_mutex_lock(&(tq->mutex));
res = tq->size;
pthread_mutex_unlock(&(tq->mutex));
return res;
}
int task_queue_is_empty(task_queue_t *tq) {
return task_queue_get_size(tq) == 0;
}
static queue_job_t * queue_job_create(task_func_t func, void *arg) {
queue_job_t *qj;
if (!func) {
return NULL;
}
qj = (queue_job_t *)malloc(sizeof(queue_job_t));
qj->func = func;
qj->arg = arg;
qj->next = NULL;
return qj;
}
static void queue_job_destroy(queue_job_t *qj) {
if (!qj) {
return;
}
free(qj);
}
static queue_job_t * queue_job_get_next(task_queue_t *tq) {
queue_job_t *qj;
if (!tq) {
return NULL;
}
pthread_mutex_lock(&(tq->mutex));
if (tq->next) {
qj = tq->next;
tq->next = qj->next;
} else {
qj = NULL;
}
pthread_mutex_unlock(&(tq->mutex));
return qj;
}
static void * queue_job_exec(void *arg_tq) {
task_queue_t *tq = (task_queue_t *)arg_tq;
queue_job_t *qj;
pthread_t thread;
int i, can_run, need_run, run;
if (!tq) {
return;
}
qj = queue_job_get_next(tq);
if (qj && qj->func) {
qj->func(qj->arg);
}
queue_job_destroy(qj);
pthread_mutex_lock(&(tq->mutex));
tq->active_tasks--;
tq->size--;
if (tq->size && tq->active_tasks < max_active_tasks && !tq->suspended) {
can_run = max_active_tasks - tq->active_tasks;
need_run = tq->size - tq->active_tasks;
run = need_run < can_run ? need_run : can_run;
for (i = 0; i < run; i++) {
if (pthread_create(&thread, NULL, &queue_job_exec, tq) < 0) {
perror("new thread creation error");
} else {
pthread_detach(thread);
tq->active_tasks++;
}
}
}
pthread_mutex_unlock(&(tq->mutex));
}
#ifndef __TASK_QUEUE_H__
#define __TASK_QUEUE_H__
/* Task queue that create threads for completing tasks
* having an upper threads limit
*/
struct task_queue;
typedef struct task_queue task_queue_t;
typedef void (*task_func_t)(void *arg);
task_queue_t * task_queue_create(const int max_threads);
void task_queue_destroy(task_queue_t *tq);
int task_queue_enqueue(task_queue_t *tq, task_func_t task, void *arg);
void task_queue_suspend(task_queue_t *tq);
void task_queue_unsuspend(task_queue_t *tq);
int task_queue_get_size(task_queue_t *tq);
int task_queue_is_empty(task_queue_t *tq);
#endif // __TASK_QUEUE_H__
#include<stdio.h> #include <stdio.h>
#include<string.h> #include <string.h>
#include<stdlib.h> #include <stdlib.h>
#include<sys/socket.h> #include <sys/socket.h>
#include<arpa/inet.h> //inet_addr #include <arpa/inet.h> //inet_addr
#include<unistd.h> #include <unistd.h>
#include<stdint.h> #include <stdint.h>
#include<pthread.h> #include <pthread.h>
#include"gateway_protocol.h" #include <sys/time.h>
#include<sys/time.h> #include <libpq-fe.h>
#include<libpq-fe.h> #include <math.h>
#include"base64.h" #include <signal.h>
#include<math.h> #include <time.h>
#include<signal.h>
#include<time.h> #include <errno.h>
#include<errno.h> #include "gateway_protocol.h"
#include "base64.h"
#include "task_queue.h"
#define NTHREADS_MAX 10
#define TIMEDATE_LENGTH 32 #define TIMEDATE_LENGTH 32
#define PEND_SEND_RETRIES_MAX 5 #define PEND_SEND_RETRIES_MAX 5
...@@ -39,8 +43,16 @@ typedef struct { ...@@ -39,8 +43,16 @@ typedef struct {
int sock_len; int sock_len;
} gcom_ch_t; // gateway communication channel } gcom_ch_t; // gateway communication channel
typedef struct {
gcom_ch_t gch;
gateway_protocol_packet_type_t packet_type;
uint8_t packet[DEVICE_DATA_MAX_LENGTH];
uint8_t packet_length;
} gcom_ch_request_t;
/* for multithreading impl */ /* for multithreading impl */
void * connection_handler (void *args); void * connection_handler (void *args);
void process_packet(void *request);
int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size); int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size);
int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size); int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size);
...@@ -77,8 +89,13 @@ void send_utc(gcom_ch_t *pch); ...@@ -77,8 +89,13 @@ void send_utc(gcom_ch_t *pch);
void ctrc_handler (int sig); void ctrc_handler (int sig);
static volatile uint8_t working = 1; static volatile uint8_t working = 1;
pthread_mutex_t mutex;
PGconn *conn;
int main (int argc, char **argv) { int main (int argc, char **argv) {
gcom_ch_t gch; gcom_ch_request req;
task_queue_t *tq;
uint8_t buf[1024]; uint8_t buf[1024];
uint8_t buf_len = 0; uint8_t buf_len = 0;
uint8_t payload[256]; uint8_t payload[256];
...@@ -89,7 +106,7 @@ int main (int argc, char **argv) { ...@@ -89,7 +106,7 @@ int main (int argc, char **argv) {
signal(SIGINT, ctrc_handler); signal(SIGINT, ctrc_handler);
PGconn *conn = PQconnectdb("user=vlad dbname=iotserver password=dev"); conn = PQconnectdb("user=vlad dbname=iotserver password=dev");
if (PQstatus(conn) == CONNECTION_BAD) { if (PQstatus(conn) == CONNECTION_BAD) {
fprintf(stderr,"connection to db error: %s\n", PQerrorMessage(conn)); fprintf(stderr,"connection to db error: %s\n", PQerrorMessage(conn));
return EXIT_FAILURE; return EXIT_FAILURE;
...@@ -109,6 +126,7 @@ int main (int argc, char **argv) { ...@@ -109,6 +126,7 @@ int main (int argc, char **argv) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
tq = task_queue_create(NTHREAD_MAX);
gateway_protocol_packet_type_t packet_type; gateway_protocol_packet_type_t packet_type;
while (working) { while (working) {
...@@ -285,6 +303,184 @@ void ctrc_handler (int sig) { ...@@ -285,6 +303,184 @@ void ctrc_handler (int sig) {
working = 0; working = 0;
} }
void process_packet(void *request) {
gcom_ch_request_t *req = (gcom_ch_request_t *)request;
uint8_t payload[DEVICE_DATA_MAX_LENGTH];
uint8_t payload_length;
PGresult *res;
if (packet_decode(
req->gch.app_key,
&(req->gch.dev_id),
&(req->packet_type),
&payload_length, payload,
req->packet_length, req->packet))
{
if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_TIME_REQ) {
printf("TIME REQ received\n");
send_utc(&(req->gch));
} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_DATA_SEND) {
sensor_data_t sensor_data;
time_t t;
// DEVICE_DATA_MAX_LENGTH*2 {hex} + 150
char db_query[662];
printf("DATA SEND received\n");
gateway_protocol_data_send_payload_decode(&sensor_data, payload, payload_length);
if (sensor_data.utc == 0) {
struct timeval tv;
gettimeofday(&tv, NULL);
t = tv.tv_sec;
} else {
t = sensor_data.utc;
}
strftime(sensor_data.timedate, TIMEDATE_LENGTH, "%d/%m/%Y %H:%M:%S", localtime(&t));
snprintf(db_query, sizeof(db_query),
"INSERT INTO dev_%s_%d VALUES (%d, '%s', $1)",
(char *)req->gch.app_key, req->gch.dev_id, t, sensor_data.timedate
);
const char *params[1];
int paramslen[1];
int paramsfor[1];
params[0] = sensor_data.data;
paramslen[0] = sensor_data.data_length;
paramsfor[0] = 1; // format - binary
res = PQexecParams(conn, db_query, 1, NULL, params, paramslen, paramsfor, 0);
if (PQresultStatus(res) == PGRES_COMMAND_OK) {
PQclear(res);
snprintf(db_query, sizeof(db_query),
"SELECT * FROM pend_msgs WHERE app_key='%s' and dev_id = %d and ack = False",
(char *)req->gch.app_key, req->gch.dev_id
);
res = PQexec(conn, db_query);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
gateway_protocol_mk_stat(
&(req->gch),
GATEWAY_PROTOCOL_STAT_ACK_PEND,
req->packet, &(req->packet_length));
printf("ACK_PEND prepared\n");
} else {
gateway_protocol_mk_stat(
&(req->gch),
GATEWAY_PROTOCOL_STAT_ACK,
req->packet, &(req->packet_length));
printf("ACK prepared\n");
}
send_gcom_ch(&(req->gch), req->packet, req->packet_length);
} else {
fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
}
PQclear(res);
} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_PEND_REQ) {
char db_query[200];
snprintf(db_query, sizeof(db_query),
"SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False",
(char *)req->gch.app_key, req->gch.dev_id
);
res = PQexec(conn, db_query);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
char msg_cont[150];
strncpy(msg_cont, PQgetvalue(res, 0, 2), sizeof(msg_cont));
printf("PEND_SEND prepared : %s\n", msg_cont);
PQclear(res);
base64_decode(msg_cont, strlen(msg_cont)-1, payload);
payload_length = BASE64_DECODE_OUT_SIZE(strlen(msg_cont));
printf("prepared to send %d bytes : %s\n", payload_length, payload);
// send the msg until ack is received
uint8_t received_ack = 0;
uint8_t pend_send_retries = PEND_SEND_RETRIES_MAX;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 300000; // 300ms ack recv timeout
do {
packet_encode(
req->gch.app_key,
req->gch.dev_id,
GATEWAY_PROTOCOL_PACKET_TYPE_PEND_SEND,
payload_length, payload,
&(req->packet_length), req->packet);
send_gcom_ch(&(req->gch), req->packet, req->packet_length);
// set timeout
if (setsockopt(req->gch.server_desc, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
perror("setsockopt error");
}
if (recv_gcom_ch(&(req->gch),
req->packet,
&(req->packet_length),
DEVICE_DATA_MAX_LENGTH) > 9)
{ /* min packet size. timeout -> -1 */
uint8_t recv_app_key[GATEWAY_PROTOCOL_APP_KEY_SIZE +1];
uint8_t recv_dev_id = 0xFF;
if (packet_decode(
recv_app_key,
&recv_dev_id,
&(req->packet_type),
&(req->packet_length), req->packet,
req->packet_length, req->packet))
{
if (!memcmp(recv_app_key, gch.app_key, GATEWAY_PROTOCOL_APP_KEY_SIZE) &&
recv_dev_id == gch.dev_id &&
packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT &&
buf_len == 1 &&
buf[0] == GATEWAY_PROTOCOL_STAT_ACK)
{
snprintf(db_query, sizeof(db_query),
"UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'",
(char *)gch.app_key, gch.dev_id, msg_cont
);
res = PQexec(conn, buf);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "error db deleting : %s", PQerrorMessage(conn));
}
PQclear(res);
received_ack = 1;
printf("ACK received\n");
} else {
printf("error: packet_type = %02X, not STAT\n");
}
}
}
} while (!received_ack && pend_send_retries--);
// cancel timeout
tv.tv_usec = 0;
if (setsockopt(req->gch.server_desc, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
perror("setsockopt error");
}
} else {
gateway_protocol_mk_stat(
&(req->gch),
GATEWAY_PROTOCOL_STAT_NACK,
req->packet, &(req->packet_length));
send_gcom_ch(&(req->gch), req->packet, req->packet_length);
printf("nothing for app %s dev %d\n", (char *)req->gch.app_key, req->gch.dev_id);
}
} else {
gateway_protocol_mk_stat(
&(req->gch),
GATEWAY_PROTOCOL_STAT_NACK,
req->packet, &(req->packet_length));
send_gcom_ch(&(req->gch), req->packet, req->packet_length);
fprintf(stderr, "packet type error : %02X\n", packet_type);
}
} else {
fprintf(stderr, "payload decode error\n");
}
}
void gateway_protocol_data_send_payload_decode( void gateway_protocol_data_send_payload_decode(
sensor_data_t *sensor_data, sensor_data_t *sensor_data,
const uint8_t *payload, const uint8_t *payload,
...@@ -394,20 +590,22 @@ void send_utc(gcom_ch_t *gch) { ...@@ -394,20 +590,22 @@ void send_utc(gcom_ch_t *gch) {
} }
int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) { int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) {
int ret, i; int ret;
if (sendto(gch->server_desc, (char *)pck, pck_size, 0, (struct sockaddr *)&gch->client, gch->sock_len) < 0) { if ((ret = sendto(gch->server_desc, (char *)pck, pck_size, 0, (struct sockaddr *)&gch->client, gch->sock_len)) < 0) {
perror("sendto error"); perror("sendto error");
} }
return ret; return ret;
} }
int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size) { int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size) {
int i; int ret;
if ((ret = recvfrom(gch->server_desc, (char *)pck, pck_size, MSG_WAITALL, (struct sockaddr *)&gch->client, &gch->sock_len)) < 0) {
if ((*pck_length = recvfrom(gch->server_desc, (char *)pck, pck_size, MSG_WAITALL, (struct sockaddr *)&gch->client, &gch->sock_len)) < 0) {
perror("socket receive error"); perror("socket receive error");
} else {
*pck_length = ret;
} }
return ret;
} }
/* connection handler for multithreading version */ /* connection handler for multithreading version */
#ifdef MULTITHREADING_VER #ifdef MULTITHREADING_VER
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment