Commit 2f278169 authored by Vladislav Rykov's avatar Vladislav Rykov
Browse files

main routine implemented, todo: mutithreading safety

parent 184b5d8f
...@@ -93,17 +93,9 @@ pthread_mutex_t mutex; ...@@ -93,17 +93,9 @@ pthread_mutex_t mutex;
PGconn *conn; PGconn *conn;
int main (int argc, char **argv) { int main (int argc, char **argv) {
gcom_ch_request req; gcom_ch_t gch;
task_queue_t *tq; task_queue_t *tq;
uint8_t buf[1024];
uint8_t buf_len = 0;
uint8_t payload[256];
uint8_t payload_length = 0;
PGresult *res;
memset(&gch, 0x0, sizeof(gch));
signal(SIGINT, ctrc_handler); signal(SIGINT, ctrc_handler);
conn = PQconnectdb("user=vlad dbname=iotserver password=dev"); conn = PQconnectdb("user=vlad dbname=iotserver password=dev");
...@@ -126,166 +118,21 @@ int main (int argc, char **argv) { ...@@ -126,166 +118,21 @@ int main (int argc, char **argv) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
tq = task_queue_create(NTHREAD_MAX); if(!(tq = task_queue_create(NTHREAD_MAX))) {
gateway_protocol_packet_type_t packet_type; perror("task_queue creation error");
return EXIT_FAILURE;
}
while (working) { while (working) {
buf_len = 0; gcom_ch_request_t req;
memset(&req, 0x0, sizeof(gcom_ch_request_t));
memcpy(&(req.gch), &gch, sizeof(gcom_ch_t));
printf("listenninig...\n"); printf("listenninig...\n");
gch.sock_len = sizeof(gch.client); req.gch.sock_len = sizeof(req.gch.client);
if ( recv_gcom_ch(&gch, buf, &buf_len, 1024) if (recv_gcom_ch(&(req->gch), req->packet, &(req->packet_length), DEVICE_DATA_MAX_LENGTH)) {
& task_queue_enqueue(tq, process_packet, &req);
packet_decode(
gch.app_key,
&gch.dev_id,
&packet_type,
&payload_length, payload,
buf_len, buf) )
{
if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_TIME_REQ) {
printf("TIME REQ received\n");
send_utc(&gch);
} else if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_DATA_SEND) {
sensor_data_t sensor_data;
time_t t;
printf("DATA SEND received\n");
gateway_protocol_data_send_payload_decode(&sensor_data, payload, payload_length);
if (sensor_data.utc == 0) {
struct timeval tv;
gettimeofday(&tv, NULL);
t = tv.tv_sec;
} else {
t = sensor_data.utc;
}
strftime(sensor_data.timedate, TIMEDATE_LENGTH, "%d/%m/%Y %H:%M:%S", localtime(&t));
snprintf(buf, sizeof(buf),
"INSERT INTO dev_%s_%d VALUES (%d, '%s', $1)", (char *)gch.app_key, gch.dev_id, t, sensor_data.timedate);
const char *params[1];
int paramlen[1];
int paramfor[1];
params[0] = sensor_data.data;
paramlen[0] = sensor_data.data_length;
paramfor[0] = 1; // binary
res = PQexecParams(conn, buf, 1, NULL, params, paramlen, paramfor, 0);
if (PQresultStatus(res) == PGRES_COMMAND_OK) {
PQclear(res);
sprintf(buf, "SELECT * FROM pend_msgs WHERE app_key='%s' and dev_id = %d and ack = False", (char *)gch.app_key, gch.dev_id);
res = PQexec(conn, buf);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_ACK_PEND,
buf, &buf_len);
printf("ACK_PEND prepared\n");
} else {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_ACK,
buf, &buf_len);
printf("ACK prepared\n");
}
send_gcom_ch(&gch, buf, buf_len);
} else {
fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
}
PQclear(res);
} else if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_PEND_REQ) {
sprintf(buf, "SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False",
(char *)gch.app_key, gch.dev_id);
res = PQexec(conn, buf);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
char msg_cont[150];
strncpy(msg_cont, PQgetvalue(res, 0, 2), sizeof(msg_cont));
printf("PEND_SEND prepared : %s\n", msg_cont);
PQclear(res);
base64_decode(msg_cont, strlen(msg_cont)-1, payload);
payload_length = BASE64_DECODE_OUT_SIZE(strlen(msg_cont));
printf("prepared to send %d bytes : %s\n", payload_length, payload);
// send the msg until ack is received
uint8_t received_ack = 0;
uint8_t pend_send_retries = PEND_SEND_RETRIES_MAX;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 300000; // 300ms ack recv timeout
do {
packet_encode(
gch.app_key,
gch.dev_id,
GATEWAY_PROTOCOL_PACKET_TYPE_PEND_SEND,
payload_length, payload,
&buf_len, buf);
send_gcom_ch(&gch, buf, buf_len);
// set timeout
if (setsockopt(gch.server_desc, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
perror("setsockopt error");
}
recv_gcom_ch(&gch, buf, &buf_len, 1024);
if (buf_len > 9) { /* min packet size. timeout -> -1 */
uint8_t recv_app_key[GATEWAY_PROTOCOL_APP_KEY_SIZE +1];
uint8_t recv_dev_id = 0xFF;
if (packet_decode(
recv_app_key,
&recv_dev_id,
&packet_type,
&buf_len, buf,
buf_len, buf))
{
if (!memcmp(recv_app_key, gch.app_key, GATEWAY_PROTOCOL_APP_KEY_SIZE) &&
recv_dev_id == gch.dev_id &&
packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT &&
buf_len == 1 &&
buf[0] == GATEWAY_PROTOCOL_STAT_ACK)
{
sprintf(buf, "UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'", (char *)gch.app_key, gch.dev_id, msg_cont);
res = PQexec(conn, buf);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "error db deleting : %s", PQerrorMessage(conn));
}
PQclear(res);
received_ack = 1;
printf("ACK received\n");
} else {
printf("error: packet_type = %02X, not STAT\n");
}
}
}
} while (!received_ack && pend_send_retries--);
// cancel timeout
tv.tv_usec = 0;
if (setsockopt(gch.server_desc, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
perror("setsockopt error");
}
} else {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_NACK,
buf, &buf_len);
send_gcom_ch(&gch, buf, buf_len);
printf("nothing for app %s dev %d\n", (char *)gch.app_key, gch.dev_id);
}
} else {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_NACK,
buf, &buf_len);
send_gcom_ch(&gch, buf, buf_len);
fprintf(stderr, "packet type error : %02X\n", packet_type);
}
} else { } else {
fprintf(stderr, "payload decode error\n"); fprintf(stderr, "payload decode error\n");
...@@ -431,8 +278,8 @@ void process_packet(void *request) { ...@@ -431,8 +278,8 @@ void process_packet(void *request) {
if (!memcmp(recv_app_key, gch.app_key, GATEWAY_PROTOCOL_APP_KEY_SIZE) && if (!memcmp(recv_app_key, gch.app_key, GATEWAY_PROTOCOL_APP_KEY_SIZE) &&
recv_dev_id == gch.dev_id && recv_dev_id == gch.dev_id &&
packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT && packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT &&
buf_len == 1 && req->packet_length == 1 &&
buf[0] == GATEWAY_PROTOCOL_STAT_ACK) req->packet[0] == GATEWAY_PROTOCOL_STAT_ACK)
{ {
snprintf(db_query, sizeof(db_query), snprintf(db_query, sizeof(db_query),
"UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'", "UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'",
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment