diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index a91dde8796bdca67f9abea027d1f31fb636ba86e..7be6e729622ed3201725aa42d38fe7896052b37c 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -122,11 +122,14 @@ # number of replications, for cluster only # replica 1 -# mqtt uri -# mqttBrokerAddress mqtt://username:password@hostname:1883/taos/ +# mqtt hostname +# mqttHostName test.mosquitto.org -# mqtt client name -# mqttBrokerClientId taos_mqtt +# mqtt port +# mqttPort 1883 + +# mqtt topic +# mqttTopic /weather/loop # the compressed rpc message, option: # -1 (no compression) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index ef0713c4152832aca87f2b768ddcf3789b581a71..fedafe5b022bdf54fb8439a10c7d6355485fef93 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -104,8 +104,12 @@ extern int32_t tsTelegrafUseFieldNum; // mqtt extern int32_t tsEnableMqttModule; -extern char tsMqttBrokerAddress[]; -extern char tsMqttBrokerClientId[]; +extern char tsMqttHostName[]; +extern char tsMqttPort[]; +extern char tsMqttUser[]; +extern char tsMqttPass[]; +extern char tsMqttClientId[]; +extern char tsMqttTopic[]; // monitor extern int32_t tsEnableMonitorModule; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 795585e5c9220dd209b85731b6278f1acf279aa5..9549ccf6075cb375b9e50c107b17140a2f8d75dc 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -137,8 +137,12 @@ int32_t tsTelegrafUseFieldNum = 0; // mqtt int32_t tsEnableMqttModule = 0; // not finished yet, not started it by default -char tsMqttBrokerAddress[128] = {0}; -char tsMqttBrokerClientId[128] = {0}; +char tsMqttHostName[TSDB_MQTT_HOSTNAME_LEN] = "test.mosquitto.org"; +char tsMqttPort[TSDB_MQTT_PORT_LEN] = "1883"; +char tsMqttUser[TSDB_MQTT_USER_LEN] = {0}; +char tsMqttPass[TSDB_MQTT_PASS_LEN] = {0}; +char tsMqttClientId[TSDB_MQTT_CLIENT_ID_LEN] = "TDengineMqttSubscriber"; +char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/weather/loop"; // monitor int32_t tsEnableMonitorModule = 1; @@ -767,26 +771,36 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "mqttBrokerAddress"; - cfg.ptr = tsMqttBrokerAddress; + cfg.option = "mqttHostName"; + cfg.ptr = tsMqttHostName; cfg.valType = TAOS_CFG_VTYPE_STRING; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.minValue = 0; cfg.maxValue = 0; - cfg.ptrLength = 126; + cfg.ptrLength = TSDB_MQTT_HOSTNAME_LEN; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "mqttBrokerClientId"; - cfg.ptr = tsMqttBrokerClientId; + cfg.option = "mqttPort"; + cfg.ptr = tsMqttPort; cfg.valType = TAOS_CFG_VTYPE_STRING; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.minValue = 0; cfg.maxValue = 0; - cfg.ptrLength = 126; + cfg.ptrLength = TSDB_MQTT_PORT_LEN; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - + + cfg.option = "mqttTopic"; + cfg.ptr = tsMqttTopic; + cfg.valType = TAOS_CFG_VTYPE_STRING; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.minValue = 0; + cfg.maxValue = 0; + cfg.ptrLength = TSDB_MQTT_TOPIC_LEN; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "compressMsgSize"; cfg.ptr = &tsCompressMsgSize; cfg.valType = TAOS_CFG_VTYPE_INT32; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index cd25ddcc5596b98e7ca93c15a9dcaf4d6b3a9608..1a40f3b56dcc82a89b66c0053d692bcbce57cd64 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -272,6 +272,13 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_SHOW_SQL_LEN 64 #define TSDB_SLOW_QUERY_SQL_LEN 512 +#define TSDB_MQTT_HOSTNAME_LEN 64 +#define TSDB_MQTT_PORT_LEN 8 +#define TSDB_MQTT_USER_LEN 24 +#define TSDB_MQTT_PASS_LEN 24 +#define TSDB_MQTT_TOPIC_LEN 64 +#define TSDB_MQTT_CLIENT_ID_LEN 32 + #define TSDB_METER_STATE_OFFLINE 0 #define TSDB_METER_STATE_ONLLINE 1 diff --git a/src/plugins/mqtt/inc/mqttInit.h b/src/plugins/mqtt/inc/mqttInit.h index 5dbd62789bd3d5c70e25863b9c24c028333f8b2f..af8c5069adb5ccd1d521f52b9cef06c8f64f0c08 100644 --- a/src/plugins/mqtt/inc/mqttInit.h +++ b/src/plugins/mqtt/inc/mqttInit.h @@ -37,12 +37,6 @@ extern "C" { * \ref mqttReconnectClient is called, this instance will be passed. */ struct reconnect_state_t { - char* hostname; - char* port; - char* topic; - char* client_id; - char* user_name; - char* password; uint8_t* sendbuf; size_t sendbufsz; uint8_t* recvbuf; diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 69810e27858040391e80fd39fea6af1ea8d4d96c..84dc5eea2a6eca1c88c9108580d272c80146598e 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -30,123 +30,58 @@ #include "tsocket.h" #include "ttimer.h" #include "mqttSystem.h" -struct mqtt_client mqttClient = {0}; -pthread_t clientDaemonThread = {0}; -void* mqttConnect=NULL; -struct reconnect_state_t recntStatus = {0}; -char* topicPath=NULL; -int mttIsRuning = 1; -int32_t mqttInitSystem() { - int rc = 0; -#if 0 - uint8_t sendbuf[2048]; - uint8_t recvbuf[1024]; - recntStatus.sendbuf = sendbuf; - recntStatus.sendbufsz = sizeof(sendbuf); - recntStatus.recvbuf = recvbuf; - recntStatus.recvbufsz = sizeof(recvbuf); - char* url = tsMqttBrokerAddress; - recntStatus.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL; - - char * passStr = strstr(url, recntStatus.user_name); - if (passStr != NULL) { - recntStatus.password = strstr(url, "@") != NULL ? strbetween(passStr, ":", "@") : NULL; - } +#define MQTT_SEND_BUF_SIZE 102400 +#define MQTT_RECV_BUF_SIZE 102400 - if (strlen(url) == 0) { - mqttDebug("mqtt module not init, url is null"); - return rc; - } +struct mqtt_client tsMqttClient = {0}; +struct reconnect_state_t tsMqttStatus = {0}; +static pthread_t tsMqttClientDaemonThread = {0}; +static void* tsMqttConnect = NULL; +static bool mqttIsRuning = false; - if (strstr(url, "@") != NULL) { - recntStatus.hostname = strbetween(url, "@", ":"); - } else if (strstr(strstr(url, "://") + 3, ":") != NULL) { - recntStatus.hostname = strbetween(url, "//", ":"); +void mqttPublishCallback(void** unused, struct mqtt_response_publish* published); +void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon); +void* mqttClientRefresher(void* client); - } else { - recntStatus.hostname = strbetween(url, "//", "/"); - } +int32_t mqttInitSystem() { return 0; } - char* _begin_hostname = strstr(url, recntStatus.hostname); - if (_begin_hostname != NULL && strstr(_begin_hostname, ":") != NULL) { - recntStatus.port = strbetween(_begin_hostname, ":", "/"); - } else { - recntStatus.port = strbetween("'1883'", "'", "'"); - } +int32_t mqttStartSystem() { + tsMqttStatus.sendbufsz = MQTT_SEND_BUF_SIZE; + tsMqttStatus.recvbufsz = MQTT_RECV_BUF_SIZE; + tsMqttStatus.sendbuf = malloc(MQTT_SEND_BUF_SIZE); + tsMqttStatus.recvbuf = malloc(MQTT_RECV_BUF_SIZE); + mqttIsRuning = true; - char* portStr = recntStatus.hostname; - if (_begin_hostname != NULL) { - char* colonStr = strstr(_begin_hostname, ":"); - if (colonStr != NULL) { - portStr = recntStatus.port; - } + mqtt_init_reconnect(&tsMqttClient, mqttReconnectClient, &tsMqttStatus, mqttPublishCallback); + if (pthread_create(&tsMqttClientDaemonThread, NULL, mqttClientRefresher, &tsMqttClient)) { + mqttError("mqtt client failed to start daemon."); + mqttCleanupRes(EXIT_FAILURE, -1, NULL); + return -1; } - char* topicStr = strstr(url, portStr); - if (topicStr != NULL) { - topicPath = strbetween(topicStr, "/", "/"); - char* _topic = "+/+/+/"; - int _tpsize = strlen(topicPath) + strlen(_topic) + 1; - recntStatus.topic = calloc(1, _tpsize); - sprintf(recntStatus.topic, "/%s/%s", topicPath, _topic); - recntStatus.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt"; - mqttConnect = NULL; - } else { - topicPath = NULL; - } - -#endif - return rc; + mqttInfo("mqtt client listening for %s messages", tsMqttTopic); + return 0; } -int32_t mqttStartSystem() { - int rc = 0; -#if 0 - if (recntStatus.user_name != NULL && recntStatus.password != NULL) { - mqttInfo("connecting to mqtt://%s:%s@%s:%s/%s/", recntStatus.user_name, recntStatus.password, - recntStatus.hostname, recntStatus.port, topicPath); - } else if (recntStatus.user_name != NULL && recntStatus.password == NULL) { - mqttInfo("connecting to mqtt://%s@%s:%s/%s/", recntStatus.user_name, recntStatus.hostname, recntStatus.port, - topicPath); - } +void mqttStopSystem() { + if (mqttIsRuning) { + mqttIsRuning = false; + tsMqttClient.error = MQTT_ERROR_SOCKET_ERROR; - mqtt_init_reconnect(&mqttClient, mqttReconnectClient, &recntStatus, mqtt_PublishCallback); - if (pthread_create(&clientDaemonThread, NULL, mqttClientRefresher, &mqttClient)) { - mqttError("Failed to start client daemon."); - mqttCleanup(EXIT_FAILURE, -1, NULL); - rc = -1; - } else { - mqttInfo("listening for '%s' messages.", recntStatus.topic); - } -#endif - return rc; -} + taosMsleep(300); + mqttCleanupRes(EXIT_SUCCESS, tsMqttClient.socketfd, &tsMqttClientDaemonThread); -void mqttStopSystem() { -#if 0 - mqttClient.error = MQTT_ERROR_SOCKET_ERROR; - mttIsRuning = 0; - usleep(300000U); - mqttCleanup(EXIT_SUCCESS, mqttClient.socketfd, &clientDaemonThread); - mqttInfo("mqtt is stoped"); -#endif + mqttInfo("mqtt is stopped"); + } } void mqttCleanUpSystem() { -#if 0 - mqttInfo("starting to cleanup mqtt"); - free(recntStatus.user_name); - free(recntStatus.password); - free(recntStatus.hostname); - free(recntStatus.port); - free(recntStatus.topic); - free(topicPath); + mqttStopSystem(); mqttInfo("mqtt is cleaned up"); -#endif } -void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) { +void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) { /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */ char* topic_name = (char*)malloc(published->topic_name_size + 1); memcpy(topic_name, published->topic_name, published->topic_name_size); @@ -155,28 +90,29 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published char _token[128] = {0}; char _dbname[128] = {0}; char _tablename[128] = {0}; - if (mqttConnect == NULL) { + if (tsMqttConnect == NULL) { mqttInfo("connect database"); - taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &mqttClient, &mqttConnect); + taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &tsMqttClient, &tsMqttConnect); } - if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) { + if (topic_name[1] == '/' && strncmp((char*)&topic_name[1], tsMqttTopic, strlen(tsMqttTopic)) == 0) { char* p_p_cmd_part[5] = {0}; char copystr[1024] = {0}; strncpy(copystr, topic_name, MIN(1024, published->topic_name_size)); char part_index = split(copystr, "/", p_p_cmd_part, 10); if (part_index < 4) { - mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'", topic_name); + mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'", + topic_name); } else { strncpy(_token, p_p_cmd_part[1], 127); strncpy(_dbname, p_p_cmd_part[2], 127); strncpy(_tablename, p_p_cmd_part[3], 127); mqttInfo("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, - _tablename); + _tablename); - if (mqttConnect != NULL) { + if (tsMqttConnect != NULL) { char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename); mqttInfo("query:%s", _sql); - taos_query_a(mqttConnect, _sql, mqttQueryInsertCallback, &mqttClient); + taos_query_a(tsMqttConnect, _sql, mqttQueryInsertCallback, &tsMqttClient); mqttInfo("free sql:%s", _sql); free(_sql); } @@ -186,27 +122,31 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published } void* mqttClientRefresher(void* client) { - while (mttIsRuning) { + while (mqttIsRuning) { mqtt_sync((struct mqtt_client*)client); taosMsleep(100); } - mqttDebug("quit refresher"); + + mqttDebug("mqtt client quit refresher"); return NULL; } -void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) { -#if 0 +void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon) { mqttInfo("clean up mqtt module"); - if (sockfd != -1) close(sockfd); - if (client_daemon != NULL) pthread_cancel(*client_daemon); -#endif + if (sockfd != -1) { + close(sockfd); + } + + if (client_daemon != NULL) { + pthread_cancel(*client_daemon); + } } void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { if (code < 0) { mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); - taos_close(mqttConnect); - mqttConnect = NULL; + taos_close(tsMqttConnect); + tsMqttConnect = NULL; return; } mqttDebug("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code)); @@ -222,36 +162,28 @@ void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) { } } -void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr) { - mqttInfo("reconnect client"); - struct reconnect_state_t* reconnect_state = *((struct reconnect_state_t**)reconnect_state_vptr); +void mqttReconnectClient(struct mqtt_client* client, void** unused) { + mqttInfo("mqtt client tries to connect to the server"); - /* Close the clients socket if this isn't the initial reconnect call */ if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { close(client->socketfd); } - /* Perform error handling here. */ if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { - mqttError("mqttReconnectClient: called while client was in error state \"%s\"", mqtt_error_str(client->error)); + mqttError("mqtt client was in error state %s", mqtt_error_str(client->error)); } - /* Open a new socket. */ - int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port); - if (sockfd == -1) { - mqttError("failed to open socket: "); - mqttCleanup(EXIT_FAILURE, sockfd, NULL); + int sockfd = open_nb_socket("test.mosquitto.org", "1883"); + if (sockfd < 0) { + mqttError("mqtt client failed to open socket %s:%s", tsMqttHostName, tsMqttPort); + mqttCleanupRes(EXIT_FAILURE, sockfd, NULL); } - /* Reinitialize the client. */ - mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf, - reconnect_state->recvbufsz); - - /* Ensure we have a clean session */ - uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION; - /* Send connection request to the broker. */ - mqtt_connect(client, reconnect_state->client_id, NULL, NULL, 0, reconnect_state->user_name, reconnect_state->password,connect_flags, 400); + // mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); + // mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); + // mqtt_subscribe(client, tsMqttTopic, 0); - /* Subscribe to the topic. */ - mqtt_subscribe(client, reconnect_state->topic, 0); + mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); + mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, NULL, NULL, MQTT_CONNECT_CLEAN_SESSION, 400); + mqtt_subscribe(client, "datetime", 0); } \ No newline at end of file diff --git a/src/wal/CMakeLists.txt b/src/wal/CMakeLists.txt index e7d5ae15108175a20aa2deaae797cb6777d1984f..359e09287a9409a7acffe27d934884cce0510bb0 100644 --- a/src/wal/CMakeLists.txt +++ b/src/wal/CMakeLists.txt @@ -7,6 +7,5 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) IF (TD_LINUX) ADD_LIBRARY(twal ${SRC}) TARGET_LINK_LIBRARIES(twal tutil common) - ADD_SUBDIRECTORY(test) ENDIF () diff --git a/tests/script/general/connection/mqtt.sim b/tests/script/general/connection/mqtt.sim new file mode 100644 index 0000000000000000000000000000000000000000..6533e414aa4bea8dfe3c73721b6d706adb31ce55 --- /dev/null +++ b/tests/script/general/connection/mqtt.sim @@ -0,0 +1,11 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 2 +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100000 +system sh/cfg.sh -n dnode1 -c http -v 1 +system sh/cfg.sh -n dnode1 -c http -v 1 +system sh/cfg.sh -n dnode1 -c mqttBrokerAddress -v mqtt://test.mosquitto.org:1883/# +system sh/cfg.sh -n dnode1 -c mqttBrokerClientId -v taosmqtt \ No newline at end of file