tsclient.h 14.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef TDENGINE_TSCLIENT_H
#define TDENGINE_TSCLIENT_H

#ifdef __cplusplus
extern "C" {
#endif

S
slguan 已提交
23
#include "os.h"
H
hzcheng 已提交
24 25 26 27
#include "taos.h"
#include "taosmsg.h"
#include "tglobalcfg.h"
#include "tlog.h"
S
slguan 已提交
28
#include "taosdef.h"
H
hzcheng 已提交
29 30
#include "tsqlfunction.h"
#include "tutil.h"
S
slguan 已提交
31
#include "trpc.h"
32
#include "qsqltype.h"
33
#include "qsqlparser.h"
H
hjxilinx 已提交
34
#include "qtsbuf.h"
H
hzcheng 已提交
35

H
hjxilinx 已提交
36 37
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)

H
hzcheng 已提交
38 39 40 41
// forward declaration
struct SSqlInfo;

typedef struct SSqlGroupbyExpr {
H
hjxilinx 已提交
42
  int16_t     tableIndex;
S
slguan 已提交
43 44
  int16_t     numOfGroupCols;
  SColIndexEx columnInfo[TSDB_MAX_TAGS];  // group by columns information
H
hjxilinx 已提交
45 46
  int16_t     orderIndex;                 // order by column index
  int16_t     orderType;                  // order by type: asc/desc
S
slguan 已提交
47
} SSqlGroupbyExpr;
H
hzcheng 已提交
48

H
hjxilinx 已提交
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
typedef struct STableInfo {
  uint8_t numOfTags;
  uint8_t precision;
  int16_t numOfColumns;
  int16_t rowSize;
} STableInfo;

typedef struct STableMeta {
  char*   tableId;  // null-terminated string
  
  union {
    // pointer to super table if it is created according to super table
    struct STableMeta* pSTable;
    
    // otherwise, the following information is required.
    STableInfo tableInfo;
  };
  
  uint8_t tableType;
  int16_t sversion;
  int8_t  numOfVpeers;
  SVnodeDesc vpeerDesc[TSDB_VNODES_SUPPORT];
  int32_t  sid;
  int32_t  vgid;
  uint64_t uid;
  
  // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
  SSchema  schema[];
} STableMeta;

typedef struct SSTableMeta {
  char* tableId;
  STableInfo tableInfo;
  int32_t  sid;
  int32_t  vgid;
  uint64_t uid;
  SSchema  schema[];
} SSTableMeta;

typedef struct STableMetaInfo {
  STableMeta * pTableMeta;       // table meta info
S
slguan 已提交
90
  SSuperTableMeta *pMetricMeta;  // metricmeta
91

H
hjxilinx 已提交
92 93 94 95
  /*
   * 1. keep the vnode index during the multi-vnode super table projection query
   * 2. keep the vnode index for multi-vnode insertion
   */
96
  int32_t vnodeIndex;
H
hjxilinx 已提交
97
  char    name[TSDB_TABLE_ID_LEN + 1];    // (super) table name
98 99
  int16_t numOfTags;                      // total required tags in query, including groupby tags
  int16_t tagColumnIndex[TSDB_MAX_TAGS];  // clause + tag projection
H
hjxilinx 已提交
100
} STableMetaInfo;
H
hzcheng 已提交
101

S
slguan 已提交
102 103 104 105 106 107 108 109 110 111 112
/* the structure for sql function in select clause */
typedef struct SSqlExpr {
  char        aliasName[TSDB_COL_NAME_LEN + 1];  // as aliasName
  SColIndexEx colInfo;
  int64_t     uid;            // refactor use the pointer
  int16_t     functionId;     // function id in aAgg array
  int16_t     resType;        // return value type
  int16_t     resBytes;       // length of return value
  int16_t     interResBytes;  // inter result buffer size
  int16_t     numOfParams;    // argument value of each function
  tVariant    param[3];       // parameters are not more than 3
H
hjxilinx 已提交
113
  int32_t     offset;         // sub result column value of arithmetic expression.
H
hzcheng 已提交
114 115
} SSqlExpr;

H
hjxilinx 已提交
116 117 118 119 120
typedef struct SColumnIndex {
  int16_t tableIndex;
  int16_t columnIndex;
} SColumnIndex;

H
hzcheng 已提交
121 122 123 124
typedef struct SFieldInfo {
  int16_t     numOfOutputCols;  // number of column in result
  int16_t     numOfAlloc;       // allocated size
  TAOS_FIELD *pFields;
H
hjxilinx 已提交
125
//  short *     pOffset;
S
slguan 已提交
126 127 128 129 130 131 132 133

  /*
   * define if this column is belong to the queried result, it may be add by parser to faciliate
   * the query process
   *
   * NOTE: these hidden columns always locate at the end of the output columns
   */
  bool *  pVisibleCols;
H
hjxilinx 已提交
134 135 136
  int32_t numOfHiddenCols;   // the number of column not belongs to the queried result columns
  SSqlFunctionExpr** pExpr;  // used for aggregation arithmetic express,such as count(*)+count(*)
  SSqlExpr** pSqlExpr;
H
hzcheng 已提交
137 138 139
} SFieldInfo;

typedef struct SSqlExprInfo {
H
hjxilinx 已提交
140 141 142
  int16_t    numOfAlloc;
  int16_t    numOfExprs;
  SSqlExpr** pExprs;
H
hzcheng 已提交
143 144
} SSqlExprInfo;

S
slguan 已提交
145 146 147 148
typedef struct SColumnBase {
  SColumnIndex       colIndex;
  int32_t            numOfFilters;
  SColumnFilterInfo *filterInfo;
H
hzcheng 已提交
149 150
} SColumnBase;

S
slguan 已提交
151
typedef struct SColumnBaseInfo {
H
hzcheng 已提交
152 153 154
  int16_t      numOfAlloc;
  int16_t      numOfCols;
  SColumnBase *pColList;
S
slguan 已提交
155
} SColumnBaseInfo;
H
hzcheng 已提交
156 157 158

struct SLocalReducer;

S
slguan 已提交
159 160
typedef struct SCond {
  uint64_t uid;
H
hjxilinx 已提交
161
  char *   cond;
S
slguan 已提交
162 163 164
} SCond;

typedef struct SJoinNode {
S
slguan 已提交
165
  char     tableId[TSDB_TABLE_ID_LEN];
S
slguan 已提交
166 167 168 169 170 171 172 173 174 175
  uint64_t uid;
  int16_t  tagCol;
} SJoinNode;

typedef struct SJoinInfo {
  bool      hasJoin;
  SJoinNode left;
  SJoinNode right;
} SJoinInfo;

H
hzcheng 已提交
176
typedef struct STagCond {
S
slguan 已提交
177 178 179 180 181 182 183 184 185 186 187 188
  // relation between tbname list and query condition, including : TK_AND or TK_OR
  int16_t relType;

  // tbname query condition, only support tbname query condition on one table
  SCond tbnameCond;

  // join condition, only support two tables join currently
  SJoinInfo joinInfo;

  // for different table, the query condition must be seperated
  SCond   cond[TSDB_MAX_JOIN_TABLE_NUM];
  int16_t numOfTagCond;
H
hzcheng 已提交
189 190
} STagCond;

S
slguan 已提交
191
typedef struct SParamInfo {
H
hjxilinx 已提交
192 193 194 195
  int32_t  idx;
  char     type;
  uint8_t  timePrec;
  short    bytes;
S
slguan 已提交
196 197 198
  uint32_t offset;
} SParamInfo;

S
slguan 已提交
199
typedef struct STableDataBlocks {
S
slguan 已提交
200
  char    tableId[TSDB_TABLE_ID_LEN];
201 202 203 204
  int8_t  tsSource;     // where does the UNIX timestamp come from, server or client
  bool    ordered;      // if current rows are ordered or not
  int64_t vgid;         // virtual group id
  int64_t prevTS;       // previous timestamp, recorded to decide if the records array is ts ascending
S
slguan 已提交
205
  int32_t numOfTables;  // number of tables in current submit block
206 207

  int32_t  rowSize;  // row size for current table
S
slguan 已提交
208
  uint32_t nAllocSize;
H
hjxilinx 已提交
209
  uint32_t headerSize;  // header for metadata (submit metadata)
S
slguan 已提交
210
  uint32_t size;
211

H
hjxilinx 已提交
212 213 214 215
  /*
   * the metermeta for current table, the metermeta will be used during submit stage, keep a ref
   * to avoid it to be removed from cache
   */
H
hjxilinx 已提交
216
  STableMeta *pTableMeta;
217

H
hzcheng 已提交
218 219 220 221
  union {
    char *filename;
    char *pData;
  };
S
slguan 已提交
222 223

  // for parameter ('?') binding
H
hjxilinx 已提交
224 225 226
  uint32_t    numOfAllocedParams;
  uint32_t    numOfParams;
  SParamInfo *params;
S
slguan 已提交
227
} STableDataBlocks;
H
hzcheng 已提交
228 229

typedef struct SDataBlockList {
S
slguan 已提交
230
  int32_t            idx;
H
hjxilinx 已提交
231 232
  uint32_t           nSize;
  uint32_t           nAlloc;
S
slguan 已提交
233 234
  char *             userParam; /* user assigned parameters for async query */
  void *             udfp;      /* user defined function pointer, used in async model */
S
slguan 已提交
235
  STableDataBlocks **pData;
H
hzcheng 已提交
236 237
} SDataBlockList;

238
typedef struct SQueryInfo {
239 240 241 242
  int16_t  command;  // the command may be different for each subclause, so keep it seperately.
  uint16_t type;     // query/insert/import type
  char     intervalTimeUnit;

243
  int64_t         etime, stime;
244
  int64_t         intervalTime;  // aggregation time interval
H
hjxilinx 已提交
245 246
  int64_t         slidingTime;   // sliding window in mseconds
  SSqlGroupbyExpr groupbyExpr;   // group by tags info
247 248 249 250 251 252 253 254 255 256

  SColumnBaseInfo  colList;
  SFieldInfo       fieldsInfo;
  SSqlExprInfo     exprsInfo;
  SLimitVal        limit;
  SLimitVal        slimit;
  STagCond         tagCond;
  SOrderVal        order;
  int16_t          interpoType;  // interpolate type
  int16_t          numOfTables;
H
hjxilinx 已提交
257
  STableMetaInfo **pMeterInfo;
258
  struct STSBuf *  tsBuf;
259 260
  int64_t *        defaultVal;   // default value for interpolation
  char *           msg;          // pointer to the pCmd->payload to keep error message temporarily
261
  int64_t          clauseLimit;  // limit for current sub clause
H
hjxilinx 已提交
262

263
  // offset value in the original sql expression, NOT sent to virtual node, only applied at client side
H
hjxilinx 已提交
264
  int64_t prjOffset;
265 266
} SQueryInfo;

267 268 269 270 271 272
// data source from sql string or from file
enum {
  DATA_FROM_SQL_STRING = 1,
  DATA_FROM_DATA_FILE = 2,
};

H
hzcheng 已提交
273
typedef struct {
274 275
  int     command;
  uint8_t msgType;
S
slguan 已提交
276 277

  union {
278 279 280 281
    bool   existsCheck;     // check if the table exists or not
    bool   inStream;        // denote if current sql is executed in stream or not
    bool   createOnDemand;  // if the table is missing, on-the-fly create it. during getmeterMeta
    int8_t dataSourceType;  // load data from file or not
S
slguan 已提交
282
  };
H
hjxilinx 已提交
283

284 285 286 287
  union {
    int32_t count;
    int32_t numOfTablesInSubmit;
  };
H
hzcheng 已提交
288

289 290 291 292 293 294 295
  int32_t  clauseIndex;  // index of multiple subclause query
  int8_t   isParseFinish;
  short    numOfCols;
  uint32_t allocSize;
  char *   payload;
  int      payloadLen;

296 297
  SQueryInfo **pQueryInfo;
  int32_t      numOfClause;
H
hzcheng 已提交
298

S
slguan 已提交
299
  // submit data blocks branched according to vnode
300
  SDataBlockList *pDataBlocks;
S
slguan 已提交
301 302 303 304

  // for parameter ('?') binding and batch processing
  int32_t batchSize;
  int32_t numOfParams;
H
hzcheng 已提交
305 306 307 308 309 310 311
} SSqlCmd;

typedef struct SResRec {
  int numOfRows;
  int numOfTotal;
} SResRec;

S
slguan 已提交
312 313
struct STSBuf;

H
hzcheng 已提交
314
typedef struct {
S
slguan 已提交
315
  int32_t       code;
H
hjxilinx 已提交
316 317 318 319 320 321 322 323 324 325 326
  int64_t       numOfRows;                  // num of results in current retrieved
  int64_t       numOfTotal;                 // num of total results
  int64_t       numOfTotalInCurrentClause;  // num of total result in current subclause
  char *        pRsp;
  int           rspType;
  int           rspLen;
  uint64_t      qhandle;
  int64_t       uid;
  int64_t       useconds;
  int64_t       offset;  // offset value from vnode during projection query of stable
  int           row;
H
hjxilinx 已提交
327
  int16_t       numOfCols;
H
hjxilinx 已提交
328 329 330 331 332 333 334
  int16_t       precision;
  int32_t       numOfGroups;
  SResRec *     pGroupRec;
  char *        data;
  void **       tsrow;
  char **       buffer;  // Buffer used to put multibytes encoded using unicode (wchar_t)
  SColumnIndex *pColumnIndex;
H
hzcheng 已提交
335 336 337 338 339 340 341
  struct SLocalReducer *pLocalReducer;
} SSqlRes;

typedef struct _tsc_obj {
  void *           signature;
  void *           pTimer;
  char             mgmtIp[TSDB_USER_LEN];
L
lihui 已提交
342
  uint16_t         mgmtPort;
H
hzcheng 已提交
343 344 345
  char             user[TSDB_USER_LEN];
  char             pass[TSDB_KEY_LEN];
  char             acctId[TSDB_DB_NAME_LEN];
S
slguan 已提交
346
  char             db[TSDB_TABLE_ID_LEN];
H
hzcheng 已提交
347 348 349
  char             sversion[TSDB_VERSION_LEN];
  char             writeAuth : 1;
  char             superAuth : 1;
350 351 352
  struct SSqlObj *pSql;
  struct SSqlObj *pHb;
  struct SSqlObj *sqlList;
H
hzcheng 已提交
353 354 355 356
  struct _sstream *streamList;
  pthread_mutex_t  mutex;
} STscObj;

357
typedef struct SSqlObj {
H
hzcheng 已提交
358 359 360 361
  void *   signature;
  STscObj *pTscObj;
  void (*fp)();
  void (*fetchFp)();
H
hjxilinx 已提交
362 363 364 365 366 367
  void *            param;
  uint32_t          ip;
  short             vnode;
  int64_t           stime;
  uint32_t          queryId;
  void *            pStream;
weixin_48148422's avatar
weixin_48148422 已提交
368
  void *            pSubscription;
H
hjxilinx 已提交
369 370 371
  char *            sqlstr;
  char              retry;
  char              maxRetry;
S
slguan 已提交
372
  SRpcIpSet        *ipList;
H
hjxilinx 已提交
373 374 375 376 377 378
  char              freed : 4;
  char              listed : 4;
  tsem_t            rspSem;
  tsem_t            emptyRspSem;
  SSqlCmd           cmd;
  SSqlRes           res;
H
hjxilinx 已提交
379
  uint8_t           numOfSubs;
380 381
  char *            asyncTblPos;
  void *            pTableHashList;
382 383
  struct SSqlObj **pSubs;
  struct SSqlObj * prev, *next;
H
hzcheng 已提交
384 385 386 387 388 389 390 391 392
} SSqlObj;

typedef struct _sstream {
  SSqlObj *pSql;
  uint32_t streamId;
  char     listed;
  int64_t  num;  // number of computing count

  /*
S
slguan 已提交
393
   * keep the number of current result in computing,
H
hzcheng 已提交
394 395 396 397 398 399 400
   * the value will be set to 0 before set timer for next computing
   */
  int64_t numOfRes;

  int64_t useconds;  // total  elapsed time
  int64_t ctime;     // stream created time
  int64_t stime;     // stream next executed time
S
slguan 已提交
401
  int64_t etime;     // stream end query time, when time is larger then etime, the stream will be closed
H
hzcheng 已提交
402 403
  int64_t interval;
  int64_t slidingTime;
404
  int16_t precision;
H
hzcheng 已提交
405 406 407 408 409
  void *  pTimer;

  void (*fp)();
  void *param;

S
slguan 已提交
410
  void (*callback)(void *);  // Callback function when stream is stopped from client level
H
hzcheng 已提交
411 412 413
  struct _sstream *prev, *next;
} SSqlStream;

S
slguan 已提交
414 415
int32_t tscInitRpc(const char *user, const char *secret);

H
hzcheng 已提交
416
// tscSql API
417
int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion);
H
hzcheng 已提交
418

419
void tscInitMsgs();
420 421
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);

422
void tscProcessMsgFromServer(SRpcMsg *rpcMsg);
S
slguan 已提交
423
int  tscProcessSql(SSqlObj *pSql);
H
hzcheng 已提交
424 425 426

void tscAsyncInsertMultiVnodesProxy(void *param, TAOS_RES *tres, int numOfRows);

S
slguan 已提交
427
int  tscRenewMeterMeta(SSqlObj *pSql, char *tableId);
H
hzcheng 已提交
428
void tscQueueAsyncRes(SSqlObj *pSql);
S
slguan 已提交
429

H
hzcheng 已提交
430 431 432 433 434 435 436 437 438 439
void tscQueueAsyncError(void(*fp), void *param);

int tscProcessLocalCmd(SSqlObj *pSql);
int tscCfgDynamicOptions(char *msg);
int taos_retrieve(TAOS_RES *res);

/*
 * transfer function for metric query in stream computing, the function need to be change
 * before send query message to vnode
 */
440 441
int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo *pQueryInfo);
void    tscRestoreSQLFunctionForMetricQuery(SQueryInfo *pQueryInfo);
H
hzcheng 已提交
442

443
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
H
hjxilinx 已提交
444
void    tscDestroyResPointerInfo(SSqlRes *pRes);
H
hzcheng 已提交
445

S
slguan 已提交
446
void tscFreeSqlCmdData(SSqlCmd *pCmd);
H
hjxilinx 已提交
447
void tscFreeResData(SSqlObj *pSql);
H
hzcheng 已提交
448

weixin_48148422's avatar
weixin_48148422 已提交
449 450 451 452
/**
 * free query result of the sql object
 * @param pObj
 */
H
hjxilinx 已提交
453
void tscFreeSqlResult(SSqlObj *pSql);
weixin_48148422's avatar
weixin_48148422 已提交
454

H
hzcheng 已提交
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470
/**
 * only free part of resources allocated during query.
 * Note: this function is multi-thread safe.
 * @param pObj
 */
void tscFreeSqlObjPartial(SSqlObj *pObj);

/**
 * free sql object, release allocated resource
 * @param pObj  Free metric/meta information, dynamically allocated payload, and
 * response buffer, object itself
 */
void tscFreeSqlObj(SSqlObj *pObj);

void tscCloseTscObj(STscObj *pObj);

471 472
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, int32_t sqlLen);

473 474 475 476 477
void tscProcessMultiVnodesInsert(SSqlObj *pSql);
void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql);
void tscKillMetricQuery(SSqlObj *pSql);
void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen);
bool tscIsUpdateQuery(STscObj *pObj);
478
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
H
hjxilinx 已提交
479

480
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
481

H
hjxilinx 已提交
482
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
H
hzcheng 已提交
483 484

void tscQueueAsyncFreeResult(SSqlObj *pSql);
H
hjxilinx 已提交
485
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo);
H
hzcheng 已提交
486

S
slguan 已提交
487 488 489
extern void *     pVnodeConn;
extern void *     pTscMgmtConn;
extern void *     tscCacheHandle;
490
extern int32_t    globalCode;
S
slguan 已提交
491 492 493 494 495 496
extern int        slaveIndex;
extern void *     tscTmr;
extern void *     tscQhandle;
extern int        tscKeepConn[];
extern int        tsInsertHeadSize;
extern int        tscNumOfThreads;
S
slguan 已提交
497
extern SRpcIpSet  tscMgmtIpList;
H
hzcheng 已提交
498

499 500
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);

H
hzcheng 已提交
501 502 503 504 505
#ifdef __cplusplus
}
#endif

#endif