Commit 641b96cf authored by Vladislav Rykov's avatar Vladislav Rykov
Browse files

updated for new server func

parent b9038bfc
No preview for this file type
DROP DATABASE IF EXISTS gateway;
CREATE DATABASE gateway;
\c gateway
ALTER DATABASE gateway OWNER TO pi;
CREATE TABLE esp32 (
utc numeric(10,0),
timedate character varying(100),
dht22_t_esp real,
dht22_h_esp real,
sht85_t_esp real,
sht85_h_esp real,
hih8121_t_esp real,
hih8121_h_esp real,
tmp36_0_esp real,
tmp36_1_esp real,
tmp36_2_esp real,
hih4030_esp real,
hh10d_esp real,
dht22_t_wis real,
dht22_h_wis real,
sht85_t_wis real,
sht85_h_wis real,
hih8121_t_wis real,
hih8121_h_wis real,
tmp102_wis real,
hh10d_wis real,
dht22_t_mkr real,
dht22_h_mkr real,
sht85_t_mkr real,
sht85_h_mkr real,
hih8121_t_mkr real,
hih8121_h_mkr real,
hh10d_mkr real
);
ALTER TABLE esp32 OWNER TO pi;
--
-- Name: pend_msgs; Type: TABLE; Schema: public; Owner: root
--
CREATE TABLE pend_msgs (
app_key VARCHAR(30 NOT NULL,
dev_id numeric(3,0) NOT NULL,
msg VARCHAR(150),
ack BOOLEAN NOT NULL DEFAULT FALSE
);
ALTER TABLE pend_msgs OWNER TO pi;
......@@ -12,43 +12,21 @@
#include"base64.h"
#include<math.h>
#include<signal.h>
#include<time.h>
#include<errno.h>
#define TIMEDATE_LENGTH 32
#define PEND_SEND_RETRIES_MAX 5
#define GATEWAY_PROTOCOL_APP_KEY_SIZE 8
#define DEVICE_DATA_MAX_LENGTH 256
typedef struct {
uint32_t utc;
char timedate[TIMEDATE_LENGTH];
float dht22_t_esp;
float dht22_h_esp;
float sht85_t_esp;
float sht85_h_esp;
float hih8121_t_esp;
float hih8121_h_esp;
float tmp36_0_esp;
float tmp36_1_esp;
float tmp36_2_esp;
float hih4030_esp;
float hh10d_esp;
float dht22_t_mkr;
float dht22_h_mkr;
float sht85_t_mkr;
float sht85_h_mkr;
float hih8121_t_mkr;
float hih8121_h_mkr;
float hh10d_mkr;
float dht22_t_wis;
float dht22_h_wis;
float sht85_t_wis;
float sht85_h_wis;
float hih8121_t_wis;
float hih8121_h_wis;
float tmp102_wis;
float hh10d_wis;
uint8_t data[DEVICE_DATA_MAX_LENGTH];
uint8_t data_length;
} sensor_data_t;
typedef struct {
......@@ -61,13 +39,12 @@ typedef struct {
int sock_len;
} gcom_ch_t; // gateway communication channel
/* for multithreading impl */
void * connection_handler (void *args);
int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size);
int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size);
void * connection_handler (void *args);
uint8_t gateway_protocol_data_send_payload_decode(sensor_data_t *sensor_data, const uint8_t *payload, const uint8_t payload_length);
void prepare_di_query(gcom_ch_t *gch, sensor_data_t *sensor_data, char *q, uint16_t q_size); // data insert
void filter_query(char *com);
void packet_encode(
const uint8_t *app_key,
const uint8_t dev_id,
......@@ -84,6 +61,10 @@ uint8_t packet_decode(
uint8_t *payload,
const uint8_t packet_length,
const uint8_t *packet);
void gateway_protocol_data_send_payload_decode(
sensor_data_t *sensor_data,
const uint8_t *payload,
const uint8_t payload_length);
void gateway_protocol_mk_stat(
gcom_ch_t *gch,
......@@ -108,9 +89,9 @@ int main (int argc, char **argv) {
signal(SIGINT, ctrc_handler);
PGconn *conn = PQconnectdb("user=pi dbname=gateway");
PGconn *conn = PQconnectdb("user=vlad dbname=iotserver password=dev");
if (PQstatus(conn) == CONNECTION_BAD) {
printf("connection to db error: %s\n", PQerrorMessage(conn));
fprintf(stderr,"connection to db error: %s\n", PQerrorMessage(conn));
return EXIT_FAILURE;
}
......@@ -120,7 +101,7 @@ int main (int argc, char **argv) {
}
gch.server.sin_family = AF_INET;
gch.server.sin_port = htons(9043);
gch.server.sin_port = htons(54345);
gch.server.sin_addr.s_addr = INADDR_ANY;
if (bind(gch.server_desc, (struct sockaddr *) &gch.server, sizeof(gch.server)) < 0) {
......@@ -135,60 +116,69 @@ int main (int argc, char **argv) {
printf("listenninig...\n");
gch.sock_len = sizeof(gch.client);
recv_gcom_ch(&gch, buf, &buf_len, 1024);
if (packet_decode(
gch.app_key,
&gch.dev_id,
&packet_type,
&payload_length, payload,
buf_len, buf))
if ( recv_gcom_ch(&gch, buf, &buf_len, 1024)
&
packet_decode(
gch.app_key,
&gch.dev_id,
&packet_type,
&payload_length, payload,
buf_len, buf) )
{
if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_TIME_REQ) {
printf("TIME REQ received\n");
send_utc(&gch);
} else if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_DATA_SEND) {
sensor_data_t sensor_data;
time_t t;
printf("DATA SEND received\n");
if (gateway_protocol_data_send_payload_decode(&sensor_data, payload, payload_length)) {
prepare_di_query(&gch, &sensor_data, buf, sizeof(buf));
filter_query(buf);
printf("%s\n", buf);
gateway_protocol_data_send_payload_decode(&sensor_data, payload, payload_length);
if (sensor_data.utc == 0) {
struct timeval tv;
gettimeofday(&tv, NULL);
t = tv.tv_sec;
} else {
t = sensor_data.utc;
}
strftime(sensor_data.timedate, TIMEDATE_LENGTH, "%d/%m/%Y %H:%M:%S", localtime(&t));
snprintf(buf, sizeof(buf),
"INSERT INTO dev_%s_%d VALUES (%d, '%s', $1)", (char *)gch.app_key, gch.dev_id, t, sensor_data.timedate);
const char *params[1];
int paramlen[1];
int paramfor[1];
params[0] = sensor_data.data;
paramlen[0] = sensor_data.data_length;
paramfor[0] = 1; // binary
res = PQexecParams(conn, buf, 1, NULL, params, paramlen, paramfor, 0);
if (PQresultStatus(res) == PGRES_COMMAND_OK) {
PQclear(res);
sprintf(buf, "SELECT * FROM pend_msgs WHERE app_key='%s' and dev_id = %d and ack = False", (char *)gch.app_key, gch.dev_id);
res = PQexec(conn, buf);
if (PQresultStatus(res) == PGRES_COMMAND_OK) {
PQclear(res);
sprintf(buf, "SELECT * FROM pend_msgs WHERE dev_id = %d", gch.dev_id);
res = PQexec(conn, buf);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_ACK_PEND,
buf, &buf_len);
printf("ACK_PEND prepared\n");
} else {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_ACK,
buf, &buf_len);
printf("ACK prepared\n");
}
send_gcom_ch(&gch, buf, buf_len);
if (PQresultStatus(res) == PGRES_TUPLES_OK && PQntuples(res)) {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_ACK_PEND,
buf, &buf_len);
printf("ACK_PEND prepared\n");
} else {
fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_ACK,
buf, &buf_len);
printf("ACK prepared\n");
}
PQclear(res);
} else {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_NACK,
buf, &buf_len);
send_gcom_ch(&gch, buf, buf_len);
fprintf(stderr, "payload decode error\n");
} else {
fprintf(stderr, "database error : %s\n", PQerrorMessage(conn));
}
PQclear(res);
} else if (packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_PEND_REQ) {
sprintf(buf, "SELECT * FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND ack = False",
(char *)gch.app_key, gch.dev_id);
......@@ -206,6 +196,9 @@ int main (int argc, char **argv) {
// send the msg until ack is received
uint8_t received_ack = 0;
uint8_t pend_send_retries = PEND_SEND_RETRIES_MAX;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 300000; // 300ms ack recv timeout
do {
packet_encode(
gch.app_key,
......@@ -213,48 +206,73 @@ int main (int argc, char **argv) {
GATEWAY_PROTOCOL_PACKET_TYPE_PEND_SEND,
payload_length, payload,
&buf_len, buf);
send_gcom_ch(&gch, buf, buf_len);
recv_gcom_ch(&gch, buf, &buf_len, 1024);
uint8_t recv_app_key[GATEWAY_PROTOCOL_APP_KEY_SIZE +1];
uint8_t recv_dev_id = 0xFF;
if (packet_decode(
recv_app_key,
&recv_dev_id,
&packet_type,
&buf_len, buf,
buf_len, buf))
{
if (!memcmp(recv_app_key, gch.app_key, GATEWAY_PROTOCOL_APP_KEY_SIZE) &&
recv_dev_id == gch.dev_id &&
packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT &&
buf_len == 1 &&
buf[0] == GATEWAY_PROTOCOL_STAT_ACK)
send_gcom_ch(&gch, buf, buf_len);
// set timeout
if (setsockopt(gch.server_desc, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
perror("setsockopt error");
}
recv_gcom_ch(&gch, buf, &buf_len, 1024);
if (buf_len > 9) { /* min packet size. timeout -> -1 */
uint8_t recv_app_key[GATEWAY_PROTOCOL_APP_KEY_SIZE +1];
uint8_t recv_dev_id = 0xFF;
if (packet_decode(
recv_app_key,
&recv_dev_id,
&packet_type,
&buf_len, buf,
buf_len, buf))
{
//sprintf(buf, "DELETE FROM pend_msgs WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'", (char *)gch.app_key, gch.dev_id, msg_cont);
sprintf(buf, "UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'", (char *)gch.app_key, gch.dev_id, msg_cont);
printf("%s", buf);
res = PQexec(conn, buf);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "error db deleting : %s", PQerrorMessage(conn));
if (!memcmp(recv_app_key, gch.app_key, GATEWAY_PROTOCOL_APP_KEY_SIZE) &&
recv_dev_id == gch.dev_id &&
packet_type == GATEWAY_PROTOCOL_PACKET_TYPE_STAT &&
buf_len == 1 &&
buf[0] == GATEWAY_PROTOCOL_STAT_ACK)
{
sprintf(buf, "UPDATE pend_msgs SET ack = True WHERE app_key = '%s' AND dev_id = %d AND msg = '%s'", (char *)gch.app_key, gch.dev_id, msg_cont);
res = PQexec(conn, buf);
if (PQresultStatus(res) != PGRES_COMMAND_OK) {
fprintf(stderr, "error db deleting : %s", PQerrorMessage(conn));
}
PQclear(res);
received_ack = 1;
printf("ACK received\n");
} else {
printf("error: packet_type = %02X, not STAT\n");
}
PQclear(res);
received_ack = 1;
printf("ACK received\n");
}
}
} while (!received_ack && pend_send_retries--);
// cancel timeout
tv.tv_usec = 0;
if (setsockopt(gch.server_desc, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
perror("setsockopt error");
}
} else {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_NACK,
buf, &buf_len);
send_gcom_ch(&gch, buf, buf_len);
printf("nothing for app %s dev %d\n", (char *)gch.app_key, gch.dev_id);
}
} else {
gateway_protocol_mk_stat(
&gch,
GATEWAY_PROTOCOL_STAT_NACK,
buf, &buf_len);
send_gcom_ch(&gch, buf, buf_len);
fprintf(stderr, "packet type error : %02X\n", packet_type);
}
} else {
fprintf(stderr, "packet decode error (type : %02X, pck_len = %d, pay_len = %d\n",
packet_type, buf_len, payload_length);
fprintf(stderr, "payload decode error\n");
}
}
close(gch.server_desc);
......@@ -267,7 +285,7 @@ void ctrc_handler (int sig) {
working = 0;
}
uint8_t gateway_protocol_data_send_payload_decode(
void gateway_protocol_data_send_payload_decode(
sensor_data_t *sensor_data,
const uint8_t *payload,
const uint8_t payload_length)
......@@ -277,164 +295,10 @@ uint8_t gateway_protocol_data_send_payload_decode(
memcpy(&sensor_data->utc, &payload[p_len], sizeof(sensor_data->utc));
p_len += sizeof(sensor_data->utc);
memcpy(&sensor_data->timedate, &payload[p_len], sizeof(sensor_data->timedate));
//p_len += sizeof(sensor_data->timedate);
p_len += TIMEDATE_LENGTH;
memcpy(&sensor_data->dht22_t_esp, &payload[p_len], sizeof(sensor_data->dht22_t_esp));
p_len += sizeof(sensor_data->dht22_t_esp);
memcpy(&sensor_data->dht22_h_esp, &payload[p_len], sizeof(sensor_data->dht22_h_esp));
p_len += sizeof(sensor_data->dht22_h_esp);
memcpy(&sensor_data->sht85_t_esp, &payload[p_len], sizeof(sensor_data->sht85_t_esp));
p_len += sizeof(sensor_data->sht85_t_esp);
memcpy(&sensor_data->sht85_h_esp, &payload[p_len], sizeof(sensor_data->sht85_h_esp));
p_len += sizeof(sensor_data->sht85_h_esp);
memcpy(&sensor_data->hih8121_t_esp, &payload[p_len], sizeof(sensor_data->hih8121_t_esp));
p_len += sizeof(sensor_data->hih8121_t_esp);
memcpy(&sensor_data->hih8121_h_esp, &payload[p_len], sizeof(sensor_data->hih8121_h_esp));
p_len += sizeof(sensor_data->hih8121_h_esp);
memcpy(&sensor_data->tmp36_0_esp, &payload[p_len], sizeof(sensor_data->tmp36_0_esp));
p_len += sizeof(sensor_data->tmp36_0_esp);
memcpy(&sensor_data->tmp36_1_esp, &payload[p_len], sizeof(sensor_data->tmp36_1_esp));
p_len += sizeof(sensor_data->tmp36_1_esp);
memcpy(&sensor_data->tmp36_2_esp, &payload[p_len], sizeof(sensor_data->tmp36_2_esp));
p_len += sizeof(sensor_data->tmp36_2_esp);
memcpy(&sensor_data->hih4030_esp, &payload[p_len], sizeof(sensor_data->hih4030_esp));
p_len += sizeof(sensor_data->hih4030_esp);
memcpy(&sensor_data->hh10d_esp, &payload[p_len], sizeof(sensor_data->hh10d_esp));
p_len += sizeof(sensor_data->hh10d_esp);
memcpy(&sensor_data->dht22_t_mkr, &payload[p_len], sizeof(sensor_data->dht22_t_mkr));
p_len += sizeof(sensor_data->dht22_t_mkr);
memcpy(&sensor_data->dht22_h_mkr, &payload[p_len], sizeof(sensor_data->dht22_h_mkr));
p_len += sizeof(sensor_data->dht22_h_mkr);
memcpy(&sensor_data->sht85_t_mkr, &payload[p_len], sizeof(sensor_data->sht85_t_mkr));
p_len += sizeof(sensor_data->sht85_t_mkr);
memcpy(&sensor_data->sht85_h_mkr, &payload[p_len], sizeof(sensor_data->sht85_h_mkr));
p_len += sizeof(sensor_data->sht85_h_esp);
memcpy(&sensor_data->hih8121_t_mkr, &payload[p_len], sizeof(sensor_data->hih8121_t_mkr));
p_len += sizeof(sensor_data->hih8121_t_mkr);
memcpy(&sensor_data->hih8121_h_mkr, &payload[p_len], sizeof(sensor_data->hih8121_h_mkr));
p_len += sizeof(sensor_data->hih8121_h_mkr);
memcpy(&sensor_data->hh10d_mkr, &payload[p_len], sizeof(sensor_data->hh10d_mkr));
p_len += sizeof(sensor_data->hh10d_mkr);
memcpy(&sensor_data->dht22_t_wis, &payload[p_len], sizeof(sensor_data->dht22_t_wis));
p_len += sizeof(sensor_data->dht22_t_wis);
memcpy(&sensor_data->dht22_h_wis, &payload[p_len], sizeof(sensor_data->dht22_h_wis));
p_len += sizeof(sensor_data->dht22_h_wis);
memcpy(&sensor_data->sht85_t_wis, &payload[p_len], sizeof(sensor_data->sht85_t_wis));
p_len += sizeof(sensor_data->sht85_t_wis);
memcpy(&sensor_data->sht85_h_wis, &payload[p_len], sizeof(sensor_data->sht85_h_wis));
p_len += sizeof(sensor_data->sht85_h_wis);
memcpy(&sensor_data->hih8121_t_wis, &payload[p_len], sizeof(sensor_data->hih8121_t_wis));
p_len += sizeof(sensor_data->hih8121_t_wis);
memcpy(&sensor_data->hih8121_h_wis, &payload[p_len], sizeof(sensor_data->hih8121_h_wis));
p_len += sizeof(sensor_data->hih8121_h_wis);
memcpy(&sensor_data->tmp102_wis, &payload[p_len], sizeof(sensor_data->tmp102_wis));
p_len += sizeof(sensor_data->tmp102_wis);
memcpy(&sensor_data->hh10d_wis, &payload[p_len], sizeof(sensor_data->hh10d_wis));
p_len += sizeof(sensor_data->hh10d_wis);
printf("p_len = %d, payload_length = %d\n", p_len, payload_length);
return (p_len == payload_length);
memcpy(sensor_data->data, &payload[p_len], payload_length - p_len);
sensor_data->data_length = payload_length - p_len;
}
void prepare_di_query(gcom_ch_t *gch, sensor_data_t *sensor_data, char *q, uint16_t q_size) {
snprintf(q, q_size,
"INSERT INTO dev_%s_%d VALUES ("
"%lu, '%s', "
"'{ "
"\"dht22_t_esp\" : %.2f, "
"\"dht22_h_esp\" : %.2f, "
"\"sht85_t_esp\" : %.2f, "
"\"sht85_h_esp\" : %.2f, "
"\"hih8121_t_esp\" : %.2f, "
"\"hih8121_h_esp\" : %.2f, "
"\"tmp36_0_esp\" : %.2f, "
"\"tmp36_1_esp\" : %.2f, "
"\"tmp36_2_esp\" : %.2f, "
"\"hih4030_esp\" : %.2f, "
"\"hh10d_esp\" : %.2f, "
"\"dht22_t_mkr\" : %.2f, "
"\"dht22_h_mkr\" : %.2f, "
"\"sht85_t_mkr\" : %.2f, "
"\"sht85_h_mkr\" : %.2f, "
"\"hih8121_t_mkr\" : %.2f, "
"\"hih8121_h_mkr\" : %.2f, "
"\"hh10d_mkr\" : %.2f, "
"\"dht22_t_wis\" : %.2f, "
"\"dht22_h_wis\" : %.2f, "
"\"sht85_t_wis\" : %.2f, "
"\"sht85_h_wis\" : %.2f, "
"\"hih8121_t_wis\" : %.2f, "
"\"hih8121_h_wis\" : %.2f, "
"\"tmp102_wis\" : %.2f, "
"\"hh10d_wis\" : %.2f"
"}'"
")",
(char *)gch->app_key, gch->dev_id,
sensor_data->utc, sensor_data->timedate,
sensor_data->dht22_t_esp, sensor_data->dht22_h_esp,
sensor_data->sht85_t_esp, sensor_data->sht85_h_esp,
sensor_data->hih8121_t_esp, sensor_data->hih8121_h_esp,
sensor_data->tmp36_0_esp, sensor_data->tmp36_1_esp, sensor_data->tmp36_2_esp,
sensor_data->hih4030_esp,
sensor_data->hh10d_esp,
sensor_data->dht22_t_mkr, sensor_data->dht22_h_mkr,
sensor_data->sht85_t_mkr, sensor_data->sht85_h_mkr,
sensor_data->hih8121_t_mkr, sensor_data->hih8121_h_mkr,
sensor_data->hh10d_mkr,
sensor_data->dht22_t_wis, sensor_data->dht22_h_wis,
sensor_data->sht85_t_wis, sensor_data->sht85_h_wis,
sensor_data->hih8121_t_wis, sensor_data->hih8121_h_wis,
sensor_data->tmp102_wis,
sensor_data->hh10d_wis
);
}
void filter_query(char *q) {
char *pchr;
const char nanstr[] = "\"NaN\"";
while((pchr = strstr(q, "nan"))) {
memmove(&pchr[5], &pchr[3], strlen(pchr)+1);
memcpy(pchr, nanstr, sizeof(nanstr)-1);
}
while(pchr = strchr(q, '-')) {
memmove(pchr, &pchr[1], strlen(pchr));
}
}
void packet_encode(
const uint8_t *app_key,
const uint8_t dev_id,
......@@ -490,6 +354,8 @@ uint8_t packet_decode(
memcpy(payload, &packet[p_len], *payload_length);
p_len += *payload_length;
printf("payload_length = %d , calc = %d, recv = %d\n", *payload_length, p_len, packet_length);
return p_len == packet_length;
}
......@@ -520,7 +386,7 @@ void send_utc(gcom_ch_t *gch) {
gch->app_key,
gch->dev_id,
GATEWAY_PROTOCOL_PACKET_TYPE_TIME_SEND,
sizeof(tv.tv_sec), (uint8_t *)&tv.tv_sec,
sizeof(uint32_t), (uint8_t *)&tv.tv_sec,
&buf_len, buf
);
......@@ -528,12 +394,7 @@ void send_utc(gcom_ch_t *gch) {
}
int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) {
int ret;
for(uint8_t i = 0; i < pck_size; i++) {
printf("%02X : ", pck[i]);
}
printf("\n");
int ret, i;
if (sendto(gch->server_desc, (char *)pck, pck_size, 0, (struct sockaddr *)&gch->client, gch->sock_len) < 0) {
perror("sendto error");
......@@ -542,14 +403,11 @@ int send_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t pck_size) {
}
int recv_gcom_ch(gcom_ch_t *gch, uint8_t *pck, uint8_t *pck_length, uint16_t pck_size) {
int i;
if ((*pck_length = recvfrom(gch->server_desc, (char *)pck, pck_size, MSG_WAITALL, (struct sockaddr *)&gch->client, &gch->sock_len)) < 0) {
perror("socket receive error");
}
for(uint8_t i = 0; i < *pck_length; i++) {
printf("%02X : ", pck[i]);
}
printf("\n");
}
/* connection handler for multithreading version */
#ifdef MULTITHREADING_VER
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment