Commit 9c51a1ed authored by Vladislav Rykov's avatar Vladislav Rykov
Browse files

multithreaded version implemented, not tested

parent 2f278169
No preview for this file type
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "base64.h" #include "base64.h"
#include "task_queue.h" #include "task_queue.h"
#define NTHREADS_MAX 10 #define NTHREAD_MAX 10
#define TIMEDATE_LENGTH 32 #define TIMEDATE_LENGTH 32
#define PEND_SEND_RETRIES_MAX 5 #define PEND_SEND_RETRIES_MAX 5
...@@ -122,6 +122,8 @@ int main (int argc, char **argv) { ...@@ -122,6 +122,8 @@ int main (int argc, char **argv) {
perror("task_queue creation error"); perror("task_queue creation error");
return EXIT_FAILURE; return EXIT_FAILURE;
} }
pthread_mutex_init(&mutex, NULL);
while (working) { while (working) {
gcom_ch_request_t req; gcom_ch_request_t req;
...@@ -131,15 +133,15 @@ int main (int argc, char **argv) { ...@@ -131,15 +133,15 @@ int main (int argc, char **argv) {
printf("listenninig...\n"); printf("listenninig...\n");
req.gch.sock_len = sizeof(req.gch.client); req.gch.sock_len = sizeof(req.gch.client);
if (recv_gcom_ch(&(req->gch), req->packet, &(req->packet_length), DEVICE_DATA_MAX_LENGTH)) { if (recv_gcom_ch(&req.gch, req.packet, &req.packet_length, DEVICE_DATA_MAX_LENGTH)) {
task_queue_enqueue(tq, process_packet, &req); task_queue_enqueue(tq, process_packet, &req);
} else { } else {
fprintf(stderr, "payload decode error\n"); fprintf(stderr, "payload decode error\n");
} }
} }
pthread_mutex_destroy(&mutex);
close(gch.server_desc); close(gch.server_desc);
PQfinish(conn); PQfinish(conn);
...@@ -196,7 +198,10 @@ void process_packet(void *request) { ...@@ -196,7 +198,10 @@ void process_packet(void *request) {
paramslen[0] = sensor_data.data_length; paramslen[0] = sensor_data.data_length;
paramsfor[0] = 1; // format - binary paramsfor[0] = 1; // format - binary
pthread_mutex_lock(&mutex);
res = PQexecParams(conn, db_query, 1, NULL, params, paramslen, paramsfor, 0); res = PQexecParams(conn, db_query, 1, NULL, params, paramslen, paramsfor, 0);
pthread_mutex_unlock(&mutex);
if (PQresultStatus(res) == PGRES_COMMAND_OK) { if (PQresultStatus(res) == PGRES_COMMAND_OK) {
PQclear(res); PQclear(res);
...@@ -204,7 +209,11 @@ void process_packet(void *request) { ...@@ -204,7 +209,11 @@ void process_packet(void *request) {
"SELECT * FROM pend_msgs WHERE app_key='%s' and dev_id = %d and ack = False", "SELECT * FROM pend_msgs WHERE app_key='%s' and dev_id = %d and ack = False",
(char *)req->gch.app_key, req->gch.dev_id (char *)req->gch.app_key, req->gch.dev_id
); );
pthread_mutex_lock(&mutex);
res = PQexec(conn, db_query); res = PQexec(conn, db_query);
pthread_mutex_unlock(&mutex);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) { if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
gateway_protocol_mk_stat( gateway_protocol_mk_stat(
&(req->gch), &(req->gch),
...@@ -230,7 +239,9 @@ void process_packet(void *request) { ...@@ -230,7 +239,9 @@ void process_packet(void *request) {
"SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False", "SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False",
(char *)req->gch.app_key, req->gch.dev_id (char *)req->gch.app_key, req->gch.dev_id
); );
pthread_mutex_lock(&mutex);
res = PQexec(conn, db_query); res = PQexec(conn, db_query);
pthread_mutex_unlock(&mutex);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) { if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
char msg_cont[150]; char msg_cont[150];
...@@ -275,17 +286,19 @@ void process_packet(void *request) { ...@@ -275,17 +286,19 @@ void process_packet(void *request) {
&(req->packet_length), req->packet, &(req->packet_length), req->packet,
req->packet_length, req->packet)) req->packet_length, req->packet))
{ {
if (!memcmp(recv_app_key, gch.app_key, GATEWAY_PROTOCOL_APP_KEY_SIZE) && if (!memcmp(recv_app_key, req->gch.app_key, GATEWAY_PROTOCOL_APP_KEY_SIZE) &&
recv_dev_id == gch.dev_id && recv_dev_id == req->gch.dev_id &&
packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT && req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT &&
req->packet_length == 1 && req->packet_length == 1 &&
req->packet[0] == GATEWAY_PROTOCOL_STAT_ACK) req->packet[0] == GATEWAY_PROTOCOL_STAT_ACK)
{ {
snprintf(db_query, sizeof(db_query), snprintf(db_query, sizeof(db_query),
"UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'", "UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'",
(char *)gch.app_key, gch.dev_id, msg_cont (char *)req->gch.app_key, req->gch.dev_id, msg_cont
); );
res = PQexec(conn, buf); pthread_mutex_lock(&mutex);
res = PQexec(conn, db_query);
pthread_mutex_unlock(&mutex);
if (PQresultStatus(res) != PGRES_COMMAND_OK) { if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "error db deleting : %s", PQerrorMessage(conn)); fprintf(stderr, "error db deleting : %s", PQerrorMessage(conn));
} }
...@@ -321,7 +334,7 @@ void process_packet(void *request) { ...@@ -321,7 +334,7 @@ void process_packet(void *request) {
send_gcom_ch(&(req->gch), req->packet, req->packet_length); send_gcom_ch(&(req->gch), req->packet, req->packet_length);
fprintf(stderr, "packet type error : %02X\n", packet_type); fprintf(stderr, "packet type error : %02X\n", req->packet_type);
} }
} else { } else {
fprintf(stderr, "payload decode error\n"); fprintf(stderr, "payload decode error\n");
...@@ -442,6 +455,7 @@ int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) { ...@@ -442,6 +455,7 @@ int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) {
if ((ret = sendto(gch->server_desc, (char *)pck, pck_size, 0, (struct sockaddr *)&gch->client, gch->sock_len)) < 0) { if ((ret = sendto(gch->server_desc, (char *)pck, pck_size, 0, (struct sockaddr *)&gch->client, gch->sock_len)) < 0) {
perror("sendto error"); perror("sendto error");
} }
return ret; return ret;
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment