rpcTcp.c 14.4 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18 19 20
#include "tlog.h"
#include "tlog.h"
#include "tsocket.h"
#include "tutil.h"
21
#include "rpcHead.h"
J
Jeff Tao 已提交
22
#include "rpcTcp.h"
H
hzcheng 已提交
23

24 25 26
#ifndef EPOLLWAKEUP
  #define EPOLLWAKEUP (1u << 29)
#endif
H
hzcheng 已提交
27

J
Jeff Tao 已提交
28 29 30 31 32 33 34 35 36
typedef struct SFdObj {
  void              *signature;
  int                fd;       // TCP socket FD
  void              *thandle;  // handle from upper layer, like TAOS
  uint32_t           ip;
  uint16_t           port;
  struct SThreadObj *pThreadObj;
  struct SFdObj     *prev;
  struct SFdObj     *next;
H
hzcheng 已提交
37 38
} SFdObj;

J
Jeff Tao 已提交
39
typedef struct SThreadObj {
H
hzcheng 已提交
40 41
  pthread_t       thread;
  SFdObj *        pHead;
J
Jeff Tao 已提交
42
  pthread_mutex_t mutex;
H
hzcheng 已提交
43
  pthread_cond_t  fdReady;
J
Jeff Tao 已提交
44
  char            ipstr[TSDB_IPv4ADDR_LEN];
H
hzcheng 已提交
45 46 47 48
  int             pollFd;
  int             numOfFds;
  int             threadId;
  char            label[12];
49 50
  void           *shandle;  // handle passed by upper layer during server initialization
  void           *(*processData)(SRecvInfo *pPacket);
H
hzcheng 已提交
51 52 53
} SThreadObj;

typedef struct {
J
Jeff Tao 已提交
54
  char        ip[TSDB_IPv4ADDR_LEN];
L
lihui 已提交
55
  uint16_t    port;
H
hzcheng 已提交
56 57 58 59 60 61 62
  char        label[12];
  int         numOfThreads;
  void *      shandle;
  SThreadObj *pThreadObj;
  pthread_t   thread;
} SServerObj;

J
Jeff Tao 已提交
63 64 65 66 67
static void   *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void    taosFreeFdObj(SFdObj *pFdObj);
static void    taosReportBrokenLink(SFdObj *pFdObj);
static void    taosAcceptTcpConnection(void *arg);
H
hzcheng 已提交
68

69
void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
J
Jeff Tao 已提交
70 71
  SServerObj *pServerObj;
  SThreadObj *pThreadObj;
H
hzcheng 已提交
72

J
Jeff Tao 已提交
73
  pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
74 75 76 77 78
  strcpy(pServerObj->ip, ip);
  pServerObj->port = port;
  strcpy(pServerObj->label, label);
  pServerObj->numOfThreads = numOfThreads;

J
Jeff Tao 已提交
79
  pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
80 81
  if (pServerObj->pThreadObj == NULL) {
    tError("TCP:%s no enough memory", label);
J
Jeff Tao 已提交
82
    free(pServerObj);
83
    return NULL;
H
hzcheng 已提交
84
  }
J
Jeff Tao 已提交
85

J
Jeff Tao 已提交
86
  int code = 0;
87
  pThreadObj = pServerObj->pThreadObj;
J
Jeff Tao 已提交
88
  for (int i = 0; i < numOfThreads; ++i) {
89 90 91
    pThreadObj->processData = fp;
    strcpy(pThreadObj->label, label);
    pThreadObj->shandle = shandle;
H
hzcheng 已提交
92

J
Jeff Tao 已提交
93
    code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
J
Jeff Tao 已提交
94 95 96
    if (code < 0) {
      tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
      break;;
97
    }
H
hzcheng 已提交
98

J
Jeff Tao 已提交
99 100 101 102
    code = pthread_cond_init(&(pThreadObj->fdReady), NULL);
    if (code != 0) {
      tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
      break;
103
    }
H
hzcheng 已提交
104

105 106 107
    pThreadObj->pollFd = epoll_create(10);  // size does not matter
    if (pThreadObj->pollFd < 0) {
      tError("%s failed to create TCP epoll", label);
J
Jeff Tao 已提交
108 109
      code = -1;
      break;
110
    }
H
hzcheng 已提交
111

J
Jeff Tao 已提交
112 113 114
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
J
Jeff Tao 已提交
115
    code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
J
Jeff Tao 已提交
116 117 118 119
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
      break;
120
    }
H
hzcheng 已提交
121

122 123
    pThreadObj->threadId = i;
    pThreadObj++;
H
hzcheng 已提交
124 125
  }

J
Jeff Tao 已提交
126 127 128 129 130 131 132 133 134
  if (code == 0) { 
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
    code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
    }
H
hzcheng 已提交
135 136
  }

J
Jeff Tao 已提交
137 138 139 140 141 142 143
  if (code != 0) {
    free(pServerObj->pThreadObj);
    free(pServerObj);
    pServerObj = NULL;
  } else {
    tTrace("%s TCP server is initialized, ip:%s port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
  }
H
hzcheng 已提交
144

145
  return (void *)pServerObj;
H
hzcheng 已提交
146 147 148
}

void taosCleanUpTcpServer(void *handle) {
149
  SServerObj *pServerObj = handle;
J
Jeff Tao 已提交
150
  SThreadObj *pThreadObj;
H
hzcheng 已提交
151 152 153 154 155 156

  if (pServerObj == NULL) return;

  pthread_cancel(pServerObj->thread);
  pthread_join(pServerObj->thread, NULL);

J
Jeff Tao 已提交
157
  for (int i = 0; i < pServerObj->numOfThreads; ++i) {
H
hzcheng 已提交
158 159 160
    pThreadObj = pServerObj->pThreadObj + i;

    while (pThreadObj->pHead) {
J
Jeff Tao 已提交
161 162 163
      SFdObj *pFdObj = pThreadObj->pHead;
      pThreadObj->pHead = pFdObj->next;
      taosFreeFdObj(pFdObj);
H
hzcheng 已提交
164 165 166 167 168 169
    }

    close(pThreadObj->pollFd);
    pthread_cancel(pThreadObj->thread);
    pthread_join(pThreadObj->thread, NULL);
    pthread_cond_destroy(&(pThreadObj->fdReady));
J
Jeff Tao 已提交
170
    pthread_mutex_destroy(&(pThreadObj->mutex));
H
hzcheng 已提交
171 172 173 174
  }

  tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);

J
Jeff Tao 已提交
175
  tfree(pServerObj->pThreadObj);
H
hzcheng 已提交
176 177 178
  tfree(pServerObj);
}

J
Jeff Tao 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
static void taosAcceptTcpConnection(void *arg) {
  int                connFd = -1;
  struct sockaddr_in caddr;
  int                sockFd;
  int                threadId = 0;
  SThreadObj        *pThreadObj;
  SServerObj        *pServerObj;

  pServerObj = (SServerObj *)arg;

  sockFd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
  if (sockFd < 0) return; 

  tTrace("%s TCP server is ready, ip:%s:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);

  while (1) {
    socklen_t addrlen = sizeof(caddr);
    connFd = accept(sockFd, (struct sockaddr *)&caddr, &addrlen);

    if (connFd < 0) {
      tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
      continue;
    }

    tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port);
    taosKeepTcpAlive(connFd);

    // pick up the thread to handle this connection
    pThreadObj = pServerObj->pThreadObj + threadId;

    SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
    if (pFdObj) {
      pFdObj->ip = caddr.sin_addr.s_addr;
      pFdObj->port = caddr.sin_port;
      tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, 
              inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
    } else {
      close(connFd);
      tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno));
    }  

    // pick up next thread for next connection
    threadId++;
    threadId = threadId % pServerObj->numOfThreads;
  }
}

void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
  SThreadObj    *pThreadObj;
  pthread_attr_t thattr;

  pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
  memset(pThreadObj, 0, sizeof(SThreadObj));
  strcpy(pThreadObj->label, label);
  strcpy(pThreadObj->ipstr, ip);
  pThreadObj->shandle = shandle;

  if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
    tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
    return NULL;
  }

  if (pthread_cond_init(&(pThreadObj->fdReady), NULL) != 0) {
    tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
    return NULL;
  }

  pThreadObj->pollFd = epoll_create(10);  // size does not matter
  if (pThreadObj->pollFd < 0) {
    tError("%s failed to create TCP client epoll", label);
    return NULL;
  }

  pThreadObj->processData = fp;

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
  int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
  pthread_attr_destroy(&thattr);
  if (code != 0) {
    tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
    return NULL;
  }

  tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);

  return pThreadObj;
}

void taosCleanUpTcpClient(void *chandle) {
  SThreadObj *pThreadObj = chandle;
  if (pThreadObj == NULL) return;

  while (pThreadObj->pHead) {
J
Jeff Tao 已提交
273 274 275
    SFdObj *pFdObj = pThreadObj->pHead;
    pThreadObj->pHead = pFdObj->next;
    taosFreeFdObj(pFdObj);
J
Jeff Tao 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
  }

  close(pThreadObj->pollFd);

  pthread_cancel(pThreadObj->thread);
  pthread_join(pThreadObj->thread, NULL);

  tTrace (":%s, all connections are cleaned up", pThreadObj->label);

  tfree(pThreadObj);
}

void *taosOpenTcpClientConnection(void *shandle, void *thandle, char *ip, uint16_t port) {
  SThreadObj *    pThreadObj = shandle;
  struct in_addr  destIp;

  int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ipstr);
  if (fd <= 0) return NULL;

  inet_aton(ip, &destIp);
  SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
  
  if (pFdObj) {
    pFdObj->thandle = thandle;
    pFdObj->port = port;
    pFdObj->ip = destIp.s_addr;
    tTrace("%s %p, TCP connection to %s:%hu is created, FD:%p numOfFds:%d", 
            pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
  } else {
    close(fd);
    tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
  }

  return pFdObj;
}

void taosCloseTcpConnection(void *chandle) {
313
  SFdObj *pFdObj = chandle;
314 315
  if (pFdObj == NULL) return;

J
Jeff Tao 已提交
316
  taosFreeFdObj(pFdObj);
317 318
}

J
Jeff Tao 已提交
319
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
320
  SFdObj *pFdObj = chandle;
321 322 323 324 325 326

  if (chandle == NULL) return -1;

  return (int)send(pFdObj->fd, data, (size_t)len, 0);
}

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
static void taosReportBrokenLink(SFdObj *pFdObj) {

  SThreadObj *pThreadObj = pFdObj->pThreadObj;

  // notify the upper layer, so it will clean the associated context
  if (pFdObj->thandle) {
    SRecvInfo recvInfo;
    recvInfo.msg = NULL;
    recvInfo.msgLen = 0;
    recvInfo.ip = 0;
    recvInfo.port = 0;
    recvInfo.shandle = pThreadObj->shandle;
    recvInfo.thandle = pFdObj->thandle;;
    recvInfo.chandle = NULL;
    recvInfo.connType = RPC_CONN_TCP;
    (*(pThreadObj->processData))(&recvInfo);
  } 
}

J
Jeff Tao 已提交
346 347 348 349 350
#define maxEvents 10

static void *taosProcessTcpData(void *param) {
  SThreadObj        *pThreadObj = param;
  SFdObj            *pFdObj;
H
hzcheng 已提交
351
  struct epoll_event events[maxEvents];
352 353
  SRecvInfo          recvInfo;
  SRpcHead           rpcHead;
H
hzcheng 已提交
354 355

  while (1) {
J
Jeff Tao 已提交
356
    pthread_mutex_lock(&pThreadObj->mutex);
H
hzcheng 已提交
357
    if (pThreadObj->numOfFds < 1) {
J
Jeff Tao 已提交
358
      pthread_cond_wait(&pThreadObj->fdReady, &pThreadObj->mutex);
H
hzcheng 已提交
359
    }
J
Jeff Tao 已提交
360
    pthread_mutex_unlock(&pThreadObj->mutex);
H
hzcheng 已提交
361

J
Jeff Tao 已提交
362
    int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
H
hzcheng 已提交
363 364
    if (fdNum < 0) continue;

J
Jeff Tao 已提交
365
    for (int i = 0; i < fdNum; ++i) {
H
hzcheng 已提交
366 367 368
      pFdObj = events[i].data.ptr;

      if (events[i].events & EPOLLERR) {
369 370
        tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
371 372 373 374
        continue;
      }

      if (events[i].events & EPOLLHUP) {
375 376
        tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
377 378 379
        continue;
      }

380
      int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
381
      if (headLen != sizeof(SRpcHead)) {
382 383
        tError("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
384 385 386
        continue;
      }

387 388 389
      int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
      char   *buffer = malloc(msgLen + tsRpcOverhead);
      if ( NULL == buffer) {
390 391
        tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
        taosReportBrokenLink(pFdObj);
392 393
        continue;
      }
H
hzcheng 已提交
394

395 396 397
      char   *msg = buffer + tsRpcOverhead;
      int32_t leftLen = msgLen - headLen;
      int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
H
hzcheng 已提交
398 399

      if (leftLen != retLen) {
400 401 402
        tError("%s %p, read error, leftLen:%d retLen:%d", 
                pThreadObj->label, pFdObj->thandle, leftLen, retLen);
        taosReportBrokenLink(pFdObj);
S
slguan 已提交
403
        tfree(buffer);
H
hzcheng 已提交
404 405 406
        continue;
      }

J
Jeff Tao 已提交
407
      // tTrace("%s TCP data is received, ip:%s:%u len:%d", pThreadObj->label, pFdObj->ipstr, pFdObj->port, msgLen);
408 409 410 411

      memcpy(msg, &rpcHead, sizeof(SRpcHead));
      recvInfo.msg = msg;
      recvInfo.msgLen = msgLen;
412 413 414 415 416 417
      recvInfo.ip = pFdObj->ip;
      recvInfo.port = pFdObj->port;
      recvInfo.shandle = pThreadObj->shandle;
      recvInfo.thandle = pFdObj->thandle;;
      recvInfo.chandle = pFdObj;
      recvInfo.connType = RPC_CONN_TCP;
H
hzcheng 已提交
418

419
      pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
J
Jeff Tao 已提交
420
      if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
H
hzcheng 已提交
421 422
    }
  }
J
Jeff Tao 已提交
423 424

  return NULL;
H
hzcheng 已提交
425 426
}

J
Jeff Tao 已提交
427
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
H
hzcheng 已提交
428 429
  struct epoll_event event;

J
Jeff Tao 已提交
430 431
  SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
  if (pFdObj == NULL) return NULL;
H
hzcheng 已提交
432

J
Jeff Tao 已提交
433 434 435
  pFdObj->fd = fd;
  pFdObj->pThreadObj = pThreadObj;
  pFdObj->signature = pFdObj;
H
hzcheng 已提交
436

J
Jeff Tao 已提交
437 438 439 440 441
  event.events = EPOLLIN | EPOLLPRI | EPOLLWAKEUP;
  event.data.ptr = pFdObj;
  if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
    tfree(pFdObj);
    return NULL;
H
hzcheng 已提交
442 443
  }

J
Jeff Tao 已提交
444 445 446 447 448 449 450 451
  // notify the data process, add into the FdObj list
  pthread_mutex_lock(&(pThreadObj->mutex));
  pFdObj->next = pThreadObj->pHead;
  if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
  pThreadObj->pHead = pFdObj;
  pThreadObj->numOfFds++;
  pthread_cond_signal(&pThreadObj->fdReady);
  pthread_mutex_unlock(&(pThreadObj->mutex));
H
hzcheng 已提交
452

J
Jeff Tao 已提交
453
  return pFdObj;
H
hzcheng 已提交
454 455
}

J
Jeff Tao 已提交
456
static void taosFreeFdObj(SFdObj *pFdObj) {
457 458 459 460

  if (pFdObj == NULL) return;
  if (pFdObj->signature != pFdObj) return;

461
  SThreadObj *pThreadObj = pFdObj->pThreadObj;
J
Jeff Tao 已提交
462
  pthread_mutex_lock(&pThreadObj->mutex);
463

J
Jeff Tao 已提交
464 465 466 467 468 469
  if (pFdObj->signature == NULL) {
    pthread_mutex_unlock(&pThreadObj->mutex);
    return;
  }

  pFdObj->signature = NULL;
470
  close(pFdObj->fd);
471
  epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
472 473 474 475

  pThreadObj->numOfFds--;

  if (pThreadObj->numOfFds < 0)
476 477
    tError("%s %p, TCP thread:%d, number of FDs is negative!!!", 
            pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
478 479 480 481 482 483 484 485 486 487 488 489 490

  // remove from the FdObject list

  if (pFdObj->prev) {
    (pFdObj->prev)->next = pFdObj->next;
  } else {
    pThreadObj->pHead = pFdObj->next;
  }

  if (pFdObj->next) {
    (pFdObj->next)->prev = pFdObj->prev;
  }

J
Jeff Tao 已提交
491
  pthread_mutex_unlock(&pThreadObj->mutex);
492

493 494 495
  tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", 
          pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);

496 497 498
  tfree(pFdObj);
}

H
hzcheng 已提交
499