提交 60416be4 编写于 作者: S Shengliang Guan

TD-1310

上级 b87ad419
...@@ -122,11 +122,14 @@ ...@@ -122,11 +122,14 @@
# number of replications, for cluster only # number of replications, for cluster only
# replica 1 # replica 1
# mqtt uri # mqtt hostname
# mqttBrokerAddress mqtt://username:password@hostname:1883/taos/ # mqttHostName test.mosquitto.org
# mqtt client name # mqtt port
# mqttBrokerClientId taos_mqtt # mqttPort 1883
# mqtt topic
# mqttTopic /weather/loop
# the compressed rpc message, option: # the compressed rpc message, option:
# -1 (no compression) # -1 (no compression)
......
...@@ -104,8 +104,12 @@ extern int32_t tsTelegrafUseFieldNum; ...@@ -104,8 +104,12 @@ extern int32_t tsTelegrafUseFieldNum;
// mqtt // mqtt
extern int32_t tsEnableMqttModule; extern int32_t tsEnableMqttModule;
extern char tsMqttBrokerAddress[]; extern char tsMqttHostName[];
extern char tsMqttBrokerClientId[]; extern char tsMqttPort[];
extern char tsMqttUser[];
extern char tsMqttPass[];
extern char tsMqttClientId[];
extern char tsMqttTopic[];
// monitor // monitor
extern int32_t tsEnableMonitorModule; extern int32_t tsEnableMonitorModule;
......
...@@ -137,8 +137,12 @@ int32_t tsTelegrafUseFieldNum = 0; ...@@ -137,8 +137,12 @@ int32_t tsTelegrafUseFieldNum = 0;
// mqtt // mqtt
int32_t tsEnableMqttModule = 0; // not finished yet, not started it by default int32_t tsEnableMqttModule = 0; // not finished yet, not started it by default
char tsMqttBrokerAddress[128] = {0}; char tsMqttHostName[TSDB_MQTT_HOSTNAME_LEN] = "test.mosquitto.org";
char tsMqttBrokerClientId[128] = {0}; char tsMqttPort[TSDB_MQTT_PORT_LEN] = "1883";
char tsMqttUser[TSDB_MQTT_USER_LEN] = {0};
char tsMqttPass[TSDB_MQTT_PASS_LEN] = {0};
char tsMqttClientId[TSDB_MQTT_CLIENT_ID_LEN] = "TDengineMqttSubscriber";
char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/weather/loop";
// monitor // monitor
int32_t tsEnableMonitorModule = 1; int32_t tsEnableMonitorModule = 1;
...@@ -767,26 +771,36 @@ static void doInitGlobalConfig(void) { ...@@ -767,26 +771,36 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "mqttBrokerAddress"; cfg.option = "mqttHostName";
cfg.ptr = tsMqttBrokerAddress; cfg.ptr = tsMqttHostName;
cfg.valType = TAOS_CFG_VTYPE_STRING; cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0; cfg.minValue = 0;
cfg.maxValue = 0; cfg.maxValue = 0;
cfg.ptrLength = 126; cfg.ptrLength = TSDB_MQTT_HOSTNAME_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "mqttBrokerClientId"; cfg.option = "mqttPort";
cfg.ptr = tsMqttBrokerClientId; cfg.ptr = tsMqttPort;
cfg.valType = TAOS_CFG_VTYPE_STRING; cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0; cfg.minValue = 0;
cfg.maxValue = 0; cfg.maxValue = 0;
cfg.ptrLength = 126; cfg.ptrLength = TSDB_MQTT_PORT_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "mqttTopic";
cfg.ptr = tsMqttTopic;
cfg.valType = TAOS_CFG_VTYPE_STRING;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT;
cfg.minValue = 0;
cfg.maxValue = 0;
cfg.ptrLength = TSDB_MQTT_TOPIC_LEN;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "compressMsgSize"; cfg.option = "compressMsgSize";
cfg.ptr = &tsCompressMsgSize; cfg.ptr = &tsCompressMsgSize;
cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.valType = TAOS_CFG_VTYPE_INT32;
......
...@@ -272,6 +272,13 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -272,6 +272,13 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_SHOW_SQL_LEN 64 #define TSDB_SHOW_SQL_LEN 64
#define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_MQTT_HOSTNAME_LEN 64
#define TSDB_MQTT_PORT_LEN 8
#define TSDB_MQTT_USER_LEN 24
#define TSDB_MQTT_PASS_LEN 24
#define TSDB_MQTT_TOPIC_LEN 64
#define TSDB_MQTT_CLIENT_ID_LEN 32
#define TSDB_METER_STATE_OFFLINE 0 #define TSDB_METER_STATE_OFFLINE 0
#define TSDB_METER_STATE_ONLLINE 1 #define TSDB_METER_STATE_ONLLINE 1
......
...@@ -37,12 +37,6 @@ extern "C" { ...@@ -37,12 +37,6 @@ extern "C" {
* \ref mqttReconnectClient is called, this instance will be passed. * \ref mqttReconnectClient is called, this instance will be passed.
*/ */
struct reconnect_state_t { struct reconnect_state_t {
char* hostname;
char* port;
char* topic;
char* client_id;
char* user_name;
char* password;
uint8_t* sendbuf; uint8_t* sendbuf;
size_t sendbufsz; size_t sendbufsz;
uint8_t* recvbuf; uint8_t* recvbuf;
......
...@@ -30,123 +30,58 @@ ...@@ -30,123 +30,58 @@
#include "tsocket.h" #include "tsocket.h"
#include "ttimer.h" #include "ttimer.h"
#include "mqttSystem.h" #include "mqttSystem.h"
struct mqtt_client mqttClient = {0};
pthread_t clientDaemonThread = {0};
void* mqttConnect=NULL;
struct reconnect_state_t recntStatus = {0};
char* topicPath=NULL;
int mttIsRuning = 1;
int32_t mqttInitSystem() { #define MQTT_SEND_BUF_SIZE 102400
int rc = 0; #define MQTT_RECV_BUF_SIZE 102400
#if 0
uint8_t sendbuf[2048];
uint8_t recvbuf[1024];
recntStatus.sendbuf = sendbuf;
recntStatus.sendbufsz = sizeof(sendbuf);
recntStatus.recvbuf = recvbuf;
recntStatus.recvbufsz = sizeof(recvbuf);
char* url = tsMqttBrokerAddress;
recntStatus.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL;
char * passStr = strstr(url, recntStatus.user_name);
if (passStr != NULL) {
recntStatus.password = strstr(url, "@") != NULL ? strbetween(passStr, ":", "@") : NULL;
}
if (strlen(url) == 0) { struct mqtt_client tsMqttClient = {0};
mqttDebug("mqtt module not init, url is null"); struct reconnect_state_t tsMqttStatus = {0};
return rc; static pthread_t tsMqttClientDaemonThread = {0};
} static void* tsMqttConnect = NULL;
static bool mqttIsRuning = false;
if (strstr(url, "@") != NULL) { void mqttPublishCallback(void** unused, struct mqtt_response_publish* published);
recntStatus.hostname = strbetween(url, "@", ":"); void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon);
} else if (strstr(strstr(url, "://") + 3, ":") != NULL) { void* mqttClientRefresher(void* client);
recntStatus.hostname = strbetween(url, "//", ":");
} else { int32_t mqttInitSystem() { return 0; }
recntStatus.hostname = strbetween(url, "//", "/");
}
char* _begin_hostname = strstr(url, recntStatus.hostname); int32_t mqttStartSystem() {
if (_begin_hostname != NULL && strstr(_begin_hostname, ":") != NULL) { tsMqttStatus.sendbufsz = MQTT_SEND_BUF_SIZE;
recntStatus.port = strbetween(_begin_hostname, ":", "/"); tsMqttStatus.recvbufsz = MQTT_RECV_BUF_SIZE;
} else { tsMqttStatus.sendbuf = malloc(MQTT_SEND_BUF_SIZE);
recntStatus.port = strbetween("'1883'", "'", "'"); tsMqttStatus.recvbuf = malloc(MQTT_RECV_BUF_SIZE);
} mqttIsRuning = true;
char* portStr = recntStatus.hostname; mqtt_init_reconnect(&tsMqttClient, mqttReconnectClient, &tsMqttStatus, mqttPublishCallback);
if (_begin_hostname != NULL) { if (pthread_create(&tsMqttClientDaemonThread, NULL, mqttClientRefresher, &tsMqttClient)) {
char* colonStr = strstr(_begin_hostname, ":"); mqttError("mqtt client failed to start daemon.");
if (colonStr != NULL) { mqttCleanupRes(EXIT_FAILURE, -1, NULL);
portStr = recntStatus.port; return -1;
}
} }
char* topicStr = strstr(url, portStr); mqttInfo("mqtt client listening for %s messages", tsMqttTopic);
if (topicStr != NULL) { return 0;
topicPath = strbetween(topicStr, "/", "/");
char* _topic = "+/+/+/";
int _tpsize = strlen(topicPath) + strlen(_topic) + 1;
recntStatus.topic = calloc(1, _tpsize);
sprintf(recntStatus.topic, "/%s/%s", topicPath, _topic);
recntStatus.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt";
mqttConnect = NULL;
} else {
topicPath = NULL;
}
#endif
return rc;
} }
int32_t mqttStartSystem() { void mqttStopSystem() {
int rc = 0; if (mqttIsRuning) {
#if 0 mqttIsRuning = false;
if (recntStatus.user_name != NULL && recntStatus.password != NULL) { tsMqttClient.error = MQTT_ERROR_SOCKET_ERROR;
mqttInfo("connecting to mqtt://%s:%s@%s:%s/%s/", recntStatus.user_name, recntStatus.password,
recntStatus.hostname, recntStatus.port, topicPath);
} else if (recntStatus.user_name != NULL && recntStatus.password == NULL) {
mqttInfo("connecting to mqtt://%s@%s:%s/%s/", recntStatus.user_name, recntStatus.hostname, recntStatus.port,
topicPath);
}
mqtt_init_reconnect(&mqttClient, mqttReconnectClient, &recntStatus, mqtt_PublishCallback); taosMsleep(300);
if (pthread_create(&clientDaemonThread, NULL, mqttClientRefresher, &mqttClient)) { mqttCleanupRes(EXIT_SUCCESS, tsMqttClient.socketfd, &tsMqttClientDaemonThread);
mqttError("Failed to start client daemon.");
mqttCleanup(EXIT_FAILURE, -1, NULL);
rc = -1;
} else {
mqttInfo("listening for '%s' messages.", recntStatus.topic);
}
#endif
return rc;
}
void mqttStopSystem() { mqttInfo("mqtt is stopped");
#if 0 }
mqttClient.error = MQTT_ERROR_SOCKET_ERROR;
mttIsRuning = 0;
usleep(300000U);
mqttCleanup(EXIT_SUCCESS, mqttClient.socketfd, &clientDaemonThread);
mqttInfo("mqtt is stoped");
#endif
} }
void mqttCleanUpSystem() { void mqttCleanUpSystem() {
#if 0 mqttStopSystem();
mqttInfo("starting to cleanup mqtt");
free(recntStatus.user_name);
free(recntStatus.password);
free(recntStatus.hostname);
free(recntStatus.port);
free(recntStatus.topic);
free(topicPath);
mqttInfo("mqtt is cleaned up"); mqttInfo("mqtt is cleaned up");
#endif
} }
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) { void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) {
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */ /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
char* topic_name = (char*)malloc(published->topic_name_size + 1); char* topic_name = (char*)malloc(published->topic_name_size + 1);
memcpy(topic_name, published->topic_name, published->topic_name_size); memcpy(topic_name, published->topic_name, published->topic_name_size);
...@@ -155,28 +90,29 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published ...@@ -155,28 +90,29 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
char _token[128] = {0}; char _token[128] = {0};
char _dbname[128] = {0}; char _dbname[128] = {0};
char _tablename[128] = {0}; char _tablename[128] = {0};
if (mqttConnect == NULL) { if (tsMqttConnect == NULL) {
mqttInfo("connect database"); mqttInfo("connect database");
taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &mqttClient, &mqttConnect); taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &tsMqttClient, &tsMqttConnect);
} }
if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) { if (topic_name[1] == '/' && strncmp((char*)&topic_name[1], tsMqttTopic, strlen(tsMqttTopic)) == 0) {
char* p_p_cmd_part[5] = {0}; char* p_p_cmd_part[5] = {0};
char copystr[1024] = {0}; char copystr[1024] = {0};
strncpy(copystr, topic_name, MIN(1024, published->topic_name_size)); strncpy(copystr, topic_name, MIN(1024, published->topic_name_size));
char part_index = split(copystr, "/", p_p_cmd_part, 10); char part_index = split(copystr, "/", p_p_cmd_part, 10);
if (part_index < 4) { if (part_index < 4) {
mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'", topic_name); mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'",
topic_name);
} else { } else {
strncpy(_token, p_p_cmd_part[1], 127); strncpy(_token, p_p_cmd_part[1], 127);
strncpy(_dbname, p_p_cmd_part[2], 127); strncpy(_dbname, p_p_cmd_part[2], 127);
strncpy(_tablename, p_p_cmd_part[3], 127); strncpy(_tablename, p_p_cmd_part[3], 127);
mqttInfo("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, mqttInfo("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
_tablename); _tablename);
if (mqttConnect != NULL) { if (tsMqttConnect != NULL) {
char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename); char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename);
mqttInfo("query:%s", _sql); mqttInfo("query:%s", _sql);
taos_query_a(mqttConnect, _sql, mqttQueryInsertCallback, &mqttClient); taos_query_a(tsMqttConnect, _sql, mqttQueryInsertCallback, &tsMqttClient);
mqttInfo("free sql:%s", _sql); mqttInfo("free sql:%s", _sql);
free(_sql); free(_sql);
} }
...@@ -186,27 +122,31 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published ...@@ -186,27 +122,31 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
} }
void* mqttClientRefresher(void* client) { void* mqttClientRefresher(void* client) {
while (mttIsRuning) { while (mqttIsRuning) {
mqtt_sync((struct mqtt_client*)client); mqtt_sync((struct mqtt_client*)client);
taosMsleep(100); taosMsleep(100);
} }
mqttDebug("quit refresher");
mqttDebug("mqtt client quit refresher");
return NULL; return NULL;
} }
void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) { void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon) {
#if 0
mqttInfo("clean up mqtt module"); mqttInfo("clean up mqtt module");
if (sockfd != -1) close(sockfd); if (sockfd != -1) {
if (client_daemon != NULL) pthread_cancel(*client_daemon); close(sockfd);
#endif }
if (client_daemon != NULL) {
pthread_cancel(*client_daemon);
}
} }
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) {
if (code < 0) { if (code < 0) {
mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code));
taos_close(mqttConnect); taos_close(tsMqttConnect);
mqttConnect = NULL; tsMqttConnect = NULL;
return; return;
} }
mqttDebug("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code)); mqttDebug("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code));
...@@ -222,36 +162,28 @@ void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) { ...@@ -222,36 +162,28 @@ void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) {
} }
} }
void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr) { void mqttReconnectClient(struct mqtt_client* client, void** unused) {
mqttInfo("reconnect client"); mqttInfo("mqtt client tries to connect to the server");
struct reconnect_state_t* reconnect_state = *((struct reconnect_state_t**)reconnect_state_vptr);
/* Close the clients socket if this isn't the initial reconnect call */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
close(client->socketfd); close(client->socketfd);
} }
/* Perform error handling here. */
if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { if (client->error != MQTT_ERROR_INITIAL_RECONNECT) {
mqttError("mqttReconnectClient: called while client was in error state \"%s\"", mqtt_error_str(client->error)); mqttError("mqtt client was in error state %s", mqtt_error_str(client->error));
} }
/* Open a new socket. */ int sockfd = open_nb_socket("test.mosquitto.org", "1883");
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port); if (sockfd < 0) {
if (sockfd == -1) { mqttError("mqtt client failed to open socket %s:%s", tsMqttHostName, tsMqttPort);
mqttError("failed to open socket: "); mqttCleanupRes(EXIT_FAILURE, sockfd, NULL);
mqttCleanup(EXIT_FAILURE, sockfd, NULL);
} }
/* Reinitialize the client. */ // mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz);
mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf, // mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400);
reconnect_state->recvbufsz); // mqtt_subscribe(client, tsMqttTopic, 0);
/* Ensure we have a clean session */
uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION;
/* Send connection request to the broker. */
mqtt_connect(client, reconnect_state->client_id, NULL, NULL, 0, reconnect_state->user_name, reconnect_state->password,connect_flags, 400);
/* Subscribe to the topic. */ mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz);
mqtt_subscribe(client, reconnect_state->topic, 0); mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, NULL, NULL, MQTT_CONNECT_CLEAN_SESSION, 400);
mqtt_subscribe(client, "datetime", 0);
} }
\ No newline at end of file
...@@ -7,6 +7,5 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) ...@@ -7,6 +7,5 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC)
IF (TD_LINUX) IF (TD_LINUX)
ADD_LIBRARY(twal ${SRC}) ADD_LIBRARY(twal ${SRC})
TARGET_LINK_LIBRARIES(twal tutil common) TARGET_LINK_LIBRARIES(twal tutil common)
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)
ENDIF () ENDIF ()
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100000
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode1 -c http -v 1
system sh/cfg.sh -n dnode1 -c mqttBrokerAddress -v mqtt://test.mosquitto.org:1883/#
system sh/cfg.sh -n dnode1 -c mqttBrokerClientId -v taosmqtt
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册