rpcTcp.c 14.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tsocket.h"
#include "tutil.h"
S
slguan 已提交
19
#include "rpcLog.h"
20
#include "rpcHead.h"
J
Jeff Tao 已提交
21
#include "rpcTcp.h"
H
hzcheng 已提交
22

23 24 25
#ifndef EPOLLWAKEUP
  #define EPOLLWAKEUP (1u << 29)
#endif
H
hzcheng 已提交
26

J
Jeff Tao 已提交
27 28 29 30 31 32 33 34 35
typedef struct SFdObj {
  void              *signature;
  int                fd;       // TCP socket FD
  void              *thandle;  // handle from upper layer, like TAOS
  uint32_t           ip;
  uint16_t           port;
  struct SThreadObj *pThreadObj;
  struct SFdObj     *prev;
  struct SFdObj     *next;
H
hzcheng 已提交
36 37
} SFdObj;

J
Jeff Tao 已提交
38
typedef struct SThreadObj {
H
hzcheng 已提交
39 40
  pthread_t       thread;
  SFdObj *        pHead;
J
Jeff Tao 已提交
41
  pthread_mutex_t mutex;
J
jtao1735 已提交
42
  uint32_t        ip;
43
  bool            stop;
H
hzcheng 已提交
44 45 46 47
  int             pollFd;
  int             numOfFds;
  int             threadId;
  char            label[12];
48 49
  void           *shandle;  // handle passed by upper layer during server initialization
  void           *(*processData)(SRecvInfo *pPacket);
H
hzcheng 已提交
50 51 52
} SThreadObj;

typedef struct {
53
  int         fd;
J
jtao1735 已提交
54
  uint32_t    ip;
L
lihui 已提交
55
  uint16_t    port;
H
hzcheng 已提交
56 57 58 59 60 61 62
  char        label[12];
  int         numOfThreads;
  void *      shandle;
  SThreadObj *pThreadObj;
  pthread_t   thread;
} SServerObj;

J
Jeff Tao 已提交
63 64 65 66
static void   *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void    taosFreeFdObj(SFdObj *pFdObj);
static void    taosReportBrokenLink(SFdObj *pFdObj);
67
static void*   taosAcceptTcpConnection(void *arg);
H
hzcheng 已提交
68

J
jtao1735 已提交
69
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
J
Jeff Tao 已提交
70 71
  SServerObj *pServerObj;
  SThreadObj *pThreadObj;
H
hzcheng 已提交
72

J
Jeff Tao 已提交
73
  pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
J
jtao1735 已提交
74
  pServerObj->ip = ip;
75 76 77 78
  pServerObj->port = port;
  strcpy(pServerObj->label, label);
  pServerObj->numOfThreads = numOfThreads;

J
Jeff Tao 已提交
79
  pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
80 81
  if (pServerObj->pThreadObj == NULL) {
    tError("TCP:%s no enough memory", label);
J
Jeff Tao 已提交
82
    free(pServerObj);
83
    return NULL;
H
hzcheng 已提交
84
  }
J
Jeff Tao 已提交
85

J
Jeff Tao 已提交
86
  int code = 0;
87
  pThreadObj = pServerObj->pThreadObj;
J
Jeff Tao 已提交
88
  for (int i = 0; i < numOfThreads; ++i) {
89 90 91
    pThreadObj->processData = fp;
    strcpy(pThreadObj->label, label);
    pThreadObj->shandle = shandle;
H
hzcheng 已提交
92

J
Jeff Tao 已提交
93
    code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
J
Jeff Tao 已提交
94 95 96
    if (code < 0) {
      tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
      break;;
97
    }
H
hzcheng 已提交
98

99 100 101
    pThreadObj->pollFd = epoll_create(10);  // size does not matter
    if (pThreadObj->pollFd < 0) {
      tError("%s failed to create TCP epoll", label);
J
Jeff Tao 已提交
102 103
      code = -1;
      break;
104
    }
H
hzcheng 已提交
105

J
Jeff Tao 已提交
106 107 108
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
J
Jeff Tao 已提交
109
    code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
J
Jeff Tao 已提交
110 111 112 113
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
      break;
114
    }
H
hzcheng 已提交
115

116 117
    pThreadObj->threadId = i;
    pThreadObj++;
H
hzcheng 已提交
118 119
  }

J
Jeff Tao 已提交
120 121 122 123 124 125 126 127 128
  if (code == 0) { 
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
    code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
    }
H
hzcheng 已提交
129 130
  }

J
Jeff Tao 已提交
131 132 133 134 135
  if (code != 0) {
    free(pServerObj->pThreadObj);
    free(pServerObj);
    pServerObj = NULL;
  } else {
J
jtao1735 已提交
136
    tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
J
Jeff Tao 已提交
137
  }
H
hzcheng 已提交
138

139
  return (void *)pServerObj;
H
hzcheng 已提交
140 141
}

142 143 144 145 146 147 148 149
static void taosStopTcpThread(SThreadObj* pThreadObj) {
  pThreadObj->stop = true;

  // signal the thread to stop, try graceful method first,
  // and use pthread_cancel when failed
  struct epoll_event event = { .events = EPOLLIN };
  eventfd_t fd = eventfd(1, 0);
  if (fd == -1) {
150
    tError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno));
151 152
    pthread_cancel(pThreadObj->thread);
  } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
153
    tError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno));
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
    pthread_cancel(pThreadObj->thread);
  }

  pthread_join(pThreadObj->thread, NULL);
  close(pThreadObj->pollFd);
  if (fd != -1) {
    close(fd);
  }

  while (pThreadObj->pHead) {
    SFdObj *pFdObj = pThreadObj->pHead;
    pThreadObj->pHead = pFdObj->next;
    taosFreeFdObj(pFdObj);
  }
}


H
hzcheng 已提交
171
void taosCleanUpTcpServer(void *handle) {
172
  SServerObj *pServerObj = handle;
J
Jeff Tao 已提交
173
  SThreadObj *pThreadObj;
H
hzcheng 已提交
174 175 176

  if (pServerObj == NULL) return;

177
  shutdown(pServerObj->fd, SHUT_RD);
H
hzcheng 已提交
178 179
  pthread_join(pServerObj->thread, NULL);

J
Jeff Tao 已提交
180
  for (int i = 0; i < pServerObj->numOfThreads; ++i) {
H
hzcheng 已提交
181
    pThreadObj = pServerObj->pThreadObj + i;
182
    taosStopTcpThread(pThreadObj);
J
Jeff Tao 已提交
183
    pthread_mutex_destroy(&(pThreadObj->mutex));
H
hzcheng 已提交
184 185 186 187
  }

  tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);

J
Jeff Tao 已提交
188
  tfree(pServerObj->pThreadObj);
H
hzcheng 已提交
189 190 191
  tfree(pServerObj);
}

192
static void* taosAcceptTcpConnection(void *arg) {
J
Jeff Tao 已提交
193 194 195 196 197 198 199 200
  int                connFd = -1;
  struct sockaddr_in caddr;
  int                threadId = 0;
  SThreadObj        *pThreadObj;
  SServerObj        *pServerObj;

  pServerObj = (SServerObj *)arg;

201 202
  pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
  if (pServerObj->fd < 0) return NULL; 
J
Jeff Tao 已提交
203

204
  tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
J
Jeff Tao 已提交
205 206 207

  while (1) {
    socklen_t addrlen = sizeof(caddr);
208 209 210 211 212 213
    connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
    if (connFd == -1) {
      if (errno == EINVAL) {
        tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
        break;
      }
214
      tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
J
Jeff Tao 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
      continue;
    }

    taosKeepTcpAlive(connFd);

    // pick up the thread to handle this connection
    pThreadObj = pServerObj->pThreadObj + threadId;

    SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
    if (pFdObj) {
      pFdObj->ip = caddr.sin_addr.s_addr;
      pFdObj->port = caddr.sin_port;
      tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, 
              inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
    } else {
      close(connFd);
231 232
      tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
             inet_ntoa(caddr.sin_addr), caddr.sin_port);
J
Jeff Tao 已提交
233 234 235 236 237 238
    }  

    // pick up next thread for next connection
    threadId++;
    threadId = threadId % pServerObj->numOfThreads;
  }
239 240 241

  close(pServerObj->fd);
  return NULL;
J
Jeff Tao 已提交
242 243
}

J
jtao1735 已提交
244
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
J
Jeff Tao 已提交
245 246 247 248 249 250
  SThreadObj    *pThreadObj;
  pthread_attr_t thattr;

  pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
  memset(pThreadObj, 0, sizeof(SThreadObj));
  strcpy(pThreadObj->label, label);
J
jtao1735 已提交
251
  pThreadObj->ip = ip;
J
Jeff Tao 已提交
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
  pThreadObj->shandle = shandle;

  if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
    tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
    return NULL;
  }

  pThreadObj->pollFd = epoll_create(10);  // size does not matter
  if (pThreadObj->pollFd < 0) {
    tError("%s failed to create TCP client epoll", label);
    return NULL;
  }

  pThreadObj->processData = fp;

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
  int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
  pthread_attr_destroy(&thattr);
  if (code != 0) {
    tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
    return NULL;
  }

  tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);

  return pThreadObj;
}

void taosCleanUpTcpClient(void *chandle) {
  SThreadObj *pThreadObj = chandle;
  if (pThreadObj == NULL) return;

285
  taosStopTcpThread(pThreadObj);
J
Jeff Tao 已提交
286 287 288 289 290
  tTrace (":%s, all connections are cleaned up", pThreadObj->label);

  tfree(pThreadObj);
}

J
jtao1735 已提交
291
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
J
Jeff Tao 已提交
292 293
  SThreadObj *    pThreadObj = shandle;

J
jtao1735 已提交
294
  int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
J
Jeff Tao 已提交
295 296 297 298 299 300 301
  if (fd <= 0) return NULL;

  SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
  
  if (pFdObj) {
    pFdObj->thandle = thandle;
    pFdObj->port = port;
J
jtao1735 已提交
302 303
    pFdObj->ip = ip;
    tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d", 
J
Jeff Tao 已提交
304 305 306 307 308 309 310 311 312 313
            pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
  } else {
    close(fd);
    tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
  }

  return pFdObj;
}

void taosCloseTcpConnection(void *chandle) {
314
  SFdObj *pFdObj = chandle;
315 316
  if (pFdObj == NULL) return;

J
Jeff Tao 已提交
317
  taosFreeFdObj(pFdObj);
318 319
}

J
Jeff Tao 已提交
320
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
321
  SFdObj *pFdObj = chandle;
322 323 324 325 326 327

  if (chandle == NULL) return -1;

  return (int)send(pFdObj->fd, data, (size_t)len, 0);
}

328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
static void taosReportBrokenLink(SFdObj *pFdObj) {

  SThreadObj *pThreadObj = pFdObj->pThreadObj;

  // notify the upper layer, so it will clean the associated context
  if (pFdObj->thandle) {
    SRecvInfo recvInfo;
    recvInfo.msg = NULL;
    recvInfo.msgLen = 0;
    recvInfo.ip = 0;
    recvInfo.port = 0;
    recvInfo.shandle = pThreadObj->shandle;
    recvInfo.thandle = pFdObj->thandle;;
    recvInfo.chandle = NULL;
    recvInfo.connType = RPC_CONN_TCP;
    (*(pThreadObj->processData))(&recvInfo);
344 345 346
  } else {
    taosFreeFdObj(pFdObj);
  }
347 348
}

J
Jeff Tao 已提交
349 350 351 352 353
#define maxEvents 10

static void *taosProcessTcpData(void *param) {
  SThreadObj        *pThreadObj = param;
  SFdObj            *pFdObj;
H
hzcheng 已提交
354
  struct epoll_event events[maxEvents];
355 356
  SRecvInfo          recvInfo;
  SRpcHead           rpcHead;
357
 
H
hzcheng 已提交
358
  while (1) {
J
Jeff Tao 已提交
359
    int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
360 361 362 363
    if (pThreadObj->stop) {
      tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
      break;
    }
H
hzcheng 已提交
364 365
    if (fdNum < 0) continue;

J
Jeff Tao 已提交
366
    for (int i = 0; i < fdNum; ++i) {
H
hzcheng 已提交
367 368 369
      pFdObj = events[i].data.ptr;

      if (events[i].events & EPOLLERR) {
370 371
        tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
372 373 374 375
        continue;
      }

      if (events[i].events & EPOLLHUP) {
376 377
        tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
378 379 380
        continue;
      }

381
      int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
382
      if (headLen != sizeof(SRpcHead)) {
383
        tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
384
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
385 386 387
        continue;
      }

388 389 390
      int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
      char   *buffer = malloc(msgLen + tsRpcOverhead);
      if ( NULL == buffer) {
391 392
        tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
        taosReportBrokenLink(pFdObj);
393 394
        continue;
      }
H
hzcheng 已提交
395

396 397 398
      char   *msg = buffer + tsRpcOverhead;
      int32_t leftLen = msgLen - headLen;
      int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
H
hzcheng 已提交
399 400

      if (leftLen != retLen) {
401 402 403
        tError("%s %p, read error, leftLen:%d retLen:%d", 
                pThreadObj->label, pFdObj->thandle, leftLen, retLen);
        taosReportBrokenLink(pFdObj);
S
slguan 已提交
404
        tfree(buffer);
H
hzcheng 已提交
405 406 407
        continue;
      }

J
jtao1735 已提交
408
      // tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen);
409 410 411 412

      memcpy(msg, &rpcHead, sizeof(SRpcHead));
      recvInfo.msg = msg;
      recvInfo.msgLen = msgLen;
413 414 415 416 417 418
      recvInfo.ip = pFdObj->ip;
      recvInfo.port = pFdObj->port;
      recvInfo.shandle = pThreadObj->shandle;
      recvInfo.thandle = pFdObj->thandle;;
      recvInfo.chandle = pFdObj;
      recvInfo.connType = RPC_CONN_TCP;
H
hzcheng 已提交
419

420
      pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
421
      if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
H
hzcheng 已提交
422 423
    }
  }
J
Jeff Tao 已提交
424 425

  return NULL;
H
hzcheng 已提交
426 427
}

J
Jeff Tao 已提交
428
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
H
hzcheng 已提交
429 430
  struct epoll_event event;

J
Jeff Tao 已提交
431 432
  SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
  if (pFdObj == NULL) return NULL;
H
hzcheng 已提交
433

J
Jeff Tao 已提交
434 435 436
  pFdObj->fd = fd;
  pFdObj->pThreadObj = pThreadObj;
  pFdObj->signature = pFdObj;
H
hzcheng 已提交
437

J
Jeff Tao 已提交
438 439 440 441 442
  event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
  event.data.ptr = pFdObj;
  if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
    tfree(pFdObj);
    return NULL;
H
hzcheng 已提交
443 444
  }

J
Jeff Tao 已提交
445 446 447 448 449 450 451
  // notify the data process, add into the FdObj list
  pthread_mutex_lock(&(pThreadObj->mutex));
  pFdObj->next = pThreadObj->pHead;
  if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
  pThreadObj->pHead = pFdObj;
  pThreadObj->numOfFds++;
  pthread_mutex_unlock(&(pThreadObj->mutex));
H
hzcheng 已提交
452

J
Jeff Tao 已提交
453
  return pFdObj;
H
hzcheng 已提交
454 455
}

J
Jeff Tao 已提交
456
static void taosFreeFdObj(SFdObj *pFdObj) {
457 458 459 460

  if (pFdObj == NULL) return;
  if (pFdObj->signature != pFdObj) return;

461
  SThreadObj *pThreadObj = pFdObj->pThreadObj;
J
Jeff Tao 已提交
462
  pthread_mutex_lock(&pThreadObj->mutex);
463

J
Jeff Tao 已提交
464 465 466 467 468 469
  if (pFdObj->signature == NULL) {
    pthread_mutex_unlock(&pThreadObj->mutex);
    return;
  }

  pFdObj->signature = NULL;
470
  epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
471
  taosCloseSocket(pFdObj->fd);
472 473 474 475

  pThreadObj->numOfFds--;

  if (pThreadObj->numOfFds < 0)
476 477
    tError("%s %p, TCP thread:%d, number of FDs is negative!!!", 
            pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
478 479 480 481 482 483 484 485 486 487 488 489 490

  // remove from the FdObject list

  if (pFdObj->prev) {
    (pFdObj->prev)->next = pFdObj->next;
  } else {
    pThreadObj->pHead = pFdObj->next;
  }

  if (pFdObj->next) {
    (pFdObj->next)->prev = pFdObj->prev;
  }

J
Jeff Tao 已提交
491
  pthread_mutex_unlock(&pThreadObj->mutex);
492

493 494 495
  tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", 
          pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);

496 497
  tfree(pFdObj);
}