walMain.c 9.9 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <dirent.h>
#include <unistd.h>
#include <fcntl.h> 

#include "os.h"
#include "tlog.h"
26
#include "tchecksum.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27 28
#include "tutil.h"
#include "twal.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
29
#include "tqueue.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30 31 32 33 34 35 36 37

#define walPrefix "wal"
#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);}
#define wWarn(...) if (wDebugFlag & DEBUG_WARN) {tprintf("WARN WAL ", wDebugFlag, __VA_ARGS__);}
#define wTrace(...) if (wDebugFlag & DEBUG_TRACE) {tprintf("WAL ", wDebugFlag, __VA_ARGS__);}
#define wPrint(...) {tprintf("WAL ", 255, __VA_ARGS__);}

typedef struct {
J
Jeff Tao 已提交
38
  uint64_t version;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39
  int      fd;
J
Jeff Tao 已提交
40
  int      keep;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41 42
  int      level;
  int      max;  // maximum number of wal files
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
43
  uint32_t id;   // increase continuously
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
44 45 46
  int      num;  // number of wal files
  char     path[TSDB_FILENAME_LEN];
  char     name[TSDB_FILENAME_LEN];
47
  pthread_mutex_t mutex;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48 49 50 51 52
} SWal;

int wDebugFlag = 135;

static uint32_t walSignature = 0xFAFBFDFE;
J
Jeff Tao 已提交
53
static int walHandleExistingFiles(const char *path);
J
Jeff Tao 已提交
54
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp);
J
Jeff Tao 已提交
55
static int walRemoveWalFiles(const char *path);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
56

J
Jeff Tao 已提交
57
void *walOpen(const char *path, const SWalCfg *pCfg) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58 59 60 61
  SWal *pWal = calloc(sizeof(SWal), 1);
  if (pWal == NULL) return NULL;

  pWal->fd = -1;
J
Jeff Tao 已提交
62
  pWal->max = pCfg->wals;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63 64
  pWal->id = 0;
  pWal->num = 0;
J
Jeff Tao 已提交
65
  pWal->level = pCfg->commitLog;
J
Jeff Tao 已提交
66
  pWal->keep = pCfg->keep;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67
  strcpy(pWal->path, path);
68
  pthread_mutex_init(&pWal->mutex, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
69 70 71

  if (access(path, F_OK) != 0) mkdir(path, 0755);
  
J
Jeff Tao 已提交
72 73
  if (pCfg->keep == 1) return pWal;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
74 75 76 77 78
  if (walHandleExistingFiles(path) == 0) 
    walRenew(pWal);

  if (pWal->fd <0) {
    wError("wal:%s, failed to open", path);
79
    pthread_mutex_destroy(&pWal->mutex);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
80 81
    free(pWal);
    pWal = NULL;
82
  } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83 84 85 86 87

  return pWal;
}

void walClose(void *handle) {
S
slguan 已提交
88 89
  if (handle == NULL) return;
  
J
Jeff Tao 已提交
90
  SWal *pWal = handle;  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
91 92
  close(pWal->fd);

J
Jeff Tao 已提交
93 94 95 96 97 98 99 100 101
  if (pWal->keep == 0) {
    // remove all files in the directory
    for (int i=0; i<pWal->num; ++i) {
      sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id-i);
      if (remove(pWal->name) <0) {
        wError("wal:%s, failed to remove", pWal->name);
      } else {
        wTrace("wal:%s, it is removed", pWal->name);
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
102
    }
J
Jeff Tao 已提交
103 104
  } else {
    wTrace("wal:%s, it is closed and kept", pWal->name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105
  }
106 107 108 109

  pthread_mutex_destroy(&pWal->mutex);

  free(pWal);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110 111
}

J
Jeff Tao 已提交
112 113
int walRenew(void *handle) {
  SWal *pWal = handle;
114
  int   code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115
  
116 117
  pthread_mutex_lock(&pWal->mutex);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118 119 120 121 122 123
  if (pWal->fd >=0) {
    close(pWal->fd);
    pWal->id++;
    wTrace("wal:%s, it is closed", pWal->name);
  }

124 125
  pWal->num++;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126
  sprintf(pWal->name, "%s/%s%d", pWal->path, walPrefix, pWal->id);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
127
  pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
128

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129 130
  if (pWal->fd < 0) {
    wError("wal:%d, failed to open(%s)", pWal->name, strerror(errno));
131 132 133 134 135 136
    code = -1;
  } else {
    wTrace("wal:%s, it is created", pWal->name);

    if (pWal->num > pWal->max) {
      // remove the oldest wal file
sangshuduo's avatar
sangshuduo 已提交
137
      char name[TSDB_FILENAME_LEN * 3];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138
      sprintf(name, "%s/%s%d", pWal->path, walPrefix, pWal->id - pWal->max);
139 140 141 142 143
      if (remove(name) <0) {
        wError("wal:%s, failed to remove(%s)", name, strerror(errno));
      } else {
        wTrace("wal:%s, it is removed", name);
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144

145
      pWal->num--;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
146 147 148
    }
  }  
  
149 150 151
  pthread_mutex_unlock(&pWal->mutex);

  return code;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153 154
}

int walWrite(void *handle, SWalHead *pHead) {
J
Jeff Tao 已提交
155
  SWal *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157 158 159
  int   code = 0;

  // no wal  
  if (pWal->level == TAOS_WAL_NOLOG) return 0;
J
Jeff Tao 已提交
160
  if (pHead->version <= pWal->version) return 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161 162

  pHead->signature = walSignature;
J
Jeff Tao 已提交
163
  taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
164 165 166 167 168
  int contLen = pHead->len + sizeof(SWalHead);

  if(write(pWal->fd, pHead, contLen) != contLen) {
    wError("wal:%s, failed to write(%s)", pWal->name, strerror(errno));
    code = -1;
J
Jeff Tao 已提交
169 170 171
  } else {
    pWal->version = pHead->version;
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
172 173 174 175 176 177

  return code;
}

void walFsync(void *handle) {

J
Jeff Tao 已提交
178
  SWal *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
179 180 181 182 183

  if (pWal->level == TAOS_WAL_FSYNC) 
    fsync(pWal->fd);
}

J
Jeff Tao 已提交
184
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
J
Jeff Tao 已提交
185
  SWal    *pWal = handle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186 187 188
  int      code = 0;
  struct   dirent *ent;
  int      count = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189
  uint32_t maxId = 0, minId = -1, index =0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190 191

  int   plen = strlen(walPrefix);
sangshuduo's avatar
sangshuduo 已提交
192
  char  opath[TSDB_FILENAME_LEN+5];
J
Jeff Tao 已提交
193 194 195 196
   
  int slen = sprintf(opath, "%s", pWal->path);
  if ( pWal->keep == 0) 
    strcpy(opath+slen, "/old");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197 198 199 200 201 202 203

  // is there old directory?
  if (access(opath, F_OK)) return 0; 

  DIR *dir = opendir(opath);
  while ((ent = readdir(dir))!= NULL) {
    if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
204
      index = atol(ent->d_name + plen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
205 206 207 208 209 210
      if (index > maxId) maxId = index;
      if (index < minId) minId = index;
      count++;
    }
  }

211 212 213 214
  if (count == 0) {
    if (pWal->keep) code = walRenew(pWal);
    return code;
  }
J
Jeff Tao 已提交
215

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
216
  if ( count != (maxId-minId+1) ) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217
    wError("wal:%s, messed up, count:%d max:%d min:%d", opath, count, maxId, minId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218 219 220 221 222
    code = -1;
  } else {
    wTrace("wal:%s, %d files will be restored", opath, count);

    for (index = minId; index<=maxId; ++index) {
J
Jeff Tao 已提交
223 224
      sprintf(pWal->name, "%s/%s%d", opath, walPrefix, index);
      code = walRestoreWalFile(pWal, pVnode, writeFp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225 226 227 228 229
      if (code < 0) break;
    }
  }

  if (code == 0) {
J
Jeff Tao 已提交
230
    if (pWal->keep == 0) {
231
      code = walRemoveWalFiles(opath);
J
Jeff Tao 已提交
232 233 234 235 236 237 238 239 240 241 242 243 244 245
      if (code == 0) {
        if (remove(opath) < 0) {
          wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
          code = -1;
        }
      }
    } else { 
      // open the existing WAL file in append mode
      pWal->num = count;
      pWal->id = maxId;
      sprintf(pWal->name, "%s/%s%d", opath, walPrefix, maxId);
      pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
      if (pWal->fd < 0) {
        wError("wal:%s, failed to open file(%s)", pWal->name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
246 247 248 249 250 251 252 253 254 255
        code = -1;
      }
    }
  }

  closedir(dir);

  return code;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256
int walGetWalFile(void *handle, char *name, uint32_t *index) {
J
Jeff Tao 已提交
257
  SWal   *pWal = handle;
258 259 260 261 262 263 264 265 266 267 268 269 270 271
  int     code = 1;
  int32_t first = 0; 

  name[0] = 0;
  if (pWal == NULL || pWal->num == 0) return 0;

  pthread_mutex_lock(&(pWal->mutex));

  first = pWal->id + 1 - pWal->num;
  if (*index == 0) *index = first;  // set to first one

  if (*index < first && *index > pWal->id) {
    code = -1;  // index out of range
  } else { 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272
    sprintf(name, "%s/%s%d", pWal->path, walPrefix, *index);
273 274 275 276 277 278 279 280
    code = (*index == pWal->id) ? 0:1;
  }

  pthread_mutex_unlock(&(pWal->mutex));

  return code;
}  

J
Jeff Tao 已提交
281 282 283
static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
  int   code = 0;
  char *name = pWal->name;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284 285 286 287 288

  char *buffer = malloc(1024000);  // size for one record
  if (buffer == NULL) return -1;

  SWalHead *pHead = (SWalHead *)buffer;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289 290 291 292

  int fd = open(name, O_RDONLY);
  if (fd < 0) {
    wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
293
    free(buffer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294 295 296 297 298 299
    return -1;
  }

  wTrace("wal:%s, start to restore", name);

  while (1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
300
    int ret = read(fd, pHead, sizeof(SWalHead));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
301 302
    if ( ret == 0) { code = 0; break;}  

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303
    if (ret != sizeof(SWalHead)) {
304
      wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
305 306 307
      break;
    }

J
Jeff Tao 已提交
308
    if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
309
      wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
310
      break;
311
    } 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
313 314 315
    ret = read(fd, pHead->cont, pHead->len);
    if ( ret != pHead->len) {
      wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
316 317 318
      break;
    }

J
Jeff Tao 已提交
319
    if (pWal->keep) pWal->version = pHead->version;
J
Jeff Tao 已提交
320
    (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
321 322
  }

J
Jeff Tao 已提交
323
  close(fd);
J
Jeff Tao 已提交
324 325
  free(buffer);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326 327 328
  return code;
}

J
Jeff Tao 已提交
329
int walHandleExistingFiles(const char *path) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
330
  int    code = 0;
sangshuduo's avatar
sangshuduo 已提交
331 332
  char   oname[TSDB_FILENAME_LEN * 3];
  char   nname[TSDB_FILENAME_LEN * 3];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370
  char   opath[TSDB_FILENAME_LEN];

  sprintf(opath, "%s/old", path);

  struct dirent *ent;
  DIR   *dir = opendir(path);
  int    plen = strlen(walPrefix);

  if (access(opath, F_OK) == 0) {
    // old directory is there, it means restore process is not finished
    walRemoveWalFiles(path);

  } else {
    // move all files to old directory
    int count = 0;
    while ((ent = readdir(dir))!= NULL) {  
      if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
        if (access(opath, F_OK) != 0) mkdir(opath, 0755);

        sprintf(oname, "%s/%s", path, ent->d_name);
        sprintf(nname, "%s/old/%s", path, ent->d_name);
        if (rename(oname, nname) < 0) {
          wError("wal:%s, failed to move to new:%s", oname, nname);
          code = -1;
          break;
        } 

        count++;
      }
    }

    wTrace("wal:%s, %d files are moved for restoration", path, count);
  }
  
  closedir(dir);
  return code;
}

J
Jeff Tao 已提交
371
static int walRemoveWalFiles(const char *path) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
372
  int    plen = strlen(walPrefix);
sangshuduo's avatar
sangshuduo 已提交
373
  char   name[TSDB_FILENAME_LEN * 3];
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
  int    code = 0;

  if (access(path, F_OK) != 0) return 0;

  struct dirent *ent;
  DIR   *dir = opendir(path);

  while ((ent = readdir(dir))!= NULL) {
    if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
      sprintf(name, "%s/%s", path, ent->d_name);
      if (remove(name) <0) {
        wError("wal:%s, failed to remove(%s)", name, strerror(errno));
        code = -1; break;
      }
    }
  } 

  closedir(dir);

  return code;
}