Commit e4969dc2 authored by Vladislav Rykov's avatar Vladislav Rykov
Browse files

all communications work

parent c26be7a7
[submodule "lib/gateway_protocol"] [submodule "lib/gateway_protocol"]
path = lib/gateway_protocol path = lib/gateway_protocol
url = http://lorca.act.uji.es/gitlab/vrykov/thso.gateway_protocol.git url = http://lorca.act.uji.es/gitlab/vrykov/thso.gateway_protocol.git
[submodule "lib/base64"]
path = lib/base64
url = https://github.com/zhicheng/base64.git
...@@ -18,13 +18,14 @@ MAIN = gateway ...@@ -18,13 +18,14 @@ MAIN = gateway
$(LDIR)/gateway_protocol/gateway_protocol.o: $(LDIR)/gateway_protocol/gateway_protocol.o:
$(CC) -c $(LIBS)/gateway_protocol.c -o $(OBJ)/gateway_protocol.o -I$(LIBS) $(CC) -c $(LIBS)/gateway_protocol.c -o $(OBJ)/gateway_protocol.o -I$(LIBS)
$(CC) $(SRC) $(OBJ)/gateway_protocol.o -o $(MAIN) -I$(LIBS) $(LIBD) $(INCLUDES) $(CC) -c $(LDIR)/base64/base64.c -o $(OBJ)/base64.o -Ilib/base64
$(CC) $(SRC) $(OBJ)/gateway_protocol.o $(OBJ)/base64.o -o $(MAIN) -I$(LIBS) -Ilib/base64 $(LIBD) $(INCLUDES)
all: $(MAIN) all: $(MAIN)
@echo Compiling gateway project @echo Compiling gateway project
$(MAIN): $(OBJ)/gateway_protocol.o $(MAIN): $(OBJ)/gateway_protocol.o $(OBJ)/base64.o
$(CC) $(CFLAGS) $(INCLUDES) -o $(MAIN) $(OBJS) $(LFLAGS) $(LIBS) $(CC) $(CFLAGS) $(INCLUDES) -o $(MAIN) $(OBJS) $(LFLAGS) $(LIBS)
.c.o: .c.o:
......
No preview for this file type
Subproject commit 81060e3338120b43d759ee8adfe24619370c5f36
...@@ -5,14 +5,13 @@ ...@@ -5,14 +5,13 @@
#include<arpa/inet.h> //inet_addr #include<arpa/inet.h> //inet_addr
#include<unistd.h> #include<unistd.h>
#include<stdint.h> #include<stdint.h>
#include<pthread.h> #include<pthread.h>
#include<gateway_protocol.h> #include<gateway_protocol.h>
#include<sys/time.h> #include<sys/time.h>
#include<libpq-fe.h> #include<libpq-fe.h>
#include"base64.h"
#include<math.h>
#include<signal.h>
#define TIMEDATE_LENGTH 32 #define TIMEDATE_LENGTH 32
...@@ -52,6 +51,24 @@ typedef struct { ...@@ -52,6 +51,24 @@ typedef struct {
void * connection_handler (void *args); void * connection_handler (void *args);
uint8_t gateway_protocol_data_send_payload_decode(sensor_data_t *sensor_data, const uint8_t *payload, const uint8_t payload_length); uint8_t gateway_protocol_data_send_payload_decode(sensor_data_t *sensor_data, const uint8_t *payload, const uint8_t payload_length);
void filter_nans(char *com);
void packet_encode(
uint8_t dev_id,
gateway_protocol_packet_type_t p_type,
uint8_t payload_length,
uint8_t *payload,
uint8_t *packet_length,
uint8_t *packet);
uint8_t packet_decode(
uint8_t *dev_id,
gateway_protocol_packet_type_t *ptype,
uint8_t *payload_length,
uint8_t *payload,
uint8_t packet_length,
uint8_t *packet);
void ctrc_handler (int sig);
static volatile uint8_t working = 1;
int main (int argc, char **argv) { int main (int argc, char **argv) {
int server_desc, client_desc; int server_desc, client_desc;
...@@ -59,9 +76,12 @@ int main (int argc, char **argv) { ...@@ -59,9 +76,12 @@ int main (int argc, char **argv) {
socklen_t client_socklen; socklen_t client_socklen;
uint8_t buf[1024]; uint8_t buf[1024];
uint8_t buf_len = 0; uint8_t buf_len = 0;
uint8_t payload[128]; uint8_t payload[256];
uint8_t payload_length = 0; uint8_t payload_length = 0;
int sock_len; int sock_len;
PGresult *res;
signal(SIGINT, ctrc_handler);
PGconn *conn = PQconnectdb("user=root dbname=gateway"); PGconn *conn = PQconnectdb("user=root dbname=gateway");
if (PQstatus(conn) == CONNECTION_BAD) { if (PQstatus(conn) == CONNECTION_BAD) {
...@@ -83,7 +103,10 @@ int main (int argc, char **argv) { ...@@ -83,7 +103,10 @@ int main (int argc, char **argv) {
return EXIT_FAILURE; return EXIT_FAILURE;
} }
while (1) { uint8_t dev_id = 0xFF;
gateway_protocol_packet_type_t packet_type;
while (working) {
buf_len = 0; buf_len = 0;
printf("listenninig...\n"); printf("listenninig...\n");
if ((buf_len = recvfrom(server_desc, (char *)buf, 1024, MSG_WAITALL, (struct sockaddr *)&client, &sock_len)) < 0) { if ((buf_len = recvfrom(server_desc, (char *)buf, 1024, MSG_WAITALL, (struct sockaddr *)&client, &sock_len)) < 0) {
...@@ -92,14 +115,11 @@ int main (int argc, char **argv) { ...@@ -92,14 +115,11 @@ int main (int argc, char **argv) {
printf("packet received!\n"); printf("packet received!\n");
for (uint8_t i = 0; i < buf_len; i++) { for(uint8_t i = 0; i < buf_len; i++) {
printf("%02X :", buf[i]); printf("%02X : ", buf[i]);
} }
printf("\n"); printf("\n");
uint8_t dev_id = 0xFF;
gateway_protocol_packet_type_t packet_type;
if (gateway_protocol_packet_decode( if (gateway_protocol_packet_decode(
&dev_id, &dev_id,
&packet_type, &packet_type,
...@@ -109,23 +129,42 @@ int main (int argc, char **argv) { ...@@ -109,23 +129,42 @@ int main (int argc, char **argv) {
if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_TIME_REQ) { if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_TIME_REQ) {
printf("TIME REQ received\n"); printf("TIME REQ received\n");
struct timeval tv; struct timeval tv;
buf_len = 0;
buf[0] = dev_id;
buf_len++;
buf[1] = GATEWAY_PROTOCOL_PACKET_TYPE_TIME_SEND;
buf_len++;
gettimeofday(&tv, NULL); gettimeofday(&tv, NULL);
memcpy(&buf[buf_len], &tv.tv_sec, sizeof(uint32_t));
buf_len += sizeof(uint32_t); packet_encode (
dev_id,
GATEWAY_PROTOCOL_PACKET_TYPE_TIME_SEND,
sizeof(tv.tv_sec), (uint8_t *)&tv.tv_sec,
&buf_len, buf
);
for(uint8_t i = 0; i < buf_len; i++) {
printf("%02X : ", buf[i]);
}
printf("\n");
} else if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_DATA_SEND) { } else if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_DATA_SEND) {
sensor_data_t sensor_data; sensor_data_t sensor_data;
printf("DATA SEND received\n"); printf("DATA SEND received\n");
if (gateway_protocol_data_send_payload_decode(&sensor_data, payload, payload_length)) { if (gateway_protocol_data_send_payload_decode(&sensor_data, payload, payload_length)) {
PGresult *res; snprintf(buf, sizeof(buf), "INSERT INTO esp32 ("
snprintf(buf, sizeof(buf), "INSERT INTO esp32 VALUES(" "utc, timedate, "
"dht22_t_esp, dht22_h_esp, "
"sht85_t_esp, sht85_h_esp, "
"hih8121_t_esp, hih8121_h_esp, "
"tmp36_0_esp, tmp36_1_esp, tmp36_2_esp, "
"hih4030_esp, "
"hh10d_esp, "
"dht22_t_mkr, dht22_h_mkr, "
"sht85_t_mkr, sht85_h_mkr, "
"hih8121_t_mkr, hih8121_h_mkr, "
"hh10d_mkr, "
"dht22_t_wis, dht22_h_wis, "
"sht85_t_wis, sht85_h_wis, "
"hih8121_t_wis, hih8121_h_wis, "
"tmp102_wis, "
"hh10d_wis) "
"VALUES("
"%lu, '%s', " "%lu, '%s', "
"%.2f, %.2f, " // esp "%.2f, %.2f, " // esp
"%.2f, %.2f, " "%.2f, %.2f, "
...@@ -159,46 +198,113 @@ int main (int argc, char **argv) { ...@@ -159,46 +198,113 @@ int main (int argc, char **argv) {
sensor_data.tmp102_wis, sensor_data.tmp102_wis,
sensor_data.hh10d_wis sensor_data.hh10d_wis
); );
//printf("%s\n", buf); filter_nans(buf);
printf("%s\n", buf);
res = PQexec(conn, buf); res = PQexec(conn, buf);
if (PQresultStatus(res) == PGRES_COMMAND_OK) { if (PQresultStatus(res) == PGRES_COMMAND_OK) {
PQclear(res); PQclear(res);
fprintf(stderr, "%s\n", PQerrorMessage(conn)); //fprintf(stderr, "%s\n", PQerrorMessage(conn));
sprintf(buf, "SELECT * FROM pend_msgs WHERE dev_id = %d", dev_id); sprintf(buf, "SELECT * FROM pend_msgs WHERE dev_id = %d", dev_id);
res = PQexec(conn, buf); res = PQexec(conn, buf);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) { if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
buf[2] = GATEWAY_PROTOCOL_STAT_ACK_PEND; buf[3] = GATEWAY_PROTOCOL_STAT_ACK_PEND;
printf("ACK_PEND prepared\n"); printf("ACK_PEND prepared\n");
} else { } else {
buf[2] = GATEWAY_PROTOCOL_STAT_ACK; buf[3] = GATEWAY_PROTOCOL_STAT_ACK;
printf("ACK prepared\n"); printf("ACK prepared\n");
} }
buf[0] = dev_id; buf[0] = dev_id;
buf[1] = GATEWAY_PROTOCOL_PACKET_TYPE_STAT; buf[1] = GATEWAY_PROTOCOL_PACKET_TYPE_STAT;
buf_len = 3; buf[2] = 1;
buf_len = 4;
} else {
fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
} }
PQclear(res); PQclear(res);
} else { } else {
gateway_protocol_stat_t s_type = GATEWAY_PROTOCOL_STAT_NACK;
packet_encode (
dev_id,
GATEWAY_PROTOCOL_PACKET_TYPE_STAT,
1, (uint8_t *)&s_type,
&buf_len, buf);
/*
buf[0] = dev_id; buf[0] = dev_id;
buf[1] = GATEWAY_PROTOCOL_PACKET_TYPE_STAT; buf[1] = GATEWAY_PROTOCOL_PACKET_TYPE_STAT;
buf[2] = GATEWAY_PROTOCOL_STAT_NACK; buf[2] = 1;
buf_len = 3; buf[3] = GATEWAY_PROTOCOL_STAT_NACK;
buf_len = 4;
printf("payload decode error\n"); */
fprintf(stderr, "payload decode error\n");
} }
} else if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_PEND_REQ) { } else if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_PEND_REQ) {
sprintf(buf, "SELECT * FROM pend_msgs WHERE dev_id = %d", dev_id);
res = PQexec(conn, buf);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
char msg_cont[150];
strncpy(msg_cont, PQgetvalue(res, 0, 1), sizeof(msg_cont));
printf("PEND_SEND prepared : %s\n", PQgetvalue(res, 0, 1));
base64_decode(PQgetvalue(res, 0, 1), strlen(PQgetvalue(res, 0, 1)), payload);
payload_length = BASE64_DECODE_OUT_SIZE(strlen(PQgetvalue(res, 0, 1)));
PQclear(res);
printf("prepared to send %d bytes\n", payload_length);
packet_encode(
dev_id,
GATEWAY_PROTOCOL_PACKET_TYPE_PEND_SEND,
payload_length, payload,
&buf_len, buf);
printf("packet %d bytes\n", buf_len);
for(uint8_t i = 0; i < buf_len; i++) {
printf("%02X : ", buf[i]);
}
printf("\n");
if (sendto(server_desc, (char *) buf, buf_len, 0, (struct sockaddr *)&client, sock_len) < 0) {
perror("sendto error");
}
if ((buf_len = recvfrom(server_desc, (char *)buf, 1024, MSG_WAITALL, (struct sockaddr *)&client, &sock_len)) < 0) {
perror("socket receive error");
}
if (packet_decode(
&dev_id,
&packet_type,
&payload_length, payload,
buf_len, buf))
{
if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT &&
payload_length == 1 &&
payload[0] == GATEWAY_PROTOCOL_STAT_ACK)
{
sprintf(buf, "DELETE FROM pend_msgs WHERE dev_id = %d AND msg = '%s'", dev_id, msg_cont);
printf("%s", buf);
res = PQexec(conn, buf);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "error db deleting : %s", PQerrorMessage(conn));
}
PQclear(res);
printf("ACK received\n");
}
}
continue;
}
} else { } else {
printf("packet type error\n"); fprintf(stderr, "packet type error : %02X\n", packet_type);
} }
if (sendto(server_desc, (char *) buf, buf_len, 0, (struct sockaddr *)&client, sock_len) < 0) { if (sendto(server_desc, (char *) buf, buf_len, 0, (struct sockaddr *)&client, sock_len) < 0) {
perror("sendto error"); perror("sendto error");
} }
} else { } else {
perror("packet decode error"); fprintf(stderr, "packet decode error (type : %02X, pck_len = %d, pay_len = %d\n",
packet_type, buf_len, payload_length);
} }
} }
...@@ -208,6 +314,10 @@ int main (int argc, char **argv) { ...@@ -208,6 +314,10 @@ int main (int argc, char **argv) {
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
void ctrc_handler (int sig) {
working = 0;
}
uint8_t gateway_protocol_data_send_payload_decode(sensor_data_t *sensor_data, const uint8_t *payload, const uint8_t payload_length) { uint8_t gateway_protocol_data_send_payload_decode(sensor_data_t *sensor_data, const uint8_t *payload, const uint8_t payload_length) {
uint8_t p_len = 0; uint8_t p_len = 0;
...@@ -299,9 +409,77 @@ uint8_t gateway_protocol_data_send_payload_decode(sensor_data_t *sensor_data, co ...@@ -299,9 +409,77 @@ uint8_t gateway_protocol_data_send_payload_decode(sensor_data_t *sensor_data, co
memcpy(&sensor_data->hh10d_wis, &payload[p_len], sizeof(sensor_data->hh10d_wis)); memcpy(&sensor_data->hh10d_wis, &payload[p_len], sizeof(sensor_data->hh10d_wis));
p_len += sizeof(sensor_data->hh10d_wis); p_len += sizeof(sensor_data->hh10d_wis);
//printf("p_len = %d, payload_length = %d\n", p_len, payload_length);
return (p_len == payload_length); return (p_len == payload_length);
} }
void filter_nans(char *com) {
char *pchr;
const char nanstr[] = "'NaN'";
while((pchr = strstr(com, "nan"))) {
memmove(&pchr[5], &pchr[3], strlen(pchr)+1);
memcpy(pchr, nanstr, sizeof(nanstr)-1);
}
while(pchr = strchr(com, '-')) {
memmove(pchr, &pchr[1], strlen(pchr));
}
}
void packet_encode(
uint8_t dev_id,
gateway_protocol_packet_type_t p_type,
uint8_t payload_length,
uint8_t *payload,
uint8_t *packet_length,
uint8_t *packet)
{
*packet_length = 0;
packet[*packet_length] = dev_id;
(*packet_length)++;
packet[*packet_length] = p_type;
(*packet_length)++;
packet[*packet_length] = payload_length;
(*packet_length)++;
memcpy(&packet[*packet_length], payload, payload_length);
*packet_length += payload_length;
}
uint8_t packet_decode(
uint8_t *dev_id,
gateway_protocol_packet_type_t *ptype,
uint8_t *payload_length,
uint8_t *payload,
uint8_t packet_length,
uint8_t *packet)
{
uint8_t p_len = 0;
*dev_id = packet[p_len];
p_len++;
*ptype = (gateway_protocol_packet_type_t) packet[p_len];
p_len++;
*payload_length = packet[p_len];
p_len++;
memcpy(payload, &packet[p_len], *payload_length);
p_len += *payload_length;
return p_len == packet_length;
}
/* connection handler for multithreading version */ /* connection handler for multithreading version */
#ifdef MULTITHREADING_VER #ifdef MULTITHREADING_VER
void *connection_handler(void *args) { void *connection_handler(void *args) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment