gateway.c 21 KB
Newer Older
1
2
3
4
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/socket.h>
5
#include <sys/stat.h>
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <arpa/inet.h> //inet_addr
#include <unistd.h>
#include <stdint.h>
#include <pthread.h>
#include <sys/time.h>
#include <libpq-fe.h>
#include <math.h>
#include <signal.h>
#include <time.h>

#include <errno.h>

#include "gateway_protocol.h"
19
#include "gateway_telemetry_protocol.h"
20
21
#include "base64.h"
#include "task_queue.h"
22
#include "json.h"
23
#include "aes.h"
Vladislav Rykov's avatar
Vladislav Rykov committed
24
#include "gw_stat_linked_list.h"
25

Vladislav Rykov's avatar
Vladislav Rykov committed
26

Vladislav Rykov's avatar
Vladislav Rykov committed
27
28
29
#define TIMEDATE_LENGTH			32
#define PEND_SEND_RETRIES_MAX		5
#define GATEWAY_PROTOCOL_APP_KEY_SIZE	8
Vladislav Rykov's avatar
Vladislav Rykov committed
30
#define DEVICE_DATA_MAX_LENGTH		256
31
#define GATEWAY_SECURE_KEY_SIZE		16
32
#define GATEWAY_ID_SIZE			6
33

Vladislav Rykov's avatar
Vladislav Rykov committed
34

35
36
37
38
39
40
41
42
43
typedef struct {
	char 		db_addr[15+1];
	uint16_t 	db_port;
	char 		db_name[32];
	char 		db_user_name[32];
	char 		db_user_pass[32];
	uint32_t	telemetry_send_period;
} dynamic_conf_t;

44
typedef struct {
45
	uint8_t 	gw_id[GATEWAY_ID_SIZE];
46
	uint8_t 	gw_secure_key[GATEWAY_SECURE_KEY_SIZE];
47
48
	uint16_t 	gw_port;
	char 		db_type[20];
49
50
	char 		platform_gw_manager_ip[20];
	uint16_t 	platform_gw_manager_port;
51
52
	uint8_t 	thread_pool_size;
} static_conf_t;
53

54
typedef struct {
55
56
57
	static_conf_t  static_conf;
	dynamic_conf_t dynamic_conf;
} gw_conf_t;
58

Vladislav Rykov's avatar
Vladislav Rykov committed
59
60
typedef struct {
	uint32_t utc;
Vladislav Rykov's avatar
Vladislav Rykov committed
61
	char timedate[TIMEDATE_LENGTH];
Vladislav Rykov's avatar
Vladislav Rykov committed
62

Vladislav Rykov's avatar
Vladislav Rykov committed
63
64
	uint8_t data[DEVICE_DATA_MAX_LENGTH];
	uint8_t data_length;
Vladislav Rykov's avatar
Vladislav Rykov committed
65
66
} sensor_data_t;

Vladislav Rykov's avatar
Vladislav Rykov committed
67
typedef struct {
Vladislav Rykov's avatar
Vladislav Rykov committed
68
	gateway_protocol_conf_t gwp_conf;
Vladislav Rykov's avatar
Vladislav Rykov committed
69
70
71
72
73
74
75
	int server_desc;
	int client_desc;
	struct sockaddr_in server;
	struct sockaddr_in client;
	int sock_len;
} gcom_ch_t; // gateway communication channel

76
77
78
79
80
81
82
typedef struct {
	gcom_ch_t gch;	
	gateway_protocol_packet_type_t packet_type;
	uint8_t packet[DEVICE_DATA_MAX_LENGTH];
	uint8_t packet_length;
} gcom_ch_request_t;

Vladislav Rykov's avatar
Vladislav Rykov committed
83
84
85
86
typedef struct {
	uint64_t errors_count;
} gw_stat_t;

87
88
89
90
91
92
static const char * static_conf_file  = "conf/static.conf";
static const char * dynamic_conf_file = "conf/dynamic.conf";
static int read_static_conf (const char *static_conf_file_path,  gw_conf_t *gw_conf);
static int read_dynamic_conf(const char *dynamic_conf_file_path, gw_conf_t *gw_conf);
static void process_static_conf (json_value* value, static_conf_t  *static_conf);
static void process_dynamic_conf(json_value* value, dynamic_conf_t *dynamic_conf);
93
static json_value * read_json_conf(const char *file_path);
94

95
void process_packet(void *request);
Vladislav Rykov's avatar
Vladislav Rykov committed
96

97
uint8_t gateway_auth(const gw_conf_t *gw_conf, const char *dynamic_conf_file_path);
98
99
void	*gateway_mngr(void *gw_conf);

Vladislav Rykov's avatar
Vladislav Rykov committed
100
101
102
int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size);
int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size);

Vladislav Rykov's avatar
Vladislav Rykov committed
103
104
105
106
void gateway_protocol_data_send_payload_decode(
	sensor_data_t *sensor_data, 
	const uint8_t *payload, 
	const uint8_t payload_length);
Vladislav Rykov's avatar
Vladislav Rykov committed
107

Vladislav Rykov's avatar
Vladislav Rykov committed
108
109
110
111
112
113
114
115
void gateway_protocol_mk_stat(
	gcom_ch_t *gch,
	gateway_protocol_stat_t stat,
	uint8_t *pck,
	uint8_t *pck_len);

void send_utc(gcom_ch_t *pch);

Vladislav Rykov's avatar
Vladislav Rykov committed
116
117
void gateway_protocol_checkup_callback(gateway_protocol_conf_t *gwp_conf);

Vladislav Rykov's avatar
Vladislav Rykov committed
118
119
void ctrc_handler (int sig);
static volatile uint8_t working = 1;
Vladislav Rykov's avatar
Vladislav Rykov committed
120

121
pthread_mutex_t mutex;
Vladislav Rykov's avatar
Vladislav Rykov committed
122
pthread_mutex_t gw_stat_mutex;
123
124
PGconn *conn;

Vladislav Rykov's avatar
Vladislav Rykov committed
125
126
gw_stat_t gw_stat;

Vladislav Rykov's avatar
Vladislav Rykov committed
127
int main (int argc, char **argv) {
128
129
	gw_conf_t *gw_conf = (gw_conf_t *)malloc(sizeof(gw_conf_t));
	char *db_conninfo = (char *)malloc(512);
130
	gcom_ch_t gch;
131
	task_queue_t *tq;
132
133
	pthread_t gw_mngr;
	sigset_t sigset;
Vladislav Rykov's avatar
Vladislav Rykov committed
134
135
	
	gw_stat.errors_count = 0;
136
137

	sigemptyset(&sigset);
138
	/* block SIGALRM for gateway manager thread */
139
140
	sigaddset(&sigset, SIGALRM);
	sigprocmask(SIG_BLOCK, &sigset, NULL);
141

Vladislav Rykov's avatar
Vladislav Rykov committed
142
	signal(SIGINT, ctrc_handler);
143
	
144
	if (read_static_conf(static_conf_file, gw_conf)) {
145
146
147
		return EXIT_FAILURE;
	}

148
	gateway_telemetry_protocol_init(gw_conf->static_conf.gw_id, gw_conf->static_conf.gw_secure_key);
149

150
	if (!gateway_auth(gw_conf, dynamic_conf_file)) {
151
152
153
154
		fprintf(stderr, "Gateway authentication failure.");
		return EXIT_FAILURE;
	}

155
156
	if (read_dynamic_conf(dynamic_conf_file, gw_conf)) {
		fprintf(stderr, "Read dynamic configuration failure.");
157
158
159
		return EXIT_FAILURE;
	}
	
160
161
	snprintf(db_conninfo, 512, 
			"hostaddr=%s port=%d dbname=%s user=%s password=%s", 
162
163
164
165
166
			gw_conf->dynamic_conf.db_addr,
			gw_conf->dynamic_conf.db_port,
			gw_conf->dynamic_conf.db_name,
			gw_conf->dynamic_conf.db_user_name,
			gw_conf->dynamic_conf.db_user_pass);
167
168
169
	
	printf("db_conf : '%s'\n", db_conninfo);

170
	conn = PQconnectdb(db_conninfo);
171
172
	
	snprintf(db_conninfo, 512, 
173
174
175
176
177
178
179
			"id=%s secure_key=%s port=%d type=%s thread_pool_size=%d telemetry_send_period=%d\n", 
			gw_conf->static_conf.gw_id,
			gw_conf->static_conf.gw_secure_key,
			gw_conf->static_conf.gw_port,
			gw_conf->static_conf.db_type,
			gw_conf->static_conf.thread_pool_size,
			gw_conf->dynamic_conf.telemetry_send_period);
180
	printf("gw_conf : '%s'\n", db_conninfo);
181
	free(db_conninfo);
182

Vladislav Rykov's avatar
Vladislav Rykov committed
183
	if (PQstatus(conn) == CONNECTION_BAD) {
Vladislav Rykov's avatar
Vladislav Rykov committed
184
		fprintf(stderr,"connection to db error: %s\n", PQerrorMessage(conn));
185
		free(gw_conf);
Vladislav Rykov's avatar
Vladislav Rykov committed
186
187
188
		return EXIT_FAILURE;
	}

Vladislav Rykov's avatar
Vladislav Rykov committed
189
	if ((gch.server_desc = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) {
Vladislav Rykov's avatar
Vladislav Rykov committed
190
		perror("socket creation error");
191
		free(gw_conf);
Vladislav Rykov's avatar
Vladislav Rykov committed
192
193
194
		return EXIT_FAILURE;
	}

Vladislav Rykov's avatar
Vladislav Rykov committed
195
	gch.server.sin_family 		= AF_INET;
196
	gch.server.sin_port		= htons(gw_conf->static_conf.gw_port);
Vladislav Rykov's avatar
Vladislav Rykov committed
197
	gch.server.sin_addr.s_addr 	= INADDR_ANY;
Vladislav Rykov's avatar
Vladislav Rykov committed
198

Vladislav Rykov's avatar
Vladislav Rykov committed
199
	if (bind(gch.server_desc, (struct sockaddr *) &gch.server, sizeof(gch.server)) < 0) {
Vladislav Rykov's avatar
Vladislav Rykov committed
200
		perror("binding error");
201
		free(gw_conf);
Vladislav Rykov's avatar
Vladislav Rykov committed
202
203
204
		return EXIT_FAILURE;
	}

205
206
207
208
209
	if (pthread_create(&gw_mngr, NULL, gateway_mngr, gw_conf)) {
		fprintf(stderr, "Failed to create gateway manager thread.");
		return EXIT_FAILURE;
	}

210
	if(!(tq = task_queue_create(gw_conf->static_conf.thread_pool_size))) {
211
		perror("task_queue creation error");
212
		free(gw_conf);
213
214
		return EXIT_FAILURE;
	}
215
216

	pthread_mutex_init(&mutex, NULL);
Vladislav Rykov's avatar
Vladislav Rykov committed
217
	pthread_mutex_init(&gw_stat_mutex, NULL);
Vladislav Rykov's avatar
Vladislav Rykov committed
218
219

	gateway_protocol_set_checkup_callback(gateway_protocol_checkup_callback);
Vladislav Rykov's avatar
Vladislav Rykov committed
220
221
222

	gw_stat_linked_list_init();

Vladislav Rykov's avatar
Vladislav Rykov committed
223
	while (working) {
224
225
226
		gcom_ch_request_t *req = (gcom_ch_request_t *)malloc(sizeof(gcom_ch_request_t));
		memset(req, 0x0, sizeof(gcom_ch_request_t));
		memcpy(&req->gch, &gch, sizeof(gcom_ch_t));
227
	
Vladislav Rykov's avatar
Vladislav Rykov committed
228
		printf("listenninig...\n");
229
		req->gch.sock_len = sizeof(req->gch.client);
Vladislav Rykov's avatar
Vladislav Rykov committed
230
		
231
232
		if (recv_gcom_ch(&req->gch, req->packet, &req->packet_length, DEVICE_DATA_MAX_LENGTH)) {
			task_queue_enqueue(tq, process_packet, req);
Vladislav Rykov's avatar
Vladislav Rykov committed
233
		} else {
234
			fprintf(stderr, "packet receive error\n");
Vladislav Rykov's avatar
Vladislav Rykov committed
235
		}
Vladislav Rykov's avatar
Vladislav Rykov committed
236
237
	}

238
	free(gw_conf);
239
	pthread_mutex_destroy(&mutex);
Vladislav Rykov's avatar
Vladislav Rykov committed
240
	close(gch.server_desc);
Vladislav Rykov's avatar
Vladislav Rykov committed
241
	PQfinish(conn);
Vladislav Rykov's avatar
Vladislav Rykov committed
242

Vladislav Rykov's avatar
Vladislav Rykov committed
243
244
245
	return EXIT_SUCCESS;
}

Vladislav Rykov's avatar
Vladislav Rykov committed
246
247
248
249
void ctrc_handler (int sig) {
	working = 0;
}

250
251
252
253
254
255
void process_packet(void *request) {
	gcom_ch_request_t *req = (gcom_ch_request_t *)request;
	uint8_t payload[DEVICE_DATA_MAX_LENGTH];
	uint8_t payload_length;	
	PGresult *res;

Vladislav Rykov's avatar
Vladislav Rykov committed
256
257
	if (gateway_protocol_packet_decode(
		&(req->gch.gwp_conf),
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
		&(req->packet_type),
		&payload_length, payload,
		req->packet_length, req->packet))
	{
		if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_TIME_REQ) {
			printf("TIME REQ received\n");
			send_utc(&(req->gch));
		} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_DATA_SEND) {
			sensor_data_t sensor_data;
			time_t t;
			// DEVICE_DATA_MAX_LENGTH*2 {hex} + 150
			char db_query[662];

			printf("DATA SEND received\n");
			gateway_protocol_data_send_payload_decode(&sensor_data, payload, payload_length);
			
			if (sensor_data.utc == 0) {
				struct timeval tv;
				gettimeofday(&tv, NULL);
				t = tv.tv_sec;
			} else {
				t = sensor_data.utc;
			}
			
			strftime(sensor_data.timedate, TIMEDATE_LENGTH, "%d/%m/%Y %H:%M:%S", localtime(&t));
			snprintf(db_query, sizeof(db_query), 
284
				"INSERT INTO dev_%s_%d VALUES (%lu, '%s', $1)", 
Vladislav Rykov's avatar
Vladislav Rykov committed
285
				(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id, t, sensor_data.timedate
286
287
288
289
290
			);
			
			const char *params[1];
			int paramslen[1];
			int paramsfor[1];
291
			params[0] = (char *) sensor_data.data;
292
293
294
			paramslen[0] = sensor_data.data_length;
			paramsfor[0] = 1; // format - binary

Vladislav Rykov's avatar
Vladislav Rykov committed
295
296
297
298
			pthread_mutex_lock(&gw_stat_mutex);
			gw_stat_linked_list_add((char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id);
			pthread_mutex_unlock(&gw_stat_mutex);

299
			pthread_mutex_lock(&mutex);
300
			res = PQexecParams(conn, db_query, 1, NULL, params, paramslen, paramsfor, 0);
301
302
			pthread_mutex_unlock(&mutex);

303
304
305
306
307
			if (PQresultStatus(res) == PGRES_COMMAND_OK) {
				PQclear(res);

				snprintf(db_query, sizeof(db_query),
					 "SELECT * FROM pend_msgs WHERE app_key='%s' and dev_id = %d and ack = False", 
Vladislav Rykov's avatar
Vladislav Rykov committed
308
					(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id
309
				);
310
311
				
				pthread_mutex_lock(&mutex);
312
				res = PQexec(conn, db_query);
313
314
				pthread_mutex_unlock(&mutex);
				
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
				if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
					gateway_protocol_mk_stat(
						&(req->gch), 
						GATEWAY_PROTOCOL_STAT_ACK_PEND,
						req->packet, &(req->packet_length));
					printf("ACK_PEND prepared\n");
				} else {
					gateway_protocol_mk_stat(
						&(req->gch), 
						GATEWAY_PROTOCOL_STAT_ACK,
						req->packet, &(req->packet_length));
					printf("ACK prepared\n");
				}
				
				send_gcom_ch(&(req->gch), req->packet, req->packet_length);
			} else {
				fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
Vladislav Rykov's avatar
Vladislav Rykov committed
332
				gw_stat.errors_count++;
333
334
335
336
337
338
			}
			PQclear(res);
		} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_PEND_REQ) {
			char db_query[200];
			snprintf(db_query, sizeof(db_query),
				 "SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False", 
Vladislav Rykov's avatar
Vladislav Rykov committed
339
				(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id
340
			);
341
			pthread_mutex_lock(&mutex);
342
			res = PQexec(conn, db_query);
343
			pthread_mutex_unlock(&mutex);
344
345
346
347
348
349
350
351
352
353
354
355
356
357
			
			if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
				char msg_cont[150];
				strncpy(msg_cont, PQgetvalue(res, 0, 2), sizeof(msg_cont));
				printf("PEND_SEND prepared : %s\n", msg_cont);
				PQclear(res);
			
				base64_decode(msg_cont, strlen(msg_cont)-1, payload);
				payload_length = BASE64_DECODE_OUT_SIZE(strlen(msg_cont));
				printf("prepared to send %d bytes : %s\n", payload_length, payload);
				
				// send the msg until ack is received
				uint8_t received_ack = 0;
				uint8_t pend_send_retries = PEND_SEND_RETRIES_MAX;
Vladislav Rykov's avatar
Vladislav Rykov committed
358
359
				gateway_protocol_packet_encode(
					&(req->gch.gwp_conf),
360
361
362
					GATEWAY_PROTOCOL_PACKET_TYPE_PEND_SEND,
					payload_length, payload,
					&(req->packet_length), req->packet);
363
364
				do {
					send_gcom_ch(&(req->gch), req->packet, req->packet_length);
365
366
367
368
369
370
371
372
373
374
375
					
					// 300 ms
					usleep(300000);

					pthread_mutex_lock(&mutex);
					res = PQexec(conn, db_query);
					pthread_mutex_unlock(&mutex);
					
					if (PQresultStatus(res) == PGRES_TUPLES_OK) {
						if (!PQntuples(res) || strcmp(PQgetvalue(res, 0, 2), msg_cont)) {
							received_ack = 1;
376
377
						}
					}
378
379
					PQclear(res);
					printf("received_ack = %d, retries = %d\n", received_ack, pend_send_retries);
380
381
382
383
384
385
386
387
388
				} while (!received_ack && pend_send_retries--);
			} else {
				gateway_protocol_mk_stat(
					&(req->gch),
					GATEWAY_PROTOCOL_STAT_NACK,
					req->packet, &(req->packet_length));
				
				send_gcom_ch(&(req->gch), req->packet, req->packet_length);
				
Vladislav Rykov's avatar
Vladislav Rykov committed
389
				printf("nothing for app %s dev %d\n", (char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id);
390
			}
391
392
393
394
395
396
		} else if (req->packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT) {
			// TODO change to ACK_PEND = 0x01
			if (payload[0] == 0x00) {
				char db_query[200];
				snprintf(db_query, sizeof(db_query),
					 "SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False", 
Vladislav Rykov's avatar
Vladislav Rykov committed
397
					(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id
398
399
400
401
402
403
404
				);
				pthread_mutex_lock(&mutex);
				res = PQexec(conn, db_query);
				pthread_mutex_unlock(&mutex);
				if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
					snprintf(db_query, sizeof(db_query),
						"UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'",
Vladislav Rykov's avatar
Vladislav Rykov committed
405
						(char *)req->gch.gwp_conf.app_key, req->gch.gwp_conf.dev_id, PQgetvalue(res, 0, 2)
406
407
408
409
410
411
412
413
					);
					PQclear(res);
					pthread_mutex_lock(&mutex);
					res = PQexec(conn, db_query);
					pthread_mutex_unlock(&mutex);
					if (PQresultStatus(res) == PGRES_COMMAND_OK) {
						printf("pend_msgs updated\n");
					} else {
Vladislav Rykov's avatar
Vladislav Rykov committed
414
						gw_stat.errors_count++;
415
416
417
418
419
						fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
					}
				}
				PQclear(res);
			}
420
421
422
423
424
425
426
427
		} else {
			gateway_protocol_mk_stat(
				&(req->gch),
				GATEWAY_PROTOCOL_STAT_NACK,
				req->packet, &(req->packet_length));
			
			send_gcom_ch(&(req->gch), req->packet, req->packet_length);
				
428
			fprintf(stderr, "packet type error : %02X\n", req->packet_type);
Vladislav Rykov's avatar
Vladislav Rykov committed
429
			gw_stat.errors_count++;
430
431
432
		}
	} else {
		fprintf(stderr, "payload decode error\n");
Vladislav Rykov's avatar
Vladislav Rykov committed
433
		gw_stat.errors_count++;
434
	}
435
436
	
	free(req);
437
438
}

439
uint8_t gateway_auth(const gw_conf_t *gw_conf, const char *dynamic_conf_file_path) {
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
	int sockfd;
	struct sockaddr_in platformaddr;
	uint8_t buffer[1024];
	uint16_t buffer_length = 0;
	uint8_t payload_buffer[1024];
	uint16_t payload_buffer_length = 0;
	FILE *fp;

	if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
		return 0;
	}

	memset(&platformaddr, 0x0, sizeof(platformaddr));

	platformaddr.sin_family = AF_INET;
455
456
	platformaddr.sin_addr.s_addr = inet_addr(gw_conf->static_conf.platform_gw_manager_ip);
	platformaddr.sin_port = htons(gw_conf->static_conf.platform_gw_manager_port);
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
	
	if (connect(sockfd, (struct sockaddr *)&platformaddr, sizeof(platformaddr))) {
		return 0;
	}

	gateway_telemetry_protocol_encode_packet(buffer, 0, GATEWAY_TELEMETRY_PROTOCOL_AUTH, buffer, &buffer_length);
	write(sockfd, buffer, buffer_length);
	
	buffer_length = read(sockfd, buffer, sizeof(buffer));
	gateway_telemetry_protocol_packet_type_t pt;
	if (!gateway_telemetry_protocol_decode_packet(payload_buffer, &payload_buffer_length, &pt, buffer, buffer_length)) {
		return 0;
	}

	// write db_conf into file
472
	fp = fopen(dynamic_conf_file_path, "w");
473
474
475
476
477
478
	fwrite(payload_buffer, payload_buffer_length, 1, fp);
	fclose(fp);

	return 1;
}

Vladislav Rykov's avatar
Vladislav Rykov committed
479
480
481
#define GW_MNGR_BUF_LEN		1024
#define GW_MNGR_QBUF_LEN	1136
void * gateway_mngr(void *gw_cnf) {
482
	struct itimerval tval;
Vladislav Rykov's avatar
Vladislav Rykov committed
483
	gw_conf_t *gw_conf = (gw_conf_t *) gw_cnf;
484
485
	sigset_t alarm_msk;
	int sig;
Vladislav Rykov's avatar
Vladislav Rykov committed
486
487
488
489
490
491
	struct timeval tv;
	char buf[GW_MNGR_BUF_LEN];
	char qbuf[GW_MNGR_QBUF_LEN];
	char b64_gwid[12];
	PGresult *res;
	
492
493
494
495

	sigemptyset(&alarm_msk);
	sigaddset(&alarm_msk, SIGALRM);

496
	tval.it_value.tv_sec = gw_conf->dynamic_conf.telemetry_send_period;
497
	tval.it_value.tv_usec = 0;
498
	tval.it_interval.tv_sec = gw_conf->dynamic_conf.telemetry_send_period;
499
500
501
502
503
504
505
	tval.it_interval.tv_usec = 0;

	if (setitimer(ITIMER_REAL, &tval, NULL)) {
		perror("Failed to set itimer");
		return NULL;
	}

506
	base64_encode(gw_conf->static_conf.gw_id, GATEWAY_ID_SIZE, b64_gwid);
Vladislav Rykov's avatar
Vladislav Rykov committed
507
	
508
509
	while (1) {
		// get utc
Vladislav Rykov's avatar
Vladislav Rykov committed
510
511
		gettimeofday(&tv, NULL);

512
		// create applications and devices serving log
Vladislav Rykov's avatar
Vladislav Rykov committed
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
		pthread_mutex_lock(&gw_stat_mutex);
		gw_stat_linked_list_flush(buf, 0);
		pthread_mutex_unlock(&gw_stat_mutex);

		// flush utc and log into a query	
		snprintf(qbuf, GW_MNGR_QBUF_LEN, "UPDATE gateways SET num_errors = %lld, last_keep_alive = %d, last_report = '%s' WHERE id = '%s'",
				gw_stat.errors_count, (uint32_t) tv.tv_sec, buf, b64_gwid );

		pthread_mutex_lock(&mutex);
		res = PQexec(conn, qbuf);
		pthread_mutex_unlock(&mutex);
	
		if (PQresultStatus(res) != PGRES_COMMAND_OK) {
			fprintf(stderr, "gateway manager db update failed!\n");
		}

		buf[0] = '\0';
		qbuf[0] = '\0';
531
532
533
534
		sigwait(&alarm_msk, &sig);
	}
}

Vladislav Rykov's avatar
Vladislav Rykov committed
535
void gateway_protocol_data_send_payload_decode(
Vladislav Rykov's avatar
Vladislav Rykov committed
536
537
538
539
	sensor_data_t *sensor_data, 
	const uint8_t *payload, 
	const uint8_t payload_length) 
{
Vladislav Rykov's avatar
Vladislav Rykov committed
540
541
542
543
544
	uint8_t p_len = 0;

	memcpy(&sensor_data->utc, &payload[p_len], sizeof(sensor_data->utc));
	p_len += sizeof(sensor_data->utc);

Vladislav Rykov's avatar
Vladislav Rykov committed
545
546
	memcpy(sensor_data->data, &payload[p_len], payload_length - p_len);
	sensor_data->data_length = payload_length - p_len;
Vladislav Rykov's avatar
Vladislav Rykov committed
547
548
}

Vladislav Rykov's avatar
Vladislav Rykov committed
549
550
551
552
553
554
void gateway_protocol_mk_stat(
	gcom_ch_t *gch,
	gateway_protocol_stat_t stat,
	uint8_t *pck,
	uint8_t *pck_len)
{
Vladislav Rykov's avatar
Vladislav Rykov committed
555
556
	gateway_protocol_packet_encode(
		&(gch->gwp_conf),
Vladislav Rykov's avatar
Vladislav Rykov committed
557
		GATEWAY_PROTOCOL_PACKET_TYPE_STAT,
Vladislav Rykov's avatar
Vladislav Rykov committed
558
		1, (uint8_t *)&stat,
Vladislav Rykov's avatar
Vladislav Rykov committed
559
		pck_len, pck);
Vladislav Rykov's avatar
Vladislav Rykov committed
560
561
}

Vladislav Rykov's avatar
Vladislav Rykov committed
562
563


Vladislav Rykov's avatar
Vladislav Rykov committed
564
void send_utc(gcom_ch_t *gch) {
Vladislav Rykov's avatar
Vladislav Rykov committed
565
	uint8_t buf[50];
Vladislav Rykov's avatar
Vladislav Rykov committed
566
567
568
569
570
	uint8_t buf_len = 0;
	struct timeval tv;
				
	gettimeofday(&tv, NULL);
				
Vladislav Rykov's avatar
Vladislav Rykov committed
571
572
	gateway_protocol_packet_encode (
		&(gch->gwp_conf),
Vladislav Rykov's avatar
Vladislav Rykov committed
573
		GATEWAY_PROTOCOL_PACKET_TYPE_TIME_SEND,
Vladislav Rykov's avatar
Vladislav Rykov committed
574
		sizeof(uint32_t), (uint8_t *)&tv.tv_sec,
Vladislav Rykov's avatar
Vladislav Rykov committed
575
576
577
578
579
580
		&buf_len, buf
	);
					
	send_gcom_ch(gch, buf, buf_len);
}

Vladislav Rykov's avatar
Vladislav Rykov committed
581
582
583
void gateway_protocol_checkup_callback(gateway_protocol_conf_t *gwp_conf) {
	PGresult *res;
	char db_query[200];
584
	
Vladislav Rykov's avatar
Vladislav Rykov committed
585
586
587
588
589
590
591
592
593
594
595
596
	snprintf(db_query, sizeof(db_query), 
		"SELECT secure_key, secure FROM applications WHERE app_key = '%s'", (char *)gwp_conf->app_key
	);
	pthread_mutex_lock(&mutex);
	res = PQexec(conn, db_query);
	pthread_mutex_unlock(&mutex);

	if ((PQresultStatus(res) == PGRES_TUPLES_OK) && PQntuples(res)) {
		base64_decode(PQgetvalue(res, 0, 0), strlen(PQgetvalue(res, 0, 0))-1, gwp_conf->secure_key);
		gwp_conf->secure = PQgetvalue(res, 0, 1)[0] == 't';
	} else {
		perror("gateway_protocol_checkup_callback error");
Vladislav Rykov's avatar
Vladislav Rykov committed
597
		gw_stat.errors_count++;
Vladislav Rykov's avatar
Vladislav Rykov committed
598
599
600
601
	}
	PQclear(res);
}

Vladislav Rykov's avatar
Vladislav Rykov committed
602
int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) {
603
	int ret;
Vladislav Rykov's avatar
Vladislav Rykov committed
604
	
605
	if ((ret = sendto(gch->server_desc, (char *)pck, pck_size, 0, (struct sockaddr *)&gch->client, gch->sock_len)) < 0) {
Vladislav Rykov's avatar
Vladislav Rykov committed
606
		gw_stat.errors_count++;
Vladislav Rykov's avatar
Vladislav Rykov committed
607
608
		perror("sendto error");
	}
609

Vladislav Rykov's avatar
Vladislav Rykov committed
610
611
612
613
	return ret;
}

int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size) {
614
	int ret;
615
	if ((ret = recvfrom(gch->server_desc, (char *)pck, pck_size, MSG_WAITALL, (struct sockaddr *)&gch->client, &gch->sock_len)) < 0) {
Vladislav Rykov's avatar
Vladislav Rykov committed
616
		perror("socket receive error");
Vladislav Rykov's avatar
Vladislav Rykov committed
617
		gw_stat.errors_count++;
618
619
	} else {
		*pck_length = ret;
620
		
Vladislav Rykov's avatar
Vladislav Rykov committed
621
	}
622

623
	return ret;
Vladislav Rykov's avatar
Vladislav Rykov committed
624
}
625
626


627
static void process_static_conf(json_value* value, static_conf_t *st_conf) {
628
629
630
	/* bad practice. must add checks for the EUI string */
	char buffer[128];
	strncpy(buffer, value->u.object.values[0].value->u.string.ptr, sizeof(buffer));
631
632
	sscanf(buffer, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx", &st_conf->gw_id[0], &st_conf->gw_id[1], &st_conf->gw_id[2],
							&st_conf->gw_id[3], &st_conf->gw_id[4], &st_conf->gw_id[5]
633
634
635
	);
	strncpy(buffer, value->u.object.values[1].value->u.string.ptr, sizeof(buffer));
	sscanf(buffer, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx:%hhx", 
636
637
638
639
			&st_conf->gw_secure_key[0], &st_conf->gw_secure_key[1], &st_conf->gw_secure_key[2], &st_conf->gw_secure_key[3],
			&st_conf->gw_secure_key[4], &st_conf->gw_secure_key[5], &st_conf->gw_secure_key[6], &st_conf->gw_secure_key[7],
			&st_conf->gw_secure_key[8], &st_conf->gw_secure_key[9], &st_conf->gw_secure_key[10], &st_conf->gw_secure_key[11],
			&st_conf->gw_secure_key[12], &st_conf->gw_secure_key[13], &st_conf->gw_secure_key[14], &st_conf->gw_secure_key[15]
640
	);
641
642
643
644
645
	st_conf->gw_port = value->u.object.values[2].value->u.integer;
	strncpy(st_conf->db_type, value->u.object.values[3].value->u.string.ptr, sizeof(st_conf->db_type));
	strncpy(st_conf->platform_gw_manager_ip, value->u.object.values[4].value->u.string.ptr, sizeof(st_conf->platform_gw_manager_ip));
	st_conf->platform_gw_manager_port = value->u.object.values[5].value->u.integer;
	st_conf->thread_pool_size = value->u.object.values[6].value->u.integer;
646
}
647

648
649
650
651
652
653
654
static void process_dynamic_conf(json_value* value, dynamic_conf_t *dyn_conf) {
	strncpy(dyn_conf->db_addr, value->u.object.values[0].value->u.string.ptr, sizeof(dyn_conf->db_addr));
	dyn_conf->db_port = atoi(value->u.object.values[1].value->u.string.ptr);
	strncpy(dyn_conf->db_name, value->u.object.values[2].value->u.string.ptr, sizeof(dyn_conf->db_name));
	strncpy(dyn_conf->db_user_name, value->u.object.values[3].value->u.string.ptr, sizeof(dyn_conf->db_user_name));
	strncpy(dyn_conf->db_user_pass, value->u.object.values[4].value->u.string.ptr, sizeof(dyn_conf->db_user_pass));
	dyn_conf->telemetry_send_period = value->u.object.values[5].value->u.integer;
655
656
}

657
static json_value * read_json_conf(const char *file_path) {
658
659
660
661
662
663
	struct stat filestatus;
	FILE *fp;
	char *file_contents;
	json_char *json;
	json_value *jvalue;

664
665
666
	if (stat(file_path, &filestatus)) {
		fprintf(stderr, "File %s not found.", file_path);
		return NULL;
667
668
669
	}
	file_contents = (char *)malloc(filestatus.st_size);
	if (!file_contents) {
670
671
		fprintf(stderr, "Memory error allocating %d bytes.", (int) filestatus.st_size);
		return NULL;
672
	}
673
	fp = fopen(file_path, "rt");
674
	if (!fp) {
675
		fprintf(stderr, "Unable to open %s.", file_path);
676
677
		fclose(fp);
		free(file_contents);
678
		return NULL;
679
680
	}
	if (fread(file_contents, filestatus.st_size, 1, fp) != 1) {
681
		fprintf(stderr, "Unable to read %s.", file_path);
682
683
		fclose(fp);
		free(file_contents);
684
		return NULL;
685
686
687
	}
	fclose(fp);
	
688
689
	file_contents[filestatus.st_size] = '\0';
	printf("file content : \n'%s'\n", file_contents);
690
691
692
693
694
695
	
	json = (json_char *)file_contents;
	jvalue = json_parse(json, filestatus.st_size);
	if (!jvalue) {
		perror("Unable to parse json.");
		free(file_contents);
696
		return NULL;
697
698
	}
	
699
700
701
702
703
	free(file_contents);
	
	return jvalue;
}

704
static int read_static_conf(const char *static_conf_file_path, gw_conf_t *gw_conf) {
705
706
	json_value *jvalue;
	
707
	jvalue = read_json_conf(static_conf_file_path);
708
709
710
	if (!jvalue) {
		return 1;
	}
711
	process_static_conf(jvalue, &gw_conf->static_conf);
712
713
714
715
716

	json_value_free(jvalue);
	
	return 0;
}
717

718
static int read_dynamic_conf(const char *dynamic_conf_file_path, gw_conf_t *gw_conf) {
719
720
	json_value *jvalue;
	
721
	jvalue = read_json_conf(dynamic_conf_file_path);
722
723
724
	if (!jvalue) {
		return 1;
	}
725
	process_dynamic_conf(jvalue, &gw_conf->dynamic_conf);
726
727
728
729
730
731

	json_value_free(jvalue);
	
	return 0;
}