rpcTcp.c 15.8 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tsocket.h"
#include "tutil.h"
19
#include "taosdef.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
20
#include "taoserror.h" 
S
slguan 已提交
21
#include "rpcLog.h"
22
#include "rpcHead.h"
J
Jeff Tao 已提交
23
#include "rpcTcp.h"
H
hzcheng 已提交
24

25 26 27
#ifndef EPOLLWAKEUP
  #define EPOLLWAKEUP (1u << 29)
#endif
H
hzcheng 已提交
28

J
Jeff Tao 已提交
29 30
typedef struct SFdObj {
  void              *signature;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31 32 33
  int                fd;          // TCP socket FD
  int                closedByApp; // 1: already closed by App
  void              *thandle;     // handle from upper layer, like TAOS
J
Jeff Tao 已提交
34 35 36 37 38
  uint32_t           ip;
  uint16_t           port;
  struct SThreadObj *pThreadObj;
  struct SFdObj     *prev;
  struct SFdObj     *next;
H
hzcheng 已提交
39 40
} SFdObj;

J
Jeff Tao 已提交
41
typedef struct SThreadObj {
H
hzcheng 已提交
42 43
  pthread_t       thread;
  SFdObj *        pHead;
J
Jeff Tao 已提交
44
  pthread_mutex_t mutex;
J
jtao1735 已提交
45
  uint32_t        ip;
46
  bool            stop;
H
hzcheng 已提交
47 48 49
  int             pollFd;
  int             numOfFds;
  int             threadId;
50
  char            label[TSDB_LABEL_LEN];
51 52
  void           *shandle;  // handle passed by upper layer during server initialization
  void           *(*processData)(SRecvInfo *pPacket);
H
hzcheng 已提交
53 54 55
} SThreadObj;

typedef struct {
56
  int         fd;
J
jtao1735 已提交
57
  uint32_t    ip;
L
lihui 已提交
58
  uint16_t    port;
59
  char        label[TSDB_LABEL_LEN];
H
hzcheng 已提交
60 61 62 63 64 65
  int         numOfThreads;
  void *      shandle;
  SThreadObj *pThreadObj;
  pthread_t   thread;
} SServerObj;

J
Jeff Tao 已提交
66 67 68 69
static void   *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void    taosFreeFdObj(SFdObj *pFdObj);
static void    taosReportBrokenLink(SFdObj *pFdObj);
70
static void   *taosAcceptTcpConnection(void *arg);
H
hzcheng 已提交
71

J
jtao1735 已提交
72
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
J
Jeff Tao 已提交
73 74
  SServerObj *pServerObj;
  SThreadObj *pThreadObj;
H
hzcheng 已提交
75

J
Jeff Tao 已提交
76
  pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
77 78 79 80 81 82
  if (pServerObj == NULL) {
    tError("TCP:%s no enough memory", label);
    terrno = TAOS_SYSTEM_ERROR(errno); 
    return NULL;
  }

83
  pServerObj->fd = -1;
84
  pServerObj->thread = 0;
J
jtao1735 已提交
85
  pServerObj->ip = ip;
86
  pServerObj->port = port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
87
  tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
88 89
  pServerObj->numOfThreads = numOfThreads;

J
Jeff Tao 已提交
90
  pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
91 92
  if (pServerObj->pThreadObj == NULL) {
    tError("TCP:%s no enough memory", label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
93
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
94
    free(pServerObj);
95
    return NULL;
H
hzcheng 已提交
96
  }
J
Jeff Tao 已提交
97

J
Jeff Tao 已提交
98
  int code = 0;
99 100 101 102
  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);

103
  // initialize parameters in case it may encounter error later 
104
  pThreadObj = pServerObj->pThreadObj;
J
Jeff Tao 已提交
105
  for (int i = 0; i < numOfThreads; ++i) {
106 107
    pThreadObj->pollFd = -1;
    pThreadObj->thread = 0;
108
    pThreadObj->processData = fp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109
    tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
110
    pThreadObj->shandle = shandle;
111 112
    pThreadObj++;
  }
H
hzcheng 已提交
113

114 115 116
  // initialize mutex, thread, fd which may fail
  pThreadObj = pServerObj->pThreadObj;
  for (int i = 0; i < numOfThreads; ++i) {
J
Jeff Tao 已提交
117
    code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
J
Jeff Tao 已提交
118 119
    if (code < 0) {
      tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
120
      break;
121
    }
H
hzcheng 已提交
122

123 124 125
    pThreadObj->pollFd = epoll_create(10);  // size does not matter
    if (pThreadObj->pollFd < 0) {
      tError("%s failed to create TCP epoll", label);
J
Jeff Tao 已提交
126 127
      code = -1;
      break;
128
    }
H
hzcheng 已提交
129

J
Jeff Tao 已提交
130
    code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
J
Jeff Tao 已提交
131 132 133
    if (code != 0) {
      tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
      break;
134
    }
H
hzcheng 已提交
135

136 137
    pThreadObj->threadId = i;
    pThreadObj++;
H
hzcheng 已提交
138 139
  }

140 141 142
  pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
  if (pServerObj->fd < 0) code = -1; 

J
Jeff Tao 已提交
143
  if (code == 0) { 
144
    code = pthread_create(&pServerObj->thread, &thattr, taosAcceptTcpConnection, (void *)pServerObj);
J
Jeff Tao 已提交
145 146 147
    if (code != 0) {
      tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
    }
H
hzcheng 已提交
148 149
  }

J
Jeff Tao 已提交
150
  if (code != 0) {
151
    terrno = TAOS_SYSTEM_ERROR(errno); 
152
    taosCleanUpTcpServer(pServerObj);
J
Jeff Tao 已提交
153 154
    pServerObj = NULL;
  } else {
J
jtao1735 已提交
155
    tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
J
Jeff Tao 已提交
156
  }
H
hzcheng 已提交
157

158
  pthread_attr_destroy(&thattr);
159
  return (void *)pServerObj;
H
hzcheng 已提交
160 161
}

162 163
static void taosStopTcpThread(SThreadObj* pThreadObj) {
  pThreadObj->stop = true;
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
  eventfd_t fd = -1;

  if (pThreadObj->thread && pThreadObj->pollFd >=0) {
    // signal the thread to stop, try graceful method first,
    // and use pthread_cancel when failed
    struct epoll_event event = { .events = EPOLLIN };
    fd = eventfd(1, 0);
    if (fd == -1) {
      // failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
      tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno));
      pthread_cancel(pThreadObj->thread);
    } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
      // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
      tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno));
      pthread_cancel(pThreadObj->thread);
    }
180 181
  }

182 183 184
  if (pThreadObj->thread) pthread_join(pThreadObj->thread, NULL);
  if (pThreadObj->pollFd >=0) close(pThreadObj->pollFd);
  if (fd != -1) close(fd);
185 186 187 188 189 190 191 192 193

  while (pThreadObj->pHead) {
    SFdObj *pFdObj = pThreadObj->pHead;
    pThreadObj->pHead = pFdObj->next;
    taosFreeFdObj(pFdObj);
  }
}


H
hzcheng 已提交
194
void taosCleanUpTcpServer(void *handle) {
195
  SServerObj *pServerObj = handle;
J
Jeff Tao 已提交
196
  SThreadObj *pThreadObj;
H
hzcheng 已提交
197 198

  if (pServerObj == NULL) return;
199 200
  if(pServerObj->fd >=0) shutdown(pServerObj->fd, SHUT_RD);
  if(pServerObj->thread) pthread_join(pServerObj->thread, NULL);
H
hzcheng 已提交
201

J
Jeff Tao 已提交
202
  for (int i = 0; i < pServerObj->numOfThreads; ++i) {
H
hzcheng 已提交
203
    pThreadObj = pServerObj->pThreadObj + i;
204
    taosStopTcpThread(pThreadObj);
J
Jeff Tao 已提交
205
    pthread_mutex_destroy(&(pThreadObj->mutex));
H
hzcheng 已提交
206 207 208 209
  }

  tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);

J
Jeff Tao 已提交
210
  tfree(pServerObj->pThreadObj);
H
hzcheng 已提交
211 212 213
  tfree(pServerObj);
}

214
static void *taosAcceptTcpConnection(void *arg) {
J
Jeff Tao 已提交
215 216 217 218 219 220 221
  int                connFd = -1;
  struct sockaddr_in caddr;
  int                threadId = 0;
  SThreadObj        *pThreadObj;
  SServerObj        *pServerObj;

  pServerObj = (SServerObj *)arg;
222
  tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
J
Jeff Tao 已提交
223 224 225

  while (1) {
    socklen_t addrlen = sizeof(caddr);
226 227 228 229 230 231
    connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
    if (connFd == -1) {
      if (errno == EINVAL) {
        tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
        break;
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
232

233
      tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
J
Jeff Tao 已提交
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
      continue;
    }

    taosKeepTcpAlive(connFd);

    // pick up the thread to handle this connection
    pThreadObj = pServerObj->pThreadObj + threadId;

    SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
    if (pFdObj) {
      pFdObj->ip = caddr.sin_addr.s_addr;
      pFdObj->port = caddr.sin_port;
      tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, 
              inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
    } else {
      close(connFd);
250 251
      tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
             inet_ntoa(caddr.sin_addr), caddr.sin_port);
J
Jeff Tao 已提交
252 253 254 255 256 257
    }  

    // pick up next thread for next connection
    threadId++;
    threadId = threadId % pServerObj->numOfThreads;
  }
258 259 260

  close(pServerObj->fd);
  return NULL;
J
Jeff Tao 已提交
261 262
}

J
jtao1735 已提交
263
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
J
Jeff Tao 已提交
264 265 266 267 268
  SThreadObj    *pThreadObj;
  pthread_attr_t thattr;

  pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
  memset(pThreadObj, 0, sizeof(SThreadObj));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
269
  tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
J
jtao1735 已提交
270
  pThreadObj->ip = ip;
J
Jeff Tao 已提交
271 272 273 274
  pThreadObj->shandle = shandle;

  if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
    tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
275
    free(pThreadObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
277 278 279 280 281 282
    return NULL;
  }

  pThreadObj->pollFd = epoll_create(10);  // size does not matter
  if (pThreadObj->pollFd < 0) {
    tError("%s failed to create TCP client epoll", label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
283
    free(pThreadObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
285 286 287 288 289 290 291 292 293 294
    return NULL;
  }

  pThreadObj->processData = fp;

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
  int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
  pthread_attr_destroy(&thattr);
  if (code != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
295 296
    close(pThreadObj->pollFd);
    free(pThreadObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
297
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
298 299 300 301
    tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
    return NULL;
  }

B
Bomin Zhang 已提交
302
  tTrace("%s TCP client is initialized, ip:%u:%hu", label, ip, port);
J
Jeff Tao 已提交
303 304 305 306 307 308 309 310

  return pThreadObj;
}

void taosCleanUpTcpClient(void *chandle) {
  SThreadObj *pThreadObj = chandle;
  if (pThreadObj == NULL) return;

311
  taosStopTcpThread(pThreadObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312
  tTrace ("%s, all connections are cleaned up", pThreadObj->label);
J
Jeff Tao 已提交
313 314 315 316

  tfree(pThreadObj);
}

J
jtao1735 已提交
317
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
J
Jeff Tao 已提交
318 319
  SThreadObj *    pThreadObj = shandle;

J
jtao1735 已提交
320
  int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
321
  if (fd < 0) return NULL;
J
Jeff Tao 已提交
322 323 324 325 326 327

  SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
  
  if (pFdObj) {
    pFdObj->thandle = thandle;
    pFdObj->port = port;
J
jtao1735 已提交
328 329
    pFdObj->ip = ip;
    tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d", 
J
Jeff Tao 已提交
330 331 332 333 334 335 336 337 338 339
            pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
  } else {
    close(fd);
    tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
  }

  return pFdObj;
}

void taosCloseTcpConnection(void *chandle) {
340
  SFdObj *pFdObj = chandle;
341 342
  if (pFdObj == NULL) return;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
343 344 345
  pFdObj->thandle = NULL;
  pFdObj->closedByApp = 1;
  shutdown(pFdObj->fd, SHUT_WR);
346 347
}

J
Jeff Tao 已提交
348
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
349
  SFdObj *pFdObj = chandle;
350 351 352 353 354 355

  if (chandle == NULL) return -1;

  return (int)send(pFdObj->fd, data, (size_t)len, 0);
}

356 357 358 359 360
static void taosReportBrokenLink(SFdObj *pFdObj) {

  SThreadObj *pThreadObj = pFdObj->pThreadObj;

  // notify the upper layer, so it will clean the associated context
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
361 362 363
  if (pFdObj->closedByApp == 0) {
    shutdown(pFdObj->fd, SHUT_WR);

364 365 366 367 368 369
    SRecvInfo recvInfo;
    recvInfo.msg = NULL;
    recvInfo.msgLen = 0;
    recvInfo.ip = 0;
    recvInfo.port = 0;
    recvInfo.shandle = pThreadObj->shandle;
370
    recvInfo.thandle = pFdObj->thandle;
371 372 373
    recvInfo.chandle = NULL;
    recvInfo.connType = RPC_CONN_TCP;
    (*(pThreadObj->processData))(&recvInfo);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
  } 

  taosFreeFdObj(pFdObj);
}

static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
  SRpcHead    rpcHead;
  int32_t     msgLen, leftLen, retLen, headLen;
  char       *buffer, *msg;

  SThreadObj *pThreadObj = pFdObj->pThreadObj;

  headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
  if (headLen != sizeof(SRpcHead)) {
    tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
    return -1; 
  }

  msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
  buffer = malloc(msgLen + tsRpcOverhead);
  if ( NULL == buffer) {
    tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
    return -1;
  }

  msg = buffer + tsRpcOverhead;
  leftLen = msgLen - headLen;
  retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);

  if (leftLen != retLen) {
    tError("%s %p, read error, leftLen:%d retLen:%d", 
            pThreadObj->label, pFdObj->thandle, leftLen, retLen);
    free(buffer);
    return -1;
408
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
409 410 411 412 413 414 415 416

  memcpy(msg, &rpcHead, sizeof(SRpcHead));
  
  pInfo->msg = msg;
  pInfo->msgLen = msgLen;
  pInfo->ip = pFdObj->ip;
  pInfo->port = pFdObj->port;
  pInfo->shandle = pThreadObj->shandle;
417
  pInfo->thandle = pFdObj->thandle;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
418 419 420 421 422 423 424 425 426
  pInfo->chandle = pFdObj;
  pInfo->connType = RPC_CONN_TCP;

  if (pFdObj->closedByApp) {
    free(buffer); 
    return -1;
  }

  return 0;
427 428
}

J
Jeff Tao 已提交
429 430 431 432 433
#define maxEvents 10

static void *taosProcessTcpData(void *param) {
  SThreadObj        *pThreadObj = param;
  SFdObj            *pFdObj;
H
hzcheng 已提交
434
  struct epoll_event events[maxEvents];
435
  SRecvInfo          recvInfo;
436
 
H
hzcheng 已提交
437
  while (1) {
J
Jeff Tao 已提交
438
    int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
439
    if (pThreadObj->stop) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
440
      tTrace("%s TCP thread get stop event, exiting...", pThreadObj->label);
441 442
      break;
    }
H
hzcheng 已提交
443 444
    if (fdNum < 0) continue;

J
Jeff Tao 已提交
445
    for (int i = 0; i < fdNum; ++i) {
H
hzcheng 已提交
446 447 448
      pFdObj = events[i].data.ptr;

      if (events[i].events & EPOLLERR) {
449 450
        tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
451 452 453
        continue;
      }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
454 455
      if (events[i].events & EPOLLRDHUP) {
        tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle);
456
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
457 458 459
        continue;
      }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
460 461
      if (events[i].events & EPOLLHUP) {
        tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
462
        taosReportBrokenLink(pFdObj);
463 464
        continue;
      }
H
hzcheng 已提交
465

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
466 467
      if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
        shutdown(pFdObj->fd, SHUT_WR); 
H
hzcheng 已提交
468 469 470
        continue;
      }

471
      pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
472
      if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
H
hzcheng 已提交
473 474
    }
  }
J
Jeff Tao 已提交
475 476

  return NULL;
H
hzcheng 已提交
477 478
}

J
Jeff Tao 已提交
479
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
H
hzcheng 已提交
480 481
  struct epoll_event event;

J
Jeff Tao 已提交
482
  SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
483 484 485
  if (pFdObj == NULL) {
    return NULL;
  }
H
hzcheng 已提交
486

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
487
  pFdObj->closedByApp = 0;
J
Jeff Tao 已提交
488 489 490
  pFdObj->fd = fd;
  pFdObj->pThreadObj = pThreadObj;
  pFdObj->signature = pFdObj;
H
hzcheng 已提交
491

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
492
  event.events = EPOLLIN | EPOLLRDHUP;
J
Jeff Tao 已提交
493 494 495
  event.data.ptr = pFdObj;
  if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
    tfree(pFdObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
496
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
497
    return NULL;
H
hzcheng 已提交
498 499
  }

J
Jeff Tao 已提交
500 501 502 503 504 505 506
  // notify the data process, add into the FdObj list
  pthread_mutex_lock(&(pThreadObj->mutex));
  pFdObj->next = pThreadObj->pHead;
  if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
  pThreadObj->pHead = pFdObj;
  pThreadObj->numOfFds++;
  pthread_mutex_unlock(&(pThreadObj->mutex));
H
hzcheng 已提交
507

J
Jeff Tao 已提交
508
  return pFdObj;
H
hzcheng 已提交
509 510
}

J
Jeff Tao 已提交
511
static void taosFreeFdObj(SFdObj *pFdObj) {
512 513 514 515

  if (pFdObj == NULL) return;
  if (pFdObj->signature != pFdObj) return;

516
  SThreadObj *pThreadObj = pFdObj->pThreadObj;
J
Jeff Tao 已提交
517
  pthread_mutex_lock(&pThreadObj->mutex);
518

J
Jeff Tao 已提交
519 520 521 522 523 524
  if (pFdObj->signature == NULL) {
    pthread_mutex_unlock(&pThreadObj->mutex);
    return;
  }

  pFdObj->signature = NULL;
525
  epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
526
  taosCloseSocket(pFdObj->fd);
527 528 529

  pThreadObj->numOfFds--;
  if (pThreadObj->numOfFds < 0)
530 531
    tError("%s %p, TCP thread:%d, number of FDs is negative!!!", 
            pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
532 533 534 535 536 537 538 539 540 541 542

  if (pFdObj->prev) {
    (pFdObj->prev)->next = pFdObj->next;
  } else {
    pThreadObj->pHead = pFdObj->next;
  }

  if (pFdObj->next) {
    (pFdObj->next)->prev = pFdObj->prev;
  }

J
Jeff Tao 已提交
543
  pthread_mutex_unlock(&pThreadObj->mutex);
544

545 546 547
  tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", 
          pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);

548 549
  tfree(pFdObj);
}