gateway.c 20.3 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
5
#include <sys/stat.h>
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <arpa/inet.h> //inet_addr
#include <unistd.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/time.h>
#include <libpq-fe.h>
#include <math.h>
#include <signal.h>
#include <time.h>

#include <errno.h>

#include "gateway_protocol.h"
19
#include "gateway_telemetry_protocol.h"
20
21
#include "base64.h"
#include "task_queue.h"
22
#include "json.h"
23
#include "aes.h"
Vladislav Rykov's avatar
Vladislav Rykov committed
24
#include "gw_stat_linked_list.h"
25

26
#define NTHREAD_MAX			10
Vladislav Rykov's avatar
Vladislav Rykov committed
27

Vladislav Rykov's avatar
Vladislav Rykov committed
28
29
30
#define TIMEDATE_LENGTH			32
#define PEND_SEND_RETRIES_MAX		5
#define GATEWAY_PROTOCOL_APP_KEY_SIZE	8
Vladislav Rykov's avatar
Vladislav Rykov committed
31
#define DEVICE_DATA_MAX_LENGTH		256
32
#define GATEWAY_SECURE_KEY_SIZE		16
33
#define GATEWAY_ID_SIZE			6
34

Vladislav Rykov's avatar
Vladislav Rykov committed
35

36
typedef struct {
37
	uint8_t 	gw_id[GATEWAY_ID_SIZE];
38
	uint8_t 	gw_secure_key[GATEWAY_SECURE_KEY_SIZE];
39
40
	uint16_t 	gw_port;
	char 		db_type[20];
41
	uint32_t	telemetry_send_period;
42
43
	char 		platform_gw_manager_ip[20];
	uint16_t 	platform_gw_manager_port;
44
45
} gw_conf_t;

46
47
48
49
50
51
52
53
typedef struct {
	char 		addr[15+1];
	uint16_t 	port;
	char 		db_name[32];
	char 		user_name[32];
	char 		user_pass[32];
} db_conf_t;

Vladislav Rykov's avatar
Vladislav Rykov committed
54
55
typedef struct {
	uint32_t utc;
Vladislav Rykov's avatar
Vladislav Rykov committed
56
	char timedate[TIMEDATE_LENGTH];
Vladislav Rykov's avatar
Vladislav Rykov committed
57

Vladislav Rykov's avatar
Vladislav Rykov committed
58
59
	uint8_t data[DEVICE_DATA_MAX_LENGTH];
	uint8_t data_length;
Vladislav Rykov's avatar
Vladislav Rykov committed
60
61
} sensor_data_t;

Vladislav Rykov's avatar
Vladislav Rykov committed
62
typedef struct {
Vladislav Rykov's avatar
Vladislav Rykov committed
63
	gateway_protocol_conf_t gwp_conf;
Vladislav Rykov's avatar
Vladislav Rykov committed
64
65
66
67
68
69
70
	int server_desc;
	int client_desc;
	struct sockaddr_in server;
	struct sockaddr_in client;
	int sock_len;
} gcom_ch_t; // gateway communication channel

71
72
73
74
75
76
77
typedef struct {
	gcom_ch_t gch;	
	gateway_protocol_packet_type_t packet_type;
	uint8_t packet[DEVICE_DATA_MAX_LENGTH];
	uint8_t packet_length;
} gcom_ch_request_t;

Vladislav Rykov's avatar
Vladislav Rykov committed
78
79
80
81
typedef struct {
	uint64_t errors_count;
} gw_stat_t;

82
83
static const char * gw_conf_file = "gateway.conf";
static const char * db_conf_file = "db.conf";
84
static void process_gw_conf(json_value* value, gw_conf_t *gw_conf);
85
86
87
88
static void process_db_conf(json_value* value, db_conf_t *db_conf);
static json_value * read_json_conf(const char *file_path);
static int read_gw_conf(const char *gw_conf_file_path, gw_conf_t *gw_conf);
static int read_db_conf(const char *db_conf_file_path, db_conf_t *db_conf);
89

90
void process_packet(void *request);
Vladislav Rykov's avatar
Vladislav Rykov committed
91

92
93
94
uint8_t gateway_auth(const gw_conf_t *gw_conf, const char *db_conf_file_path);
void	*gateway_mngr(void *gw_conf);

Vladislav Rykov's avatar
Vladislav Rykov committed
95
96
97
int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size);
int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size);

Vladislav Rykov's avatar
Vladislav Rykov committed
98
99
100
101
void gateway_protocol_data_send_payload_decode(
	sensor_data_t *sensor_data, 
	const uint8_t *payload, 
	const uint8_t payload_length);
Vladislav Rykov's avatar
Vladislav Rykov committed
102

Vladislav Rykov's avatar
Vladislav Rykov committed
103
104
105
106
107
108
109
110
void gateway_protocol_mk_stat(
	gcom_ch_t *gch,
	gateway_protocol_stat_t stat,
	uint8_t *pck,
	uint8_t *pck_len);

void send_utc(gcom_ch_t *pch);

Vladislav Rykov's avatar
Vladislav Rykov committed
111
112
void gateway_protocol_checkup_callback(gateway_protocol_conf_t *gwp_conf);

Vladislav Rykov's avatar
Vladislav Rykov committed
113
114
void ctrc_handler (int sig);
static volatile uint8_t working = 1;
Vladislav Rykov's avatar
Vladislav Rykov committed
115

116
pthread_mutex_t mutex;
Vladislav Rykov's avatar
Vladislav Rykov committed
117
pthread_mutex_t gw_stat_mutex;
118
119
PGconn *conn;

Vladislav Rykov's avatar
Vladislav Rykov committed
120
121
gw_stat_t gw_stat;

Vladislav Rykov's avatar
Vladislav Rykov committed
122
int main (int argc, char **argv) {
123
	gw_conf_t *gw_conf = (gw_conf_t *)malloc(sizeof(gw_conf_t));
124
	db_conf_t *db_conf = (db_conf_t *)malloc(sizeof(db_conf_t));
125
	char *db_conninfo = (char *)malloc(512);
126
	gcom_ch_t gch;
127
	task_queue_t *tq;
128
129
	pthread_t gw_mngr;
	sigset_t sigset;
Vladislav Rykov's avatar
Vladislav Rykov committed
130
131
	
	gw_stat.errors_count = 0;
132
133

	sigemptyset(&sigset);
134
	/* block SIGALRM for gateway manager thread */
135
136
	sigaddset(&sigset, SIGALRM);
	sigprocmask(SIG_BLOCK, &sigset, NULL);
137

Vladislav Rykov's avatar
Vladislav Rykov committed
138
	signal(SIGINT, ctrc_handler);
139
	
140
	if (read_gw_conf(gw_conf_file, gw_conf)) {
141
142
143
		return EXIT_FAILURE;
	}

144
145
146
147
148
149
150
151
	gateway_telemetry_protocol_init(gw_conf->gw_id, gw_conf->gw_secure_key);

	if (!gateway_auth(gw_conf, db_conf_file)) {
		fprintf(stderr, "Gateway authentication failure.");
		return EXIT_FAILURE;
	}

	if (read_db_conf(db_conf_file, db_conf)) {
152
153
154
		return EXIT_FAILURE;
	}
	
155
156
	snprintf(db_conninfo, 512, 
			"hostaddr=%s port=%d dbname=%s user=%s password=%s", 
157
158
159
160
161
162
163
164
			db_conf->addr,
			db_conf->port,
			db_conf->db_name,
			db_conf->user_name,
			db_conf->user_pass);
	
	printf("db_conf : '%s'\n", db_conninfo);

165
	conn = PQconnectdb(db_conninfo);
166
167
168
169
170
171
172
173
174
175
	
	snprintf(db_conninfo, 512, 
			"id=%s secure_key=%s port=%d type=%s telemetry_send_period=%d\n", 
			gw_conf->gw_id,
			gw_conf->gw_secure_key,
			gw_conf->gw_port,
			gw_conf->db_type,
			gw_conf->telemetry_send_period);
	printf("gw_conf : '%s'\n", db_conninfo);
	
176
	free(db_conninfo);
Vladislav Rykov's avatar
Vladislav Rykov committed
177
	if (PQstatus(conn) == CONNECTION_BAD) {
Vladislav Rykov's avatar
Vladislav Rykov committed
178
		fprintf(stderr,"connection to db error: %s\n", PQerrorMessage(conn));
179
		free(gw_conf);
Vladislav Rykov's avatar
Vladislav Rykov committed
180
181
182
		return EXIT_FAILURE;
	}

Vladislav Rykov's avatar
Vladislav Rykov committed
183
	if ((gch.server_desc = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
Vladislav Rykov's avatar
Vladislav Rykov committed
184
		perror("socket creation error");
185
		free(gw_conf);
Vladislav Rykov's avatar
Vladislav Rykov committed
186
187
188
		return EXIT_FAILURE;
	}

Vladislav Rykov's avatar
Vladislav Rykov committed
189
	gch.server.sin_family 		= AF_INET;
190
	gch.server.sin_port		= htons(gw_conf->gw_port);
Vladislav Rykov's avatar
Vladislav Rykov committed
191
	gch.server.sin_addr.s_addr 	= INADDR_ANY;
Vladislav Rykov's avatar
Vladislav Rykov committed
192

Vladislav Rykov's avatar
Vladislav Rykov committed
193
	if (bind(gch.server_desc, (struct sockaddr *) &gch.server, sizeof(gch.server)) < 0) {
Vladislav Rykov's avatar
Vladislav Rykov committed
194
		perror("binding error");
195
		free(gw_conf);
Vladislav Rykov's avatar
Vladislav Rykov committed
196
197
198
		return EXIT_FAILURE;
	}

199
200
201
202
203
	if (pthread_create(&gw_mngr, NULL, gateway_mngr, gw_conf)) {
		fprintf(stderr, "Failed to create gateway manager thread.");
		return EXIT_FAILURE;
	}

204
205
	if(!(tq = task_queue_create(NTHREAD_MAX))) {
		perror("task_queue creation error");
206
		free(gw_conf);
207
208
		return EXIT_FAILURE;
	}
209
210

	pthread_mutex_init(&mutex, NULL);
Vladislav Rykov's avatar
Vladislav Rykov committed
211
	pthread_mutex_init(&gw_stat_mutex, NULL);
Vladislav Rykov's avatar
Vladislav Rykov committed
212
213

	gateway_protocol_set_checkup_callback(gateway_protocol_checkup_callback);
Vladislav Rykov's avatar
Vladislav Rykov committed
214
215
216

	gw_stat_linked_list_init();

Vladislav Rykov's avatar
Vladislav Rykov committed
217
	while (working) {
218
219
220
		gcom_ch_request_t *req = (gcom_ch_request_t *)malloc(sizeof(gcom_ch_request_t));
		memset(req, 0x0, sizeof(gcom_ch_request_t));
		memcpy(&req->gch, &gch, sizeof(gcom_ch_t));
221
	
Vladislav Rykov's avatar
Vladislav Rykov committed
222
		printf("listenninig...\n");
223
		req->gch.sock_len = sizeof(req->gch.client);
Vladislav Rykov's avatar
Vladislav Rykov committed
224
		
225
226
		if (recv_gcom_ch(&req->gch, req->packet, &req->packet_length, DEVICE_DATA_MAX_LENGTH)) {
			task_queue_enqueue(tq, process_packet, req);
Vladislav Rykov's avatar
Vladislav Rykov committed
227
		} else {
228
			fprintf(stderr, "packet receive error\n");
Vladislav Rykov's avatar
Vladislav Rykov committed
229
		}
Vladislav Rykov's avatar
Vladislav Rykov committed
230
231
	}

232
	free(gw_conf);
233
	free(db_conf);
234
	pthread_mutex_destroy(&mutex);
Vladislav Rykov's avatar
Vladislav Rykov committed
235
	close(gch.server_desc);
Vladislav Rykov's avatar
Vladislav Rykov committed
236
	PQfinish(conn);
Vladislav Rykov's avatar
Vladislav Rykov committed
237

Vladislav Rykov's avatar
Vladislav Rykov committed
238
239
240
	return EXIT_SUCCESS;
}

Vladislav Rykov's avatar
Vladislav Rykov committed
241
242
243
244
void ctrc_handler (int sig) {
	working = 0;
}

245
246
247
248
249
250
void process_packet(void *request) {
	gcom_ch_request_t *req = (gcom_ch_request_t *)request;
	uint8_t payload[DEVICE_DATA_MAX_LENGTH];
	uint8_t payload_length;	
	PGresult *res;

Vladislav Rykov's avatar
Vladislav Rykov committed
251
252
	if (gateway_protocol_packet_decode(
		&(req->gch.gwp_conf),
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
		&(req->packet_type),
		&payload_length, payload,
		req->packet_length, req->packet))
	{
		if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_TIME_REQ) {
			printf("TIME REQ received\n");
			send_utc(&(req->gch));
		} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_DATA_SEND) {
			sensor_data_t sensor_data;
			time_t t;
			// DEVICE_DATA_MAX_LENGTH*2 {hex} + 150
			char db_query[662];

			printf("DATA SEND received\n");
			gateway_protocol_data_send_payload_decode(&sensor_data, payload, payload_length);
			
			if (sensor_data.utc == 0) {
				struct timeval tv;
				gettimeofday(&tv, NULL);
				t = tv.tv_sec;
			} else {
				t = sensor_data.utc;
			}
			
			strftime(sensor_data.timedate, TIMEDATE_LENGTH, "%d/%m/%Y %H:%M:%S", localtime(&t));
			snprintf(db_query, sizeof(db_query), 
279
				"INSERT INTO dev_%s_%d VALUES (%lu, '%s', $1)", 
Vladislav Rykov's avatar
Vladislav Rykov committed
280
				(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id, t, sensor_data.timedate
281
282
283
284
285
			);
			
			const char *params[1];
			int paramslen[1];
			int paramsfor[1];
286
			params[0] = (char *) sensor_data.data;
287
288
289
			paramslen[0] = sensor_data.data_length;
			paramsfor[0] = 1; // format - binary

Vladislav Rykov's avatar
Vladislav Rykov committed
290
291
292
293
			pthread_mutex_lock(&gw_stat_mutex);
			gw_stat_linked_list_add((char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id);
			pthread_mutex_unlock(&gw_stat_mutex);

294
			pthread_mutex_lock(&mutex);
295
			res = PQexecParams(conn, db_query, 1, NULL, params, paramslen, paramsfor, 0);
296
297
			pthread_mutex_unlock(&mutex);

298
299
300
301
302
			if (PQresultStatus(res) == PGRES_COMMAND_OK) {
				PQclear(res);

				snprintf(db_query, sizeof(db_query),
					 "SELECT * FROM pend_msgs WHERE app_key='%s' and dev_id = %d and ack = False", 
Vladislav Rykov's avatar
Vladislav Rykov committed
303
					(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id
304
				);
305
306
				
				pthread_mutex_lock(&mutex);
307
				res = PQexec(conn, db_query);
308
309
				pthread_mutex_unlock(&mutex);
				
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
				if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
					gateway_protocol_mk_stat(
						&(req->gch), 
						GATEWAY_PROTOCOL_STAT_ACK_PEND,
						req->packet, &(req->packet_length));
					printf("ACK_PEND prepared\n");
				} else {
					gateway_protocol_mk_stat(
						&(req->gch), 
						GATEWAY_PROTOCOL_STAT_ACK,
						req->packet, &(req->packet_length));
					printf("ACK prepared\n");
				}
				
				send_gcom_ch(&(req->gch), req->packet, req->packet_length);
			} else {
				fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
Vladislav Rykov's avatar
Vladislav Rykov committed
327
				gw_stat.errors_count++;
328
329
330
331
332
333
			}
			PQclear(res);
		} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_PEND_REQ) {
			char db_query[200];
			snprintf(db_query, sizeof(db_query),
				 "SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False", 
Vladislav Rykov's avatar
Vladislav Rykov committed
334
				(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id
335
			);
336
			pthread_mutex_lock(&mutex);
337
			res = PQexec(conn, db_query);
338
			pthread_mutex_unlock(&mutex);
339
340
341
342
343
344
345
346
347
348
349
350
351
352
			
			if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
				char msg_cont[150];
				strncpy(msg_cont, PQgetvalue(res, 0, 2), sizeof(msg_cont));
				printf("PEND_SEND prepared : %s\n", msg_cont);
				PQclear(res);
			
				base64_decode(msg_cont, strlen(msg_cont)-1, payload);
				payload_length = BASE64_DECODE_OUT_SIZE(strlen(msg_cont));
				printf("prepared to send %d bytes : %s\n", payload_length, payload);
				
				// send the msg until ack is received
				uint8_t received_ack = 0;
				uint8_t pend_send_retries = PEND_SEND_RETRIES_MAX;
Vladislav Rykov's avatar
Vladislav Rykov committed
353
354
				gateway_protocol_packet_encode(
					&(req->gch.gwp_conf),
355
356
357
					GATEWAY_PROTOCOL_PACKET_TYPE_PEND_SEND,
					payload_length, payload,
					&(req->packet_length), req->packet);
358
359
				do {
					send_gcom_ch(&(req->gch), req->packet, req->packet_length);
360
361
362
363
364
365
366
367
368
369
370
					
					// 300 ms
					usleep(300000);

					pthread_mutex_lock(&mutex);
					res = PQexec(conn, db_query);
					pthread_mutex_unlock(&mutex);
					
					if (PQresultStatus(res) == PGRES_TUPLES_OK) {
						if (!PQntuples(res) || strcmp(PQgetvalue(res, 0, 2), msg_cont)) {
							received_ack = 1;
371
372
						}
					}
373
374
					PQclear(res);
					printf("received_ack = %d, retries = %d\n", received_ack, pend_send_retries);
375
376
377
378
379
380
381
382
383
				} while (!received_ack && pend_send_retries--);
			} else {
				gateway_protocol_mk_stat(
					&(req->gch),
					GATEWAY_PROTOCOL_STAT_NACK,
					req->packet, &(req->packet_length));
				
				send_gcom_ch(&(req->gch), req->packet, req->packet_length);
				
Vladislav Rykov's avatar
Vladislav Rykov committed
384
				printf("nothing for app %s dev %d\n", (char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id);
385
			}
386
387
388
389
390
391
		} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT) {
			// TODO change to ACK_PEND = 0x01
			if (payload[0] == 0x00) {
				char db_query[200];
				snprintf(db_query, sizeof(db_query),
					 "SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False", 
Vladislav Rykov's avatar
Vladislav Rykov committed
392
					(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id
393
394
395
396
397
398
399
				);
				pthread_mutex_lock(&mutex);
				res = PQexec(conn, db_query);
				pthread_mutex_unlock(&mutex);
				if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
					snprintf(db_query, sizeof(db_query),
						"UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'",
Vladislav Rykov's avatar
Vladislav Rykov committed
400
						(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id, PQgetvalue(res, 0, 2)
401
402
403
404
405
406
407
408
					);
					PQclear(res);
					pthread_mutex_lock(&mutex);
					res = PQexec(conn, db_query);
					pthread_mutex_unlock(&mutex);
					if (PQresultStatus(res) == PGRES_COMMAND_OK) {
						printf("pend_msgs updated\n");
					} else {
Vladislav Rykov's avatar
Vladislav Rykov committed
409
						gw_stat.errors_count++;
410
411
412
413
414
						fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
					}
				}
				PQclear(res);
			}
415
416
417
418
419
420
421
422
		} else {
			gateway_protocol_mk_stat(
				&(req->gch),
				GATEWAY_PROTOCOL_STAT_NACK,
				req->packet, &(req->packet_length));
			
			send_gcom_ch(&(req->gch), req->packet, req->packet_length);
				
423
			fprintf(stderr, "packet type error : %02X\n", req->packet_type);
Vladislav Rykov's avatar
Vladislav Rykov committed
424
			gw_stat.errors_count++;
425
426
427
		}
	} else {
		fprintf(stderr, "payload decode error\n");
Vladislav Rykov's avatar
Vladislav Rykov committed
428
		gw_stat.errors_count++;
429
	}
430
431
	
	free(req);
432
433
}

434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
uint8_t gateway_auth(const gw_conf_t *gw_conf, const char *db_conf_file_path) {
	int sockfd;
	struct sockaddr_in platformaddr;
	uint8_t buffer[1024];
	uint16_t buffer_length = 0;
	uint8_t payload_buffer[1024];
	uint16_t payload_buffer_length = 0;
	FILE *fp;

	if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
		return 0;
	}

	memset(&platformaddr, 0x0, sizeof(platformaddr));

	platformaddr.sin_family = AF_INET;
	platformaddr.sin_addr.s_addr = inet_addr(gw_conf->platform_gw_manager_ip);
	platformaddr.sin_port = htons(gw_conf->platform_gw_manager_port);
	
	if (connect(sockfd, (struct sockaddr *)&platformaddr, sizeof(platformaddr))) {
		return 0;
	}

	gateway_telemetry_protocol_encode_packet(buffer, 0, GATEWAY_TELEMETRY_PROTOCOL_AUTH, buffer, &buffer_length);
	write(sockfd, buffer, buffer_length);
	
	buffer_length = read(sockfd, buffer, sizeof(buffer));
	gateway_telemetry_protocol_packet_type_t pt;
	if (!gateway_telemetry_protocol_decode_packet(payload_buffer, &payload_buffer_length, &pt, buffer, buffer_length)) {
		return 0;
	}

	// write db_conf into file
	fp = fopen(db_conf_file_path, "w");
	fwrite(payload_buffer, payload_buffer_length, 1, fp);
	fclose(fp);

	return 1;
}

Vladislav Rykov's avatar
Vladislav Rykov committed
474
475
476
#define GW_MNGR_BUF_LEN		1024
#define GW_MNGR_QBUF_LEN	1136
void * gateway_mngr(void *gw_cnf) {
477
	struct itimerval tval;
Vladislav Rykov's avatar
Vladislav Rykov committed
478
	gw_conf_t *gw_conf = (gw_conf_t *) gw_cnf;
479
480
	sigset_t alarm_msk;
	int sig;
Vladislav Rykov's avatar
Vladislav Rykov committed
481
482
483
484
485
486
	struct timeval tv;
	char buf[GW_MNGR_BUF_LEN];
	char qbuf[GW_MNGR_QBUF_LEN];
	char b64_gwid[12];
	PGresult *res;
	
487
488
489
490

	sigemptyset(&alarm_msk);
	sigaddset(&alarm_msk, SIGALRM);

Vladislav Rykov's avatar
Vladislav Rykov committed
491
	tval.it_value.tv_sec = gw_conf->telemetry_send_period;
492
	tval.it_value.tv_usec = 0;
Vladislav Rykov's avatar
Vladislav Rykov committed
493
	tval.it_interval.tv_sec = gw_conf->telemetry_send_period;
494
495
496
497
498
499
500
	tval.it_interval.tv_usec = 0;

	if (setitimer(ITIMER_REAL, &tval, NULL)) {
		perror("Failed to set itimer");
		return NULL;
	}

Vladislav Rykov's avatar
Vladislav Rykov committed
501
502
	base64_encode(gw_conf->gw_id, GATEWAY_ID_SIZE, b64_gwid);
	
503
504
	while (1) {
		// get utc
Vladislav Rykov's avatar
Vladislav Rykov committed
505
506
		gettimeofday(&tv, NULL);

507
		// create applications and devices serving log
Vladislav Rykov's avatar
Vladislav Rykov committed
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
		pthread_mutex_lock(&gw_stat_mutex);
		gw_stat_linked_list_flush(buf, 0);
		pthread_mutex_unlock(&gw_stat_mutex);

		// flush utc and log into a query	
		snprintf(qbuf, GW_MNGR_QBUF_LEN, "UPDATE gateways SET num_errors = %lld, last_keep_alive = %d, last_report = '%s' WHERE id = '%s'",
				gw_stat.errors_count, (uint32_t) tv.tv_sec, buf, b64_gwid );

		pthread_mutex_lock(&mutex);
		res = PQexec(conn, qbuf);
		pthread_mutex_unlock(&mutex);
	
		if (PQresultStatus(res) != PGRES_COMMAND_OK) {
			fprintf(stderr, "gateway manager db update failed!\n");
		}

		buf[0] = '\0';
		qbuf[0] = '\0';
526
527
528
529
		sigwait(&alarm_msk, &sig);
	}
}

Vladislav Rykov's avatar
Vladislav Rykov committed
530
void gateway_protocol_data_send_payload_decode(
Vladislav Rykov's avatar
Vladislav Rykov committed
531
532
533
534
	sensor_data_t *sensor_data, 
	const uint8_t *payload, 
	const uint8_t payload_length) 
{
Vladislav Rykov's avatar
Vladislav Rykov committed
535
536
537
538
539
	uint8_t p_len = 0;

	memcpy(&sensor_data->utc, &payload[p_len], sizeof(sensor_data->utc));
	p_len += sizeof(sensor_data->utc);

Vladislav Rykov's avatar
Vladislav Rykov committed
540
541
	memcpy(sensor_data->data, &payload[p_len], payload_length - p_len);
	sensor_data->data_length = payload_length - p_len;
Vladislav Rykov's avatar
Vladislav Rykov committed
542
543
}

Vladislav Rykov's avatar
Vladislav Rykov committed
544
545
546
547
548
549
void gateway_protocol_mk_stat(
	gcom_ch_t *gch,
	gateway_protocol_stat_t stat,
	uint8_t *pck,
	uint8_t *pck_len)
{
Vladislav Rykov's avatar
Vladislav Rykov committed
550
551
	gateway_protocol_packet_encode(
		&(gch->gwp_conf),
Vladislav Rykov's avatar
Vladislav Rykov committed
552
		GATEWAY_PROTOCOL_PACKET_TYPE_STAT,
Vladislav Rykov's avatar
Vladislav Rykov committed
553
		1, (uint8_t *)&stat,
Vladislav Rykov's avatar
Vladislav Rykov committed
554
		pck_len, pck);
Vladislav Rykov's avatar
Vladislav Rykov committed
555
556
}

Vladislav Rykov's avatar
Vladislav Rykov committed
557
558


Vladislav Rykov's avatar
Vladislav Rykov committed
559
void send_utc(gcom_ch_t *gch) {
Vladislav Rykov's avatar
Vladislav Rykov committed
560
	uint8_t buf[50];
Vladislav Rykov's avatar
Vladislav Rykov committed
561
562
563
564
565
	uint8_t buf_len = 0;
	struct timeval tv;
				
	gettimeofday(&tv, NULL);
				
Vladislav Rykov's avatar
Vladislav Rykov committed
566
567
	gateway_protocol_packet_encode (
		&(gch->gwp_conf),
Vladislav Rykov's avatar
Vladislav Rykov committed
568
		GATEWAY_PROTOCOL_PACKET_TYPE_TIME_SEND,
Vladislav Rykov's avatar
Vladislav Rykov committed
569
		sizeof(uint32_t), (uint8_t *)&tv.tv_sec,
Vladislav Rykov's avatar
Vladislav Rykov committed
570
571
572
573
574
575
		&buf_len, buf
	);
					
	send_gcom_ch(gch, buf, buf_len);
}

Vladislav Rykov's avatar
Vladislav Rykov committed
576
577
578
void gateway_protocol_checkup_callback(gateway_protocol_conf_t *gwp_conf) {
	PGresult *res;
	char db_query[200];
579
	
Vladislav Rykov's avatar
Vladislav Rykov committed
580
581
582
583
584
585
586
587
588
589
590
591
	snprintf(db_query, sizeof(db_query), 
		"SELECT secure_key, secure FROM applications WHERE app_key = '%s'", (char *)gwp_conf->app_key
	);
	pthread_mutex_lock(&mutex);
	res = PQexec(conn, db_query);
	pthread_mutex_unlock(&mutex);

	if ((PQresultStatus(res) == PGRES_TUPLES_OK) && PQntuples(res)) {
		base64_decode(PQgetvalue(res, 0, 0), strlen(PQgetvalue(res, 0, 0))-1, gwp_conf->secure_key);
		gwp_conf->secure = PQgetvalue(res, 0, 1)[0] == 't';
	} else {
		perror("gateway_protocol_checkup_callback error");
Vladislav Rykov's avatar
Vladislav Rykov committed
592
		gw_stat.errors_count++;
Vladislav Rykov's avatar
Vladislav Rykov committed
593
594
595
596
	}
	PQclear(res);
}

Vladislav Rykov's avatar
Vladislav Rykov committed
597
int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) {
598
	int ret;
Vladislav Rykov's avatar
Vladislav Rykov committed
599
	
600
	if ((ret = sendto(gch->server_desc, (char *)pck, pck_size, 0, (struct sockaddr *)&gch->client, gch->sock_len)) < 0) {
Vladislav Rykov's avatar
Vladislav Rykov committed
601
		gw_stat.errors_count++;
Vladislav Rykov's avatar
Vladislav Rykov committed
602
603
		perror("sendto error");
	}
604

Vladislav Rykov's avatar
Vladislav Rykov committed
605
606
607
608
	return ret;
}

int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size) {
609
	int ret;
610
	if ((ret = recvfrom(gch->server_desc, (char *)pck, pck_size, MSG_WAITALL, (struct sockaddr *)&gch->client, &gch->sock_len)) < 0) {
Vladislav Rykov's avatar
Vladislav Rykov committed
611
		perror("socket receive error");
Vladislav Rykov's avatar
Vladislav Rykov committed
612
		gw_stat.errors_count++;
613
614
	} else {
		*pck_length = ret;
615
		
Vladislav Rykov's avatar
Vladislav Rykov committed
616
	}
617

618
	return ret;
Vladislav Rykov's avatar
Vladislav Rykov committed
619
}
620
621
622


static void process_gw_conf(json_value* value, gw_conf_t *gw_conf) {
623
624
625
626
627
628
629
630
631
632
633
634
635
	/* bad practice. must add checks for the EUI string */
	char buffer[128];
	strncpy(buffer, value->u.object.values[0].value->u.string.ptr, sizeof(buffer));
	sscanf(buffer, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx", &gw_conf->gw_id[0], &gw_conf->gw_id[1], &gw_conf->gw_id[2],
							&gw_conf->gw_id[3], &gw_conf->gw_id[4], &gw_conf->gw_id[5]
	);
	strncpy(buffer, value->u.object.values[1].value->u.string.ptr, sizeof(buffer));
	sscanf(buffer, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx", 
			&gw_conf->gw_secure_key[0], &gw_conf->gw_secure_key[1], &gw_conf->gw_secure_key[2], &gw_conf->gw_secure_key[3],
			&gw_conf->gw_secure_key[4], &gw_conf->gw_secure_key[5], &gw_conf->gw_secure_key[6], &gw_conf->gw_secure_key[7],
			&gw_conf->gw_secure_key[8], &gw_conf->gw_secure_key[9], &gw_conf->gw_secure_key[10], &gw_conf->gw_secure_key[11],
			&gw_conf->gw_secure_key[12], &gw_conf->gw_secure_key[13], &gw_conf->gw_secure_key[14], &gw_conf->gw_secure_key[15]
	);
636
637
638
	gw_conf->gw_port = value->u.object.values[2].value->u.integer;
	strncpy(gw_conf->db_type, value->u.object.values[3].value->u.string.ptr, sizeof(gw_conf->db_type));
	gw_conf->telemetry_send_period = value->u.object.values[4].value->u.integer;
639
640
	strncpy(gw_conf->platform_gw_manager_ip, value->u.object.values[5].value->u.string.ptr, sizeof(gw_conf->platform_gw_manager_ip));
	gw_conf->platform_gw_manager_port = value->u.object.values[6].value->u.integer;
641
}
642

643
644
static void process_db_conf(json_value* value, db_conf_t *db_conf) {
	strncpy(db_conf->addr, value->u.object.values[0].value->u.string.ptr, sizeof(db_conf->addr));
645
	db_conf->port = atoi(value->u.object.values[1].value->u.string.ptr);
646
647
648
	strncpy(db_conf->db_name, value->u.object.values[2].value->u.string.ptr, sizeof(db_conf->db_name));
	strncpy(db_conf->user_name, value->u.object.values[3].value->u.string.ptr, sizeof(db_conf->user_name));
	strncpy(db_conf->user_pass, value->u.object.values[4].value->u.string.ptr, sizeof(db_conf->user_pass));
649
650
}

651
static json_value * read_json_conf(const char *file_path) {
652
653
654
655
656
657
	struct stat filestatus;
	FILE *fp;
	char *file_contents;
	json_char *json;
	json_value *jvalue;

658
659
660
	if (stat(file_path, &filestatus)) {
		fprintf(stderr, "File %s not found.", file_path);
		return NULL;
661
662
663
	}
	file_contents = (char *)malloc(filestatus.st_size);
	if (!file_contents) {
664
665
		fprintf(stderr, "Memory error allocating %d bytes.", (int) filestatus.st_size);
		return NULL;
666
	}
667
	fp = fopen(file_path, "rt");
668
	if (!fp) {
669
		fprintf(stderr, "Unable to open %s.", file_path);
670
671
		fclose(fp);
		free(file_contents);
672
		return NULL;
673
674
	}
	if (fread(file_contents, filestatus.st_size, 1, fp) != 1) {
675
		fprintf(stderr, "Unable to read %s.", file_path);
676
677
		fclose(fp);
		free(file_contents);
678
		return NULL;
679
680
681
	}
	fclose(fp);
	
682
683
	file_contents[filestatus.st_size] = '\0';
	printf("file content : \n'%s'\n", file_contents);
684
685
686
687
688
689
	
	json = (json_char *)file_contents;
	jvalue = json_parse(json, filestatus.st_size);
	if (!jvalue) {
		perror("Unable to parse json.");
		free(file_contents);
690
		return NULL;
691
692
	}
	
693
694
695
696
697
698
699
700
701
702
703
704
	free(file_contents);
	
	return jvalue;
}

static int read_gw_conf(const char *gw_conf_file_path, gw_conf_t *gw_conf) {
	json_value *jvalue;
	
	jvalue = read_json_conf(gw_conf_file_path);
	if (!jvalue) {
		return 1;
	}
705
706
707
708
709
710
	process_gw_conf(jvalue, gw_conf);

	json_value_free(jvalue);
	
	return 0;
}
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725

static int read_db_conf(const char *db_conf_file_path, db_conf_t *db_conf) {
	json_value *jvalue;
	
	jvalue = read_json_conf(db_conf_file_path);
	if (!jvalue) {
		return 1;
	}
	process_db_conf(jvalue, db_conf);

	json_value_free(jvalue);
	
	return 0;
}