rpcTcp.c 14.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tsocket.h"
#include "tutil.h"
S
slguan 已提交
19
#include "rpcLog.h"
20
#include "rpcHead.h"
J
Jeff Tao 已提交
21
#include "rpcTcp.h"
H
hzcheng 已提交
22

23 24 25
#ifndef EPOLLWAKEUP
  #define EPOLLWAKEUP (1u << 29)
#endif
H
hzcheng 已提交
26

J
Jeff Tao 已提交
27 28 29 30 31 32 33 34 35
typedef struct SFdObj {
  void              *signature;
  int                fd;       // TCP socket FD
  void              *thandle;  // handle from upper layer, like TAOS
  uint32_t           ip;
  uint16_t           port;
  struct SThreadObj *pThreadObj;
  struct SFdObj     *prev;
  struct SFdObj     *next;
H
hzcheng 已提交
36 37
} SFdObj;

J
Jeff Tao 已提交
38
typedef struct SThreadObj {
H
hzcheng 已提交
39 40
  pthread_t       thread;
  SFdObj *        pHead;
J
Jeff Tao 已提交
41
  pthread_mutex_t mutex;
J
jtao1735 已提交
42
  uint32_t        ip;
43
  bool            stop;
H
hzcheng 已提交
44 45 46 47
  int             pollFd;
  int             numOfFds;
  int             threadId;
  char            label[12];
48 49
  void           *shandle;  // handle passed by upper layer during server initialization
  void           *(*processData)(SRecvInfo *pPacket);
H
hzcheng 已提交
50 51 52
} SThreadObj;

typedef struct {
53
  int         fd;
J
jtao1735 已提交
54
  uint32_t    ip;
L
lihui 已提交
55
  uint16_t    port;
H
hzcheng 已提交
56 57 58 59 60 61 62
  char        label[12];
  int         numOfThreads;
  void *      shandle;
  SThreadObj *pThreadObj;
  pthread_t   thread;
} SServerObj;

J
Jeff Tao 已提交
63 64 65 66
static void   *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void    taosFreeFdObj(SFdObj *pFdObj);
static void    taosReportBrokenLink(SFdObj *pFdObj);
67
static void*   taosAcceptTcpConnection(void *arg);
H
hzcheng 已提交
68

J
jtao1735 已提交
69
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
J
Jeff Tao 已提交
70 71
  SServerObj *pServerObj;
  SThreadObj *pThreadObj;
H
hzcheng 已提交
72

J
Jeff Tao 已提交
73
  pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
J
jtao1735 已提交
74
  pServerObj->ip = ip;
75 76 77 78
  pServerObj->port = port;
  strcpy(pServerObj->label, label);
  pServerObj->numOfThreads = numOfThreads;

J
Jeff Tao 已提交
79
  pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
80 81
  if (pServerObj->pThreadObj == NULL) {
    tError("TCP:%s no enough memory", label);
J
Jeff Tao 已提交
82
    free(pServerObj);
83
    return NULL;
H
hzcheng 已提交
84
  }
J
Jeff Tao 已提交
85

J
Jeff Tao 已提交
86
  int code = 0;
87
  pThreadObj = pServerObj->pThreadObj;
J
Jeff Tao 已提交
88
  for (int i = 0; i < numOfThreads; ++i) {
89 90 91
    pThreadObj->processData = fp;
    strcpy(pThreadObj->label, label);
    pThreadObj->shandle = shandle;
H
hzcheng 已提交
92

J
Jeff Tao 已提交
93
    code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
J
Jeff Tao 已提交
94 95 96
    if (code < 0) {
      tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
      break;;
97
    }
H
hzcheng 已提交
98

99 100 101
    pThreadObj->pollFd = epoll_create(10);  // size does not matter
    if (pThreadObj->pollFd < 0) {
      tError("%s failed to create TCP epoll", label);
J
Jeff Tao 已提交
102 103
      code = -1;
      break;
104
    }
H
hzcheng 已提交
105

J
Jeff Tao 已提交
106 107 108
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
J
Jeff Tao 已提交
109
    code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
J
Jeff Tao 已提交
110 111 112 113
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
      break;
114
    }
H
hzcheng 已提交
115

116 117
    pThreadObj->threadId = i;
    pThreadObj++;
H
hzcheng 已提交
118 119
  }

J
Jeff Tao 已提交
120 121 122 123 124 125 126 127 128
  if (code == 0) { 
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
    code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
    }
H
hzcheng 已提交
129 130
  }

J
Jeff Tao 已提交
131 132 133 134 135
  if (code != 0) {
    free(pServerObj->pThreadObj);
    free(pServerObj);
    pServerObj = NULL;
  } else {
J
jtao1735 已提交
136
    tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
J
Jeff Tao 已提交
137
  }
H
hzcheng 已提交
138

139
  return (void *)pServerObj;
H
hzcheng 已提交
140 141
}

142 143 144 145 146 147 148 149
static void taosStopTcpThread(SThreadObj* pThreadObj) {
  pThreadObj->stop = true;

  // signal the thread to stop, try graceful method first,
  // and use pthread_cancel when failed
  struct epoll_event event = { .events = EPOLLIN };
  eventfd_t fd = eventfd(1, 0);
  if (fd == -1) {
150
    tError("%s, failed to create eventfd, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno));
151 152
    pthread_cancel(pThreadObj->thread);
  } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
153
    tError("%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s", pThreadObj->label, strerror(errno));
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
    pthread_cancel(pThreadObj->thread);
  }

  pthread_join(pThreadObj->thread, NULL);
  close(pThreadObj->pollFd);
  if (fd != -1) {
    close(fd);
  }

  while (pThreadObj->pHead) {
    SFdObj *pFdObj = pThreadObj->pHead;
    pThreadObj->pHead = pFdObj->next;
    taosFreeFdObj(pFdObj);
  }
}


H
hzcheng 已提交
171
void taosCleanUpTcpServer(void *handle) {
172
  SServerObj *pServerObj = handle;
J
Jeff Tao 已提交
173
  SThreadObj *pThreadObj;
H
hzcheng 已提交
174 175 176

  if (pServerObj == NULL) return;

177
  shutdown(pServerObj->fd, SHUT_RD);
H
hzcheng 已提交
178 179
  pthread_join(pServerObj->thread, NULL);

J
Jeff Tao 已提交
180
  for (int i = 0; i < pServerObj->numOfThreads; ++i) {
H
hzcheng 已提交
181
    pThreadObj = pServerObj->pThreadObj + i;
182
    taosStopTcpThread(pThreadObj);
J
Jeff Tao 已提交
183
    pthread_mutex_destroy(&(pThreadObj->mutex));
H
hzcheng 已提交
184 185 186 187
  }

  tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);

J
Jeff Tao 已提交
188
  tfree(pServerObj->pThreadObj);
H
hzcheng 已提交
189 190 191
  tfree(pServerObj);
}

192
static void* taosAcceptTcpConnection(void *arg) {
J
Jeff Tao 已提交
193 194 195 196 197 198 199 200
  int                connFd = -1;
  struct sockaddr_in caddr;
  int                threadId = 0;
  SThreadObj        *pThreadObj;
  SServerObj        *pServerObj;

  pServerObj = (SServerObj *)arg;

201 202
  pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
  if (pServerObj->fd < 0) return NULL; 
J
Jeff Tao 已提交
203

204
  tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
J
Jeff Tao 已提交
205 206 207

  while (1) {
    socklen_t addrlen = sizeof(caddr);
208 209 210 211 212 213
    connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
    if (connFd == -1) {
      if (errno == EINVAL) {
        tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
        break;
      }
214
      tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
J
Jeff Tao 已提交
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
      continue;
    }

    taosKeepTcpAlive(connFd);

    // pick up the thread to handle this connection
    pThreadObj = pServerObj->pThreadObj + threadId;

    SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
    if (pFdObj) {
      pFdObj->ip = caddr.sin_addr.s_addr;
      pFdObj->port = caddr.sin_port;
      tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, 
              inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
    } else {
      close(connFd);
231 232
      tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
             inet_ntoa(caddr.sin_addr), caddr.sin_port);
J
Jeff Tao 已提交
233 234 235 236 237 238
    }  

    // pick up next thread for next connection
    threadId++;
    threadId = threadId % pServerObj->numOfThreads;
  }
239 240 241

  close(pServerObj->fd);
  return NULL;
J
Jeff Tao 已提交
242 243
}

J
jtao1735 已提交
244
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
J
Jeff Tao 已提交
245 246 247 248 249 250
  SThreadObj    *pThreadObj;
  pthread_attr_t thattr;

  pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
  memset(pThreadObj, 0, sizeof(SThreadObj));
  strcpy(pThreadObj->label, label);
J
jtao1735 已提交
251
  pThreadObj->ip = ip;
J
Jeff Tao 已提交
252 253 254 255
  pThreadObj->shandle = shandle;

  if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
    tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256
    free(pThreadObj);
J
Jeff Tao 已提交
257 258 259 260 261 262
    return NULL;
  }

  pThreadObj->pollFd = epoll_create(10);  // size does not matter
  if (pThreadObj->pollFd < 0) {
    tError("%s failed to create TCP client epoll", label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
263
    free(pThreadObj);
J
Jeff Tao 已提交
264 265 266 267 268 269 270 271 272 273
    return NULL;
  }

  pThreadObj->processData = fp;

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
  int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
  pthread_attr_destroy(&thattr);
  if (code != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274 275
    close(pThreadObj->pollFd);
    free(pThreadObj);
J
Jeff Tao 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288
    tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
    return NULL;
  }

  tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);

  return pThreadObj;
}

void taosCleanUpTcpClient(void *chandle) {
  SThreadObj *pThreadObj = chandle;
  if (pThreadObj == NULL) return;

289
  taosStopTcpThread(pThreadObj);
J
Jeff Tao 已提交
290 291 292 293 294
  tTrace (":%s, all connections are cleaned up", pThreadObj->label);

  tfree(pThreadObj);
}

J
jtao1735 已提交
295
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
J
Jeff Tao 已提交
296 297
  SThreadObj *    pThreadObj = shandle;

J
jtao1735 已提交
298
  int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
299
  if (fd < 0) return NULL;
J
Jeff Tao 已提交
300 301 302 303 304 305

  SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
  
  if (pFdObj) {
    pFdObj->thandle = thandle;
    pFdObj->port = port;
J
jtao1735 已提交
306 307
    pFdObj->ip = ip;
    tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d", 
J
Jeff Tao 已提交
308 309 310 311 312 313 314 315 316 317
            pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
  } else {
    close(fd);
    tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
  }

  return pFdObj;
}

void taosCloseTcpConnection(void *chandle) {
318
  SFdObj *pFdObj = chandle;
319 320
  if (pFdObj == NULL) return;

J
Jeff Tao 已提交
321
  taosFreeFdObj(pFdObj);
322 323
}

J
Jeff Tao 已提交
324
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
325
  SFdObj *pFdObj = chandle;
326 327 328 329 330 331

  if (chandle == NULL) return -1;

  return (int)send(pFdObj->fd, data, (size_t)len, 0);
}

332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
static void taosReportBrokenLink(SFdObj *pFdObj) {

  SThreadObj *pThreadObj = pFdObj->pThreadObj;

  // notify the upper layer, so it will clean the associated context
  if (pFdObj->thandle) {
    SRecvInfo recvInfo;
    recvInfo.msg = NULL;
    recvInfo.msgLen = 0;
    recvInfo.ip = 0;
    recvInfo.port = 0;
    recvInfo.shandle = pThreadObj->shandle;
    recvInfo.thandle = pFdObj->thandle;;
    recvInfo.chandle = NULL;
    recvInfo.connType = RPC_CONN_TCP;
    (*(pThreadObj->processData))(&recvInfo);
348 349 350
  } else {
    taosFreeFdObj(pFdObj);
  }
351 352
}

J
Jeff Tao 已提交
353 354 355 356 357
#define maxEvents 10

static void *taosProcessTcpData(void *param) {
  SThreadObj        *pThreadObj = param;
  SFdObj            *pFdObj;
H
hzcheng 已提交
358
  struct epoll_event events[maxEvents];
359 360
  SRecvInfo          recvInfo;
  SRpcHead           rpcHead;
361
 
H
hzcheng 已提交
362
  while (1) {
J
Jeff Tao 已提交
363
    int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
364 365 366 367
    if (pThreadObj->stop) {
      tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
      break;
    }
H
hzcheng 已提交
368 369
    if (fdNum < 0) continue;

J
Jeff Tao 已提交
370
    for (int i = 0; i < fdNum; ++i) {
H
hzcheng 已提交
371 372 373
      pFdObj = events[i].data.ptr;

      if (events[i].events & EPOLLERR) {
374 375
        tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
376 377 378 379
        continue;
      }

      if (events[i].events & EPOLLHUP) {
380 381
        tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
382 383 384
        continue;
      }

385
      int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
386
      if (headLen != sizeof(SRpcHead)) {
387
        tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
388
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
389 390 391
        continue;
      }

392 393 394
      int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
      char   *buffer = malloc(msgLen + tsRpcOverhead);
      if ( NULL == buffer) {
395 396
        tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
        taosReportBrokenLink(pFdObj);
397 398
        continue;
      }
H
hzcheng 已提交
399

400 401 402
      char   *msg = buffer + tsRpcOverhead;
      int32_t leftLen = msgLen - headLen;
      int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
H
hzcheng 已提交
403 404

      if (leftLen != retLen) {
405 406 407
        tError("%s %p, read error, leftLen:%d retLen:%d", 
                pThreadObj->label, pFdObj->thandle, leftLen, retLen);
        taosReportBrokenLink(pFdObj);
S
slguan 已提交
408
        tfree(buffer);
H
hzcheng 已提交
409 410 411
        continue;
      }

J
jtao1735 已提交
412
      // tTrace("%s TCP data is received, ip:0x%x:%u len:%d", pThreadObj->label, pFdObj->ip, pFdObj->port, msgLen);
413 414 415 416

      memcpy(msg, &rpcHead, sizeof(SRpcHead));
      recvInfo.msg = msg;
      recvInfo.msgLen = msgLen;
417 418 419 420 421 422
      recvInfo.ip = pFdObj->ip;
      recvInfo.port = pFdObj->port;
      recvInfo.shandle = pThreadObj->shandle;
      recvInfo.thandle = pFdObj->thandle;;
      recvInfo.chandle = pFdObj;
      recvInfo.connType = RPC_CONN_TCP;
H
hzcheng 已提交
423

424
      pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
425
      if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
H
hzcheng 已提交
426 427
    }
  }
J
Jeff Tao 已提交
428 429

  return NULL;
H
hzcheng 已提交
430 431
}

J
Jeff Tao 已提交
432
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
H
hzcheng 已提交
433 434
  struct epoll_event event;

J
Jeff Tao 已提交
435 436
  SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
  if (pFdObj == NULL) return NULL;
H
hzcheng 已提交
437

J
Jeff Tao 已提交
438 439 440
  pFdObj->fd = fd;
  pFdObj->pThreadObj = pThreadObj;
  pFdObj->signature = pFdObj;
H
hzcheng 已提交
441

J
Jeff Tao 已提交
442 443 444 445 446
  event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
  event.data.ptr = pFdObj;
  if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
    tfree(pFdObj);
    return NULL;
H
hzcheng 已提交
447 448
  }

J
Jeff Tao 已提交
449 450 451 452 453 454 455
  // notify the data process, add into the FdObj list
  pthread_mutex_lock(&(pThreadObj->mutex));
  pFdObj->next = pThreadObj->pHead;
  if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
  pThreadObj->pHead = pFdObj;
  pThreadObj->numOfFds++;
  pthread_mutex_unlock(&(pThreadObj->mutex));
H
hzcheng 已提交
456

J
Jeff Tao 已提交
457
  return pFdObj;
H
hzcheng 已提交
458 459
}

J
Jeff Tao 已提交
460
static void taosFreeFdObj(SFdObj *pFdObj) {
461 462 463 464

  if (pFdObj == NULL) return;
  if (pFdObj->signature != pFdObj) return;

465
  SThreadObj *pThreadObj = pFdObj->pThreadObj;
J
Jeff Tao 已提交
466
  pthread_mutex_lock(&pThreadObj->mutex);
467

J
Jeff Tao 已提交
468 469 470 471 472 473
  if (pFdObj->signature == NULL) {
    pthread_mutex_unlock(&pThreadObj->mutex);
    return;
  }

  pFdObj->signature = NULL;
474
  epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
475
  taosCloseSocket(pFdObj->fd);
476 477 478 479

  pThreadObj->numOfFds--;

  if (pThreadObj->numOfFds < 0)
480 481
    tError("%s %p, TCP thread:%d, number of FDs is negative!!!", 
            pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
482 483 484 485 486 487 488 489 490 491 492 493 494

  // remove from the FdObject list

  if (pFdObj->prev) {
    (pFdObj->prev)->next = pFdObj->next;
  } else {
    pThreadObj->pHead = pFdObj->next;
  }

  if (pFdObj->next) {
    (pFdObj->next)->prev = pFdObj->prev;
  }

J
Jeff Tao 已提交
495
  pthread_mutex_unlock(&pThreadObj->mutex);
496

497 498 499
  tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", 
          pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);

500 501
  tfree(pFdObj);
}