rpcTcp.c 15.6 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
slguan 已提交
16
#include "os.h"
H
hzcheng 已提交
17 18
#include "tsocket.h"
#include "tutil.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
19
#include "taoserror.h" 
S
slguan 已提交
20
#include "rpcLog.h"
21
#include "rpcHead.h"
J
Jeff Tao 已提交
22
#include "rpcTcp.h"
H
hzcheng 已提交
23

24 25 26
#ifndef EPOLLWAKEUP
  #define EPOLLWAKEUP (1u << 29)
#endif
H
hzcheng 已提交
27

J
Jeff Tao 已提交
28 29
typedef struct SFdObj {
  void              *signature;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30 31 32
  int                fd;          // TCP socket FD
  int                closedByApp; // 1: already closed by App
  void              *thandle;     // handle from upper layer, like TAOS
J
Jeff Tao 已提交
33 34 35 36 37
  uint32_t           ip;
  uint16_t           port;
  struct SThreadObj *pThreadObj;
  struct SFdObj     *prev;
  struct SFdObj     *next;
H
hzcheng 已提交
38 39
} SFdObj;

J
Jeff Tao 已提交
40
typedef struct SThreadObj {
H
hzcheng 已提交
41 42
  pthread_t       thread;
  SFdObj *        pHead;
J
Jeff Tao 已提交
43
  pthread_mutex_t mutex;
J
jtao1735 已提交
44
  uint32_t        ip;
45
  bool            stop;
H
hzcheng 已提交
46 47 48 49
  int             pollFd;
  int             numOfFds;
  int             threadId;
  char            label[12];
50 51
  void           *shandle;  // handle passed by upper layer during server initialization
  void           *(*processData)(SRecvInfo *pPacket);
H
hzcheng 已提交
52 53 54
} SThreadObj;

typedef struct {
55
  int         fd;
J
jtao1735 已提交
56
  uint32_t    ip;
L
lihui 已提交
57
  uint16_t    port;
H
hzcheng 已提交
58 59 60 61 62 63 64
  char        label[12];
  int         numOfThreads;
  void *      shandle;
  SThreadObj *pThreadObj;
  pthread_t   thread;
} SServerObj;

J
Jeff Tao 已提交
65 66 67 68
static void   *taosProcessTcpData(void *param);
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd);
static void    taosFreeFdObj(SFdObj *pFdObj);
static void    taosReportBrokenLink(SFdObj *pFdObj);
69
static void*   taosAcceptTcpConnection(void *arg);
H
hzcheng 已提交
70

J
jtao1735 已提交
71
void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle) {
J
Jeff Tao 已提交
72 73
  SServerObj *pServerObj;
  SThreadObj *pThreadObj;
H
hzcheng 已提交
74

J
Jeff Tao 已提交
75
  pServerObj = (SServerObj *)calloc(sizeof(SServerObj), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77 78 79 80 81
  if (pServerObj == NULL) {
    tError("TCP:%s no enough memory", label);
    terrno = TAOS_SYSTEM_ERROR(errno); 
    return NULL;
  }

J
jtao1735 已提交
82
  pServerObj->ip = ip;
83
  pServerObj->port = port;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
84
  tstrncpy(pServerObj->label, label, sizeof(pServerObj->label));
85 86
  pServerObj->numOfThreads = numOfThreads;

J
Jeff Tao 已提交
87
  pServerObj->pThreadObj = (SThreadObj *)calloc(sizeof(SThreadObj), numOfThreads);
88 89
  if (pServerObj->pThreadObj == NULL) {
    tError("TCP:%s no enough memory", label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
90
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
91
    free(pServerObj);
92
    return NULL;
H
hzcheng 已提交
93
  }
J
Jeff Tao 已提交
94

J
Jeff Tao 已提交
95
  int code = 0;
96
  pThreadObj = pServerObj->pThreadObj;
J
Jeff Tao 已提交
97
  for (int i = 0; i < numOfThreads; ++i) {
98
    pThreadObj->processData = fp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
99
    tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
100
    pThreadObj->shandle = shandle;
H
hzcheng 已提交
101

J
Jeff Tao 已提交
102
    code = pthread_mutex_init(&(pThreadObj->mutex), NULL);
J
Jeff Tao 已提交
103 104
    if (code < 0) {
      tError("%s failed to init TCP process data mutex(%s)", label, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
105
      terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
106
      break;;
107
    }
H
hzcheng 已提交
108

109 110 111
    pThreadObj->pollFd = epoll_create(10);  // size does not matter
    if (pThreadObj->pollFd < 0) {
      tError("%s failed to create TCP epoll", label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112
      terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
113 114
      code = -1;
      break;
115
    }
H
hzcheng 已提交
116

J
Jeff Tao 已提交
117 118 119
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
J
Jeff Tao 已提交
120
    code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
J
Jeff Tao 已提交
121 122 123
    pthread_attr_destroy(&thattr);
    if (code != 0) {
      tError("%s failed to create TCP process data thread(%s)", label, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124
      terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
125
      break;
126
    }
H
hzcheng 已提交
127

128 129
    pThreadObj->threadId = i;
    pThreadObj++;
H
hzcheng 已提交
130 131
  }

J
Jeff Tao 已提交
132 133 134 135 136 137 138
  if (code == 0) { 
    pthread_attr_t thattr;
    pthread_attr_init(&thattr);
    pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
    code = pthread_create(&(pServerObj->thread), &thattr, (void *)taosAcceptTcpConnection, (void *)(pServerObj));
    pthread_attr_destroy(&thattr);
    if (code != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
139
      terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
140 141
      tError("%s failed to create TCP accept thread(%s)", label, strerror(errno));
    }
H
hzcheng 已提交
142 143
  }

J
Jeff Tao 已提交
144 145 146 147 148
  if (code != 0) {
    free(pServerObj->pThreadObj);
    free(pServerObj);
    pServerObj = NULL;
  } else {
J
jtao1735 已提交
149
    tTrace("%s TCP server is initialized, ip:0x%x port:%hu numOfThreads:%d", label, ip, port, numOfThreads);
J
Jeff Tao 已提交
150
  }
H
hzcheng 已提交
151

152
  return (void *)pServerObj;
H
hzcheng 已提交
153 154
}

155 156 157 158 159 160 161 162
static void taosStopTcpThread(SThreadObj* pThreadObj) {
  pThreadObj->stop = true;

  // signal the thread to stop, try graceful method first,
  // and use pthread_cancel when failed
  struct epoll_event event = { .events = EPOLLIN };
  eventfd_t fd = eventfd(1, 0);
  if (fd == -1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163 164
    // failed to create eventfd, call pthread_cancel instead, which may result in data corruption:
    tError("%s, failed to create eventfd(%s)", pThreadObj->label, strerror(errno));
165 166
    pthread_cancel(pThreadObj->thread);
  } else if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
167 168
    // failed to call epoll_ctl, call pthread_cancel instead, which may result in data corruption:
    tError("%s, failed to call epoll_ctl(%s)", pThreadObj->label, strerror(errno));
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
    pthread_cancel(pThreadObj->thread);
  }

  pthread_join(pThreadObj->thread, NULL);
  close(pThreadObj->pollFd);
  if (fd != -1) {
    close(fd);
  }

  while (pThreadObj->pHead) {
    SFdObj *pFdObj = pThreadObj->pHead;
    pThreadObj->pHead = pFdObj->next;
    taosFreeFdObj(pFdObj);
  }
}


H
hzcheng 已提交
186
void taosCleanUpTcpServer(void *handle) {
187
  SServerObj *pServerObj = handle;
J
Jeff Tao 已提交
188
  SThreadObj *pThreadObj;
H
hzcheng 已提交
189 190 191

  if (pServerObj == NULL) return;

192
  shutdown(pServerObj->fd, SHUT_RD);
H
hzcheng 已提交
193 194
  pthread_join(pServerObj->thread, NULL);

J
Jeff Tao 已提交
195
  for (int i = 0; i < pServerObj->numOfThreads; ++i) {
H
hzcheng 已提交
196
    pThreadObj = pServerObj->pThreadObj + i;
197
    taosStopTcpThread(pThreadObj);
J
Jeff Tao 已提交
198
    pthread_mutex_destroy(&(pThreadObj->mutex));
H
hzcheng 已提交
199 200 201 202
  }

  tTrace("TCP:%s, TCP server is cleaned up", pServerObj->label);

J
Jeff Tao 已提交
203
  tfree(pServerObj->pThreadObj);
H
hzcheng 已提交
204 205 206
  tfree(pServerObj);
}

207
static void* taosAcceptTcpConnection(void *arg) {
J
Jeff Tao 已提交
208 209 210 211 212 213 214 215
  int                connFd = -1;
  struct sockaddr_in caddr;
  int                threadId = 0;
  SThreadObj        *pThreadObj;
  SServerObj        *pServerObj;

  pServerObj = (SServerObj *)arg;

216 217
  pServerObj->fd = taosOpenTcpServerSocket(pServerObj->ip, pServerObj->port);
  if (pServerObj->fd < 0) return NULL; 
J
Jeff Tao 已提交
218

219
  tTrace("%s TCP server is ready, ip:0x%x:%hu", pServerObj->label, pServerObj->ip, pServerObj->port);
J
Jeff Tao 已提交
220 221 222

  while (1) {
    socklen_t addrlen = sizeof(caddr);
223 224 225 226 227 228
    connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen);
    if (connFd == -1) {
      if (errno == EINVAL) {
        tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
        break;
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229

230
      tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
J
Jeff Tao 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
      continue;
    }

    taosKeepTcpAlive(connFd);

    // pick up the thread to handle this connection
    pThreadObj = pServerObj->pThreadObj + threadId;

    SFdObj *pFdObj = taosMallocFdObj(pThreadObj, connFd);
    if (pFdObj) {
      pFdObj->ip = caddr.sin_addr.s_addr;
      pFdObj->port = caddr.sin_port;
      tTrace("%s new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label, 
              inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
    } else {
      close(connFd);
247 248
      tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
             inet_ntoa(caddr.sin_addr), caddr.sin_port);
J
Jeff Tao 已提交
249 250 251 252 253 254
    }  

    // pick up next thread for next connection
    threadId++;
    threadId = threadId % pServerObj->numOfThreads;
  }
255 256 257

  close(pServerObj->fd);
  return NULL;
J
Jeff Tao 已提交
258 259
}

J
jtao1735 已提交
260
void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *fp, void *shandle) {
J
Jeff Tao 已提交
261 262 263 264 265
  SThreadObj    *pThreadObj;
  pthread_attr_t thattr;

  pThreadObj = (SThreadObj *)malloc(sizeof(SThreadObj));
  memset(pThreadObj, 0, sizeof(SThreadObj));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266
  tstrncpy(pThreadObj->label, label, sizeof(pThreadObj->label));
J
jtao1735 已提交
267
  pThreadObj->ip = ip;
J
Jeff Tao 已提交
268 269 270 271
  pThreadObj->shandle = shandle;

  if (pthread_mutex_init(&(pThreadObj->mutex), NULL) < 0) {
    tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
272
    free(pThreadObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
273
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
274 275 276 277 278 279
    return NULL;
  }

  pThreadObj->pollFd = epoll_create(10);  // size does not matter
  if (pThreadObj->pollFd < 0) {
    tError("%s failed to create TCP client epoll", label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
280
    free(pThreadObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
282 283 284 285 286 287 288 289 290 291
    return NULL;
  }

  pThreadObj->processData = fp;

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
  int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj));
  pthread_attr_destroy(&thattr);
  if (code != 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
292 293
    close(pThreadObj->pollFd);
    free(pThreadObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
295 296 297 298 299 300 301 302 303 304 305 306 307
    tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
    return NULL;
  }

  tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);

  return pThreadObj;
}

void taosCleanUpTcpClient(void *chandle) {
  SThreadObj *pThreadObj = chandle;
  if (pThreadObj == NULL) return;

308
  taosStopTcpThread(pThreadObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
309
  tTrace ("%s, all connections are cleaned up", pThreadObj->label);
J
Jeff Tao 已提交
310 311 312 313

  tfree(pThreadObj);
}

J
jtao1735 已提交
314
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
J
Jeff Tao 已提交
315 316
  SThreadObj *    pThreadObj = shandle;

J
jtao1735 已提交
317
  int fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
318
  if (fd < 0) return NULL;
J
Jeff Tao 已提交
319 320 321 322 323 324

  SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
  
  if (pFdObj) {
    pFdObj->thandle = thandle;
    pFdObj->port = port;
J
jtao1735 已提交
325 326
    pFdObj->ip = ip;
    tTrace("%s %p, TCP connection to 0x%x:%hu is created, FD:%p numOfFds:%d", 
J
Jeff Tao 已提交
327 328 329 330 331 332 333 334 335 336
            pThreadObj->label, thandle, ip, port, pFdObj, pThreadObj->numOfFds);
  } else {
    close(fd);
    tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
  }

  return pFdObj;
}

void taosCloseTcpConnection(void *chandle) {
337
  SFdObj *pFdObj = chandle;
338 339
  if (pFdObj == NULL) return;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
340 341 342
  pFdObj->thandle = NULL;
  pFdObj->closedByApp = 1;
  shutdown(pFdObj->fd, SHUT_WR);
343 344
}

J
Jeff Tao 已提交
345
int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
346
  SFdObj *pFdObj = chandle;
347 348 349 350 351 352

  if (chandle == NULL) return -1;

  return (int)send(pFdObj->fd, data, (size_t)len, 0);
}

353 354 355 356 357
static void taosReportBrokenLink(SFdObj *pFdObj) {

  SThreadObj *pThreadObj = pFdObj->pThreadObj;

  // notify the upper layer, so it will clean the associated context
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
358 359 360
  if (pFdObj->closedByApp == 0) {
    shutdown(pFdObj->fd, SHUT_WR);

361 362 363 364 365 366 367 368 369 370
    SRecvInfo recvInfo;
    recvInfo.msg = NULL;
    recvInfo.msgLen = 0;
    recvInfo.ip = 0;
    recvInfo.port = 0;
    recvInfo.shandle = pThreadObj->shandle;
    recvInfo.thandle = pFdObj->thandle;;
    recvInfo.chandle = NULL;
    recvInfo.connType = RPC_CONN_TCP;
    (*(pThreadObj->processData))(&recvInfo);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
  } 

  taosFreeFdObj(pFdObj);
}

static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
  SRpcHead    rpcHead;
  int32_t     msgLen, leftLen, retLen, headLen;
  char       *buffer, *msg;

  SThreadObj *pThreadObj = pFdObj->pThreadObj;

  headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
  if (headLen != sizeof(SRpcHead)) {
    tTrace("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
    return -1; 
  }

  msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
  buffer = malloc(msgLen + tsRpcOverhead);
  if ( NULL == buffer) {
    tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
    return -1;
  }

  msg = buffer + tsRpcOverhead;
  leftLen = msgLen - headLen;
  retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);

  if (leftLen != retLen) {
    tError("%s %p, read error, leftLen:%d retLen:%d", 
            pThreadObj->label, pFdObj->thandle, leftLen, retLen);
    free(buffer);
    return -1;
405
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423

  memcpy(msg, &rpcHead, sizeof(SRpcHead));
  
  pInfo->msg = msg;
  pInfo->msgLen = msgLen;
  pInfo->ip = pFdObj->ip;
  pInfo->port = pFdObj->port;
  pInfo->shandle = pThreadObj->shandle;
  pInfo->thandle = pFdObj->thandle;;
  pInfo->chandle = pFdObj;
  pInfo->connType = RPC_CONN_TCP;

  if (pFdObj->closedByApp) {
    free(buffer); 
    return -1;
  }

  return 0;
424 425
}

J
Jeff Tao 已提交
426 427 428 429 430
#define maxEvents 10

static void *taosProcessTcpData(void *param) {
  SThreadObj        *pThreadObj = param;
  SFdObj            *pFdObj;
H
hzcheng 已提交
431
  struct epoll_event events[maxEvents];
432
  SRecvInfo          recvInfo;
433
 
H
hzcheng 已提交
434
  while (1) {
J
Jeff Tao 已提交
435
    int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
436 437 438 439
    if (pThreadObj->stop) {
      tTrace("%s, tcp thread get stop event, exiting...", pThreadObj->label);
      break;
    }
H
hzcheng 已提交
440 441
    if (fdNum < 0) continue;

J
Jeff Tao 已提交
442
    for (int i = 0; i < fdNum; ++i) {
H
hzcheng 已提交
443 444 445
      pFdObj = events[i].data.ptr;

      if (events[i].events & EPOLLERR) {
446 447
        tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
448 449 450
        continue;
      }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
451 452
      if (events[i].events & EPOLLRDHUP) {
        tTrace("%s %p, FD RD hang up", pThreadObj->label, pFdObj->thandle);
453
        taosReportBrokenLink(pFdObj);
H
hzcheng 已提交
454 455 456
        continue;
      }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
457 458
      if (events[i].events & EPOLLHUP) {
        tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
459
        taosReportBrokenLink(pFdObj);
460 461
        continue;
      }
H
hzcheng 已提交
462

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
463 464
      if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
        shutdown(pFdObj->fd, SHUT_WR); 
H
hzcheng 已提交
465 466 467
        continue;
      }

468
      pFdObj->thandle = (*(pThreadObj->processData))(&recvInfo);
469
      if (pFdObj->thandle == NULL) taosFreeFdObj(pFdObj);
H
hzcheng 已提交
470 471
    }
  }
J
Jeff Tao 已提交
472 473

  return NULL;
H
hzcheng 已提交
474 475
}

J
Jeff Tao 已提交
476
static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int fd) {
H
hzcheng 已提交
477 478
  struct epoll_event event;

J
Jeff Tao 已提交
479
  SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
480 481 482
  if (pFdObj == NULL) {
    return NULL;
  }
H
hzcheng 已提交
483

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
484
  pFdObj->closedByApp = 0;
J
Jeff Tao 已提交
485 486 487
  pFdObj->fd = fd;
  pFdObj->pThreadObj = pThreadObj;
  pFdObj->signature = pFdObj;
H
hzcheng 已提交
488

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
489
  event.events = EPOLLIN | EPOLLRDHUP;
J
Jeff Tao 已提交
490 491 492
  event.data.ptr = pFdObj;
  if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
    tfree(pFdObj);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
493
    terrno = TAOS_SYSTEM_ERROR(errno); 
J
Jeff Tao 已提交
494
    return NULL;
H
hzcheng 已提交
495 496
  }

J
Jeff Tao 已提交
497 498 499 500 501 502 503
  // notify the data process, add into the FdObj list
  pthread_mutex_lock(&(pThreadObj->mutex));
  pFdObj->next = pThreadObj->pHead;
  if (pThreadObj->pHead) (pThreadObj->pHead)->prev = pFdObj;
  pThreadObj->pHead = pFdObj;
  pThreadObj->numOfFds++;
  pthread_mutex_unlock(&(pThreadObj->mutex));
H
hzcheng 已提交
504

J
Jeff Tao 已提交
505
  return pFdObj;
H
hzcheng 已提交
506 507
}

J
Jeff Tao 已提交
508
static void taosFreeFdObj(SFdObj *pFdObj) {
509 510 511 512

  if (pFdObj == NULL) return;
  if (pFdObj->signature != pFdObj) return;

513
  SThreadObj *pThreadObj = pFdObj->pThreadObj;
J
Jeff Tao 已提交
514
  pthread_mutex_lock(&pThreadObj->mutex);
515

J
Jeff Tao 已提交
516 517 518 519 520 521
  if (pFdObj->signature == NULL) {
    pthread_mutex_unlock(&pThreadObj->mutex);
    return;
  }

  pFdObj->signature = NULL;
522
  epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
523
  taosCloseSocket(pFdObj->fd);
524 525 526

  pThreadObj->numOfFds--;
  if (pThreadObj->numOfFds < 0)
527 528
    tError("%s %p, TCP thread:%d, number of FDs is negative!!!", 
            pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
529 530 531 532 533 534 535 536 537 538 539

  if (pFdObj->prev) {
    (pFdObj->prev)->next = pFdObj->next;
  } else {
    pThreadObj->pHead = pFdObj->next;
  }

  if (pFdObj->next) {
    (pFdObj->next)->prev = pFdObj->prev;
  }

J
Jeff Tao 已提交
540
  pthread_mutex_unlock(&pThreadObj->mutex);
541

542 543 544
  tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", 
          pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);

545 546
  tfree(pFdObj);
}