提交 23252749 编写于 作者: L liuyao

Merge branch 'enh/triggerCheckPoint2' of https://github.com/taosdata/TDengine...

Merge branch 'enh/triggerCheckPoint2' of https://github.com/taosdata/TDengine into enh/triggerCheckPoint2
......@@ -745,9 +745,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -133,6 +133,7 @@ void mndCleanupStream(SMnode *pMnode) {
taosArrayDestroy(execNodeList.pTaskList);
taosHashCleanup(execNodeList.pTaskMap);
taosThreadMutexDestroy(&execNodeList.lock);
mDebug("mnd stream cleanup");
}
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
......@@ -2279,12 +2280,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
doExtractTasksFromStream(pMnode);
}
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
for(int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
pStatusEntry->status = p->status;
if (p->status != TASK_STATUS__NORMAL) {
mDebug("received s-task:0x%x no in ready stat:%s", p->taskId, streamGetTaskStatusStr(p->status));
......
......@@ -1890,6 +1890,9 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
taosWLockLatch(&pMeta->lock);
streamSetStatusNormal(pTask);
streamMetaSaveTask(pMeta, pTask);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
taosWUnLockLatch(&pMeta->lock);
}
streamTaskStop(pTask);
......@@ -1905,25 +1908,27 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// all tasks are closed, now let's restart the stream meta
if (pMeta->closedTask == numOfCount) {
tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId);
if (streamMetaCommit(pMeta) < 0) {
// if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
// }
restartTasks = true;
pMeta->closedTask = 0; // reset value
} else {
tqDebug("vgId:%d closed tasks:%d, not closed:%d", vgId, pMeta->closedTask, (numOfCount - pMeta->closedTask));
}
taosWUnLockLatch(&pMeta->lock);
_end:
tDecoderClear(&decoder);
tmsgSendRsp(&rsp);
// tmsgSendRsp(&rsp);
if (restartTasks) {
tqDebug("vgId:%d all tasks are stopped, restart them", vgId);
taosWLockLatch(&pMeta->lock);
terrno = 0;
int32_t code = streamMetaReopen(pTq->pStreamMeta, 0);
int32_t code = streamMetaReopen(pMeta, 0);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
taosWUnLockLatch(&pMeta->lock);
......
......@@ -523,6 +523,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
case TDMT_VND_DROP_INDEX:
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
break;
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);
break;
case TDMT_VND_STREAM_TASK_UPDATE:
tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
break;
case TDMT_VND_COMPACT:
vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
goto _exit;
......@@ -678,10 +684,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_UPDATE:
return tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessStreamTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
default:
......
......@@ -790,14 +790,13 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (wrapper->pHandle[i]) {
rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
size_t len = 0;
char* name = rocksdb_column_family_handle_get_name(p, &len);
// char buf[64] = {0};
// memcpy(buf, name, len);
// qError("column name: name: %s, len: %d", buf, (int)len);
// taosMemoryFree(name);
taosArrayPush(pHandle, &p);
// size_t len = 0;
// char* name = rocksdb_column_family_handle_get_name(p, &len);
// char buf[64] = {0};
// memcpy(buf, name, len);
// qError("column name: name: %s, len: %d", buf, (int)len);
// taosMemoryFree(name);
}
}
taosThreadRwlockUnlock(&wrapper->rwLock);
......
......@@ -815,6 +815,8 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
SStreamChkptReadyInfo info = {.taskId = pInfo->taskId, .epset = pInfo->epSet};
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
info.msg.info.noResp = 1; // refactor later.
qDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d",
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.downstreamNodeId, index);
......
......@@ -56,6 +56,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
goto _err;
}
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
}
......@@ -64,6 +65,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
if (streamMetaBegin(pMeta) < 0) {
goto _err;
}
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) {
......@@ -77,10 +82,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
if (streamMetaBegin(pMeta) < 0) {
goto _err;
}
pMeta->walScanCounter = 0;
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
......@@ -618,22 +619,27 @@ void metaHbToMnode(void* param, void* tmrId) {
SEpSet epset = {0};
hbMsg.numOfTasks = numOfTasks;
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
SStreamId* pId = taosArrayGet(pMeta->pTaskList, i);
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
int64_t keys[2] = {pId->streamId, pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
if ((*pTask)->info.fillHistory == 1) {
continue;
}
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (i == 0) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
}
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
taosRUnLockLatch(&pMeta->lock);
int32_t code = 0;
......
......@@ -348,6 +348,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
......@@ -533,7 +534,7 @@ int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
EPSET_TO_STR(pEpSet, buf)
qDebug("s-task:0x%x (vgId:%d) epset is updated %s", pTask->id.taskId, nodeId, buf);
qDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
}
// check for the dispath info and the upstream task info
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册