Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
oceanbase
oblogproxy
提交
0f647e8e
O
oblogproxy
项目概览
oceanbase
/
oblogproxy
9 个月 前同步成功
通知
24
Star
29
Fork
10
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
O
oblogproxy
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
0f647e8e
编写于
7月 25, 2022
作者:
F
Fankux
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix obmysql auth
上级
302085df
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
444 addition
and
442 deletion
+444
-442
CMakeLists.txt
CMakeLists.txt
+23
-14
cmake/gtest.cmake
cmake/gtest.cmake
+2
-2
cmake/oblogmsg.cmake
cmake/oblogmsg.cmake
+1
-1
script/run.sh
script/run.sh
+31
-0
src/codec/codec_endian.h
src/codec/codec_endian.h
+20
-51
src/codec/legacy_decoder.cpp
src/codec/legacy_decoder.cpp
+4
-4
src/codec/legacy_encoder.cpp
src/codec/legacy_encoder.cpp
+12
-12
src/codec/msg_buf.h
src/codec/msg_buf.h
+6
-11
src/codec/protobuf_decoder.cpp
src/codec/protobuf_decoder.cpp
+1
-2
src/codec/protobuf_encoder.cpp
src/codec/protobuf_encoder.cpp
+2
-2
src/communication/communicator.cpp
src/communication/communicator.cpp
+1
-1
src/communication/http.cpp
src/communication/http.cpp
+1
-1
src/obaccess/mysql_protocol.cpp
src/obaccess/mysql_protocol.cpp
+101
-108
src/obaccess/mysql_protocol.h
src/obaccess/mysql_protocol.h
+3
-5
src/obaccess/ob_access.cpp
src/obaccess/ob_access.cpp
+39
-23
src/obaccess/ob_access.h
src/obaccess/ob_access.h
+3
-0
src/obaccess/ob_mysql_packet.cpp
src/obaccess/ob_mysql_packet.cpp
+154
-150
src/obaccess/ob_mysql_packet.h
src/obaccess/ob_mysql_packet.h
+27
-38
src/obaccess/oblog_config.h
src/obaccess/oblog_config.h
+1
-1
src/test/test_common.cpp
src/test/test_common.cpp
+0
-7
src/test/test_ob_mysql.cpp
src/test/test_ob_mysql.cpp
+12
-9
未找到文件。
CMakeLists.txt
浏览文件 @
0f647e8e
...
...
@@ -4,10 +4,11 @@ project(oblogproxy CXX)
macro
(
ob_define VAR DEFAULT
)
if
(
NOT DEFINED
${
VAR
}
)
set
(
${
VAR
}
${
DEFAULT
}
)
endif
()
endif
()
endmacro
()
ob_define
(
OBLOGPROXY_RELEASEID 1
)
option
(
WITH_DEBUG
"With debug symbols"
ON
)
option
(
WITH_ASAN
"Compile with AddressSanitizer"
OFF
)
option
(
WITH_TEST
"With Tests"
OFF
)
...
...
@@ -15,6 +16,7 @@ option(WITH_DEMO "With Demos" OFF)
option
(
WITH_JNI_LIB
"With oblogreader jni lib"
OFF
)
option
(
WITH_GLOG
"With google log"
ON
)
option
(
WITH_DEPS
"With precompiled deps"
ON
)
option
(
WITH_LOGMSG
"compiled logmsg from source files"
OFF
)
option
(
USE_OBCDC_NS
"With libobcdc"
ON
)
option
(
USE_LIBOBLOG
"With precompiled liboblog"
OFF
)
option
(
USE_CXX11_ABI
"Build with C++11 ABI"
OFF
)
...
...
@@ -31,9 +33,10 @@ SET(FIND_LIBOBLOG ON)
if
(
USE_OBCDC_NS
)
SET
(
OBCDC_NAME
"libobcdc"
)
SET
(
OBCDC_NAME_VAR
"-DUSE_OBCDC_NS"
)
else
()
else
()
SET
(
OBCDC_NAME
"liboblog"
)
endif
()
if
(
WITH_DEPS
)
execute_process
(
COMMAND bash deps/dep_create.sh
${
OMS_PROJECT_BUILD_PATH
}
/deps
...
...
@@ -172,14 +175,21 @@ add_library(PROTO_OBJS OBJECT ${PROTO_SRCS} ${PROTO_HDRS})
message
(
"protoc:
${
PROTOBUF_PROTOC_EXECUTABLE
}
, proto srcs :
${
PROTO_SRCS
}
"
)
# oblogmsg
SET
(
OBLOGMSG_MAPPING
"-DLOGMSG_BY_LIBOBLOG=1 -DLogMsgLocalInit=
\"
if((_t_s_lmb=new(std::nothrow)LogMsgBuf())==nullptr){OMS_ERROR<<
\\\"
Failed to alloc LogMsgBuf
\\\"
;stop();return;}
\"
-DLogMsgLocalDestroy=
\"
delete _t_s_lmb
\"
"
)
SET
(
OBLOGMSG_INCLUDE_DIR
${
LIBOBLOG_INCLUDE_PATH
}
${
LIBOBLOG_INCLUDE_PATH
}
/oblogmsg
)
set
(
LOGMSG_BY_LIBOBLOG_DEFINE
""
)
if
(
LOGMSG_BY_LIBOBLOG
)
set
(
LOGMSG_BY_LIBOBLOG_DEFINE
"-DLOGMSG_BY_LIBOBLOG=1"
)
SET
(
OBLOGMSG_MAPPING
"
${
LOGMSG_BY_LIBOBLOG_DEFINE
}
-DLogMsgLocalInit=
\"
if((_t_s_lmb=new(std::nothrow)LogMsgBuf())==nullptr){OMS_ERROR<<
\\\"
Failed to alloc LogMsgBuf
\\\"
;stop();return;}
\"
-DLogMsgLocalDestroy=
\"
delete _t_s_lmb
\"
"
)
SET
(
OBLOGMSG_INCLUDE_DIR
${
LIBOBLOG_INCLUDE_PATH
}
${
LIBOBLOG_INCLUDE_PATH
}
/oblogmsg
)
SET
(
OBLOGMSG_LIBRARIES
${
LIBOBLOG_LIBRARIES
}
)
GET_FILENAME_COMPONENT
(
OBLOGMSG_LIB_DIR
${
OBLOGMSG_LIBRARIES
}
DIRECTORY
)
SET
(
OBLOGMSG_LIBRARIES
${
LIBOBLOG_LIBRARIES
}
)
GET_FILENAME_COMPONENT
(
OBLOGMSG_LIB_DIR
${
OBLOGMSG_LIBRARIES
}
DIRECTORY
)
ADD_LIBRARY
(
oblogmsg STATIC IMPORTED GLOBAL
)
SET_PROPERTY
(
TARGET oblogmsg PROPERTY IMPORTED_LOCATION
${
OBLOGMSG_LIBRARIES
}
)
ADD_LIBRARY
(
oblogmsg STATIC IMPORTED GLOBAL
)
SET_PROPERTY
(
TARGET oblogmsg PROPERTY IMPORTED_LOCATION
${
OBLOGMSG_LIBRARIES
}
)
else
()
include
(
oblogmsg
)
endif
()
# oblog
if
(
FIND_LIBOBLOG
)
...
...
@@ -310,7 +320,7 @@ target_include_directories(common PUBLIC ${COMMON_INC})
if
(
USE_OBCDC_NS
)
set
(
DEP_OBCDC_LIB obcdc
)
else
()
else
()
set
(
DEP_OBCDC_LIB oblog
)
endif
()
# oblogreader
...
...
@@ -353,7 +363,7 @@ add_dependencies(logproxy logproxy_static)
target_include_directories
(
logproxy PUBLIC
${
DEP_INC
}
${
LOGPROXY_INC
}
)
target_link_directories
(
logproxy PUBLIC
${
DEP_LIB_PATH
}
)
target_link_libraries
(
logproxy
${
BASE_LIBS
}
${
DEP_OBCDC_LIB
}
${
DEP_LIBS
}
)
target_link_options
(
logproxy PUBLIC
"
${
ASAN_LINK_OPTION
}
"
)
target_link_options
(
logproxy PUBLIC
-static-libstdc++
${
ASAN_LINK_OPTION
}
)
if
(
WITH_DEMO
)
# demo client
...
...
@@ -363,18 +373,17 @@ if (WITH_DEMO)
target_include_directories
(
demo_client PUBLIC
${
COMMON_INC
}
)
target_link_directories
(
demo_client PUBLIC
${
DEP_LIB_PATH
}
)
target_link_libraries
(
demo_client libcommon.a
${
DEP_LIBS
}
)
target_link_libraries
(
demo_client libcommon.a
${
DEP_LIBS
}
)
target_link_options
(
demo_client PUBLIC
${
ASAN_LINK_OPTION
}
)
target_link_options
(
demo_client PUBLIC -static-libstdc++
${
ASAN_LINK_OPTION
}
)
endif
()
if
(
WITH_TEST
)
# test_base
file
(
GLOB TEST_BASE_SRC ./src/test/test_entry.cpp
)
add_executable
(
test_base
${
TEST_BASE_SRC
}
)
add_dependencies
(
test_base
logproxy_static
gtest
)
add_dependencies
(
test_base
common oblogmsg
gtest
)
target_include_directories
(
test_base PUBLIC
${
LOGPROXY_INC
}
)
target_link_directories
(
test_base PUBLIC
${
DEP_LIB_PATH
}
)
target_link_libraries
(
test_base
${
BASE_LIBS
}
${
DEP_LIBS
}
)
target_link_libraries
(
test_base
libcommon.a
${
DEP_LIBS
}
)
target_link_options
(
test_base PUBLIC
${
ASAN_LINK_OPTION
}
)
# test_oblogreader
...
...
cmake/gtest.cmake
浏览文件 @
0f647e8e
...
...
@@ -16,7 +16,7 @@ ExternalProject_Add(
${
EXTERNAL_PROJECT_LOG_ARGS
}
DEPENDS gflags
GIT_REPOSITORY
"https://github.com/google/googletest.git"
GIT_TAG
"release-1.
6
.0"
GIT_TAG
"release-1.
11
.0"
PREFIX
${
GTEST_SOURCES_DIR
}
UPDATE_COMMAND
""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=
${
CMAKE_CXX_COMPILER
}
...
...
@@ -31,7 +31,7 @@ ExternalProject_Add(
-DCMAKE_INSTALL_LIBDIR=
${
GTEST_INSTALL_DIR
}
/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DBUILD_GMOCK=OFF
-D
build_gtest
_samples=OFF
-D
gtest_build
_samples=OFF
-Dgtest_build_tests=OFF
-DCMAKE_BUILD_TYPE=
${
THIRD_PARTY_BUILD_TYPE
}
-DCMAKE_PREFIX_PATH=
${
prefix_path
}
...
...
cmake/oblogmsg.cmake
浏览文件 @
0f647e8e
...
...
@@ -7,7 +7,7 @@ endif ()
SET
(
OBLOGMSG_SOURCES_DIR
${
THIRD_PARTY_PATH
}
/oblogmsg
)
SET
(
OBLOGMSG_DOWNLOAD_DIR
"
${
OBLOGMSG_SOURCES_DIR
}
/src/extern_oblogmsg"
)
SET
(
OBLOGMSG_INSTALL_DIR
${
THIRD_PARTY_PATH
}
/install/oblogmsg
)
SET
(
OBLOGMSG_INCLUDE_DIR
"
${
OBLOGMSG_INSTALL_DIR
}
/include"
CACHE PATH
"oblogmsg include directory."
FORCE
)
SET
(
OBLOGMSG_INCLUDE_DIR
"
${
OBLOGMSG_INSTALL_DIR
}
/include
;
${
THIRD_PARTY_PATH
}
/install/oblogmsg/drcmessage
"
CACHE PATH
"oblogmsg include directory."
FORCE
)
SET
(
OBLOGMSG_LIB_DIR
"
${
OBLOGMSG_INSTALL_DIR
}
/lib/"
CACHE FILEPATH
"oblogmsg library directory."
FORCE
)
SET
(
OBLOGMSG_LIBRARIES
"liboblogmsg.a"
CACHE FILEPATH
"oblogmsg library."
FORCE
)
...
...
script/run.sh
浏览文件 @
0f647e8e
...
...
@@ -74,6 +74,34 @@ stop() {
kill_proc_9
}
do_config_sys
()
{
username
=
$1
password
=
$2
if
[[
-z
"
${
username
}
"
]]
||
[[
-z
"
${
password
}
"
]]
;
then
echo
"No input sys username or password"
exit
-1
fi
username_x
=
`
./bin/
${
BIN
}
-x
${
username
}
`
password_x
=
`
./bin/
${
BIN
}
-x
${
password
}
`
cp
./conf/conf.json ./conf/conf.json.new
sed
-r
-i
's/"ob_sys_username"[ ]*:[ ]*"[0-9a-zA-Z]+/"ob_sys_username": "'
${
username_x
}
'/'
./conf/conf.json.new
sed
-r
-i
's/"ob_sys_password"[ ]*:[ ]*"[0-9a-zA-Z]+/"ob_sys_password": "'
${
password_x
}
'/'
./conf/conf.json.new
diff ./conf/conf.json ./conf/conf.json.new
echo
""
read
-r
-p
"!!DANGER!! About to update logproxy conf/conf.json, Please confirm? [Y/n] "
response
if
[
"
${
response
}
"
!=
"y"
]
&&
[
"
${
response
}
"
!=
"Y"
]
;
then
echo
"Cancel!"
rm
-rf
./conf/conf.json.new
exit
0
fi
cp
./conf/conf.json ./conf/conf.json.bak
mv
./conf/conf.json.new ./conf/conf.json
}
case
C
"
$1
"
in
Cstop
)
stop
...
...
@@ -93,6 +121,9 @@ Cstatus)
echo
"status :
${
status
}
"
exit
${
status
}
;;
Cconfig_sys
)
do_config_sys
$2
$3
;;
C
*
)
echo
"Usage:
$0
{start|stop|status}"
;;
...
...
src/codec/codec_endian.h
浏览文件 @
0f647e8e
...
...
@@ -17,75 +17,44 @@
namespace
oceanbase
{
namespace
logproxy
{
enum
class
Endian
{
BIG
,
LITTLE
,
};
#if __BYTE_ORDER__ == __ORDER_LITTLE_ENDIAN__
inline
bool
is_little_endian
()
{
int32_t
i
=
0x01020304
;
char
*
buf
=
(
char
*
)
&
i
;
return
buf
[
0
]
==
0x04
;
}
#define le_to_cpu(integer) (integer)
#define cpu_to_le(integer) (integer)
template
<
class
Integer
>
Integer
bswap
(
Integer
integer
)
inline
uint16_t
be_to_cpu
(
uint16_t
integer
)
{
char
*
src_buf
=
(
char
*
)
&
integer
;
Integer
ret
=
0
;
char
*
dst_buf
=
(
char
*
)
&
ret
;
const
int
bytes
=
sizeof
(
integer
);
for
(
int
i
=
0
;
i
<
bytes
;
i
++
)
{
dst_buf
[
i
]
=
src_buf
[
bytes
-
i
-
1
];
}
return
ret
;
return
__builtin_bswap16
(
integer
);
}
template
<
class
Integer
>
Integer
le_to_cpu
(
Integer
integer
)
inline
uint32_t
be_to_cpu
(
uint32_t
integer
)
{
if
(
is_little_endian
())
{
return
integer
;
}
return
bswap
<
Integer
>
(
integer
);
return
__builtin_bswap32
(
integer
);
}
template
<
class
Integer
>
Integer
cpu_to_le
(
Integer
integer
)
inline
uint64_t
be_to_cpu
(
uint64_t
integer
)
{
return
le_to_cpu
<
Integer
>
(
integer
);
return
__builtin_bswap64
(
integer
);
}
template
<
class
Integer
>
Integer
be_to_cpu
(
Integer
integer
)
inline
uint16_t
cpu_to_be
(
uint16_t
integer
)
{
if
(
!
is_little_endian
())
{
return
integer
;
}
return
bswap
<
Integer
>
(
integer
);
return
__builtin_bswap16
(
integer
);
}
template
<
class
Integer
>
Integer
cpu_to_be
(
Integer
integer
)
inline
uint32_t
cpu_to_be
(
uint32_t
integer
)
{
return
be_to_cpu
<
Integer
>
(
integer
);
return
__builtin_bswap32
(
integer
);
}
template
<
class
Integer
,
Endian
endian
>
Integer
transform_endian
(
Integer
integer
)
inline
uint64_t
cpu_to_be
(
uint64_t
integer
)
{
if
(
endian
==
Endian
::
BIG
)
{
if
(
!
is_little_endian
())
{
return
integer
;
}
return
bswap
(
integer
);
}
else
{
if
(
is_little_endian
())
{
return
integer
;
}
return
bswap
(
integer
);
}
return
__builtin_bswap64
(
integer
);
}
#else
#endif
}
// namespace logproxy
}
// namespace oceanbase
src/codec/legacy_decoder.cpp
浏览文件 @
0f647e8e
...
...
@@ -30,7 +30,7 @@ static PacketError decode_v1(Channel* ch, Message*& message)
OMS_ERROR
<<
"Failed to read message payload size, ch:"
<<
ch
->
peer
().
id
()
<<
", error:"
<<
strerror
(
errno
);
return
PacketError
::
NETWORK_ERROR
;
}
payload_size
=
be_to_cpu
<
uint32_t
>
(
payload_size
);
payload_size
=
be_to_cpu
(
payload_size
);
// FIXME.. use an mem pool
char
*
payload_buf
=
(
char
*
)
malloc
(
payload_size
);
if
(
nullptr
==
payload_buf
)
{
...
...
@@ -86,12 +86,12 @@ PacketError LegacyDecoder::decode(Channel* ch, MessageVersion version, Message*&
}
// type
int32_t
type
=
-
1
;
u
int32_t
type
=
-
1
;
if
(
ch
->
readn
((
char
*
)
&
type
,
4
)
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read message header, ch:"
<<
ch
->
peer
().
id
()
<<
", error:"
<<
strerror
(
errno
);
return
PacketError
::
NETWORK_ERROR
;
}
type
=
be_to_cpu
<
int32_t
>
(
type
);
type
=
be_to_cpu
(
type
);
if
(
!
is_type_available
(
type
))
{
OMS_ERROR
<<
"Invalid packet type:"
<<
type
<<
", ch:"
<<
ch
->
peer
().
id
();
return
PacketError
::
PROTOCOL_ERROR
;
...
...
@@ -117,7 +117,7 @@ static int read_varstr(Channel* ch, std::string& val)
if
(
ch
->
readn
((
char
*
)
&
len
,
4
)
!=
OMS_OK
)
{
return
OMS_FAILED
;
}
len
=
be_to_cpu
<
uint32_t
>
(
len
);
len
=
be_to_cpu
(
len
);
char
*
buf
=
(
char
*
)
malloc
(
len
);
FreeGuard
<
char
*>
ff
(
buf
);
...
...
src/codec/legacy_encoder.cpp
浏览文件 @
0f647e8e
...
...
@@ -64,8 +64,8 @@ static int compress_data(const RecordDataMessage& msg, MsgBuf& buffer)
size_t
offset
=
0
;
for
(
auto
&
ptr
:
ptrs
)
{
size_t
block_size
=
ptr
.
second
;
uint32_t
seq_be
=
cpu_to_be
<
uint32_t
>
(
idx
++
);
uint32_t
size_be
=
cpu_to_be
<
uint32_t
>
(
block_size
);
uint32_t
seq_be
=
cpu_to_be
(
idx
++
);
uint32_t
size_be
=
cpu_to_be
((
uint32_t
)
block_size
);
memcpy
(
raw
+
offset
,
&
seq_be
,
4
);
memcpy
(
raw
+
offset
+
4
,
&
size_be
,
4
);
memcpy
(
raw
+
offset
+
8
,
ptr
.
first
,
block_size
);
...
...
@@ -81,9 +81,9 @@ static int compress_data(const RecordDataMessage& msg, MsgBuf& buffer)
OMS_DEBUG
<<
"compress packet raw from size:"
<<
total_size
<<
" to compressed size:"
<<
compressed_size
;
}
uint32_t
packet_len_be
=
cpu_to_be
<
uint32_t
>
(
compressed_size
+
9
);
uint32_t
orginal_size_be
=
cpu_to_be
<
uint32_t
>
(
total_size
);
uint32_t
compressed_size_be
=
cpu_to_be
<
uint32_t
>
(
compressed_size
);
uint32_t
packet_len_be
=
cpu_to_be
((
uint32_t
)
compressed_size
+
9
);
uint32_t
orginal_size_be
=
cpu_to_be
(
total_size
);
uint32_t
compressed_size_be
=
cpu_to_be
((
uint32_t
)
compressed_size
);
char
*
buf
=
(
char
*
)
malloc
(
13
);
memcpy
(
buf
,
&
packet_len_be
,
4
);
memset
(
buf
+
4
,
(
uint8_t
)
CompressType
::
LZ4
,
1
);
...
...
@@ -170,8 +170,8 @@ LegacyEncoder::LegacyEncoder()
<<
", size: "
<<
header
->
m_size
;
}
uint32_t
seq_be
=
cpu_to_be
<
uint32_t
>
(
idx
++
);
uint32_t
size_be
=
cpu_to_be
<
uint32_t
>
(
size
);
uint32_t
seq_be
=
cpu_to_be
(
idx
++
);
uint32_t
size_be
=
cpu_to_be
((
uint32_t
)
size
);
buffer
.
push_back_copy
((
char
*
)
&
seq_be
,
4
);
buffer
.
push_back_copy
((
char
*
)
&
size_be
,
4
);
buffer
.
push_back
((
char
*
)
logmsg_buf
,
size
,
false
);
...
...
@@ -179,8 +179,8 @@ LegacyEncoder::LegacyEncoder()
total_size
+=
(
size
+
8
);
}
uint32_t
packet_len_be
=
cpu_to_be
<
uint32_t
>
(
total_size
+
9
);
total_size
=
cpu_to_be
<
uint32_t
>
(
total_size
);
uint32_t
packet_len_be
=
cpu_to_be
(
total_size
+
9
);
total_size
=
cpu_to_be
(
total_size
);
char
*
buf
=
(
char
*
)
malloc
(
4
+
1
+
4
+
4
);
memcpy
(
buf
,
&
packet_len_be
,
4
);
...
...
@@ -208,9 +208,9 @@ LegacyEncoder::LegacyEncoder()
}
// Error message
uint32_t
code_be
=
cpu_to_be
<
uint32_t
>
(
msg
.
code
);
uint32_t
code_be
=
cpu_to_be
((
uint32_t
)
msg
.
code
);
memcpy
(
buf
,
&
code_be
,
4
);
uint32_t
varlen_be
=
cpu_to_be
<
uint32_t
>
(
msg
.
message
.
size
());
uint32_t
varlen_be
=
cpu_to_be
((
uint32_t
)
msg
.
message
.
size
());
memcpy
(
buf
+
4
,
&
varlen_be
,
4
);
if
(
msg
.
message
.
size
()
!=
0
)
{
memcpy
(
buf
+
8
,
msg
.
message
.
c_str
(),
msg
.
message
.
size
());
...
...
@@ -237,7 +237,7 @@ int LegacyEncoder::encode(const Message& msg, MsgBuf& buffer)
memset
(
buf
,
0
,
2
);
// response type code
uint32_t
msg_type_be
=
cpu_to_be
<
uint32_t
>
((
uint32_t
)
msg
.
type
());
uint32_t
msg_type_be
=
cpu_to_be
((
uint32_t
)
msg
.
type
());
memcpy
(
buf
+
2
,
&
msg_type_be
,
4
);
buffer
.
push_front
(
buf
,
len
);
return
ret
;
...
...
src/codec/msg_buf.h
浏览文件 @
0f647e8e
...
...
@@ -63,11 +63,6 @@ public:
bool
_owned
=
true
;
};
public:
using
ChunksType
=
std
::
deque
<
Chunk
>
;
using
ChunkIterator
=
ChunksType
::
iterator
;
using
ChunkConstIterator
=
ChunksType
::
const_iterator
;
public:
MsgBuf
()
=
default
;
...
...
@@ -107,12 +102,12 @@ public:
return
_chunks
.
size
();
}
ChunkConstI
terator
begin
()
const
std
::
deque
<
Chunk
>::
const_i
terator
begin
()
const
{
return
_chunks
.
begin
();
}
ChunkConstI
terator
end
()
const
std
::
deque
<
Chunk
>::
const_i
terator
end
()
const
{
return
_chunks
.
end
();
}
...
...
@@ -120,7 +115,7 @@ public:
size_t
byte_size
()
const
;
private:
ChunksType
_chunks
;
std
::
deque
<
Chunk
>
_chunks
;
};
// TODO messageBufferWriter
...
...
@@ -150,14 +145,14 @@ public:
int
read_uint64
(
uint64_t
&
i
);
template
<
class
Integer
,
Endian
endian
=
Endian
::
LITTLE
>
template
<
class
Integer
>
int
read_int
(
Integer
&
i
)
{
int
ret
=
read
((
char
*
)
&
i
,
sizeof
(
i
));
if
(
ret
!=
0
)
{
return
ret
;
}
i
=
transform_endian
<
Integer
,
endian
>
(
i
);
i
=
le_to_cpu
(
i
);
return
0
;
}
...
...
@@ -182,7 +177,7 @@ public:
private:
const
MsgBuf
&
_buffer
;
MsgBuf
::
ChunkConstI
terator
_iter
;
std
::
deque
<
MsgBuf
::
Chunk
>::
const_i
terator
_iter
;
size_t
_pos
=
0
;
size_t
_byte_size
=
0
;
size_t
_read_size
=
0
;
...
...
src/codec/protobuf_decoder.cpp
浏览文件 @
0f647e8e
...
...
@@ -42,7 +42,6 @@ PacketError ProtobufDecoder::decode(Channel* ch, MessageVersion version, Message
// type
int8_t
type
=
-
1
;
memcpy
(
&
type
,
header_buf
,
1
);
type
=
be_to_cpu
<
int8_t
>
(
type
);
if
(
!
is_type_available
(
type
))
{
OMS_ERROR
<<
"Invalid packet type:"
<<
type
<<
", ch:"
<<
ch
->
peer
().
id
();
return
PacketError
::
PROTOCOL_ERROR
;
...
...
@@ -51,7 +50,7 @@ PacketError ProtobufDecoder::decode(Channel* ch, MessageVersion version, Message
// payload size
uint32_t
payload_size
=
0
;
memcpy
(
&
payload_size
,
header_buf
+
1
,
4
);
payload_size
=
be_to_cpu
<
uint32_t
>
(
payload_size
);
payload_size
=
be_to_cpu
(
payload_size
);
// TODO... suppose that no large message
if
(
payload_size
>
Config
::
instance
().
max_packet_bytes
.
val
())
{
...
...
src/codec/protobuf_encoder.cpp
浏览文件 @
0f647e8e
...
...
@@ -67,7 +67,7 @@ static char* encode_message_header(MessageType type, int packet_size, bool magic
offset
=
sizeof
(
PACKET_MAGIC
);
}
int16_t
version
=
cpu_to_be
<
int16_t
>
((
int16_t
)
MessageVersion
::
V2
);
uint16_t
version
=
cpu_to_be
((
u
int16_t
)
MessageVersion
::
V2
);
memcpy
(
buffer
+
offset
,
&
version
,
sizeof
(
version
));
offset
+=
sizeof
(
version
);
...
...
@@ -75,7 +75,7 @@ static char* encode_message_header(MessageType type, int packet_size, bool magic
memcpy
(
buffer
+
offset
,
&
message_type
,
sizeof
(
message_type
));
offset
+=
sizeof
(
message_type
);
int32_t
pb_packet_size
=
cpu_to_be
<
int32_t
>
(
packet_size
);
uint32_t
pb_packet_size
=
cpu_to_be
((
uint32_t
)
packet_size
);
memcpy
(
buffer
+
offset
,
&
pb_packet_size
,
sizeof
(
pb_packet_size
));
return
buffer
;
}
...
...
src/communication/communicator.cpp
浏览文件 @
0f647e8e
...
...
@@ -392,7 +392,7 @@ PacketError Communicator::receive_message(Channel* ch, Message*& msg)
}
}
version
=
be_to_cpu
<
uint16_t
>
(
version
);
version
=
be_to_cpu
(
version
);
if
(
!
is_version_available
(
version
))
{
OMS_ERROR
<<
"Invalid packet version:"
<<
version
<<
", ch:"
<<
ch
->
peer
().
id
();
return
PacketError
::
PROTOCOL_ERROR
;
...
...
src/communication/http.cpp
浏览文件 @
0f647e8e
...
...
@@ -75,7 +75,7 @@ static void http_request_error_cb(enum evhttp_request_error error, void* arg)
void
http_conn_close_cb
(
struct
evhttp_connection
*
,
void
*
arg
)
{
OMS_
WARN
<<
"HTTP request conn closed"
;
OMS_
DEBUG
<<
"HTTP request conn closed"
;
event_base_loopexit
(((
HttpContext
*
)
arg
)
->
base
,
nullptr
);
}
...
...
src/obaccess/mysql_protocol.cpp
浏览文件 @
0f647e8e
...
...
@@ -46,9 +46,84 @@ int MysqlProtocol::connect_to_server()
{
int
ret
=
connect
(
_hostname
.
c_str
(),
_port
,
false
,
_detect_timeout
,
_sockfd
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to connect to server
. server="
<<
_hostname
<<
':'
<<
_port
;
OMS_ERROR
<<
"Failed to connect to server
: "
<<
_hostname
<<
':'
<<
_port
<<
", user: "
<<
_username
;
}
else
{
OMS_INFO
<<
"Connect to server success. server="
<<
_hostname
<<
':'
<<
_port
;
OMS_INFO
<<
"Connect to server success: "
<<
_hostname
<<
':'
<<
_port
<<
", user: "
<<
_username
;
}
return
ret
;
}
int
MysqlProtocol
::
login
(
const
std
::
string
&
host
,
int
port
,
const
std
::
string
&
username
,
const
std
::
string
&
passwd_sha1
,
const
std
::
string
&
database
)
{
_hostname
=
host
;
_port
=
port
;
_username
=
username
;
_passwd_sha1
=
passwd_sha1
;
// https://dev.mysql.com/doc/internals/en/secure-password-authentication.html
// 1 connect to the server
int
ret
=
connect_to_server
();
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to connect to server "
<<
_hostname
<<
':'
<<
_port
;
return
OMS_CONNECT_FAILED
;
}
// 2 receive initial handshake
MsgBuf
msgbuf
;
uint8_t
sequence
=
0
;
uint32_t
packet_length
=
0
;
ret
=
recv_mysql_packet
(
_sockfd
,
_detect_timeout
,
packet_length
,
sequence
,
msgbuf
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to receive handshake packet from: "
<<
_hostname
<<
':'
<<
_port
<<
", error: "
<<
strerror
(
errno
);
return
ret
;
}
OMS_DEBUG
<<
"Receive handshake packet from server: "
<<
_hostname
<<
':'
<<
_port
<<
", user: "
<<
_username
;
MySQLInitialHandShakePacket
handshake_packet
;
ret
=
handshake_packet
.
decode
(
msgbuf
);
if
(
ret
!=
OMS_OK
||
!
handshake_packet
.
scramble_valid
())
{
OMS_ERROR
<<
"Failed to decode_payload initial handshake packet or does not has a valid scramble:"
<<
handshake_packet
.
scramble_valid
()
<<
". length="
<<
packet_length
<<
", server: "
<<
_hostname
<<
':'
<<
_port
;
return
ret
;
}
const
std
::
vector
<
char
>&
scramble
=
handshake_packet
.
scramble
();
// 3 calculate the password combined with scramble buffer -> auth information
std
::
vector
<
char
>
auth
;
ret
=
calc_mysql_auth_info
(
scramble
,
auth
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to calc login info from: "
<<
_hostname
<<
':'
<<
_port
<<
", user: "
<<
_username
;
return
ret
;
}
// 4 send the handshake response with auth information
ret
=
send_auth
(
auth
,
database
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to send handshake response message to "
<<
_hostname
<<
':'
<<
_port
<<
", user="
<<
_username
;
return
OMS_FAILED
;
}
// 5 receive response from server
msgbuf
.
reset
();
ret
=
recv_mysql_packet
(
_sockfd
,
_detect_timeout
,
packet_length
,
sequence
,
msgbuf
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to recv handshake auth response from: "
<<
_hostname
<<
':'
<<
_port
<<
", user: "
<<
_username
;
return
OMS_FAILED
;
}
MySQLOkPacket
ok_packet
;
ret
=
ok_packet
.
decode
(
msgbuf
);
if
(
ret
==
OMS_OK
)
{
OMS_INFO
<<
"Auth user success of server: "
<<
_hostname
<<
':'
<<
_port
<<
", user: "
<<
_username
;
}
else
{
MySQLErrorPacket
error_packet
;
error_packet
.
decode
(
msgbuf
);
OMS_ERROR
<<
"Auth user failed of server: "
<<
_hostname
<<
':'
<<
_port
<<
", user: "
<<
_username
;
}
return
ret
;
}
...
...
@@ -61,7 +136,7 @@ static inline void my_xor(const unsigned char* s1, const unsigned char* s2, uint
}
}
int
MysqlProtocol
::
calc_mysql_auth_info
(
const
std
::
vector
<
char
>&
scramble
_buffer
,
std
::
vector
<
char
>&
auth_info
)
int
MysqlProtocol
::
calc_mysql_auth_info
(
const
std
::
vector
<
char
>&
scramble
,
std
::
vector
<
char
>&
auth
)
{
// SHA1( password ) XOR SHA1( "20-bytes random data from server" <concat> SHA1( SHA1( password ) ) )
// SHA1(password) -> stage1
...
...
@@ -82,8 +157,8 @@ int MysqlProtocol::calc_mysql_auth_info(const std::vector<char>& scramble_buffer
}
std
::
vector
<
char
>
scramble_combined
;
scramble_combined
.
reserve
(
scramble
_buffer
.
size
()
+
passwd_stage2
.
size
());
scramble_combined
.
assign
(
scramble
_buffer
.
begin
(),
scramble_buffer
.
end
());
scramble_combined
.
reserve
(
scramble
.
size
()
+
passwd_stage2
.
size
());
scramble_combined
.
assign
(
scramble
.
begin
(),
scramble
.
end
());
scramble_combined
.
insert
(
scramble_combined
.
end
(),
passwd_stage2
.
begin
(),
passwd_stage2
.
end
());
sha1
.
reset
();
...
...
@@ -107,127 +182,45 @@ int MysqlProtocol::calc_mysql_auth_info(const std::vector<char>& scramble_buffer
return
OMS_FAILED
;
}
auth
_info
.
resize
(
sha_combined
.
size
());
auth
.
resize
(
sha_combined
.
size
());
my_xor
((
const
unsigned
char
*
)
_passwd_sha1
.
data
(),
(
const
unsigned
char
*
)
sha_combined
.
data
(),
auth
_info
.
size
(),
(
unsigned
char
*
)
auth
_info
.
data
());
auth
.
size
(),
(
unsigned
char
*
)
auth
.
data
());
return
OMS_OK
;
}
int
MysqlProtocol
::
send_auth
_info
(
const
std
::
vector
<
char
>&
auth_info
,
uint8_t
sequenc
e
)
int
MysqlProtocol
::
send_auth
(
const
std
::
vector
<
char
>&
auth_info
,
const
std
::
string
&
databas
e
)
{
My
sqlHandShakeResponsePacket
hand_shake_response_packet
(
_username
,
""
,
auth_info
,
sequence
);
My
SQLHandShakeResponsePacket
handshake_response_packet
(
_username
,
database
,
auth_info
);
MsgBuf
msgbuf
;
int
ret
=
hand
_
shake_response_packet
.
encode
(
msgbuf
);
int
ret
=
handshake_response_packet
.
encode
(
msgbuf
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to encode hand
shake response packet"
;
OMS_ERROR
<<
"Failed to encode handshake response packet"
;
return
-
1
;
}
for
(
const
auto
&
iter
:
msgbuf
)
{
ret
=
writen
(
_sockfd
,
iter
.
buffer
(),
iter
.
size
());
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to send hand shake response message. error="
<<
strerror
(
errno
)
<<
". peer="
<<
_hostname
<<
":"
<<
_port
;
return
ret
;
}
}
return
OMS_OK
;
}
int
MysqlProtocol
::
is_mysql_response_ok
()
{
MsgBuf
msgbuf
;
int
ret
=
recv_mysql_packet
(
_sockfd
,
_detect_timeout
,
msgbuf
);
ret
=
send_mysql_packet
(
_sockfd
,
msgbuf
,
0
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to receive ok packet from server "
<<
_hostname
<<
':'
<<
_port
<<
", error="
<<
strerror
(
errno
);
return
OMS_FAILED
;
}
MysqlOkPacket
ok_packet
;
return
ok_packet
.
decode
(
msgbuf
);
}
int
MysqlProtocol
::
login
(
const
std
::
string
&
host
,
int
port
,
const
std
::
string
&
username
,
const
std
::
string
&
passwd_sha1
,
const
std
::
string
&
database
)
{
_hostname
=
host
;
_port
=
port
;
_username
=
username
;
_passwd_sha1
=
passwd_sha1
;
// https://dev.mysql.com/doc/internals/en/secure-password-authentication.html
// 1 connect to the server
int
ret
=
connect_to_server
();
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to connect to server "
<<
_hostname
<<
':'
<<
_port
;
return
OMS_CONNECT_FAILED
;
}
// 2 receive initial hand shake
uint32_t
packet_length
=
0
;
uint8_t
sequence
=
0
;
MsgBuf
msgbuf
;
ret
=
recv_mysql_packet
(
_sockfd
,
_detect_timeout
,
packet_length
,
sequence
,
msgbuf
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to receive hand shake packet. error="
<<
strerror
(
errno
)
<<
". server="
<<
_hostname
<<
':'
<<
_port
;
return
ret
;
}
OMS_DEBUG
<<
"receive hand shake packet from server="
<<
_hostname
<<
':'
<<
_port
;
MysqlInitialHandShakePacket
handshake_packet
;
ret
=
handshake_packet
.
decode
(
msgbuf
);
if
(
ret
!=
OMS_OK
||
!
handshake_packet
.
scramble_buffer_valid
())
{
OMS_ERROR
<<
"Failed to decode_payload initial hand shake packet or does not has a valid scramble:"
<<
handshake_packet
.
scramble_buffer_valid
()
<<
". length="
<<
packet_length
<<
", server="
<<
_hostname
<<
':'
<<
_port
;
return
ret
;
}
const
std
::
vector
<
char
>&
scramble_buffer
=
handshake_packet
.
scramble_buffer
();
// 3 calculate the password combined with scramble buffer -> auth information
std
::
vector
<
char
>
auth_info
;
ret
=
calc_mysql_auth_info
(
scramble_buffer
,
auth_info
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to calc login info. server="
<<
_hostname
<<
':'
<<
_port
<<
", user="
<<
_username
;
OMS_ERROR
<<
"Failed to send handshake response to server: "
<<
_hostname
<<
":"
<<
_port
;
return
ret
;
}
// 4 send the hand shake response with auth information
ret
=
send_auth_info
(
auth_info
,
sequence
+
1
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to send hand shake response message to "
<<
_hostname
<<
':'
<<
_port
<<
", user="
<<
_username
;
return
OMS_FAILED
;
}
// 5 receive response from server
ret
=
is_mysql_response_ok
();
if
(
ret
==
OMS_OK
)
{
OMS_INFO
<<
"Auth user success. server="
<<
_hostname
<<
':'
<<
_port
<<
", user="
<<
_username
;
}
else
{
OMS_DEBUG
<<
"Auth user failed. server="
<<
_hostname
<<
':'
<<
_port
<<
", user="
<<
_username
;
}
return
ret
;
return
OMS_OK
;
}
int
MysqlProtocol
::
query
(
const
std
::
string
&
sql
,
My
sql
ResultSet
&
rs
)
int
MysqlProtocol
::
query
(
const
std
::
string
&
sql
,
My
SQL
ResultSet
&
rs
)
{
OMS_INFO
<<
"
query obmysql
:"
<<
sql
;
OMS_INFO
<<
"
Query obmysql SQL
:"
<<
sql
;
MysqlQueryPacket
packet
(
sql
);
MsgBuf
msgbuf
;
MySQLQueryPacket
packet
(
sql
);
int
ret
=
packet
.
encode_inplace
(
msgbuf
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to encode
mysql sql packet ,
ret:"
<<
ret
;
OMS_ERROR
<<
"Failed to encode
observer sql packet,
ret:"
<<
ret
;
return
ret
;
}
ret
=
send_mysql_packet
(
_sockfd
,
msgbuf
);
ret
=
send_mysql_packet
(
_sockfd
,
msgbuf
,
0
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to send query packet to server:"
<<
_hostname
<<
':'
<<
_port
;
return
OMS_FAILED
;
...
...
@@ -239,11 +232,11 @@ int MysqlProtocol::query(const std::string& sql, MysqlResultSet& rs)
return
OMS_FAILED
;
}
My
sql
QueryResponsePacket
query_resp
;
My
SQL
QueryResponsePacket
query_resp
;
ret
=
query_resp
.
decode
(
msgbuf
);
if
(
ret
!=
OMS_OK
||
query_resp
.
col_count
()
==
0
)
{
OMS_ERROR
<<
"Failed to query
mysql
:"
<<
query_resp
.
_err
.
_message
<<
(
query_resp
.
col_count
()
==
0
?
", unexpected column count 0"
:
""
);
OMS_ERROR
<<
"Failed to query
observer
:"
<<
query_resp
.
_err
.
_message
<<
(
query_resp
.
col_count
()
==
0
?
", unexpected column count
:
0"
:
""
);
return
OMS_FAILED
;
}
...
...
@@ -257,7 +250,7 @@ int MysqlProtocol::query(const std::string& sql, MysqlResultSet& rs)
return
OMS_FAILED
;
}
My
sql
Col
column
;
My
SQL
Col
column
;
column
.
decode
(
msgbuf
);
rs
.
cols
[
i
]
=
column
;
}
...
...
@@ -268,7 +261,7 @@ int MysqlProtocol::query(const std::string& sql, MysqlResultSet& rs)
OMS_ERROR
<<
"Failed to recv eof packet from server:"
<<
_hostname
<<
':'
<<
_port
;
return
OMS_FAILED
;
}
My
sql
EofPacket
eof
;
My
SQL
EofPacket
eof
;
ret
=
eof
.
decode
(
msgbuf
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to decode eof packet from server:"
<<
_hostname
<<
':'
<<
_port
;
...
...
@@ -287,7 +280,7 @@ int MysqlProtocol::query(const std::string& sql, MysqlResultSet& rs)
break
;
}
My
sql
Row
row
(
query_resp
.
col_count
());
My
SQL
Row
row
(
query_resp
.
col_count
());
row
.
decode
(
msgbuf
);
rs
.
rows
.
emplace_back
(
row
);
}
...
...
src/obaccess/mysql_protocol.h
浏览文件 @
0f647e8e
...
...
@@ -37,7 +37,7 @@ public:
int
login
(
const
std
::
string
&
host
,
int
port
,
const
std
::
string
&
username
,
const
std
::
string
&
passwd_sha1
,
const
std
::
string
&
database
=
""
);
int
query
(
const
std
::
string
&
sql
,
My
sql
ResultSet
&
rs
);
int
query
(
const
std
::
string
&
sql
,
My
SQL
ResultSet
&
rs
);
/**
* 设置接收网络消息包时,首次检测是否有消息到达的超时时间。单位毫秒
...
...
@@ -59,11 +59,9 @@ public:
private:
int
connect_to_server
();
int
calc_mysql_auth_info
(
const
std
::
vector
<
char
>&
scramble
_buffer
,
std
::
vector
<
char
>&
auth_info
);
int
calc_mysql_auth_info
(
const
std
::
vector
<
char
>&
scramble
,
std
::
vector
<
char
>&
auth
);
int
send_auth_info
(
const
std
::
vector
<
char
>&
auth_info
,
uint8_t
sequence
);
int
is_mysql_response_ok
();
int
send_auth
(
const
std
::
vector
<
char
>&
auth_info
,
const
std
::
string
&
database
);
private:
std
::
string
_username
;
...
...
src/obaccess/ob_access.cpp
浏览文件 @
0f647e8e
...
...
@@ -96,21 +96,26 @@ static int parse_cluster_url(const std::string& cluster_url, std::vector<ObAcces
return
OMS_OK
;
}
int
ObAccess
::
init
(
const
OblogConfig
&
config
)
int
ObAccess
::
init
(
const
OblogConfig
&
hs_
config
)
{
if
(
!
config
.
cluster_url
.
empty
())
{
if
(
parse_cluster_url
(
config
.
cluster_url
.
val
(),
_servers
)
!=
OMS_OK
)
{
_user
=
hs_config
.
user
.
val
();
_user_to_conn
=
_user
;
std
::
vector
<
std
::
string
>
sections
;
if
(
!
hs_config
.
cluster_url
.
empty
())
{
if
(
parse_cluster_url
(
hs_config
.
cluster_url
.
val
(),
_servers
)
!=
OMS_OK
)
{
return
OMS_FAILED
;
}
_user_to_conn
=
ObUsername
(
_user
).
name_without_cluster
();
}
else
{
const
std
::
string
&
root_servers
=
config
.
root_servers
.
val
();
const
std
::
string
&
root_servers
=
hs_
config
.
root_servers
.
val
();
if
(
root_servers
.
empty
())
{
OMS_ERROR
<<
"Failed to init ObAccess caused by empty root_servers"
;
return
OMS_FAILED
;
}
std
::
vector
<
std
::
string
>
sections
;
int
ret
=
split
(
root_servers
,
';'
,
sections
);
if
(
ret
==
0
)
{
OMS_ERROR
<<
"Failed to init ObAccess caused by invalid root_servers"
;
...
...
@@ -138,26 +143,27 @@ int ObAccess::init(const OblogConfig& config)
}
}
_user
=
config
.
user
.
val
();
hex2bin
(
config
.
password
.
val
().
c_str
(),
config
.
password
.
val
().
size
(),
_password_sha1
);
hex2bin
(
hs_config
.
password
.
val
().
c_str
(),
hs_config
.
password
.
val
().
size
(),
_password_sha1
);
if
(
_user
.
empty
()
||
_password_sha1
.
empty
())
{
if
(
_user
_to_conn
.
empty
()
||
_password_sha1
.
empty
())
{
OMS_ERROR
<<
"Failed to init ObAccess caused by empty user or password"
;
return
OMS_FAILED
;
}
_sys_user
=
config
.
sys_user
.
empty
()
?
Config
::
instance
().
ob_sys_username
.
val
()
:
config
.
sys_user
.
val
();
if
(
config
.
sys_password
.
empty
())
{
MysqlProtocol
::
do_sha_password
(
Config
::
instance
().
ob_sys_password
.
val
(),
_sys_password_sha1
);
_sys_user
=
!
hs_config
.
sys_user
.
empty
()
?
hs_config
.
sys_user
.
val
()
:
Config
::
instance
().
ob_sys_username
.
val
();
// preconfiged sys password was encrypted, otherwise was plain text as handshake param
if
(
!
hs_config
.
sys_password
.
empty
())
{
MysqlProtocol
::
do_sha_password
(
hs_config
.
sys_password
.
val
(),
_sys_password_sha1
);
}
else
{
MysqlProtocol
::
do_sha_password
(
config
.
sys_password
.
val
(),
_sys_password_sha1
);
MysqlProtocol
::
do_sha_password
(
Config
::
instance
().
ob_
sys_password
.
val
(),
_sys_password_sha1
);
}
if
(
_sys_user
.
empty
()
||
_sys_password_sha1
.
empty
())
{
OMS_ERROR
<<
"Failed to init ObAccess caused by empty sys_user or sys_password"
;
return
OMS_FAILED
;
}
int
ret
=
_table_whites
.
from
(
config
.
table_whites
.
val
());
int
ret
=
_table_whites
.
from
(
hs_
config
.
table_whites
.
val
());
if
(
ret
!=
OMS_OK
)
{
return
ret
;
}
...
...
@@ -180,19 +186,19 @@ int ObAccess::auth_sys(const ServerInfo& server)
{
MysqlProtocol
auther
;
// all tenant must be sys tenant;
int
ret
=
auther
.
login
(
server
.
host
,
server
.
port
,
_user
,
_password_sha1
);
int
ret
=
auther
.
login
(
server
.
host
,
server
.
port
,
_user
_to_conn
,
_password_sha1
);
if
(
ret
!=
OMS_OK
)
{
return
ret
;
}
My
sql
ResultSet
rs
;
My
SQL
ResultSet
rs
;
ret
=
auther
.
query
(
"show tenant"
,
rs
);
if
(
ret
!=
OMS_OK
||
rs
.
rows
.
empty
())
{
OMS_ERROR
<<
"Failed to auth, show tenant for sys all match mode, ret:"
<<
ret
;
return
OMS_FAILED
;
}
const
My
sql
Row
&
row
=
rs
.
rows
.
front
();
const
My
SQL
Row
&
row
=
rs
.
rows
.
front
();
const
std
::
string
&
tenant
=
row
.
fields
().
front
();
if
(
tenant
.
size
()
<
3
||
strncasecmp
(
"sys"
,
tenant
.
c_str
(),
3
)
!=
0
)
{
OMS_ERROR
<<
"Failed to auth, all tenant mode must be sys tenant, current: "
<<
tenant
;
...
...
@@ -213,9 +219,9 @@ int ObAccess::auth_tenant(const ServerInfo& server)
ObUsername
ob_user
(
_user
);
// 2. for each of tenant servers, login it.
My
sql
ResultSet
rs
;
My
SQL
ResultSet
rs
;
for
(
auto
&
tenant_entry
:
_table_whites
.
tenants
)
{
OMS_INFO
<<
"About to auth tenant:"
<<
tenant_entry
.
first
<<
" of user:"
<<
_user
;
OMS_INFO
<<
"About to auth tenant:"
<<
tenant_entry
.
first
<<
" of user:"
<<
_user
_to_conn
;
rs
.
reset
();
ret
=
sys_auther
.
query
(
...
...
@@ -235,15 +241,12 @@ int ObAccess::auth_tenant(const ServerInfo& server)
<<
", col count:"
<<
rs
.
col_count
<<
", ret:"
<<
ret
;
return
OMS_FAILED
;
}
const
My
sql
Row
&
row
=
rs
.
rows
.
front
();
const
My
SQL
Row
&
row
=
rs
.
rows
.
front
();
const
std
::
string
&
host
=
row
.
fields
()[
0
];
const
uint16_t
sql_port
=
atoi
(
row
.
fields
()[
1
].
c_str
());
MysqlProtocol
user_auther
;
std
::
string
conn_user
=
ob_user
.
username
;
conn_user
.
append
(
"@"
);
conn_user
.
append
(
ob_user
.
tenant
.
empty
()
?
tenant_entry
.
first
:
ob_user
.
tenant
);
ret
=
user_auther
.
login
(
host
,
sql_port
,
conn_user
,
_password_sha1
);
ret
=
user_auther
.
login
(
host
,
sql_port
,
ob_user
.
name_without_cluster
(
tenant_entry
.
first
),
_password_sha1
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to auth from tenant server: "
<<
host
<<
":"
<<
sql_port
<<
", ret:"
<<
ret
;
return
ret
;
...
...
@@ -288,5 +291,18 @@ ObUsername::ObUsername(const std::string& full_name)
}
}
std
::
string
ObUsername
::
name_without_cluster
(
const
std
::
string
&
in_tenant
)
{
std
::
string
conn_user
=
username
;
if
(
!
tenant
.
empty
())
{
conn_user
.
append
(
"@"
);
conn_user
.
append
(
tenant
);
}
else
if
(
!
in_tenant
.
empty
())
{
conn_user
.
append
(
"@"
);
conn_user
.
append
(
in_tenant
);
}
return
conn_user
;
}
}
// namespace logproxy
}
// namespace oceanbase
src/obaccess/ob_access.h
浏览文件 @
0f647e8e
...
...
@@ -46,6 +46,7 @@ private:
private:
std
::
vector
<
ServerInfo
>
_servers
;
std
::
string
_user
;
std
::
string
_user_to_conn
;
std
::
string
_password_sha1
;
std
::
string
_sys_user
;
std
::
string
_sys_password_sha1
;
...
...
@@ -60,6 +61,8 @@ struct ObUsername {
std
::
string
username
;
explicit
ObUsername
(
const
std
::
string
&
full_name
);
std
::
string
name_without_cluster
(
const
std
::
string
&
in_tenant
=
""
);
};
}
// namespace logproxy
...
...
src/obaccess/ob_mysql_packet.cpp
浏览文件 @
0f647e8e
...
...
@@ -18,6 +18,7 @@
#include "communication/io.h"
#include "common/log.h"
#include "common/common.h"
#include "common/guard.hpp"
// https://dev.mysql.com/doc/internals/en/capability-flags.html
// copy from include/mysql_com.h (mariadb source code)
...
...
@@ -84,26 +85,25 @@ int recv_mysql_packet(int fd, int timeout, uint32_t& packet_length, uint8_t& seq
uint32_t
packet_header
=
0
;
ret
=
readn
(
fd
,
&
packet_header
,
4
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read packet length
. error=
"
<<
strerror
(
errno
);
OMS_ERROR
<<
"Failed to read packet length
, errno: "
<<
errno
<<
", error:
"
<<
strerror
(
errno
);
return
ret
;
}
sequence
=
packet_header
&
0xFF000000
;
packet_length
=
le_to_cpu
<
uint32_t
>
(
packet_header
&
0x00FFFFFF
);
sequence
=
(
packet_header
&
0xFF000000
)
>>
3
;
packet_length
=
le_to_cpu
(
packet_header
&
0x00FFFFFF
);
if
(
packet_length
>=
UINT32_MAX
-
sizeof
(
packet_header
))
{
OMS_ERROR
<<
"Got invalid packet length, too length
. length=
"
<<
packet_length
;
OMS_ERROR
<<
"Got invalid packet length, too length
:
"
<<
packet_length
;
return
OMS_FAILED
;
}
char
*
buffer
=
(
char
*
)
malloc
(
packet_length
);
if
(
nullptr
==
buffer
)
{
OMS_ERROR
<<
"Failed to malloc memory for mysql hand
shake packet. length=
"
<<
packet_length
;
OMS_ERROR
<<
"Failed to malloc memory for mysql hand
shake packet length:
"
<<
packet_length
;
return
OMS_FAILED
;
}
ret
=
readn
(
fd
,
buffer
,
packet_length
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read mysql hand
shake packet. length="
<<
packet_length
<<
", error=
"
<<
strerror
(
errno
);
OMS_ERROR
<<
"Failed to read mysql hand
shake packet. length: "
<<
packet_length
<<
", error:
"
<<
strerror
(
errno
);
free
(
buffer
);
return
ret
;
}
...
...
@@ -120,13 +120,20 @@ int recv_mysql_packet(int fd, int timeout, MsgBuf& msgbuf)
return
recv_mysql_packet
(
fd
,
timeout
,
packet_length
,
sequence
,
msgbuf
);
}
int
send_mysql_packet
(
int
fd
,
MsgBuf
&
msgbuf
)
int
send_mysql_packet
(
int
fd
,
MsgBuf
&
msgbuf
,
uint8_t
sequence
)
{
// [24bit] packet_size
// [8bit] sequence
uint32_t
packet_length
=
msgbuf
.
byte_size
();
// we set sequence=0 here
packet_length
=
cpu_to_le
<
uint32_t
>
(
packet_length
&
0x00FFFFFF
);
packet_length
=
cpu_to_le
(
packet_length
&
0x00FFFFFF
);
packet_length
=
packet_length
|
(
sequence
<<
3
);
////// DEBUG ONLY ///////
std
::
string
hexstr
;
dumphex
((
char
*
)
&
packet_length
,
4
,
hexstr
);
OMS_DEBUG
<<
"MySQL packet header: "
<<
hexstr
<<
", value: "
<<
packet_length
;
////// DEBUG ONLY ///////
int
ret
=
writen
(
fd
,
&
packet_length
,
4
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to send packet, error:"
<<
strerror
(
errno
);
...
...
@@ -135,7 +142,7 @@ int send_mysql_packet(int fd, MsgBuf& msgbuf)
for
(
const
auto
&
iter
:
msgbuf
)
{
ret
=
writen
(
fd
,
iter
.
buffer
(),
iter
.
size
());
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to send packet, error:"
<<
strerror
(
errno
);
OMS_ERROR
<<
"Failed to send packet, err
no:"
<<
errno
<<
", err
or:"
<<
strerror
(
errno
);
return
ret
;
}
}
...
...
@@ -143,7 +150,7 @@ int send_mysql_packet(int fd, MsgBuf& msgbuf)
}
// https://dev.mysql.com/doc/internals/en/packet-OK_Packet.html
int
My
sql
OkPacket
::
decode
(
const
MsgBuf
&
msgbuf
)
int
My
SQL
OkPacket
::
decode
(
const
MsgBuf
&
msgbuf
)
{
MsgBufReader
reader
(
msgbuf
);
uint8_t
packet_type
;
...
...
@@ -151,9 +158,9 @@ int MysqlOkPacket::decode(const MsgBuf& msgbuf)
return
(
ret
==
OMS_OK
&&
packet_type
==
0x00
)
?
OMS_OK
:
OMS_FAILED
;
}
const
uint8_t
My
sql
EofPacket
::
_s_packet_type
=
0xfe
;
const
uint8_t
My
SQL
EofPacket
::
_s_packet_type
=
0xfe
;
int
My
sql
EofPacket
::
decode
(
const
MsgBuf
&
msgbuf
)
int
My
SQL
EofPacket
::
decode
(
const
MsgBuf
&
msgbuf
)
{
size_t
len
=
msgbuf
.
byte_size
();
if
(
len
>=
9
)
{
...
...
@@ -176,7 +183,7 @@ int MysqlEofPacket::decode(const MsgBuf& msgbuf)
/**
* https://dev.mysql.com/doc/internals/en/packet-ERR_Packet.html
*/
int
My
sql
ErrorPacket
::
decode
(
const
MsgBuf
&
msgbuf
)
int
My
SQL
ErrorPacket
::
decode
(
const
MsgBuf
&
msgbuf
)
{
MsgBufReader
reader
(
msgbuf
);
...
...
@@ -195,8 +202,8 @@ int MysqlErrorPacket::decode(const MsgBuf& msgbuf)
}
// 5 bytes SQL state
_sql_state
.
resize
(
5
);
ret
=
reader
.
read
((
char
*
)
_sql_state
.
data
(),
5
);
_sql_state
.
resize
(
6
);
ret
=
reader
.
read
((
char
*
)
_sql_state
.
data
(),
6
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to skip SQL state"
;
return
OMS_FAILED
;
...
...
@@ -210,57 +217,58 @@ int MysqlErrorPacket::decode(const MsgBuf& msgbuf)
return
OMS_FAILED
;
}
OMS_DEBUG
<<
"
mysql error packet: ["
<<
_code
<<
"]
"
<<
_message
;
OMS_DEBUG
<<
"
Error packet: ["
<<
_code
<<
"]["
<<
_sql_state_marker
<<
_sql_state
<<
"]
"
<<
_message
;
return
OMS_OK
;
}
int
MysqlInitialHandShakePacket
::
decode
(
const
MsgBuf
&
msgbuf
)
// reference:
// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::Handshake
int
MySQLInitialHandShakePacket
::
decode
(
const
MsgBuf
&
msgbuf
)
{
// reference:
// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::Handshake
MsgBufReader
buffer_reader
(
msgbuf
);
int
ret
=
buffer_reader
.
forward
(
4
);
// packet length
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to skip the packet length while decoding mysql InitialHandShakePacket"
;
return
ret
;
}
// read protocol version
ret
=
buffer_reader
.
read_uint8
(
_protocol_version
);
int
ret
=
buffer_reader
.
read_uint8
(
_protocol_version
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read protocol version while decoding mysql InitialHandShakePacket"
;
return
ret
;
}
if
(
_protocol_version
!=
0x0a
)
{
OMS_ERROR
<<
"Unsupported packet version. version="
<<
_protocol_version
;
OMS_ERROR
<<
"Unsupported packet version: "
<<
_protocol_version
;
return
OMS_FAILED
;
}
// skip the string version
char
c
=
0xFF
;
do
{
// server version
std
::
string
server_version
;
while
(
ret
==
OMS_OK
&&
buffer_reader
.
has_more
())
{
char
c
=
0
;
ret
=
buffer_reader
.
read
(
&
c
,
1
);
}
while
(
ret
==
OMS_OK
&&
c
!=
0
);
if
(
c
==
'\0'
)
{
break
;
}
server_version
.
append
(
1
,
c
);
}
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read buffer while decoding mysql InitialHandShakePacket. buffer read "
<<
buffer_reader
.
read_size
();
}
OMS_DEBUG
<<
"Observer version: "
<<
server_version
;
int32_t
connection_id
;
ret
=
buffer_reader
.
read_int
<
int32_t
,
Endian
::
LITTLE
>
(
connection_id
);
ret
=
buffer_reader
.
read_int
<
int32_t
>
(
connection_id
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read connection id while decoding mysql InitialHandShakePacket"
;
return
ret
;
}
OMS_DEBUG
<<
"Connection id: "
<<
connection_id
;
_scramble_buffer_valid
=
false
;
_scramble_buffer
.
reserve
(
20
);
_scramble
.
reserve
(
20
);
// scramble part 1.
// string[8] auth-plugin-data-part-1
_scramble
_buffer
.
resize
(
8
);
ret
=
buffer_reader
.
read
(
_scramble
_buffer
.
data
(),
8
);
_scramble
.
resize
(
8
);
ret
=
buffer_reader
.
read
(
_scramble
.
data
(),
8
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read auth-plugin-data-part-1 while decoding mysql InitialHandShakePacket"
;
return
ret
;
...
...
@@ -280,9 +288,10 @@ int MysqlInitialHandShakePacket::decode(const MsgBuf& msgbuf)
return
ret
;
}
_scramble_valid
=
false
;
if
(
!
buffer_reader
.
has_more
())
{
OMS_DEBUG
<<
"Decode done. field end with capabilities flag lower byte_size"
;
_scramble_
buffer_
valid
=
true
;
_scramble_valid
=
true
;
return
OMS_OK
;
}
...
...
@@ -294,21 +303,24 @@ int MysqlInitialHandShakePacket::decode(const MsgBuf& msgbuf)
}
// capability flags (upper 2 bytes)
ret
=
buffer_reader
.
read
((
char
*
)
&
_capabilities_flag
+
2
,
2
);
ret
=
buffer_reader
.
read
((
(
char
*
)
&
_capabilities_flag
)
+
2
,
2
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read the 'capability flags (upper 2 byte_size)' while decoding mysql InitialHandShakePacket"
;
return
ret
;
}
_capabilities_flag
=
le_to_cpu
<
uint32_t
>
(
_capabilities_flag
);
_capabilities_flag
=
le_to_cpu
(
_capabilities_flag
);
// if capabilities & CLIENT_PLUGIN_AUTH then length of auth-plugin-data
uint8_t
auth_plugin_data_len
=
0
;
ret
=
buffer_reader
.
read_uint8
(
auth_plugin_data_len
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read the 'length of auth-plugin-data' while decoding mysql InitialHandShakePacket"
;
return
ret
;
if
(
_capabilities_flag
&
CLIENT_PLUGIN_AUTH
)
{
ret
=
buffer_reader
.
read_uint8
(
auth_plugin_data_len
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read the 'length of auth-plugin-data' while decoding mysql InitialHandShakePacket"
;
return
ret
;
}
}
else
{
buffer_reader
.
forward
(
1
);
}
// reserved (all [00])
...
...
@@ -322,30 +334,49 @@ int MysqlInitialHandShakePacket::decode(const MsgBuf& msgbuf)
// then auth-plugin-data-part-2 ($len=MAX(13, length of auth-plugin-data - 8))
if
(
_capabilities_flag
&
CLIENT_SECURE_CONNECTION
)
{
const
int
auth_plugin_data_part_2_len
=
std
::
max
(
13
,
auth_plugin_data_len
-
8
);
_scramble
_buffer
.
resize
(
8
+
auth_plugin_data_part_2_len
-
1
);
ret
=
buffer_reader
.
read
(
_scramble
_buffer
.
data
()
+
8
,
auth_plugin_data_part_2_len
-
1
);
_scramble
.
resize
(
8
+
auth_plugin_data_part_2_len
);
ret
=
buffer_reader
.
read
(
_scramble
.
data
()
+
8
,
auth_plugin_data_part_2_len
);
if
(
ret
!=
OMS_OK
)
{
OMS_ERROR
<<
"Failed to read the part 2 of 'scramble buffer' while decoding mysql InitialHandShakePacket"
;
return
ret
;
}
_scramble_buffer_valid
=
true
;
_scramble_valid
=
true
;
}
if
(
_capabilities_flag
&
CLIENT_PLUGIN_AUTH
)
{
_auth_plugin_name
.
reserve
(
buffer_reader
.
remain_size
());
while
(
ret
==
OMS_OK
&&
buffer_reader
.
has_more
())
{
char
c
=
0
;
ret
=
buffer_reader
.
read
(
&
c
,
1
);
if
(
c
==
'\0'
)
{
break
;
}
_auth_plugin_name
.
append
(
1
,
c
);
}
OMS_DEBUG
<<
"auth plugin name: "
<<
_auth_plugin_name
;
}
if
(
_auth_plugin_name
!=
"mysql_native_password"
)
{
OMS_ERROR
<<
"Unsupport auth plugin name: "
<<
_auth_plugin_name
;
return
OMS_FAILED
;
}
// fix ob response length 21 scramble
if
(
_scramble
.
size
()
>
20
)
{
_scramble
.
resize
(
20
);
}
return
ret
;
}
bool
My
sqlInitialHandShakePacket
::
scramble_buffer
_valid
()
const
bool
My
SQLInitialHandShakePacket
::
scramble
_valid
()
const
{
return
_scramble_
buffer_
valid
;
return
_scramble_valid
;
}
const
std
::
vector
<
char
>&
My
sqlInitialHandShakePacket
::
scramble_buffer
()
const
const
std
::
vector
<
char
>&
My
SQLInitialHandShakePacket
::
scramble
()
const
{
return
_scramble
_buffer
;
return
_scramble
;
}
uint8_t
My
sql
InitialHandShakePacket
::
sequence
()
const
uint8_t
My
SQL
InitialHandShakePacket
::
sequence
()
const
{
return
_sequence
;
}
...
...
@@ -355,17 +386,23 @@ uint8_t MysqlInitialHandShakePacket::sequence() const
* 在buffer中写入一个\0结尾的字符串
* @return 返回非负数,表示写入的数据大小,否则失败
*/
int
write_null_terminate_string
(
char
*
buf
,
int
capacity
,
const
char
*
s
)
static
inline
int
write_null_terminate_string
(
char
*
buf
,
size_t
capacity
,
const
char
*
s
,
size_t
len
)
{
int
str_len
=
strlen
(
s
);
if
(
str_len
+
1
>
capacity
)
{
if
(
len
+
1
>
capacity
)
{
return
OMS_FAILED
;
}
memcpy
(
buf
,
s
,
str_len
+
1
);
return
str_len
+
1
;
memcpy
(
buf
,
s
,
len
+
1
);
buf
[
len
]
=
'\0'
;
return
len
+
1
;
}
int
write_string
(
char
*
buf
,
int
capacity
,
const
char
*
s
,
int
str_len
)
static
inline
int
write_null_terminate_string
(
char
*
buf
,
size_t
capacity
,
const
std
::
string
&
str
)
{
return
write_null_terminate_string
(
buf
,
capacity
,
str
.
data
(),
str
.
size
());
}
static
inline
int
write_string
(
char
*
buf
,
size_t
capacity
,
const
char
*
s
,
size_t
str_len
)
{
if
(
str_len
>
capacity
)
{
return
OMS_FAILED
;
...
...
@@ -374,7 +411,7 @@ int write_string(char* buf, int capacity, const char* s, int str_len)
return
str_len
;
}
int
write_lenenc_uint
(
char
*
buf
,
in
t
capacity
,
uint64_t
integer
)
static
inline
int
write_lenenc_uint
(
char
*
buf
,
size_
t
capacity
,
uint64_t
integer
)
{
// https://dev.mysql.com/doc/internals/en/integer.html#packet-Protocol::LengthEncodedInteger
...
...
@@ -388,7 +425,7 @@ int write_lenenc_uint(char* buf, int capacity, uint64_t integer)
}
else
if
(
integer
<
(
1
<<
16UL
))
{
if
(
capacity
>=
3
)
{
buf
[
0
]
=
0xFC
;
uint16_t
num
=
cpu_to_le
<
uint16_t
>
((
uint16_t
)
integer
);
uint16_t
num
=
cpu_to_le
((
uint16_t
)
integer
);
memcpy
(
buf
+
1
,
&
num
,
sizeof
(
num
));
return
3
;
}
else
{
...
...
@@ -397,7 +434,7 @@ int write_lenenc_uint(char* buf, int capacity, uint64_t integer)
}
else
if
(
integer
<
(
1
<<
24UL
))
{
if
(
capacity
>=
4
)
{
buf
[
0
]
=
0xFD
;
uint32_t
num
=
cpu_to_le
<
uint32_t
>
((
uint32_t
)
integer
);
uint32_t
num
=
cpu_to_le
((
uint32_t
)
integer
);
memcpy
(
buf
+
1
,
(
char
*
)
&
num
+
1
,
3
);
return
4
;
}
else
{
...
...
@@ -406,7 +443,7 @@ int write_lenenc_uint(char* buf, int capacity, uint64_t integer)
}
else
if
(
integer
<
ULLONG_MAX
)
{
if
(
capacity
>=
9
)
{
buf
[
0
]
=
0xFE
;
uint64_t
num
=
cpu_to_le
<
uint64_t
>
((
uint64_t
)
integer
);
uint64_t
num
=
cpu_to_le
((
uint64_t
)
integer
);
memcpy
(
buf
+
1
,
&
num
,
sizeof
(
num
));
return
9
;
}
else
{
...
...
@@ -423,30 +460,12 @@ int write_lenenc_uint(char* buf, int capacity, uint64_t integer)
return
OMS_FAILED
;
}
My
sqlHandShakeResponsePacket
::
Mysql
HandShakeResponsePacket
(
const
std
::
string
&
username
,
const
std
::
string
&
database
,
const
std
::
vector
<
char
>&
auth_response
,
int8_t
sequence
)
:
_username
(
username
),
_database
(
database
),
_auth_response
(
auth_response
)
,
_sequence
(
sequence
)
My
SQLHandShakeResponsePacket
::
MySQL
HandShakeResponsePacket
(
const
std
::
string
&
username
,
const
std
::
string
&
database
,
const
std
::
vector
<
char
>&
auth_response
)
:
_username
(
username
),
_database
(
database
),
_auth_response
(
auth_response
)
{}
uint32_t
MysqlHandShakeResponsePacket
::
calc_capabilities_flag
()
{
uint32_t
capabilities_flag
=
CLIENT_MYSQL
|
CLIENT_LONG_FLAG
|
CLIENT_LOCAL_FILES
|
CLIENT_PROTOCOL_41
// | CLIENT_INTERACTIVE
|
CLIENT_TRANSACTIONS
|
CLIENT_SECURE_CONNECTION
|
CLIENT_MULTI_STATEMENTS
|
CLIENT_MULTI_RESULTS
|
CLIENT_PS_MULTI_RESULTS
|
CLIENT_PLUGIN_AUTH
// | CLIENT_CONNECT_ATTRS
|
CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
|
CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS
|
CLIENT_SUPPORT_ORACLE_MODE
;
// MySQL 鉴权加上这个flag不影响
if
(
!
_database
.
empty
())
{
capabilities_flag
|=
CLIENT_CONNECT_WITH_DB
;
}
return
capabilities_flag
;
}
int
MysqlHandShakeResponsePacket
::
encode
(
MsgBuf
&
msgbuf
)
int
MySQLHandShakeResponsePacket
::
encode
(
MsgBuf
&
msgbuf
)
{
// reference:
// https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::HandshakeResponse
...
...
@@ -455,39 +474,43 @@ int MysqlHandShakeResponsePacket::encode(MsgBuf& msgbuf)
const
int
capacity
=
200
;
char
*
buffer
=
(
char
*
)
malloc
(
capacity
);
if
(
nullptr
==
buffer
)
{
OMS_ERROR
<<
"Failed to alloc memory
. count=
"
<<
capacity
;
OMS_ERROR
<<
"Failed to alloc memory
size:
"
<<
capacity
;
return
OMS_FAILED
;
}
int
offset
=
4
;
// 3 bytes: packet size, 1 byte: sequence
FreeGuard
<
char
*>
fg
(
buffer
);
// capability flags, CLIENT_PROTOCOL_41 always set
uint32_t
src_capabilities_flag
=
calc_capabilities_flag
();
uint32_t
dst_capabilities_flag
=
cpu_to_le
<
uint32_t
>
(
src_capabilities_flag
);
memcpy
(
buffer
+
offset
,
&
dst_capabilities_flag
,
sizeof
(
dst_capabilities_flag
));
offset
+=
sizeof
(
dst_capabilities_flag
);
uint32_t
capabilities_flag
=
CLIENT_MYSQL
|
CLIENT_LONG_FLAG
|
CLIENT_PROTOCOL_41
|
CLIENT_TRANSACTIONS
|
CLIENT_SECURE_CONNECTION
|
CLIENT_MULTI_STATEMENTS
|
CLIENT_MULTI_RESULTS
|
CLIENT_PS_MULTI_RESULTS
|
CLIENT_PLUGIN_AUTH
|
CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
|
CLIENT_CAN_HANDLE_EXPIRED_PASSWORDS
|
CLIENT_SUPPORT_ORACLE_MODE
;
// MySQL 鉴权加上这个flag不影响
if
(
!
_database
.
empty
())
{
capabilities_flag
|=
CLIENT_CONNECT_WITH_DB
;
}
uint32_t
offset
=
0
;
uint32_t
dst_capabilities_flag
=
cpu_to_le
(
capabilities_flag
);
memcpy
(
buffer
+
offset
,
&
dst_capabilities_flag
,
4
);
offset
+=
4
;
// max-packet size
// max size of a command packet that the client wants to send to the server.
int32_t
max_packet_size
=
16
*
1024
*
1024
;
max_packet_size
=
cpu_to_le
<
int32_t
>
(
max_packet_size
);
memcpy
(
buffer
+
offset
,
&
max_packet_size
,
sizeof
(
max_packet_size
));
offset
+=
sizeof
(
max_packet_size
);
// character set
const
int8_t
character_set
=
33
;
// // utf8 COLLATE utf8_general_ci
buffer
[
offset
]
=
character_set
;
int32_t
max_packet_size
=
cpu_to_le
(
16
*
1024
*
1024
);
memcpy
(
buffer
+
offset
,
&
max_packet_size
,
4
);
offset
+=
4
;
// character set utf8 COLLATE utf8_general_ci
buffer
[
offset
]
=
33
;
offset
+=
1
;
// string[23] reserved (all [0])
offset
+=
23
;
// string[NUL] username
int
ret
=
write_null_terminate_string
(
buffer
+
offset
,
capacity
-
offset
,
_username
.
c_str
()
);
int
ret
=
write_null_terminate_string
(
buffer
+
offset
,
capacity
-
offset
,
_username
);
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to encode user name"
;
free
(
buffer
);
buffer
=
nullptr
;
return
OMS_FAILED
;
}
offset
+=
ret
;
...
...
@@ -496,24 +519,19 @@ int MysqlHandShakeResponsePacket::encode(MsgBuf& msgbuf)
// then lenenc-int length of auth-response
// and string[n] auth-response
// https://dev.mysql.com/doc/internals/en/string.html#packet-Protocol::LengthEncodedString
if
(
src_
capabilities_flag
&
CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
)
{
if
(
capabilities_flag
&
CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
)
{
ret
=
write_lenenc_uint
(
buffer
+
offset
,
capacity
-
offset
,
_auth_response
.
size
());
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to encode auth response length
. length=
"
<<
_auth_response
.
size
()
<<
", capacity last "
OMS_ERROR
<<
"Failed to encode auth response length
:
"
<<
_auth_response
.
size
()
<<
", capacity last "
<<
capacity
-
offset
;
free
(
buffer
);
buffer
=
nullptr
;
return
OMS_FAILED
;
}
offset
+=
ret
;
ret
=
write_string
(
buffer
+
offset
,
capacity
-
offset
,
_auth_response
.
data
(),
_auth_response
.
size
());
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to encode auth response data
. length=
"
<<
_auth_response
.
size
()
<<
", capacity last %d"
OMS_ERROR
<<
"Failed to encode auth response data
length:
"
<<
_auth_response
.
size
()
<<
", capacity last %d"
<<
capacity
-
offset
;
free
(
buffer
);
buffer
=
nullptr
;
return
OMS_FAILED
;
}
offset
+=
ret
;
...
...
@@ -521,27 +539,21 @@ int MysqlHandShakeResponsePacket::encode(MsgBuf& msgbuf)
// else if capabilities & CLIENT_SECURE_CONNECTION
// then length of auth-response
// and auth-response
else
if
(
src_
capabilities_flag
&
CLIENT_SECURE_CONNECTION
)
{
else
if
(
capabilities_flag
&
CLIENT_SECURE_CONNECTION
)
{
buffer
[
offset
]
=
(
int8_t
)
_auth_response
.
size
();
ret
=
write_string
(
buffer
+
offset
,
capacity
-
offset
,
_auth_response
.
data
(),
_auth_response
.
size
());
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to encode auth response data
. length=
"
<<
_auth_response
.
size
()
<<
", capacity last "
OMS_ERROR
<<
"Failed to encode auth response data
length:
"
<<
_auth_response
.
size
()
<<
", capacity last "
<<
capacity
-
offset
;
free
(
buffer
);
buffer
=
nullptr
;
return
OMS_FAILED
;
}
offset
+=
ret
;
}
// else
// then auth-response
else
{
ret
=
write_null_terminate_string
(
buffer
+
offset
,
capacity
-
offset
,
_auth_response
.
data
());
}
else
{
// then auth-response with null terminate string
ret
=
write_null_terminate_string
(
buffer
+
offset
,
capacity
-
offset
,
_auth_response
.
data
(),
_auth_response
.
size
());
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to encode auth response data
. length=
"
<<
_auth_response
.
size
()
+
1
<<
", capacity last "
OMS_ERROR
<<
"Failed to encode auth response data
length:
"
<<
_auth_response
.
size
()
+
1
<<
", capacity last "
<<
capacity
-
offset
;
free
(
buffer
);
buffer
=
nullptr
;
return
OMS_FAILED
;
}
offset
+=
ret
;
...
...
@@ -549,13 +561,11 @@ int MysqlHandShakeResponsePacket::encode(MsgBuf& msgbuf)
// if capabilities & CLIENT_CONNECT_WITH_DB
// then string[NUL] database
if
(
!
_database
.
empty
()
&&
src_
capabilities_flag
&
CLIENT_CONNECT_WITH_DB
)
{
ret
=
write_null_terminate_string
(
buffer
+
offset
,
capacity
-
offset
,
_database
.
c_str
()
);
if
(
capabilities_flag
&
CLIENT_CONNECT_WITH_DB
)
{
ret
=
write_null_terminate_string
(
buffer
+
offset
,
capacity
-
offset
,
_database
);
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to encode database
. length=
"
<<
_database
.
size
()
+
1
<<
", capacity last "
OMS_ERROR
<<
"Failed to encode database
length:
"
<<
_database
.
size
()
+
1
<<
", capacity last "
<<
capacity
-
offset
;
free
(
buffer
);
buffer
=
nullptr
;
return
OMS_FAILED
;
}
offset
+=
ret
;
...
...
@@ -563,14 +573,11 @@ int MysqlHandShakeResponsePacket::encode(MsgBuf& msgbuf)
// if capabilities & CLIENT_PLUGIN_AUTH
// then string[NUL] auth plugin name
const
char
*
plugin_auth
=
"mysql_native_password"
;
if
(
src_capabilities_flag
&
CLIENT_PLUGIN_AUTH
)
{
ret
=
write_null_terminate_string
(
buffer
+
offset
,
capacity
-
offset
,
plugin_auth
);
if
(
capabilities_flag
&
CLIENT_PLUGIN_AUTH
)
{
const
std
::
string
auth_plugin
=
"mysql_native_password"
;
ret
=
write_null_terminate_string
(
buffer
+
offset
,
capacity
-
offset
,
auth_plugin
);
if
(
ret
<
0
)
{
OMS_ERROR
<<
"Failed to encode database. length="
<<
strlen
(
plugin_auth
)
+
1
<<
", capacity last "
<<
capacity
-
offset
;
free
(
buffer
);
buffer
=
nullptr
;
OMS_ERROR
<<
"Failed to encode auth plugin: "
<<
auth_plugin
<<
", capacity last "
<<
capacity
-
offset
;
return
OMS_FAILED
;
}
offset
+=
ret
;
...
...
@@ -583,16 +590,14 @@ int MysqlHandShakeResponsePacket::encode(MsgBuf& msgbuf)
// do nothing now
// end
uint32_t
packet_size
=
cpu_to_le
<
uint32_t
>
((
uint32_t
)
offset
-
4
);
memcpy
(
buffer
,
(
char
*
)
&
packet_size
,
3
);
buffer
[
3
]
=
_sequence
;
OMS_DEBUG
<<
"mysql hand shake response packet len:"
<<
offset
;
OMS_DEBUG
<<
"Handshake response packet len: "
<<
offset
;
fg
.
release
();
msgbuf
.
push_back
(
buffer
,
offset
);
return
OMS_OK
;
}
int
My
sql
Col
::
decode
(
const
MsgBuf
&
msgbuf
)
int
My
SQL
Col
::
decode
(
const
MsgBuf
&
msgbuf
)
{
MysqlBufReader
reader
(
msgbuf
);
reader
.
read_lenenc_str
(
_catalog
);
...
...
@@ -611,10 +616,10 @@ int MysqlCol::decode(const MsgBuf& msgbuf)
return
OMS_OK
;
}
My
sqlRow
::
Mysql
Row
(
uint64_t
col_count
)
:
_col_count
(
col_count
)
My
SQLRow
::
MySQL
Row
(
uint64_t
col_count
)
:
_col_count
(
col_count
)
{}
int
logproxy
::
My
sql
Row
::
decode
(
const
MsgBuf
&
msgbuf
)
int
logproxy
::
My
SQL
Row
::
decode
(
const
MsgBuf
&
msgbuf
)
{
MysqlBufReader
reader
(
msgbuf
);
for
(
uint64_t
i
=
0
;
i
<
_col_count
;
++
i
)
{
...
...
@@ -625,16 +630,15 @@ int logproxy::MysqlRow::decode(const MsgBuf& msgbuf)
return
OMS_OK
;
}
My
sqlQueryPacket
::
Mysql
QueryPacket
(
const
std
::
string
&
sql
)
:
_sql
(
sql
)
My
SQLQueryPacket
::
MySQL
QueryPacket
(
const
std
::
string
&
sql
)
:
_sql
(
sql
)
{}
/**
* https://dev.mysql.com/doc/internals/en/com-query.html
*/
int
My
sql
QueryPacket
::
encode_inplace
(
MsgBuf
&
msgbuf
)
int
My
SQL
QueryPacket
::
encode_inplace
(
MsgBuf
&
msgbuf
)
{
uint8_t
cmd_id_le
=
cpu_to_le
<
uint8_t
>
(
_cmd_id
);
msgbuf
.
push_back
((
char
*
)
&
cmd_id_le
,
1
,
false
);
msgbuf
.
push_back
((
char
*
)
&
_cmd_id
,
1
,
false
);
msgbuf
.
push_back
((
char
*
)
_sql
.
c_str
(),
_sql
.
size
(),
false
);
return
OMS_OK
;
}
...
...
@@ -642,7 +646,7 @@ int MysqlQueryPacket::encode_inplace(MsgBuf& msgbuf)
/**
* https://dev.mysql.com/doc/internals/en/com-query-response.html
*/
int
My
sql
QueryResponsePacket
::
decode
(
const
MsgBuf
&
msgbuf
)
int
My
SQL
QueryResponsePacket
::
decode
(
const
MsgBuf
&
msgbuf
)
{
MysqlBufReader
reader
(
msgbuf
);
uint8_t
packet_ret
=
0xff
;
...
...
@@ -663,7 +667,7 @@ int MysqlQueryResponsePacket::decode(const MsgBuf& msgbuf)
return
OMS_OK
;
}
void
My
sql
ResultSet
::
reset
()
void
My
SQL
ResultSet
::
reset
()
{
col_count
=
0
;
cols
.
clear
();
...
...
src/obaccess/ob_mysql_packet.h
浏览文件 @
0f647e8e
...
...
@@ -21,9 +21,8 @@ namespace logproxy {
class
MsgBuf
;
/**
* 接收一个mysql消息包
* 参考: https://dev.mysql.com/doc/internals/en/mysql-packet.html
* @param fd 接收消息的描述符
* rece a mysql protocol packet
* @param fd
* @param timout 等待有消息的超时时间(一旦判断有消息到达,就不再关注timeout). 单位 毫秒
* @param[out] packet_length 消息包长度
* @param[out] sequence 消息sequence,参考mysql说明
...
...
@@ -34,9 +33,9 @@ int recv_mysql_packet(int fd, int timeout, uint32_t& packet_length, uint8_t& seq
int
recv_mysql_packet
(
int
fd
,
int
timeout
,
MsgBuf
&
msgbuf
);
int
send_mysql_packet
(
int
fd
,
MsgBuf
&
msgbuf
);
int
send_mysql_packet
(
int
fd
,
MsgBuf
&
msgbuf
,
uint8_t
sequence
);
class
My
sql
Response
{
class
My
SQL
Response
{
public:
friend
class
MysqlProtocol
;
...
...
@@ -44,12 +43,12 @@ protected:
virtual
int
decode
(
const
MsgBuf
&
msgbuf
)
=
0
;
};
class
My
sqlOkPacket
:
public
Mysql
Response
{
class
My
SQLOkPacket
:
public
MySQL
Response
{
public:
int
decode
(
const
MsgBuf
&
msgbuf
)
override
;
};
class
My
sqlEofPacket
:
public
Mysql
Response
{
class
My
SQLEofPacket
:
public
MySQL
Response
{
public:
int
decode
(
const
MsgBuf
&
msgbuf
)
override
;
...
...
@@ -60,7 +59,7 @@ private:
uint16_t
_status_flags
;
};
class
My
sqlErrorPacket
:
public
Mysql
Response
{
class
My
SQLErrorPacket
:
public
MySQL
Response
{
public:
friend
class
MysqlProtocol
;
...
...
@@ -74,17 +73,13 @@ private:
std
::
string
_message
;
};
class
My
sql
InitialHandShakePacket
{
class
My
SQL
InitialHandShakePacket
{
public:
/**
* 连接上mysql之后,mysql就向客户端发送一个握手数据包。握手数据包中包含了一个随机生成的字符串(20字节)。
* 这个随机字符串称为scramble,与密码一起做运算后,发送给mysql server做鉴权
*/
int
decode
(
const
MsgBuf
&
msgbuf
);
bool
scramble_
buffer_
valid
()
const
;
bool
scramble_valid
()
const
;
const
std
::
vector
<
char
>&
scramble
_buffer
()
const
;
const
std
::
vector
<
char
>&
scramble
()
const
;
uint8_t
sequence
()
const
;
...
...
@@ -92,34 +87,28 @@ private:
uint8_t
_sequence
=
0
;
uint8_t
_protocol_version
=
0
;
uint32_t
_capabilities_flag
=
0
;
std
::
vector
<
char
>
_scramble_buffer
;
bool
_scramble_buffer_valid
=
false
;
std
::
string
_auth_plugin_name
;
bool
_scramble_valid
=
false
;
std
::
vector
<
char
>
_scramble
;
};
class
My
sql
HandShakeResponsePacket
{
class
My
SQL
HandShakeResponsePacket
{
public:
My
sqlHandShakeResponsePacket
(
const
std
::
string
&
username
,
const
std
::
string
&
database
,
const
std
::
vector
<
char
>&
auth_response
,
int8_t
sequenc
e
);
My
SQLHandShakeResponsePacket
(
const
std
::
string
&
username
,
const
std
::
string
&
database
,
const
std
::
vector
<
char
>&
auth_respons
e
);
/**
* 客户端创建socket连接成功mysql server后,MySQL会发一个握手包,之后客户端向MySQL server回复一个消息。
* 这里就负责这条消息的编码。
*/
int
encode
(
MsgBuf
&
msgbuf
);
private:
uint32_t
calc_capabilities_flag
();
private:
std
::
string
_username
;
std
::
string
_database
;
std
::
vector
<
char
>
_auth_response
;
int8_t
_sequence
=
0
;
};
class
My
sql
QueryPacket
{
class
My
SQL
QueryPacket
{
public:
explicit
My
sql
QueryPacket
(
const
std
::
string
&
sql
);
explicit
My
SQL
QueryPacket
(
const
std
::
string
&
sql
);
// use memory in-stack, none any heap memory would be alloc
int
encode_inplace
(
MsgBuf
&
msgbuf
);
...
...
@@ -129,7 +118,7 @@ private:
const
std
::
string
&
_sql
;
};
class
My
sqlCol
:
public
Mysql
Response
{
class
My
SQLCol
:
public
MySQL
Response
{
public:
int
decode
(
const
MsgBuf
&
msgbuf
)
override
;
...
...
@@ -149,11 +138,11 @@ private:
uint16_t
_filler
;
};
class
My
sqlRow
:
public
Mysql
Response
{
class
My
SQLRow
:
public
MySQL
Response
{
public:
friend
class
MysqlProtocol
;
explicit
My
sql
Row
(
uint64_t
col_count
);
explicit
My
SQL
Row
(
uint64_t
col_count
);
int
decode
(
const
MsgBuf
&
msgbuf
)
override
;
...
...
@@ -172,7 +161,7 @@ private:
std
::
vector
<
std
::
string
>
_fields
;
};
class
My
sqlQueryResponsePacket
:
public
Mysql
Response
{
class
My
SQLQueryResponsePacket
:
public
MySQL
Response
{
public:
friend
class
MysqlProtocol
;
...
...
@@ -186,15 +175,15 @@ public:
private:
uint64_t
_col_count
=
0
;
My
sql
ErrorPacket
_err
;
My
SQL
ErrorPacket
_err
;
};
struct
My
sql
ResultSet
{
struct
My
SQL
ResultSet
{
void
reset
();
uint64_t
col_count
=
0
;
std
::
vector
<
My
sql
Col
>
cols
;
std
::
vector
<
My
sql
Row
>
rows
;
std
::
vector
<
My
SQL
Col
>
cols
;
std
::
vector
<
My
SQL
Row
>
rows
;
};
}
// namespace logproxy
...
...
src/obaccess/oblog_config.h
浏览文件 @
0f647e8e
...
...
@@ -60,7 +60,7 @@ struct DbTable {
};
struct
TenantDbTable
{
bool
all_tenant
;
bool
all_tenant
=
false
;
std
::
map
<
std
::
string
,
DbTable
>
tenants
;
int
from
(
const
std
::
string
&
table_whites
);
...
...
src/test/test_common.cpp
浏览文件 @
0f647e8e
...
...
@@ -29,13 +29,6 @@ TEST(COMMON, hex2bin)
std
::
string
binstr
;
hex2bin
(
hexstr
.
data
(),
hexstr
.
size
(),
binstr
);
ASSERT_STREQ
(
binstr
.
c_str
(),
text
);
binstr
.
clear
();
hexstr
.
insert
(
0
,
1
,
' '
);
hexstr
.
insert
(
hexstr
.
size
()
/
2
,
1
,
' '
);
hexstr
.
insert
(
hexstr
.
size
(),
1
,
' '
);
hex2bin
(
hexstr
.
data
(),
hexstr
.
size
(),
binstr
);
ASSERT_STREQ
(
binstr
.
c_str
(),
text
);
}
TEST
(
COMMON
,
json
)
...
...
src/test/test_ob_mysql.cpp
浏览文件 @
0f647e8e
...
...
@@ -51,7 +51,7 @@ int do_sha_password(const std::string& pswd, std::string& sha_password)
}
void
login
(
const
std
::
string
&
host
,
int
port
,
const
std
::
string
&
user
,
const
std
::
string
&
passwd
,
const
std
::
string
&
sql
,
My
sql
ResultSet
&
rs
)
const
std
::
string
&
sql
,
My
SQL
ResultSet
&
rs
)
{
std
::
string
sha_password
;
int
ret
=
do_sha_password
(
passwd
,
sha_password
);
...
...
@@ -65,7 +65,7 @@ void login(const std::string& host, int port, const std::string& user, const std
LogStream
ls
(
0
,
""
,
0
,
nullptr
);
ls
<<
"| column counts:"
<<
rs
.
cols
.
size
()
<<
" |
\n
"
;
for
(
My
sql
Row
&
row
:
rs
.
rows
)
{
for
(
My
SQL
Row
&
row
:
rs
.
rows
)
{
ls
<<
"| "
;
for
(
const
std
::
string
&
field
:
row
.
fields
())
{
ls
<<
field
<<
" | "
;
...
...
@@ -89,32 +89,35 @@ static std::string sys_password_sha1;
static
std
::
string
cluster_url
=
""
;
static
std
::
vector
<
std
::
string
>
sqls
=
{
"show tenant"
,
"select version()"
,
// static std::vector<std::string> sqls = {"select 1"};
static
std
::
vector
<
std
::
string
>
sqls
=
{
"show tenant"
,
"select version()"
};
static
std
::
vector
<
std
::
string
>
sqls_for_sys
=
{
"SELECT server.svr_ip, server.svr_port, server.zone, tenant.tenant_id, tenant.tenant_name from "
"oceanbase.__all_resource_pool AS pool, oceanbase.__all_unit AS unit, oceanbase.__all_server AS "
"server, oceanbase.__all_tenant AS tenant WHERE tenant.tenant_id = pool.tenant_id AND "
"unit.resource_pool_id = pool.resource_pool_id AND unit.svr_ip = server.svr_ip AND "
"unit.svr_port = server.svr_port AND tenant.tenant_name='"
+
tenant
+
"'"
};
tenant
+
"'"
};
TEST
(
MYSQL_AUTH
,
query
)
{
My
sql
ResultSet
rs
;
My
SQL
ResultSet
rs
;
for
(
auto
&
sql
:
sqls
)
{
login
(
host
,
port
,
user
,
password
,
sql
,
rs
);
}
for
(
auto
&
sql
:
sqls_for_sys
)
{
login
(
host
,
port
,
sys_user
,
sys_password
,
sql
,
rs
);
}
}
TEST
(
MYSQL_AUTH
,
auth_ok
)
{
do_sha_password
(
password
,
password_sha1
);
do_sha_password
(
sys_password
,
sys_password_sha1
);
Config
::
instance
().
ob_sys_username
.
set
(
sys_user
);
Config
::
instance
().
ob_sys_password
.
set
(
sys_password
);
do_sha_password
(
password
,
password_sha1
);
do_sha_password
(
sys_password
,
sys_password_sha1
);
OblogConfig
config
(
"first_start_timestamp=0 rootserver_list="
+
host
+
":2881:"
+
std
::
to_string
(
port
)
+
" cluster_user="
+
user
+
" cluster_password="
+
dumphex
(
password_sha1
)
+
" tb_white_list="
+
tenant
+
".*.*"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录