walMain.c 15.9 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
17 18 19

#define TAOS_RANDOM_FILE_FAIL_TEST

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
20 21
#include "os.h"
#include "tlog.h"
22
#include "tchecksum.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
24
#include "ttimer.h"
25
#include "taoserror.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26
#include "twal.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27
#include "tqueue.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
28 29

#define walPrefix "wal"
S
Shengliang Guan 已提交
30

S
Shengliang Guan 已提交
31 32
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }}
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }}
S
Shengliang Guan 已提交
33 34 35 36
#define wWarn(...)  { if (wDebugFlag & DEBUG_WARN)  { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }}
#define wInfo(...)  { if (wDebugFlag & DEBUG_INFO)  { taosPrintLog("WAL ", 255, __VA_ARGS__); }}
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37 38

typedef struct {
J
Jeff Tao 已提交
39
  uint64_t version;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40
  int      fd;
J
Jeff Tao 已提交
41
  int      keep;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42
  int      level;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43 44 45
  int32_t  fsyncPeriod;
  void    *timer;
  void    *signature;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46
  int      max;  // maximum number of wal files
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47
  uint32_t id;   // increase continuously
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48 49
  int      num;  // number of wal files
  char     path[TSDB_FILENAME_LEN];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50
  char     name[TSDB_FILENAME_LEN+16];
51
  pthread_mutex_t mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52 53
} SWal;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
54 55 56
static void    *walTmrCtrl = NULL;
static int     tsWalNum = 0;
static pthread_once_t walModuleInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
static uint32_t walSignature = 0xFAFBFDFE;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58 59 60 61 62
static int  walHandleExistingFiles(const char *path);
static int  walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
static int  walRemoveWalFiles(const char *path);
static void walProcessFsyncTimer(void *param, void *tmrId);
static void walRelease(SWal *pWal);
H
Hongze Cheng 已提交
63
static int walGetMaxOldFileId(char *odir);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
64 65 66

static void walModuleInitFunc() {
  walTmrCtrl = taosTmrInit(1000, 100, 300000, "WAL");
S
TD-1652  
Shengliang Guan 已提交
67
  if (walTmrCtrl == NULL)
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68 69 70
    walModuleInit = PTHREAD_ONCE_INIT;
  else
    wDebug("WAL module is initialized");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71
}
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
72

S
TD-1388  
Shengliang Guan 已提交
73 74 75 76 77 78 79
static inline bool walNeedFsyncTimer(SWal *pWal) {
  if (pWal->fsyncPeriod > 0 && pWal->level == TAOS_WAL_FSYNC) {
    return true;
  }
  return false;
}

J
Jeff Tao 已提交
80
void *walOpen(const char *path, const SWalCfg *pCfg) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
81
  SWal *pWal = calloc(sizeof(SWal), 1);
82 83 84 85
  if (pWal == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
87 88 89 90 91 92 93
  pthread_once(&walModuleInit, walModuleInitFunc);
  if (walTmrCtrl == NULL) {
    free(pWal);
    terrno = TAOS_SYSTEM_ERROR(errno);
    return NULL;
  }

S
TD-1652  
Shengliang Guan 已提交
94
  atomic_add_fetch_32(&tsWalNum, 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95
  pWal->fd = -1;
J
Jeff Tao 已提交
96
  pWal->max = pCfg->wals;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98
  pWal->id = 0;
  pWal->num = 0;
H
hjxilinx 已提交
99
  pWal->level = pCfg->walLevel;
J
Jeff Tao 已提交
100
  pWal->keep = pCfg->keep;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101 102
  pWal->fsyncPeriod = pCfg->fsyncPeriod;
  pWal->signature = pWal;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103
  tstrncpy(pWal->path, path, sizeof(pWal->path));
104
  pthread_mutex_init(&pWal->mutex, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105

S
TD-1388  
Shengliang Guan 已提交
106
  if (walNeedFsyncTimer(pWal)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107 108 109 110 111 112 113 114
    pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
    if (pWal->timer == NULL) {
      terrno = TAOS_SYSTEM_ERROR(errno);
      walRelease(pWal);
      return NULL;
    }
  }

S
Shengliang Guan 已提交
115
  if (taosMkDir(path, 0755) != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116 117
    terrno = TAOS_SYSTEM_ERROR(errno);
    wError("wal:%s, failed to create directory(%s)", path, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118
    walRelease(pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
119
    pWal = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
120
  }
S
TD-1652  
Shengliang Guan 已提交
121

J
Jeff Tao 已提交
122 123
  if (pCfg->keep == 1) return pWal;

S
TD-1652  
Shengliang Guan 已提交
124
  if (walHandleExistingFiles(path) == 0) walRenew(pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
125

S
TD-1652  
Shengliang Guan 已提交
126
  if (pWal && pWal->fd < 0) {
127
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128
    wError("wal:%s, failed to open(%s)", path, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129
    walRelease(pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130
    pWal = NULL;
S
TD-1652  
Shengliang Guan 已提交
131
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133
  if (pWal) wDebug("wal:%s, it is open, level:%d fsyncPeriod:%d", path, pWal->level, pWal->fsyncPeriod);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
134 135 136
  return pWal;
}

S
TD-1388  
Shengliang Guan 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
int walAlter(twalh wal, const SWalCfg *pCfg) {
  SWal *pWal = wal;
  if (pWal == NULL) {
    return TSDB_CODE_WAL_APP_ERROR;
  }

  if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) {
    wDebug("wal:%s, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->name, pWal->level,
           pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod);
    return TSDB_CODE_SUCCESS;
  }

  wInfo("wal:%s, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->name, pWal->level, pWal->fsyncPeriod,
        pCfg->walLevel, pCfg->fsyncPeriod);

  pthread_mutex_lock(&pWal->mutex);
  pWal->level = pCfg->walLevel;
  pWal->fsyncPeriod = pCfg->fsyncPeriod;
  if (walNeedFsyncTimer(pWal)) {
    wInfo("wal:%s, reset fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
S
TD-1652  
Shengliang Guan 已提交
157
    taosTmrReset(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, &pWal->timer, walTmrCtrl);
S
TD-1388  
Shengliang Guan 已提交
158 159 160 161 162 163 164 165 166 167
  } else {
    wInfo("wal:%s, stop fsync timer, walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
    taosTmrStop(pWal->timer);
    pWal->timer = NULL;
  }
  pthread_mutex_unlock(&pWal->mutex);

  return TSDB_CODE_SUCCESS;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168
void walClose(void *handle) {
S
slguan 已提交
169
  if (handle == NULL) return;
S
TD-1652  
Shengliang Guan 已提交
170 171

  SWal *pWal = handle;
S
Shengliang Guan 已提交
172
  taosClose(pWal->fd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173
  if (pWal->timer) taosTmrStopA(&pWal->timer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174

J
Jeff Tao 已提交
175 176
  if (pWal->keep == 0) {
    // remove all files in the directory
S
TD-1652  
Shengliang Guan 已提交
177 178 179
    for (int i = 0; i < pWal->num; ++i) {
      snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id - i);
      if (remove(pWal->name) < 0) {
J
Jeff Tao 已提交
180 181
        wError("wal:%s, failed to remove", pWal->name);
      } else {
S
Shengliang Guan 已提交
182
        wDebug("wal:%s, it is removed", pWal->name);
J
Jeff Tao 已提交
183
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
184
    }
J
Jeff Tao 已提交
185
  } else {
S
Shengliang Guan 已提交
186
    wDebug("wal:%s, it is closed and kept", pWal->name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187
  }
188

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189
  walRelease(pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190 191
}

J
Jeff Tao 已提交
192
int walRenew(void *handle) {
193
  if (handle == NULL) return 0;
J
Jeff Tao 已提交
194
  SWal *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195 196

  terrno = 0;
197

198 199
  pthread_mutex_lock(&pWal->mutex);

S
TD-1652  
Shengliang Guan 已提交
200
  if (pWal->fd >= 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201 202
    close(pWal->fd);
    pWal->id++;
S
Shengliang Guan 已提交
203
    wDebug("wal:%s, it is closed", pWal->name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204 205
  }

206 207
  pWal->num++;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
208
  snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", pWal->path, walPrefix, pWal->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209
  pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
210

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
211
  if (pWal->fd < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
212 213
    wError("wal:%s, failed to open(%s)", pWal->name, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
214
  } else {
S
Shengliang Guan 已提交
215
    wDebug("wal:%s, it is created", pWal->name);
216 217 218

    if (pWal->num > pWal->max) {
      // remove the oldest wal file
sangshuduo's avatar
sangshuduo 已提交
219
      char name[TSDB_FILENAME_LEN * 3];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
220
      snprintf(name, sizeof(name), "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
S
TD-1652  
Shengliang Guan 已提交
221
      if (remove(name) < 0) {
222 223
        wError("wal:%s, failed to remove(%s)", name, strerror(errno));
      } else {
S
Shengliang Guan 已提交
224
        wDebug("wal:%s, it is removed", name);
225
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226

227
      pWal->num--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
228
    }
S
TD-1652  
Shengliang Guan 已提交
229 230
  }

231 232
  pthread_mutex_unlock(&pWal->mutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
233
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234 235 236
}

int walWrite(void *handle, SWalHead *pHead) {
J
Jeff Tao 已提交
237
  SWal *pWal = handle;
238
  if (pWal == NULL) return -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
239

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240 241
  terrno = 0;

S
TD-1652  
Shengliang Guan 已提交
242
  // no wal
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
243
  if (pWal->level == TAOS_WAL_NOLOG) return 0;
S
TD-1696  
Shengliang Guan 已提交
244
  if (pHead->version <= pWal->version) return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245 246

  pHead->signature = walSignature;
J
Jeff Tao 已提交
247
  taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
248 249
  int contLen = pHead->len + sizeof(SWalHead);

S
TD-1652  
Shengliang Guan 已提交
250
  if (taosTWrite(pWal->fd, pHead, contLen) != contLen) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251
    wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252
    terrno = TAOS_SYSTEM_ERROR(errno);
J
Jeff Tao 已提交
253 254 255
  } else {
    pWal->version = pHead->version;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
257
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258 259 260
}

void walFsync(void *handle) {
J
Jeff Tao 已提交
261
  SWal *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
262
  if (pWal == NULL || pWal->level != TAOS_WAL_FSYNC || pWal->fd < 0) return;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
264
  if (pWal->fsyncPeriod == 0) {
265 266 267 268
    if (fsync(pWal->fd) < 0) {
      wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
    }
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269 270
}

J
Jeff Tao 已提交
271
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
J
Jeff Tao 已提交
272
  SWal    *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
273 274
  struct   dirent *ent;
  int      count = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275
  uint32_t maxId = 0, minId = -1, index =0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
277
  terrno = 0;
S
TD-1652  
Shengliang Guan 已提交
278 279 280
  int  plen = strlen(walPrefix);
  char opath[TSDB_FILENAME_LEN + 5];

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281
  int slen = snprintf(opath, sizeof(opath), "%s", pWal->path);
S
TD-1652  
Shengliang Guan 已提交
282
  if (pWal->keep == 0) strcpy(opath + slen, "/old");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283 284

  DIR *dir = opendir(opath);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285 286 287 288 289 290
  if (dir == NULL && errno == ENOENT) return 0;
  if (dir == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return terrno;
  }

S
TD-1652  
Shengliang Guan 已提交
291 292
  while ((ent = readdir(dir)) != NULL) {
    if (strncmp(ent->d_name, walPrefix, plen) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
293
      index = atol(ent->d_name + plen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294 295 296 297 298 299
      if (index > maxId) maxId = index;
      if (index < minId) minId = index;
      count++;
    }
  }

300 301
  closedir(dir);

302
  if (count == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303 304
    if (pWal->keep) terrno = walRenew(pWal);
    return terrno;
305
  }
J
Jeff Tao 已提交
306

S
TD-1652  
Shengliang Guan 已提交
307
  if (count != (maxId - minId + 1)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308
    wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
S
Shengliang Guan 已提交
309
    terrno = TSDB_CODE_WAL_APP_ERROR;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310
  } else {
S
Shengliang Guan 已提交
311
    wDebug("wal:%s, %d files will be restored", opath, count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312

S
TD-1652  
Shengliang Guan 已提交
313
    for (index = minId; index <= maxId; ++index) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
314
      snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
315
      terrno = walRestoreWalFile(pWal, pVnode, writeFp);
H
Hongze Cheng 已提交
316
      if (terrno < 0) continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
317 318 319
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
320
  if (terrno == 0) {
J
Jeff Tao 已提交
321
    if (pWal->keep == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
322 323
      terrno = walRemoveWalFiles(opath);
      if (terrno == 0) {
J
Jeff Tao 已提交
324 325
        if (remove(opath) < 0) {
          wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326
          terrno = TAOS_SYSTEM_ERROR(errno);
J
Jeff Tao 已提交
327 328
        }
      }
S
TD-1652  
Shengliang Guan 已提交
329
    } else {
J
Jeff Tao 已提交
330 331 332
      // open the existing WAL file in append mode
      pWal->num = count;
      pWal->id = maxId;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
333
      snprintf(pWal->name, sizeof(pWal->name), "%s/%s%d", opath, walPrefix, maxId);
J
Jeff Tao 已提交
334 335 336
      pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
      if (pWal->fd < 0) {
        wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
337
        terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
338 339 340 341
      }
    }
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
342
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343 344
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
345
int walGetWalFile(void *handle, char *name, uint32_t *index) {
S
TD-1652  
Shengliang Guan 已提交
346
  SWal *  pWal = handle;
347
  int     code = 1;
S
TD-1652  
Shengliang Guan 已提交
348
  int32_t first = 0;
349 350 351 352 353 354 355 356 357 358 359

  name[0] = 0;
  if (pWal == NULL || pWal->num == 0) return 0;

  pthread_mutex_lock(&(pWal->mutex));

  first = pWal->id + 1 - pWal->num;
  if (*index == 0) *index = first;  // set to first one

  if (*index < first && *index > pWal->id) {
    code = -1;  // index out of range
S
TD-1652  
Shengliang Guan 已提交
360
  } else {
361
    sprintf(name, "wal/%s%d", walPrefix, *index);
S
TD-1652  
Shengliang Guan 已提交
362
    code = (*index == pWal->id) ? 0 : 1;
363 364 365 366 367
  }

  pthread_mutex_unlock(&(pWal->mutex));

  return code;
S
TD-1652  
Shengliang Guan 已提交
368
}
369

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
370 371 372 373 374 375 376 377 378
static void walRelease(SWal *pWal) {
  pthread_mutex_destroy(&pWal->mutex);
  pWal->signature = NULL;
  free(pWal);

  if (atomic_sub_fetch_32(&tsWalNum, 1) == 0) {
    if (walTmrCtrl) taosTmrCleanUp(walTmrCtrl);
    walTmrCtrl = NULL;
    walModuleInit = PTHREAD_ONCE_INIT;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
379
    wDebug("WAL module is cleaned up");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
380 381 382
  }
}

J
Jeff Tao 已提交
383 384
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
  char *name = pWal->name;
S
TD-1652  
Shengliang Guan 已提交
385
  int   size = 1024 * 1024;  // default 1M buffer size
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
386

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
387
  terrno = 0;
H
Hongze Cheng 已提交
388
  char *buffer = malloc(size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
389
  if (buffer == NULL) {
S
TD-1652  
Shengliang Guan 已提交
390
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
391 392
    return terrno;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
393 394

  SWalHead *pHead = (SWalHead *)buffer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
395

H
Hongze Cheng 已提交
396
  int fd = open(name, O_RDWR);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
397 398
  if (fd < 0) {
    wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
399
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
400
    free(buffer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
401
    return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
402 403
  }

S
Shengliang Guan 已提交
404
  wDebug("wal:%s, start to restore", name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
405

H
Hongze Cheng 已提交
406
  size_t offset = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
407
  while (1) {
S
Shengliang Guan 已提交
408
    int ret = taosTRead(fd, pHead, sizeof(SWalHead));
H
Hongze Cheng 已提交
409
    if (ret == 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
410

H
Hongze Cheng 已提交
411 412
    if (ret < 0) {
      wError("wal:%s, failed to read wal head part since %s", name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
413
      terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
414 415 416
      break;
    }

H
Hongze Cheng 已提交
417 418 419 420 421 422 423
    if (ret < sizeof(SWalHead)) {
      wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret);
      taosFtruncate(fd, offset);
      fsync(fd);
      break;
    }

J
Jeff Tao 已提交
424
    if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
425
      wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
H
Hongze Cheng 已提交
426
      terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
H
TD-1682  
Hongze Cheng 已提交
427
      // ASSERT(false);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
428
      break;
H
Hongze Cheng 已提交
429 430 431 432 433 434 435 436 437 438 439 440
    }

    if (pHead->len > size - sizeof(SWalHead)) {
      size = sizeof(SWalHead) + pHead->len;
      buffer = realloc(buffer, size);
      if (buffer == NULL) {
        terrno = TAOS_SYSTEM_ERROR(errno);
        break;
      }

      pHead = (SWalHead *)buffer;
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
441

S
Shengliang Guan 已提交
442
    ret = taosTRead(fd, pHead->cont, pHead->len);
H
Hongze Cheng 已提交
443 444
    if (ret < 0) {
      wError("wal:%s failed to read wal body part since %s", name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
445
      terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
446 447 448
      break;
    }

H
Hongze Cheng 已提交
449 450 451 452 453 454 455 456 457
    if (ret < pHead->len) {
      wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret);
      taosFtruncate(fd, offset);
      fsync(fd);
      break;
    }

    offset = offset + sizeof(SWalHead) + pHead->len;

J
Jeff Tao 已提交
458
    if (pWal->keep) pWal->version = pHead->version;
J
Jeff Tao 已提交
459
    (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
460 461
  }

J
Jeff Tao 已提交
462
  close(fd);
J
Jeff Tao 已提交
463 464
  free(buffer);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
465
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
466 467
}

J
Jeff Tao 已提交
468
int walHandleExistingFiles(const char *path) {
sangshuduo's avatar
sangshuduo 已提交
469 470
  char   oname[TSDB_FILENAME_LEN * 3];
  char   nname[TSDB_FILENAME_LEN * 3];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
471 472
  char   opath[TSDB_FILENAME_LEN];

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
473
  snprintf(opath, sizeof(opath), "%s/old", path);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
474 475 476 477

  struct dirent *ent;
  DIR   *dir = opendir(path);
  int    plen = strlen(walPrefix);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
478
  terrno = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
479

H
Hongze Cheng 已提交
480 481 482 483 484 485 486 487 488 489 490 491
  int midx = walGetMaxOldFileId(opath);
  int count = 0;
  while ((ent = readdir(dir)) != NULL) {
    if (strncmp(ent->d_name, walPrefix, plen) == 0) {
      midx++;
      snprintf(oname, sizeof(oname), "%s/%s", path, ent->d_name);
      snprintf(nname, sizeof(nname), "%s/old/wal%d", path, midx);
      if (taosMkDir(opath, 0755) != 0) {
        wError("wal:%s, failed to create directory:%s(%s)", oname, opath, strerror(errno));
        terrno = TAOS_SYSTEM_ERROR(errno);
        break;
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
492

H
Hongze Cheng 已提交
493 494 495 496
      if (rename(oname, nname) < 0) {
        wError("wal:%s, failed to move to new:%s", oname, nname);
        terrno = TAOS_SYSTEM_ERROR(errno);
        break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
497
      }
H
Hongze Cheng 已提交
498 499

      count++;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
500 501
    }

S
Shengliang Guan 已提交
502
    wDebug("wal:%s, %d files are moved for restoration", path, count);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
503
  }
S
TD-1652  
Shengliang Guan 已提交
504

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
505
  closedir(dir);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
506
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
507 508
}

J
Jeff Tao 已提交
509
static int walRemoveWalFiles(const char *path) {
S
TD-1652  
Shengliang Guan 已提交
510 511 512
  int  plen = strlen(walPrefix);
  char name[TSDB_FILENAME_LEN * 3];

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
513
  terrno = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
514 515

  struct dirent *ent;
S
TD-1652  
Shengliang Guan 已提交
516
  DIR *dir = opendir(path);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
517 518 519 520
  if (dir == NULL && errno == ENOENT) return 0;
  if (dir == NULL) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    return terrno;
S
TD-1652  
Shengliang Guan 已提交
521
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
522

S
TD-1652  
Shengliang Guan 已提交
523 524
  while ((ent = readdir(dir)) != NULL) {
    if (strncmp(ent->d_name, walPrefix, plen) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
525
      snprintf(name, sizeof(name), "%s/%s", path, ent->d_name);
S
TD-1652  
Shengliang Guan 已提交
526
      if (remove(name) < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
527
        wError("wal:%s, failed to remove(%s)", name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
528
        terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
529 530
      }
    }
S
TD-1652  
Shengliang Guan 已提交
531
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
532 533 534

  closedir(dir);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
535
  return terrno;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
536 537
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
538 539 540 541 542 543 544 545 546
static void walProcessFsyncTimer(void *param, void *tmrId) {
  SWal *pWal = param;

  if (pWal->signature != pWal) return;
  if (pWal->fd < 0) return;

  if (fsync(pWal->fd) < 0) {
    wError("wal:%s, fsync failed(%s)", pWal->name, strerror(errno));
  }
S
TD-1388  
Shengliang Guan 已提交
547 548 549 550 551 552 553 554

  if (walNeedFsyncTimer(pWal)) {
    pWal->timer = taosTmrStart(walProcessFsyncTimer, pWal->fsyncPeriod, pWal, walTmrCtrl);
  } else {
    wInfo("wal:%s, stop fsync timer for walLevel:%d fsyncPeriod:%d", pWal->name, pWal->level, pWal->fsyncPeriod);
    taosTmrStop(pWal->timer);
    pWal->timer = NULL;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
555
}
S
TD-1696  
Shengliang Guan 已提交
556 557 558 559 560 561

int64_t walGetVersion(twalh param) {
  SWal *pWal = param;
  if (pWal == 0) return 0;

  return pWal->version;
H
Hongze Cheng 已提交
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587
}

static int walGetMaxOldFileId(char *odir) {
  int            midx = 0;
  DIR *          dir = NULL;
  struct dirent *dp = NULL;
  int            plen = strlen(walPrefix);

  if (access(odir, F_OK) != 0) return midx;

  dir = opendir(odir);
  if (dir == NULL) {
    wError("failed to open directory %s since %s", odir, strerror(errno));
    terrno = TAOS_SYSTEM_ERROR(errno);
    return -1;
  }

  while ((dp = readdir(dir)) != NULL) {
    if (strncmp(dp->d_name, walPrefix, plen) == 0) {
      int idx = atol(dp->d_name + plen);
      if (midx < idx) midx = idx;
    }
  }

  closedir(dir);
  return midx;
S
TD-1696  
Shengliang Guan 已提交
588
}