提交 3f972b90 编写于 作者: S slguan

refact for user manage

上级 81975c11
......@@ -270,7 +270,7 @@ typedef struct SResRec {
struct STSBuf;
typedef struct {
uint8_t code;
int32_t code;
int64_t numOfRows; // num of results in current retrieved
int64_t numOfTotal; // num of total results
int64_t numOfTotalInCurrentClause; // num of total result in current subclause
......
......@@ -196,7 +196,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
tscPrint("response:%d is received, pCont:%p, contLen:%d code:%d", type, pCont, contLen, code);
tscPrint("response:%s is received, len:%d error:%s", taosMsg[(uint8_t)type], contLen, tstrerror(code));
SSqlObj *pSql = (SSqlObj *)ahandle;
if (pSql == NULL || pSql->signature != pSql) {
tscError("%p sql is already released, signature:%p", pSql, pSql->signature);
......@@ -272,7 +272,7 @@ void tscProcessMsgFromServer(char type, void *pCont, int contLen, void *ahandle,
if (pRes->code != TSDB_CODE_QUERY_CANCELLED) {
assert(type == pCmd->msgType + 1);
pRes->code = (int8_t)code;
pRes->code = (int32_t)code;
pRes->rspType = type;
pRes->rspLen = contLen;
......@@ -1737,17 +1737,19 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCreateUserMsg *pAlterMsg;
char * pMsg, *pStart;
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SCreateUserMsg);
pMsg = doBuildMsgHeader(pSql, &pStart);
pAlterMsg = (SCreateUserMsg *)pMsg;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SCreateUserMsg *pAlterMsg = (SCreateUserMsg*)pCmd->payload;
SUserInfo *pUser = &pInfo->pDCLInfo->user;
strncpy(pAlterMsg->user, pUser->user.z, pUser->user.n);
pAlterMsg->flag = pUser->type;
if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
......@@ -1758,9 +1760,6 @@ int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strncpy(pAlterMsg->pass, pUser->passwd.z, pUser->passwd.n);
}
pMsg += sizeof(SCreateUserMsg);
pCmd->payloadLen = pMsg - pStart;
if (pUser->type == TSDB_ALTER_USER_PASSWD || pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
pCmd->msgType = TSDB_MSG_TYPE_ALTER_USER;
} else {
......@@ -1871,22 +1870,20 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int32_t tscBuildDropAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SDropUserMsg *pDropMsg;
char * pMsg, *pStart;
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->payloadLen = sizeof(SDropUserMsg);
pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
pMsg = doBuildMsgHeader(pSql, &pStart);
pDropMsg = (SDropUserMsg *)pMsg;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
SDropUserMsg *pDropMsg = (SDropUserMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
strcpy(pDropMsg->user, pMeterMetaInfo->name);
pMsg += sizeof(SDropUserMsg);
pCmd->payloadLen = pMsg - pStart;
pCmd->msgType = TSDB_MSG_TYPE_DROP_USER;
return TSDB_CODE_SUCCESS;
}
......@@ -1911,38 +1908,27 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SShowMsg *pShowMsg;
char * pMsg, *pStart;
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_SHOW;
pCmd->payloadLen = sizeof(SShowMsg) + 100;
int32_t size = minMsgSize() + sizeof(SMgmtHead) + sizeof(SShowMsg) + pCmd->payloadLen + TSDB_EXTRA_PAYLOAD_SIZE;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, size)) {
tscError("%p failed to malloc for show msg", pSql);
return -1;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pMsg = pCmd->payload + tsRpcHeadSize;
pStart = pMsg;
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
SShowMsg *pShowMsg = (SShowMsg*)pCmd->payload;
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, pCmd->clauseIndex, 0);
size_t nameLen = strlen(pMeterMetaInfo->name);
size_t nameLen = strlen(pMeterMetaInfo->name);
if (nameLen > 0) {
strcpy(pMgmt->db, pMeterMetaInfo->name); // prefix is set here
strcpy(pShowMsg->db, pMeterMetaInfo->name); // prefix is set here
} else {
strcpy(pMgmt->db, pObj->db);
strcpy(pShowMsg->db, pObj->db);
}
pMsg += sizeof(SMgmtHead);
pShowMsg = (SShowMsg *)pMsg;
SShowInfo *pShowInfo = &pInfo->pDCLInfo->showOpt;
pShowMsg->type = pShowInfo->showType;
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
......@@ -1951,22 +1937,15 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strncpy(pShowMsg->payload, pPattern->z, pPattern->n);
pShowMsg->payloadLen = htons(pPattern->n);
}
pMsg += (sizeof(SShowMsg) + pPattern->n);
} else {
SSQLToken *pIpAddr = &pShowInfo->prefix;
assert(pIpAddr->n > 0 && pIpAddr->type > 0);
strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
pShowMsg->payloadLen = htons(pIpAddr->n);
pMsg += (sizeof(SShowMsg) + pIpAddr->n);
}
pCmd->payloadLen = pMsg - pStart;
pCmd->msgType = TSDB_MSG_TYPE_SHOW;
assert(msgLen + minMsgSize() <= size);
pCmd->payloadLen = sizeof(SShowMsg) + pShowMsg->payloadLen;
return TSDB_CODE_SUCCESS;
}
......@@ -2186,37 +2165,20 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
char *pMsg, *pStart;
int msgLen = 0;
SSqlCmd *pCmd = &pSql->cmd;
STscObj *pObj = pSql->pTscObj;
pMsg = pCmd->payload + tsRpcHeadSize;
pStart = pMsg;
SMgmtHead *pMgmt = (SMgmtHead *)pMsg;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
size_t nameLen = strlen(pMeterMetaInfo->name);
SSqlCmd *pCmd = &pSql->cmd;
pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
pCmd->payloadLen = sizeof(SRetrieveTableMsg);
if (nameLen > 0) {
strcpy(pMgmt->db, pMeterMetaInfo->name);
} else {
strcpy(pMgmt->db, pObj->db);
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("%p failed to malloc for query msg", pSql);
return TSDB_CODE_CLI_OUT_OF_MEMORY;
}
pMsg += sizeof(SMgmtHead);
*((uint64_t *)pMsg) = pSql->res.qhandle;
pMsg += sizeof(pSql->res.qhandle);
*((uint16_t *)pMsg) = htons(pQueryInfo->type);
pMsg += sizeof(pQueryInfo->type);
msgLen = pMsg - pStart;
pCmd->payloadLen = msgLen;
pCmd->msgType = TSDB_MSG_TYPE_RETRIEVE;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SRetrieveTableMsg *pRetrieveMsg = (SRetrieveTableMsg*)pCmd->payload;
pRetrieveMsg->qhandle = htobe64(pSql->res.qhandle);
pRetrieveMsg->free = htons(pQueryInfo->type);
return TSDB_CODE_SUCCESS;
}
......@@ -2999,6 +2961,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0);
pShow = (SShowRsp *)pRes->pRsp;
pShow->qhandle = htobe64(pShow->qhandle);
pRes->qhandle = pShow->qhandle;
tscResetForNextRetrieve(pRes);
......
......@@ -251,14 +251,14 @@ typedef struct _user_obj {
char pass[TSDB_KEY_LEN];
char acct[TSDB_USER_LEN];
int64_t createdTime;
char superAuth : 1;
char writeAuth : 1;
char reserved[16];
char updateEnd[1];
int8_t superAuth;
int8_t writeAuth;
int8_t reserved[16];
int8_t updateEnd[1];
struct _user_obj *prev, *next;
struct _acctObj * pAcct;
SQqueryList * pQList; // query list
SStreamList * pSList; // stream list
SQqueryList * pQList; // query list
SStreamList * pSList; // stream list
} SUserObj;
typedef struct {
......@@ -301,15 +301,15 @@ typedef struct {
} SSecInfo;
typedef struct {
char type;
int8_t type;
char db[TSDB_DB_NAME_LEN];
void * pNode;
short numOfColumns;
int rowSize;
int numOfRows;
int numOfReads;
short offset[TSDB_MAX_COLUMNS];
short bytes[TSDB_MAX_COLUMNS];
int16_t numOfColumns;
int32_t rowSize;
int32_t numOfRows;
int32_t numOfReads;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS];
void * signature;
uint16_t payloadLen; /* length of payload*/
char payload[]; /* payload for wildcard match in show tables */
......
......@@ -223,10 +223,10 @@ typedef struct {
} SShellSubmitRspMsg;
typedef struct SSchema {
uint8_t type;
char name[TSDB_COL_NAME_LEN];
short colId;
short bytes;
uint8_t type;
char name[TSDB_COL_NAME_LEN];
int16_t colId;
int16_t bytes;
} SSchema;
typedef struct {
......
......@@ -82,11 +82,13 @@ struct arguments args = {
int main(int argc, char* argv[]) {
/*setlocale(LC_ALL, "en_US.UTF-8"); */
//
void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
printf("ok\n");
taos_query(taos, "create account a pass 'b'");
while (1) {
sleep(1000);
if (argc != 1) {
printf("=== this a test for debug usage\n");
void *taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
taos_query(taos, "drop user a");
while (1) {
sleep(1000);
}
}
//
......
......@@ -73,6 +73,7 @@ int32_t mgmtAddUserIntoAcct(SAcctObj *pAcct, SUserObj *pUser) {
pAcct->pUser = pUser;
pAcct->acctInfo.numOfUsers++;
pUser->pAcct = pAcct;
pthread_mutex_unlock(&pAcct->mutex);
return 0;
......
......@@ -1108,7 +1108,10 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a
mError("%s from shell is not processed", taosMsg[(int8_t)type]);
}
}
rpcFreeCont(pCont);
//TODO free may be cause segment fault
//
// rpcFreeCont(pCont);
}
void mgmtInitProcessShellMsg() {
......
......@@ -96,15 +96,16 @@ int32_t mgmtUpdateUser(SUserObj *pUser) {
}
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
SUserObj *pUser;
int32_t code;
code = mgmtCheckUserLimit(pAcct);
int32_t code = mgmtCheckUserLimit(pAcct);
if (code != 0) {
return code;
}
pUser = (SUserObj *)sdbGetRow(tsUserSdb, name);
if (name[0] == 0 || pass[0] == 0) {
return TSDB_CODE_INVALID_MSG;
}
SUserObj *pUser = (SUserObj *)sdbGetRow(tsUserSdb, name);
if (pUser != NULL) {
mWarn("user:%s is already there", name);
return TSDB_CODE_USER_ALREADY_EXIST;
......@@ -159,88 +160,99 @@ void mgmtCleanUpUsers() {
}
int32_t mgmtGetUserMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
// int32_t cols = 0;
// SSchema *pSchema = tsGetSchema(pMeta);
//
// pShow->bytes[cols] = TSDB_USER_LEN;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "name");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 6;
// pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
// strcpy(pSchema[cols].name, "privilege");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pShow->bytes[cols] = 8;
// pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
// strcpy(pSchema[cols].name, "created time");
// pSchema[cols].bytes = htons(pShow->bytes[cols]);
// cols++;
//
// pMeta->numOfColumns = htons(cols);
// pShow->numOfColumns = cols;
//
// pShow->offset[0] = 0;
// for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
//
// pShow->numOfRows = pConn->pAcct->acctInfo.numOfUsers;
// pShow->pNode = pConn->pAcct->pUser;
// pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) {
return TSDB_CODE_INVALID_USER;
}
int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_USER_LEN;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 6;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "privilege");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
strcpy(pMeta->tableId, "show users");
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = pUser->pAcct->acctInfo.numOfUsers;
pShow->pNode = pUser->pAcct->pUser;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
// SUserObj *pUser = NULL;
// char * pWrite;
// int32_t cols = 0;
//
// while (numOfRows < rows) {
// pUser = (SUserObj *)pShow->pNode;
// if (pUser == NULL) break;
// pShow->pNode = (void *)pUser->next;
//
// cols = 0;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// strcpy(pWrite, pUser->user);
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// if (pUser->superAuth) {
// strcpy(pWrite, "super");
// } else if (pUser->writeAuth) {
// strcpy(pWrite, "write");
// } else {
// strcpy(pWrite, "read");
// }
// cols++;
//
// pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
// *(int64_t *)pWrite = pUser->createdTime;
// cols++;
//
// numOfRows++;
// }
// pShow->numOfReads += numOfRows;
int32_t numOfRows = 0;
SUserObj *pUser = NULL;
int32_t cols = 0;
char *pWrite;
while (numOfRows < rows) {
pUser = (SUserObj *)pShow->pNode;
if (pUser == NULL) break;
pShow->pNode = (void *)pUser->next;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pUser->user);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pUser->superAuth) {
strcpy(pWrite, "super");
} else if (pUser->writeAuth) {
strcpy(pWrite, "write");
} else {
strcpy(pWrite, "read");
}
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pUser->createdTime;
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
void *mgmtUserActionInsert(void *row, char *str, int32_t size, int32_t *ssize) {
SUserObj *pUser = (SUserObj *)row;
SUserObj *pUser = (SUserObj *) row;
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
pUser->pAcct = pAcct;
mgmtAddUserIntoAcct(pAcct, pUser);
return NULL;
}
void *mgmtUserActionDelete(void *row, char *str, int32_t size, int32_t *ssize) {
SUserObj *pUser = (SUserObj *)row;
SUserObj *pUser = (SUserObj *) row;
SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
mgmtRemoveUserFromAcct(pAcct, pUser);
return NULL;
......@@ -251,30 +263,34 @@ void *mgmtUserActionUpdate(void *row, char *str, int32_t size, int32_t *ssize) {
}
void *mgmtUserActionEncode(void *row, char *str, int32_t size, int32_t *ssize) {
SUserObj *pUser = (SUserObj *)row;
int32_t tsize = pUser->updateEnd - (char *)pUser;
SUserObj *pUser = (SUserObj *) row;
int32_t tsize = pUser->updateEnd - (int8_t *) pUser;
if (size < tsize) {
*ssize = -1;
} else {
memcpy(str, pUser, tsize);
*ssize = tsize;
}
return NULL;
}
void *mgmtUserActionDecode(void *row, char *str, int32_t size, int32_t *ssize) {
SUserObj *pUser = (SUserObj *)malloc(sizeof(SUserObj));
SUserObj *pUser = (SUserObj *) malloc(sizeof(SUserObj));
if (pUser == NULL) return NULL;
memset(pUser, 0, sizeof(SUserObj));
int32_t tsize = pUser->updateEnd - (char *)pUser;
int32_t tsize = pUser->updateEnd - (int8_t *) pUser;
memcpy(pUser, str, tsize);
return (void *)pUser;
}
void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
SUserObj *pUser = (SUserObj *)row;
int32_t tsize = pUser->updateEnd - (char *)pUser;
int32_t tsize = pUser->updateEnd - (int8_t *) pUser;
memcpy(pUser, str, tsize);
return NULL;
......@@ -282,11 +298,13 @@ void *mgmtUserActionReset(void *row, char *str, int32_t size, int32_t *ssize) {
void *mgmtUserActionDestroy(void *row, char *str, int32_t size, int32_t *ssize) {
tfree(row);
return NULL;
}
SUserObj *mgmtGetUserFromConn(void *pConn) {
SRpcConnInfo connInfo;
rpcGetConnInfo(pConn, &connInfo);
return mgmtGetUser(connInfo.user);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册