提交 4b10393c 编写于 作者: H Haojun Liao

fix(stream): fix memory leak and do some internal refactor.

上级 cbd24bb2
......@@ -189,7 +189,7 @@ int32_t streamInit();
void streamCleanUp();
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* queue);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
......
......@@ -35,18 +35,17 @@ FAIL:
return NULL;
}
void streamQueueClose(SStreamQueue* queue) {
while (1) {
void* qItem = streamQueueNextItem(queue);
if (qItem) {
streamFreeQitem(qItem);
} else {
break;
}
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
void* qItem = NULL;
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
streamFreeQitem(qItem);
}
taosFreeQall(queue->qall);
taosCloseQueue(queue->queue);
taosMemoryFree(queue);
taosFreeQall(pQueue->qall);
taosCloseQueue(pQueue->queue);
taosMemoryFree(pQueue);
}
#if 0
......
......@@ -220,11 +220,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue);
streamQueueClose(pTask->inputQueue, pTask->id.taskId);
}
if (pTask->outputInfo.queue) {
streamQueueClose(pTask->outputInfo.queue);
streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId);
}
if (pTask->exec.qmsg) {
......@@ -255,6 +255,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
}
if (pTask->msgInfo.pData != NULL) {
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
}
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册