Commit f28738fd authored by Vladislav Rykov's avatar Vladislav Rykov
Browse files

gateway manager implemented

parent c88de8c1
No preview for this file type
#ifndef GW_STAT_LINKED_LIST_H
#define GW_STAT_LINKED_LIST_H
#include <stdint.h>
void gw_stat_linked_list_init(void);
uint8_t gw_stat_linked_list_add(const char *app_key, const uint8_t dev_id);
void gw_stat_linked_list_flush(char *store, uint8_t file);
void gw_stat_linked_list_destroy(void);
#endif // GW_STAT_LINKED_LIST_H
......@@ -21,6 +21,7 @@
#include "task_queue.h"
#include "json.h"
#include "aes.h"
#include "gw_stat_linked_list.h"
#define NTHREAD_MAX 10
......@@ -74,6 +75,10 @@ typedef struct {
uint8_t packet_length;
} gcom_ch_request_t;
typedef struct {
uint64_t errors_count;
} gw_stat_t;
static const char * gw_conf_file = "gateway.conf";
static const char * db_conf_file = "db.conf";
static void process_gw_conf(json_value* value, gw_conf_t *gw_conf);
......@@ -109,8 +114,11 @@ void ctrc_handler (int sig);
static volatile uint8_t working = 1;
pthread_mutex_t mutex;
pthread_mutex_t gw_stat_mutex;
PGconn *conn;
gw_stat_t gw_stat;
int main (int argc, char **argv) {
gw_conf_t *gw_conf = (gw_conf_t *)malloc(sizeof(gw_conf_t));
db_conf_t *db_conf = (db_conf_t *)malloc(sizeof(db_conf_t));
......@@ -119,6 +127,8 @@ int main (int argc, char **argv) {
task_queue_t *tq;
pthread_t gw_mngr;
sigset_t sigset;
gw_stat.errors_count = 0;
sigemptyset(&sigset);
/* block SIGALRM for gateway manager thread */
......@@ -198,9 +208,12 @@ int main (int argc, char **argv) {
}
pthread_mutex_init(&mutex, NULL);
pthread_mutex_init(&gw_stat_mutex, NULL);
gateway_protocol_set_checkup_callback(gateway_protocol_checkup_callback);
gw_stat_linked_list_init();
while (working) {
gcom_ch_request_t *req = (gcom_ch_request_t *)malloc(sizeof(gcom_ch_request_t));
memset(req, 0x0, sizeof(gcom_ch_request_t));
......@@ -274,6 +287,10 @@ void process_packet(void *request) {
paramslen[0] = sensor_data.data_length;
paramsfor[0] = 1; // format - binary
pthread_mutex_lock(&gw_stat_mutex);
gw_stat_linked_list_add((char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id);
pthread_mutex_unlock(&gw_stat_mutex);
pthread_mutex_lock(&mutex);
res = PQexecParams(conn, db_query, 1, NULL, params, paramslen, paramsfor, 0);
pthread_mutex_unlock(&mutex);
......@@ -307,6 +324,7 @@ void process_packet(void *request) {
send_gcom_ch(&(req->gch), req->packet, req->packet_length);
} else {
fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
gw_stat.errors_count++;
}
PQclear(res);
} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_PEND_REQ) {
......@@ -388,6 +406,7 @@ void process_packet(void *request) {
if (PQresultStatus(res) == PGRES_COMMAND_OK) {
printf("pend_msgs updated\n");
} else {
gw_stat.errors_count++;
fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
}
}
......@@ -402,9 +421,11 @@ void process_packet(void *request) {
send_gcom_ch(&(req->gch), req->packet, req->packet_length);
fprintf(stderr, "packet type error : %02X\n", req->packet_type);
gw_stat.errors_count++;
}
} else {
fprintf(stderr, "payload decode error\n");
gw_stat.errors_count++;
}
free(req);
......@@ -450,18 +471,26 @@ uint8_t gateway_auth(const gw_conf_t *gw_conf, const char *db_conf_file_path) {
return 1;
}
void * gateway_mngr(void *gw_conf) {
#define GW_MNGR_BUF_LEN 1024
#define GW_MNGR_QBUF_LEN 1136
void * gateway_mngr(void *gw_cnf) {
struct itimerval tval;
uint32_t period = ((gw_conf_t *)gw_conf)->telemetry_send_period;
gw_conf_t *gw_conf = (gw_conf_t *) gw_cnf;
sigset_t alarm_msk;
int sig;
struct timeval tv;
char buf[GW_MNGR_BUF_LEN];
char qbuf[GW_MNGR_QBUF_LEN];
char b64_gwid[12];
PGresult *res;
sigemptyset(&alarm_msk);
sigaddset(&alarm_msk, SIGALRM);
tval.it_value.tv_sec = period;
tval.it_value.tv_sec = gw_conf->telemetry_send_period;
tval.it_value.tv_usec = 0;
tval.it_interval.tv_sec = period;
tval.it_interval.tv_sec = gw_conf->telemetry_send_period;
tval.it_interval.tv_usec = 0;
if (setitimer(ITIMER_REAL, &tval, NULL)) {
......@@ -469,11 +498,31 @@ void * gateway_mngr(void *gw_conf) {
return NULL;
}
base64_encode(gw_conf->gw_id, GATEWAY_ID_SIZE, b64_gwid);
while (1) {
// get utc
gettimeofday(&tv, NULL);
// create applications and devices serving log
// flush utc and log into a query
printf("periodic action done!\n");
pthread_mutex_lock(&gw_stat_mutex);
gw_stat_linked_list_flush(buf, 0);
pthread_mutex_unlock(&gw_stat_mutex);
// flush utc and log into a query
snprintf(qbuf, GW_MNGR_QBUF_LEN, "UPDATE gateways SET num_errors = %lld, last_keep_alive = %d, last_report = '%s' WHERE id = '%s'",
gw_stat.errors_count, (uint32_t) tv.tv_sec, buf, b64_gwid );
pthread_mutex_lock(&mutex);
res = PQexec(conn, qbuf);
pthread_mutex_unlock(&mutex);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "gateway manager db update failed!\n");
}
buf[0] = '\0';
qbuf[0] = '\0';
sigwait(&alarm_msk, &sig);
}
}
......@@ -540,6 +589,7 @@ void gateway_protocol_checkup_callback(gateway_protocol_conf_t *gwp_conf) {
gwp_conf->secure = PQgetvalue(res, 0, 1)[0] == 't';
} else {
perror("gateway_protocol_checkup_callback error");
gw_stat.errors_count++;
}
PQclear(res);
}
......@@ -548,6 +598,7 @@ int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) {
int ret;
if ((ret = sendto(gch->server_desc, (char *)pck, pck_size, 0, (struct sockaddr *)&gch->client, gch->sock_len)) < 0) {
gw_stat.errors_count++;
perror("sendto error");
}
......@@ -558,6 +609,7 @@ int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck
int ret;
if ((ret = recvfrom(gch->server_desc, (char *)pck, pck_size, MSG_WAITALL, (struct sockaddr *)&gch->client, &gch->sock_len)) < 0) {
perror("socket receive error");
gw_stat.errors_count++;
} else {
*pck_length = ret;
......
#include "gw_stat_linked_list.h"
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#define APP_KEY_SIZE 8
typedef struct _gw_stat_linked_list gw_stat_linked_list_t;
typedef struct _gw_stat_linked_list {
char app_key[APP_KEY_SIZE+1];
uint8_t dev_id;
uint64_t num_msgs;
struct _gw_stat_linked_list *next;
} _gw_stat_linked_list;
static gw_stat_linked_list_t *root;
static uint64_t size;
void gw_stat_linked_list_init(void) {
root = NULL;
size = 0;
}
uint8_t gw_stat_linked_list_add(const char *app_key, const uint8_t dev_id) {
gw_stat_linked_list_t *ptr = root;
uint8_t found = 0, ret = 1;
while (ptr && !found) {
if (!memcmp(ptr->app_key, app_key, APP_KEY_SIZE) && ptr->dev_id == dev_id) {
found = 1;
} else {
ptr = ptr->next;
}
}
if (ptr) {
ptr->num_msgs++;
} else {
ptr = (gw_stat_linked_list_t *)malloc(sizeof(gw_stat_linked_list_t));
if (ptr) {
memcpy(ptr->app_key, app_key, APP_KEY_SIZE);
ptr->app_key[APP_KEY_SIZE] = '\0';
ptr->dev_id = dev_id;
ptr->num_msgs = 1;
ptr->next = root;
root = ptr;
size++;
} else {
ret = 0;
}
}
return ret;
}
void gw_stat_linked_list_flush(char *store, uint8_t file) {
gw_stat_linked_list_t *ptr = root, *tmp;
char buf[64];
FILE *fp;
if (ptr && size) {
if (file) { // file output
fp = fopen(store, "w");
while (ptr) {
snprintf(buf, 64, "%s#%d#%lld|", ptr->app_key, ptr->dev_id, ptr->num_msgs);
fwrite(buf, strlen(buf), 1, fp);
tmp = ptr;
ptr = ptr->next;
free(tmp);
}
fclose(fp);
} else { // str output
store[0] = '\0';
while (ptr) {
snprintf(buf, 64, "%s#%d#%lld|", ptr->app_key, ptr->dev_id, ptr->num_msgs);
strcat(store, buf);
tmp = ptr;
ptr = ptr->next;
free(tmp);
}
}
size = 0;
root = NULL;
}
}
void gw_stat_linked_list_destroy(void) {
gw_stat_linked_list_t *ptr = root, *tmp;
while (ptr && size) {
tmp = ptr;
ptr = ptr->next;
free(tmp);
size--;
}
root = NULL;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment