未验证 提交 a1d247af 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4306 from taosdata/feature/wal

Feature/wal
...@@ -13,8 +13,7 @@ ...@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
//#include <stdint.h> #define _DEFAULT_SOURCE
//#include <stdbool.h>
#include "os.h" #include "os.h"
#include "hash.h" #include "hash.h"
#include "tlog.h" #include "tlog.h"
...@@ -392,7 +391,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { ...@@ -392,7 +391,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen); int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen);
if (retLen == msgLen) { if (retLen == msgLen) {
sDebug("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
} else { } else {
sDebug("%s, failed to send forward ack, restart", pPeer->id); sDebug("%s, failed to send forward ack, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
...@@ -821,7 +820,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { ...@@ -821,7 +820,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
pthread_attr_destroy(&thattr); pthread_attr_destroy(&thattr);
if (ret != 0) { if (ret != 0) {
sError("%s, failed to create sync thread(%s)", pPeer->id, strerror(errno)); sError("%s, failed to create sync thread since %s", pPeer->id, strerror(errno));
syncDecPeerRef(pPeer); syncDecPeerRef(pPeer);
} else { } else {
pPeer->sstatus = TAOS_SYNC_STATUS_START; pPeer->sstatus = TAOS_SYNC_STATUS_START;
...@@ -891,7 +890,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { ...@@ -891,7 +890,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo * pFwdInfo; SFwdInfo * pFwdInfo;
sDebug("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
...@@ -910,7 +909,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -910,7 +909,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)cont; SWalHead * pHead = (SWalHead *)cont;
sDebug("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
// nodeVersion = pHead->version; // nodeVersion = pHead->version;
...@@ -1106,7 +1105,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1106,7 +1105,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
SFirstPkt firstPkt; SFirstPkt firstPkt;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("failed to read peer first pkt from ip:%s(%s)", ipstr, strerror(errno)); sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd); taosCloseSocket(connFd);
return; return;
} }
...@@ -1160,7 +1159,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1160,7 +1159,7 @@ static void syncProcessBrokenLink(void *param) {
if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return; if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); sDebug("%s, TCP link is broken since %s", pPeer->id, strerror(errno));
pPeer->peerFd = -1; pPeer->peerFd = -1;
if (syncDecPeerRef(pPeer) != 0) { if (syncDecPeerRef(pPeer) != 0) {
...@@ -1191,7 +1190,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { ...@@ -1191,7 +1190,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
pFwdInfo->time = time; pFwdInfo->time = time;
pSyncFwds->fwds++; pSyncFwds->fwds++;
sDebug("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); sTrace("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
} }
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
...@@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
if (confirm && pFwdInfo->confirmed == 0) { if (confirm && pFwdInfo->confirmed == 0) {
sDebug("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
(*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code); (*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code);
pFwdInfo->confirmed = 1; pFwdInfo->confirmed = 1;
} }
...@@ -1243,9 +1242,10 @@ static void syncMonitorNodeRole(void *param, void *tmrId) { ...@@ -1243,9 +1242,10 @@ static void syncMonitorNodeRole(void *param, void *tmrId) {
if (index == pNode->selfIndex) continue; if (index == pNode->selfIndex) continue;
SSyncPeer *pPeer = pNode->peerInfo[index]; SSyncPeer *pPeer = pNode->peerInfo[index];
if (pPeer->role <= TAOS_SYNC_ROLE_UNSYNCED || nodeRole <= TAOS_SYNC_ROLE_UNSYNCED) { if (pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue;
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId()); if (pPeer->sstatus > TAOS_SYNC_STATUS_INIT || nodeSStatus > TAOS_SYNC_STATUS_INIT) continue;
}
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId());
} }
pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
...@@ -1335,7 +1335,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1335,7 +1335,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen); int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) { if (retLen == fwdLen) {
sDebug("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
} else { } else {
sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "tutil.h"
#include "ttimer.h" #include "ttimer.h"
...@@ -126,7 +128,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -126,7 +128,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
} }
if (code < 0) { if (code < 0) {
sError("%s, failed to restore %s(%s)", pPeer->id, name, strerror(errno)); sError("%s, failed to restore %s since %s", pPeer->id, name, strerror(errno));
} }
return code; return code;
...@@ -154,7 +156,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -154,7 +156,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
if (ret < 0) break; if (ret < 0) break;
sDebug("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version); sTrace("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version);
if (lastVer == pHead->version) { if (lastVer == pHead->version) {
sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer); sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer);
...@@ -166,7 +168,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -166,7 +168,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
} }
if (code < 0) { if (code < 0) {
sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno)); sError("%s, failed to restore wal from syncFd:%d since %s", pPeer->id, pPeer->syncFd, strerror(errno));
} }
free(buffer); free(buffer);
...@@ -222,7 +224,7 @@ int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { ...@@ -222,7 +224,7 @@ int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
memcpy(pRecv->offset, pHead, len); memcpy(pRecv->offset, pHead, len);
pRecv->offset += len; pRecv->offset += len;
pRecv->forwards++; pRecv->forwards++;
sDebug("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards); sTrace("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards);
} else { } else {
sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize); sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize);
pRecv->code = -1; // set error code pRecv->code = -1; // set error code
......
...@@ -13,10 +13,8 @@ ...@@ -13,10 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdint.h> #define _DEFAULT_SOURCE
#include <stdbool.h>
#include <sys/inotify.h> #include <sys/inotify.h>
#include <unistd.h>
#include "os.h" #include "os.h"
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "tutil.h"
...@@ -34,7 +32,7 @@ static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { ...@@ -34,7 +32,7 @@ static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
pPeer->watchNum = 0; pPeer->watchNum = 0;
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) { if (pPeer->notifyFd < 0) {
sError("%s, failed to init inotify(%s)", pPeer->id, strerror(errno)); sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno));
return -1; return -1;
} }
...@@ -51,14 +49,14 @@ static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { ...@@ -51,14 +49,14 @@ static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
if (*wd >= 0) { if (*wd >= 0) {
if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) { if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) {
sError("%s, failed to remove wd:%d(%s)", pPeer->id, *wd, strerror(errno)); sError("%s, failed to remove wd:%d since %s", pPeer->id, *wd, strerror(errno));
return -1; return -1;
} }
} }
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_DELETE); *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_DELETE);
if (*wd == -1) { if (*wd == -1) {
sError("%s, failed to add %s(%s)", pPeer->id, name, strerror(errno)); sError("%s, failed to add %s since %s", pPeer->id, name, strerror(errno));
return -1; return -1;
} else { } else {
sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum); sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum);
...@@ -75,7 +73,7 @@ static int32_t syncAreFilesModified(SSyncPeer *pPeer) { ...@@ -75,7 +73,7 @@ static int32_t syncAreFilesModified(SSyncPeer *pPeer) {
char buf[2048]; char buf[2048];
int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno));
return -1; return -1;
} }
...@@ -161,7 +159,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -161,7 +159,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
} }
if (code < 0) { if (code < 0) {
sError("%s, failed to retrieve file(%s)", pPeer->id, strerror(errno)); sError("%s, failed to retrieve file since %s", pPeer->id, strerror(errno));
} }
return code; return code;
...@@ -201,7 +199,7 @@ static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) { ...@@ -201,7 +199,7 @@ static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
taosClose(pPeer->notifyFd); taosClose(pPeer->notifyFd);
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) { if (pPeer->notifyFd < 0) {
sError("%s, failed to init inotify(%s)", pPeer->id, strerror(errno)); sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno));
return -1; return -1;
} }
...@@ -216,7 +214,7 @@ static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) { ...@@ -216,7 +214,7 @@ static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
*wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE);
if (*wd == -1) { if (*wd == -1) {
sError("%s, failed to watch last wal(%s)", pPeer->id, strerror(errno)); sError("%s, failed to watch last wal since %s", pPeer->id, strerror(errno));
return -1; return -1;
} }
...@@ -227,7 +225,7 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { ...@@ -227,7 +225,7 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
char buf[2048]; char buf[2048];
int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); int32_t len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len < 0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno));
return -1; return -1;
} }
...@@ -268,7 +266,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi ...@@ -268,7 +266,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
break; break;
} }
sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
if (ret != wsize) break; if (ret != wsize) break;
pPeer->sversion = pHead->version; pPeer->sversion = pHead->version;
...@@ -424,7 +422,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -424,7 +422,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
memset(&walHead, 0, sizeof(walHead)); memset(&walHead, 0, sizeof(walHead));
code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead));
} else { } else {
sError("%s, failed to send wal(%s)", pPeer->id, strerror(errno)); sError("%s, failed to send wal since %s", pPeer->id, strerror(errno));
} }
return code; return code;
......
...@@ -115,14 +115,14 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -115,14 +115,14 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
SFirstPkt firstPkt; SFirstPkt firstPkt;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("failed to read peer first pkt from ip:%s(%s)", ipstr, strerror(errno)); sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd); taosCloseSocket(connFd);
return; return;
} }
SNodeConn *pNode = (SNodeConn *)calloc(sizeof(SNodeConn), 1); SNodeConn *pNode = calloc(sizeof(SNodeConn), 1);
if (pNode == NULL) { if (pNode == NULL) {
sError("failed to allocate memory(%s)", strerror(errno)); sError("failed to allocate memory since %s", strerror(errno));
taosCloseSocket(connFd); taosCloseSocket(connFd);
return; return;
} }
...@@ -146,7 +146,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -146,7 +146,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
static void arbProcessBrokenLink(void *param) { static void arbProcessBrokenLink(void *param) {
SNodeConn *pNode = param; SNodeConn *pNode = param;
sDebug("%s, TCP link is broken(%s), close connection", pNode->id, strerror(errno)); sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno));
tfree(pNode); tfree(pNode);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册