提交 592f2109 编写于 作者: weixin_48148422's avatar weixin_48148422

update java connector

上级 051af216
......@@ -135,7 +135,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm
* Signature: (Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;JI)J
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp
(JNIEnv *, jobject, jstring, jstring, jstring, jstring, jstring, jlong, jint);
(JNIEnv *, jobject, jlong, jboolean, jstring, jstring, jint);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
......@@ -143,7 +143,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp
* Signature: (J)Lcom/taosdata/jdbc/TSDBResultSetRowData;
*/
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
(JNIEnv *, jobject, jlong);
(JNIEnv *, jobject, jlong, jint);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
......@@ -151,7 +151,7 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp
* Signature: (J)V
*/
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp
(JNIEnv *, jobject, jlong);
(JNIEnv *, jobject, jlong, jboolean);
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
......
......@@ -20,6 +20,7 @@
#include "tscJoinProcess.h"
#include "tsclient.h"
#include "tscUtil.h"
#include "ttime.h"
int __init = 0;
......@@ -514,92 +515,42 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeConnectionIm
}
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jstring jhost,
jstring juser, jstring jpass, jstring jdb,
jstring jtable, jlong jtime,
jint jperiod) {
TAOS_SUB *tsub;
jlong sub = 0;
char * host = NULL;
char * user = NULL;
char * pass = NULL;
char * db = NULL;
char * table = NULL;
int64_t time = 0;
int period = 0;
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_subscribeImp(JNIEnv *env, jobject jobj, jlong con,
jboolean restart, jstring jtopic, jstring jsql, jint jinterval) {
jlong sub = 0;
TAOS *taos = (TAOS *)con;
char *topic = NULL;
char *sql = NULL;
jniGetGlobalMethod(env);
jniTrace("jobj:%p, in TSDBJNIConnector_subscribeImp", jobj);
if (jhost != NULL) {
host = (char *)(*env)->GetStringUTFChars(env, jhost, NULL);
}
if (juser != NULL) {
user = (char *)(*env)->GetStringUTFChars(env, juser, NULL);
}
if (jpass != NULL) {
pass = (char *)(*env)->GetStringUTFChars(env, jpass, NULL);
}
if (jdb != NULL) {
db = (char *)(*env)->GetStringUTFChars(env, jdb, NULL);
}
if (jtable != NULL) {
table = (char *)(*env)->GetStringUTFChars(env, jtable, NULL);
if (jtopic != NULL) {
topic = (char *)(*env)->GetStringUTFChars(env, jtopic, NULL);
}
time = (int64_t)jtime;
period = (int)jperiod;
if (user == NULL) {
jniTrace("jobj:%p, user is null, use tsDefaultUser", jobj);
user = tsDefaultUser;
}
if (pass == NULL) {
jniTrace("jobj:%p, pass is null, use tsDefaultPass", jobj);
pass = tsDefaultPass;
if (jsql != NULL) {
sql = (char *)(*env)->GetStringUTFChars(env, jsql, NULL);
}
jniTrace("jobj:%p, host:%s, user:%s, pass:%s, db:%s, table:%s, time:%d, period:%d", jobj, host, user, pass, db, table,
time, period);
tsub = taos_subscribe(host, user, pass, db, table, time, period);
TAOS_SUB *tsub = taos_subscribe(taos, (int)restart, topic, sql, NULL, NULL, jinterval);
sub = (jlong)tsub;
if (sub == 0) {
jniTrace("jobj:%p, failed to subscribe to db:%s, table:%s", jobj, db, table);
jniTrace("jobj:%p, failed to subscribe: topic:%s", jobj, jtopic);
} else {
jniTrace("jobj:%p, successfully subscribe to db:%s, table:%s, sub:%ld, tsub:%p", jobj, db, table, sub, tsub);
jniTrace("jobj:%p, successfully subscribe: topic: %s", jobj, jtopic);
}
if (host != NULL) (*env)->ReleaseStringUTFChars(env, jhost, host);
if (user != NULL && user != tsDefaultUser) (*env)->ReleaseStringUTFChars(env, juser, user);
if (pass != NULL && pass != tsDefaultPass) (*env)->ReleaseStringUTFChars(env, jpass, pass);
if (db != NULL) (*env)->ReleaseStringUTFChars(env, jdb, db);
if (table != NULL) (*env)->ReleaseStringUTFChars(env, jtable, table);
if (topic != NULL) (*env)->ReleaseStringUTFChars(env, jtopic, topic);
if (sql != NULL) (*env)->ReleaseStringUTFChars(env, jsql, sql);
return sub;
}
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub) {
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub);
TAOS_SUB * tsub = (TAOS_SUB *)sub;
TAOS_ROW row = taos_consume(tsub);
TAOS_FIELD *fields = taos_fetch_subfields(tsub);
int num_fields = taos_subfields_count(tsub);
jniGetGlobalMethod(env);
jniTrace("jobj:%p, check fields:%p, num_fields=%d", jobj, fields, num_fields);
static jobject convert_one_row(JNIEnv *env, TAOS_ROW row, TAOS_FIELD* fields, int num_fields) {
jobject rowobj = (*env)->NewObject(env, g_rowdataClass, g_rowdataConstructor, num_fields);
jniTrace("created a rowdata object, rowobj:%p", rowobj);
if (row == NULL) {
jniTrace("jobj:%p, tsub:%p, fields size is %d, fetch row to the end", jobj, tsub, num_fields);
return NULL;
}
char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
for (int i = 0; i < num_fields; i++) {
if (row[i] == NULL) {
continue;
......@@ -634,6 +585,7 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI
}
break;
case TSDB_DATA_TYPE_BINARY: {
char tmp[TSDB_MAX_BYTES_PER_ROW] = {0};
strncpy(tmp, row[i], (size_t) fields[i].bytes); // handle the case that terminated does not exist
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetStringFp, i, (*env)->NewStringUTF(env, tmp));
......@@ -642,7 +594,7 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI
}
case TSDB_DATA_TYPE_NCHAR:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetByteArrayFp, i,
jniFromNCharToByteArray(env, (char*)row[i], fields[i].bytes));
jniFromNCharToByteArray(env, (char*)row[i], fields[i].bytes));
break;
case TSDB_DATA_TYPE_TIMESTAMP:
(*env)->CallVoidMethod(env, rowobj, g_rowdataSetTimestampFp, i, (jlong) * ((int64_t *)row[i]));
......@@ -651,13 +603,56 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNI
break;
}
}
jniTrace("jobj:%p, rowdata retrieved, rowobj:%p", jobj, rowobj);
return rowobj;
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub) {
JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_consumeImp(JNIEnv *env, jobject jobj, jlong sub, jint timeout) {
jniTrace("jobj:%p, in TSDBJNIConnector_consumeImp, sub:%ld", jobj, sub);
jniGetGlobalMethod(env);
TAOS_SUB *tsub = (TAOS_SUB *)sub;
jobject rows = (*env)->NewObject(env, g_arrayListClass, g_arrayListConstructFp);
int64_t start = taosGetTimestampMs();
int count = 0;
while (true) {
TAOS_RES * res = taos_consume(tsub);
if (res == NULL) {
jniError("jobj:%p, tsub:%p, taos_consume returns NULL", jobj, tsub);
return NULL;
}
TAOS_FIELD *fields = taos_fetch_fields(res);
int num_fields = taos_num_fields(res);
while (true) {
TAOS_ROW row = taos_fetch_row(res);
if (row == NULL) {
break;
}
jobject rowobj = convert_one_row(env, row, fields, num_fields);
(*env)->CallBooleanMethod(env, rows, g_arrayListAddFp, rowobj);
count++;
}
if (count > 0) {
break;
}
if (timeout == -1) {
continue;
}
if (((int)(taosGetTimestampMs() - start)) >= timeout) {
jniTrace("jobj:%p, sub:%ld, timeout", jobj, sub);
break;
}
}
return rows;
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_unsubscribeImp(JNIEnv *env, jobject jobj, jlong sub, jboolean keepProgress) {
TAOS_SUB *tsub = (TAOS_SUB *)sub;
taos_unsubscribe(tsub);
taos_unsubscribe(tsub, keepProgress);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_validateCreateTableSqlImp(JNIEnv *env, jobject jobj,
......
......@@ -35,6 +35,7 @@ typedef struct SSubscriptionProgress {
typedef struct SSub {
char topic[32];
int64_t lastSyncTime;
int64_t lastConsumeTime;
void * signature;
TAOS * taos;
void * pTimer;
......@@ -128,6 +129,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSub->pSql = pSql;
pSub->signature = pSub;
strncpy(pSub->topic, topic, sizeof(pSub->topic));
pSub->topic[sizeof(pSub->topic) - 1] = 0;
return pSub;
failed:
......@@ -294,7 +296,7 @@ void tscSaveSubscriptionProgress(void* sub) {
fclose(fp);
}
TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) {
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) {
STscObj* pObj = (STscObj*)taos;
if (pObj == NULL || pObj->signature != pObj) {
globalCode = TSDB_CODE_DISCONNECTED;
......@@ -319,11 +321,11 @@ TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char
return NULL;
}
pSub->interval = interval;
if (fp != NULL) {
pSub->fp = fp;
pSub->interval = interval;
pSub->param = param;
taosTmrReset(tscProcessSubscriptionTimer, 0, pSub, tscTmr, &pSub->pTimer);
taosTmrReset(tscProcessSubscriptionTimer, interval, pSub, tscTmr, &pSub->pTimer);
}
return pSub;
......@@ -338,7 +340,14 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlObj* pSql = pSub->pSql;
SSqlRes *pRes = &pSql->res;
if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 1000) {
if (pSub->pTimer == NULL) {
int duration = (int)(taosGetTimestampMs() - pSub->lastConsumeTime);
if (duration < pSub->interval) {
taosMsleep(pSub->interval - (int32_t)duration);
}
}
if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) {
char* sqlstr = pSql->sqlstr;
pSql->sqlstr = NULL;
taos_free_result_imp(pSql, 0);
......@@ -356,11 +365,14 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->cmd.type = type;
}
tscDoQuery(pSql);
if (pRes->code != TSDB_CODE_SUCCESS) {
tscRemoveFromSqlList(pSql);
return NULL;
}
pSub->lastConsumeTime = taosGetTimestampMs();
return pSql;
}
......
......@@ -117,7 +117,7 @@ DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RE
DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param);
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval);
DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
......
......@@ -75,9 +75,9 @@ int main(int argc, char *argv[]) {
}
if (async) {
tsub = taos_subscribe(topic, restart, taos, sql, subscribe_callback, NULL, 1000);
tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000);
} else {
tsub = taos_subscribe(topic, restart, taos, sql, NULL, NULL, 0);
tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}
if (tsub == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册