rpcTcp.c 14.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tsocket.h"
#include "tutil.h"
S
slguan 已提交
19
#include "rpcLog.h"
20
#include "rpcHead.h"
J
Jeff Tao 已提交
21
#include "rpcTcp.h"
H
hzcheng 已提交
22

23 24 25
#ifndef EPOLLWAKEUP
  #define EPOLLWAKEUP (1u << 29)
#endif
H
hzcheng 已提交
26

J
Jeff Tao 已提交
27 28 29 30 31 32 33 34 35
typedef struct SFdObj {
  void              *signature;
  int                fd;       // TCP socket FD
  void              *thandle;  // handle from upper layer, like TAOS
  uint32_t           ip;
  uint16_t           port;
  struct SThreadObj *pThreadObj;
  struct SFdObj     *prev;
  struct SFdObj     *next;
H
hzcheng 已提交
36 37
} SFdObj;

J
Jeff Tao 已提交
38
typedef struct SThreadObj {
H
hzcheng 已提交
39 40
  pthread_t       thread;
  SFdObj *        pHead;
J
Jeff Tao 已提交
41
  pthread_mutex_t mutex;
J
jtao1735 已提交
42
  uint32_t        ip;
43
  bool            stop;
H
hzcheng 已提交
44 45 46 47
  int             pollFd;
  int             numOfFds;
  int             threadId;
  char            label[12];
48 49
  void           *shandle;  // handle passed by upper layer during server initialization
  void           *(*processData)(SRecvInfo *pPacket);
H
hzcheng 已提交
50 51 52
} SThreadObj;

typedef struct {
53
  int         fd;
J
jtao1735 已提交
54
  uint32_t    ip;
L
lihui 已提交
55
  uint16_t    port;
H
hzcheng 已提交
56 57 58 59 60 61 62
  char        label[12];
  int         numOfThreads;
  void *      shandle;
  SThreadObj *pThreadObj;
  pthread_t   thread;
} SServerObj;

J
Jeff Tao 已提交
63 64 65 66
static void   *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void    taosFreeFdObj(SFdObj *pFdObj);
static void    taosReportBrokenLink(SFdObj *pFdObj);
67
static void*   taosAcceptTcpConnection(void *arg);
H
hzcheng 已提交
68

J
jtao1735 已提交
69
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
J
Jeff Tao 已提交
70 71
  SServerObj *pServerObj;
  SThreadObj *pThreadObj;
H
hzcheng 已提交
72

J
Jeff Tao 已提交
73
  pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
J
jtao1735 已提交
74
  pServerObj->ip = ip;
75 76 77 78
  pServerObj->port = port;
  strcpy(pServerObj->label, label);
  pServerObj->numOfThreads = numOfThreads;

J
Jeff Tao 已提交
79
  pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
80 81
  if (pServerObj->pThreadObj == NULL) {
    tError("TCP:%s no enough memory", label);
J
Jeff Tao 已提交
82
    free(pServerObj);
83
    return NULL;
H
hzcheng 已提交
84
  }
J
Jeff Tao 已提交
85

J
Jeff Tao 已提交
86
  int code = 0;
87
  pThreadObj = pServerObj->pThreadObj;
J
Jeff Tao 已提交
88
  for (int i = 0; i < numOfThreads; ++i) {
89 90 91
    pThreadObj->processData = fp;
    strcpy(pThreadObj->label, label);
    pThreadObj->shandle = shandle;
H
hzcheng 已提交
92

J
Jeff Tao 已提交
93
    code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
J
Jeff Tao 已提交
94 95 96
    if (code < 0) {
      tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
      break;;
97
    }
H
hzcheng 已提交
98

99 100 101
    pThreadObj->pollFd = epoll_create(10);  // size does not matter
    if (pThreadObj->pollFd < 0) {
      tError("%s failed to create TCP epoll", label);
J
Jeff Tao 已提交
102 103
      code = -1;
      break;
104
    }
H
hzcheng 已提交
105

J
Jeff Tao 已提交
106 107 108
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
J
Jeff Tao 已提交
109
    code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
J
Jeff Tao 已提交
110 111 112 113
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
      break;
114
    }
H
hzcheng 已提交
115

116 117
    pThreadObj->threadId = i;
    pThreadObj++;
H
hzcheng 已提交
118 119
  }

J
Jeff Tao 已提交
120 121 122 123 124 125 126 127 128
  if (code == 0) { 
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
    code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
    }
H
hzcheng 已提交
129 130
  }

J
Jeff Tao 已提交
131 132 133 134 135
  if (code != 0) {
    free(pServerObj->pThreadObj);
    free(pServerObj);
    pServerObj = NULL;
  } else {
J
jtao1735 已提交
136
    tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
J
Jeff Tao 已提交
137
  }
H
hzcheng 已提交
138

139
  return (void *)pServerObj;
H
hzcheng 已提交
140 141
}

142 143 144 145 146 147 148 149
static void taosStopTcpThread(SThreadObj* pThreadObj) {
  pThreadObj->stop = true;

  // signal the thread to stop, try graceful method first,
  // and use pthread_cancel when failed
  struct epoll_event event = { .events = EPOLLIN };
  eventfd_t fd = eventfd(1, 0);
  if (fd == -1) {
150
    tError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno));
151 152
    pthread_cancel(pThreadObj->thread);
  } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
153
    tError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno));
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
    pthread_cancel(pThreadObj->thread);
  }

  pthread_join(pThreadObj->thread, NULL);
  close(pThreadObj->pollFd);
  if (fd != -1) {
    close(fd);
  }

  while (pThreadObj->pHead) {
    SFdObj *pFdObj = pThreadObj->pHead;
    pThreadObj->pHead = pFdObj->next;
    taosFreeFdObj(pFdObj);
  }
}


H
hzcheng 已提交
171
void taosCleanUpTcpServer(void *handle) {
172
  SServerObj *pServerObj = handle;
J
Jeff Tao 已提交
173
  SThreadObj *pThreadObj;
H
hzcheng 已提交
174 175 176

  if (pServerObj == NULL) return;

177
  shutdown(pServerObj->fd, SHUT_RD);
H
hzcheng 已提交
178 179
  pthread_join(pServerObj->thread, NULL);

J
Jeff Tao 已提交
180
  for (int i = 0; i < pServerObj->numOfThreads; ++i) {
H
hzcheng 已提交
181
    pThreadObj = pServerObj->pThreadObj + i;
182
    taosStopTcpThread(pThreadObj);
J
Jeff Tao 已提交
183
    pthread_mutex_destroy(&(pThreadObj->mutex));
H
hzcheng 已提交
184 185 186 187
  }

  tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);

J
Jeff Tao 已提交
188
  tfree(pServerObj->pThreadObj);
H
hzcheng 已提交
189 190 191
  tfree(pServerObj);
}

192
static void* taosAcceptTcpConnection(void *arg) {
J
Jeff Tao 已提交
193 194 195 196 197 198 199 200
  int                connFd = -1;
  struct sockaddr_in caddr;
  int                threadId = 0;
  SThreadObj        *pThreadObj;
  SServerObj        *pServerObj;

  pServerObj = (SServerObj *)arg;

201 202
  pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
  if (pServerObj->fd < 0) return NULL; 
J
Jeff Tao 已提交
203

204
  tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
J
Jeff Tao 已提交
205 206 207

  while (1) {
    socklen_t addrlen = sizeof(caddr);
208 209 210 211 212 213
    connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
    if (connFd == -1) {
      if (errno == EINVAL) {
        tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
        break;
      }
J
Jeff Tao 已提交
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
      tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
      continue;
    }

    tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port);
    taosKeepTcpAlive(connFd);

    // pick up the thread to handle this connection
    pThreadObj = pServerObj->pThreadObj + threadId;

    SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
    if (pFdObj) {
      pFdObj->ip = caddr.sin_addr.s_addr;
      pFdObj->port = caddr.sin_port;
      tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, 
              inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
    } else {
      close(connFd);
      tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno));
    }  

    // pick up next thread for next connection
    threadId++;
    threadId = threadId % pServerObj->numOfThreads;
  }
239 240 241

  close(pServerObj->fd);
  return NULL;
J
Jeff Tao 已提交
242 243
}

J
jtao1735 已提交
244
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
J
Jeff Tao 已提交
245 246 247 248 249 250
  SThreadObj    *pThreadObj;
  pthread_attr_t thattr;

  pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
  memset(pThreadObj, 0, sizeof(SThreadObj));
  strcpy(pThreadObj->label, label);
J
jtao1735 已提交
251
  pThreadObj->ip = ip;
J
Jeff Tao 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
  pThreadObj->shandle = shandle;

  if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
    tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
    return NULL;
  }

  pThreadObj->pollFd = epoll_create(10);  // size does not matter
  if (pThreadObj->pollFd < 0) {
    tError("%s failed to create TCP client epoll", label);
    return NULL;
  }

  pThreadObj->processData = fp;

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
  int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
  pthread_attr_destroy(&thattr);
  if (code != 0) {
    tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
    return NULL;
  }

  tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);

  return pThreadObj;
}

void taosCleanUpTcpClient(void *chandle) {
  SThreadObj *pThreadObj = chandle;
  if (pThreadObj == NULL) return;

285
  taosStopTcpThread(pThreadObj);
J
Jeff Tao 已提交
286 287 288 289 290
  tTrace (":%s, all connections are cleaned up", pThreadObj->label);

  tfree(pThreadObj);
}

J
jtao1735 已提交
291
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
J
Jeff Tao 已提交
292 293
  SThreadObj *    pThreadObj = shandle;

J
jtao1735 已提交
294
  int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
J
Jeff Tao 已提交
295 296 297 298 299 300 301
  if (fd <= 0) return NULL;

  SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
  
  if (pFdObj) {
    pFdObj->thandle = thandle;
    pFdObj->port = port;
J
jtao1735 已提交
302 303
    pFdObj->ip = ip;
    tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d", 
J
Jeff Tao 已提交
304 305 306 307 308 309 310 311 312 313
            pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
  } else {
    close(fd);
    tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
  }

  return pFdObj;
}

void taosCloseTcpConnection(void *chandle) {
314
  SFdObj *pFdObj = chandle;
315 316
  if (pFdObj == NULL) return;

J
Jeff Tao 已提交
317
  taosFreeFdObj(pFdObj);
318 319
}

J
Jeff Tao 已提交
320
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
321
  SFdObj *pFdObj = chandle;
322 323 324 325 326 327

  if (chandle == NULL) return -1;

  return (int)send(pFdObj->fd, data, (size_t)len, 0);
}

328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
static void taosReportBrokenLink(SFdObj *pFdObj) {

  SThreadObj *pThreadObj = pFdObj->pThreadObj;

  // notify the upper layer, so it will clean the associated context
  if (pFdObj->thandle) {
    SRecvInfo recvInfo;
    recvInfo.msg = NULL;
    recvInfo.msgLen = 0;
    recvInfo.ip = 0;
    recvInfo.port = 0;
    recvInfo.shandle = pThreadObj->shandle;
    recvInfo.thandle = pFdObj->thandle;;
    recvInfo.chandle = NULL;
    recvInfo.connType = RPC_CONN_TCP;
    (*(pThreadObj->processData))(&recvInfo);
  } 
}

J
Jeff Tao 已提交
347 348 349 350 351
#define maxEvents 10

static void *taosProcessTcpData(void *param) {
  SThreadObj        *pThreadObj = param;
  SFdObj            *pFdObj;
H
hzcheng 已提交
352
  struct epoll_event events[maxEvents];
353 354
  SRecvInfo          recvInfo;
  SRpcHead           rpcHead;
H
hzcheng 已提交
355 356

  while (1) {
J
Jeff Tao 已提交
357
    int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
358 359 360 361
    if (pThreadObj->stop) {
      tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
      break;
    }
H
hzcheng 已提交
362 363
    if (fdNum < 0) continue;

J
Jeff Tao 已提交
364
    for (int i = 0; i < fdNum; ++i) {
H
hzcheng 已提交
365 366 367
      pFdObj = events[i].data.ptr;

      if (events[i].events & EPOLLERR) {
368 369
        tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
370 371 372 373
        continue;
      }

      if (events[i].events & EPOLLHUP) {
374 375
        tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
376 377 378
        continue;
      }

379
      int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
380
      if (headLen != sizeof(SRpcHead)) {
381 382
        tError("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
383 384 385
        continue;
      }

386 387 388
      int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
      char   *buffer = malloc(msgLen + tsRpcOverhead);
      if ( NULL == buffer) {
389 390
        tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
        taosReportBrokenLink(pFdObj);
391 392
        continue;
      }
H
hzcheng 已提交
393

394 395 396
      char   *msg = buffer + tsRpcOverhead;
      int32_t leftLen = msgLen - headLen;
      int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
H
hzcheng 已提交
397 398

      if (leftLen != retLen) {
399 400 401
        tError("%s %p, read error, leftLen:%d retLen:%d", 
                pThreadObj->label, pFdObj->thandle, leftLen, retLen);
        taosReportBrokenLink(pFdObj);
S
slguan 已提交
402
        tfree(buffer);
H
hzcheng 已提交
403 404 405
        continue;
      }

J
jtao1735 已提交
406
      // tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen);
407 408 409 410

      memcpy(msg, &rpcHead, sizeof(SRpcHead));
      recvInfo.msg = msg;
      recvInfo.msgLen = msgLen;
411 412 413 414 415 416
      recvInfo.ip = pFdObj->ip;
      recvInfo.port = pFdObj->port;
      recvInfo.shandle = pThreadObj->shandle;
      recvInfo.thandle = pFdObj->thandle;;
      recvInfo.chandle = pFdObj;
      recvInfo.connType = RPC_CONN_TCP;
H
hzcheng 已提交
417

418
      pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
J
Jeff Tao 已提交
419
      if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
H
hzcheng 已提交
420 421
    }
  }
J
Jeff Tao 已提交
422 423

  return NULL;
H
hzcheng 已提交
424 425
}

J
Jeff Tao 已提交
426
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
H
hzcheng 已提交
427 428
  struct epoll_event event;

J
Jeff Tao 已提交
429 430
  SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
  if (pFdObj == NULL) return NULL;
H
hzcheng 已提交
431

J
Jeff Tao 已提交
432 433 434
  pFdObj->fd = fd;
  pFdObj->pThreadObj = pThreadObj;
  pFdObj->signature = pFdObj;
H
hzcheng 已提交
435

J
Jeff Tao 已提交
436 437 438 439 440
  event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
  event.data.ptr = pFdObj;
  if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
    tfree(pFdObj);
    return NULL;
H
hzcheng 已提交
441 442
  }

J
Jeff Tao 已提交
443 444 445 446 447 448 449
  // notify the data process, add into the FdObj list
  pthread_mutex_lock(&(pThreadObj->mutex));
  pFdObj->next = pThreadObj->pHead;
  if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
  pThreadObj->pHead = pFdObj;
  pThreadObj->numOfFds++;
  pthread_mutex_unlock(&(pThreadObj->mutex));
H
hzcheng 已提交
450

J
Jeff Tao 已提交
451
  return pFdObj;
H
hzcheng 已提交
452 453
}

J
Jeff Tao 已提交
454
static void taosFreeFdObj(SFdObj *pFdObj) {
455 456 457 458

  if (pFdObj == NULL) return;
  if (pFdObj->signature != pFdObj) return;

459
  SThreadObj *pThreadObj = pFdObj->pThreadObj;
J
Jeff Tao 已提交
460
  pthread_mutex_lock(&pThreadObj->mutex);
461

J
Jeff Tao 已提交
462 463 464 465 466 467
  if (pFdObj->signature == NULL) {
    pthread_mutex_unlock(&pThreadObj->mutex);
    return;
  }

  pFdObj->signature = NULL;
468
  epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
469
  close(pFdObj->fd);
470 471 472 473

  pThreadObj->numOfFds--;

  if (pThreadObj->numOfFds < 0)
474 475
    tError("%s %p, TCP thread:%d, number of FDs is negative!!!", 
            pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
476 477 478 479 480 481 482 483 484 485 486 487 488

  // remove from the FdObject list

  if (pFdObj->prev) {
    (pFdObj->prev)->next = pFdObj->next;
  } else {
    pThreadObj->pHead = pFdObj->next;
  }

  if (pFdObj->next) {
    (pFdObj->next)->prev = pFdObj->prev;
  }

J
Jeff Tao 已提交
489
  pthread_mutex_unlock(&pThreadObj->mutex);
490

491 492 493
  tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", 
          pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);

494 495
  tfree(pFdObj);
}