提交 a739ffa9 编写于 作者: F Fankux

support liboblog312 && compress message by default

上级 72042ac9
......@@ -49,9 +49,16 @@ message(STATUS "DEP_VAR: ${DEP_VAR}")
message(STATUS "JAVA_HOME: ${JAVA_HOME}")
# compiler
find_program(CC NAMES gcc PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${DEP_VAR}/usr/local/gcc-5.2.0/bin/ /usr/bin/ NO_DEFAULT_PATH)
find_program(CXX NAMES g++ PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${DEP_VAR}/usr/local/gcc-5.2.0/bin/ /usr/bin/ NO_DEFAULT_PATH)
find_program(AR NAMES gcc-ar ar PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${DEP_VAR}/usr/local/gcc-5.2.0/bin/ /usr/bin/ NO_DEFAULT_PATH)
execute_process(
COMMAND which gcc
OUTPUT_VARIABLE GCC_BIN
)
GET_FILENAME_COMPONENT(COMPILER_DIR ${GCC_BIN} DIRECTORY)
message(STATUS "COMPILER_DIR: ${COMPILER_DIR}")
find_program(CC NAMES gcc PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${COMPILER_DIR} /usr/bin/ NO_DEFAULT_PATH)
find_program(CXX NAMES g++ PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${COMPILER_DIR} /usr/bin/ NO_DEFAULT_PATH)
find_program(AR NAMES gcc-ar ar PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${COMPILER_DIR} /usr/bin/ NO_DEFAULT_PATH)
SET(CMAKE_C_COMPILER ${CC})
SET(CMAKE_CXX_COMPILER ${CXX})
SET(CMAKE_C_COMPILER_AR ${AR})
......@@ -151,8 +158,14 @@ add_library(PROTO_OBJS OBJECT ${PROTO_SRCS} ${PROTO_HDRS})
message("protoc: ${PROTOBUF_PROTOC_EXECUTABLE}, proto srcs : ${PROTO_SRCS}")
# oblogmsg
include(oblogmsg)
SET(OBLOGMSG_MAPPING "")
SET(OBLOGMSG_MAPPING "-DLOGMSG_BY_LIBOBLOG=1 -DLogMsgLocalInit=\"if((_t_s_lmb=new(std::nothrow)LogMsgBuf())==nullptr){OMS_ERROR<<\\\"Failed to alloc LogMsgBuf\\\";stop();return;}\" -DLogMsgLocalDestroy=\"delete _t_s_lmb\"")
SET(OBLOGMSG_INCLUDE_DIR ${LIBOBLOG_INCLUDE_PATH} ${LIBOBLOG_INCLUDE_PATH}/oblogmsg)
SET(OBLOGMSG_LIBRARIES ${LIBOBLOG_LIBRARIES})
GET_FILENAME_COMPONENT(OBLOGMSG_LIB_DIR ${OBLOGMSG_LIBRARIES} DIRECTORY)
ADD_LIBRARY(oblogmsg STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET oblogmsg PROPERTY IMPORTED_LOCATION ${OBLOGMSG_LIBRARIES})
# oblog
if (FIND_LIBOBLOG)
......
......@@ -16,7 +16,9 @@ ExternalProject_Add(
${EXTERNAL_PROJECT_LOG_ARGS}
DEPENDS gflags
GIT_REPOSITORY "https://github.com/google/glog.git"
GIT_TAG "v0.4.0"
GIT_TAG "v0.5.0"
GIT_SUBMODULES ""
GIT_SUBMODULES_RECURSE "false"
PREFIX ${GLOG_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
......@@ -30,7 +32,9 @@ ExternalProject_Add(
-DCMAKE_INSTALL_PREFIX=${GLOG_INSTALL_DIR}
-DCMAKE_INSTALL_LIBDIR=${GLOG_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DBUILD_SHARED_LIBS=OFF
-DWITH_GFLAGS=ON
-DWITH_GTEST=OFF
-Dgflags_DIR=${GFLAGS_INSTALL_DIR}/lib/cmake/gflags
-DBUILD_TESTING=OFF
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
......
......@@ -31,6 +31,8 @@ ExternalProject_Add(
-DCMAKE_INSTALL_LIBDIR=${GTEST_INSTALL_DIR}/lib
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DBUILD_GMOCK=OFF
-Dbuild_gtest_samples=OFF
-Dgtest_build_tests=OFF
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
-DCMAKE_PREFIX_PATH=${prefix_path}
${EXTERNAL_OPTIONAL_ARGS}
......
......@@ -7,7 +7,7 @@ stable_repo=https://mirrors.aliyun.com/oceanbase/community/stable/el/7/x86_64/
[deps]
devdeps-openssl-static-1.0.1e-3.el7.x86_64.rpm
devdeps-libaio-0.3.112-3.el7.x86_64.rpm
oceanbase-ce-devel-3.1.1-1.el7.x86_64.rpm
oceanbase-ce-devel-3.1.2-10000392021123010.el7.x86_64.rpm
[tools]
obdevtools-gcc-5.2.0-3.el7.x86_64.rpm
\ No newline at end of file
......@@ -7,7 +7,7 @@ stable_repo=https://mirrors.aliyun.com/oceanbase/community/stable/el/8/x86_64/
[deps]
devdeps-openssl-static-1.0.1e-3.el8.x86_64.rpm
devdeps-libaio-0.3.112-3.el8.x86_64.rpm
oceanbase-ce-devel-3.1.1-1.el8.x86_64.rpm
oceanbase-ce-devel-3.1.2-10000392021123010.el8.x86_64.rpm
[tools]
obdevtools-gcc-5.2.0-3.el8.x86_64.rpm
......@@ -85,9 +85,11 @@ public:
} else {
oblog_config.password.set(Config::instance().ob_sys_password.val());
}
reader.init(_client.id.get(), _client.packet_version, ch, oblog_config);
reader.start();
reader.join();
int ret = reader.init(_client.id.get(), _client.packet_version, ch, oblog_config);
if (ret == OMS_OK) {
reader.start();
reader.join();
}
// !!!IMPORTANT!!! we don't quit current thread which work as child process's main thread
// we IGNORE other context inheriting from parent process
......
......@@ -10,6 +10,7 @@
* See the Mulan PubL v2 for more details.
*/
#include <stdint.h>
#include "common/guard.hpp"
#include "communication/channel.h"
#include "codec/decoder.h"
......
......@@ -124,11 +124,14 @@ int RecordDataMessage::encode_log_records(MsgBuf& buffer, size_t& raw_len) const
switch (compress_type) {
case CompressType::PLAIN: {
int ret = encode_log_records_plain(buffer);
if (OMS_OK == ret) {
if (ret == OMS_OK) {
raw_len = buffer.byte_size();
}
return ret;
}
case CompressType::LZ4: {
return encode_log_records_lz4(buffer, raw_len);
}
default: {
OMS_ERROR << "Unsupported compress type: " << (int)compress_type;
return OMS_FAILED;
......
......@@ -16,6 +16,16 @@
namespace oceanbase {
namespace logproxy {
static char dump_line[1024];
static void dump_writer(const char* buf, int size)
{
int len = std::min(1022, size);
memcpy(dump_line, buf, len);
dump_line[len] = '\n';
dump_line[len + 1] = '\0';
printf(dump_line);
}
void init_log(const char* argv0, bool restart)
{
#ifdef WITH_GLOG
......@@ -51,6 +61,9 @@ void init_log(const char* argv0, bool restart)
google::SetLogDestination(google::GLOG_ERROR, "log/logproxy_error.");
google::SetLogDestination(google::GLOG_FATAL, "log/logproxy_error.");
google::InstallFailureSignalHandler();
google::InstallFailureWriter(dump_writer);
FileGcRoutine log_gc("./log", {"logproxy_", bin_name + ".log"});
// log_gc.start();
// log_gc.detach();
......
......@@ -167,9 +167,9 @@ int main(int argc, char** argv)
oblog_config.password.set(dumphex(password_sha1));
}
if (!oblog_config.sys_password.val().empty()) {
std::string sys_password_sha1;
MysqlProtocol::do_sha_password(oblog_config.sys_password.val(), sys_password_sha1);
oblog_config.sys_password.set(dumphex(sys_password_sha1));
// std::string sys_password_sha1;
// MysqlProtocol::do_sha_password(oblog_config.sys_password.val(), sys_password_sha1);
oblog_config.sys_password.set(oblog_config.sys_password.val());
}
Config::instance().verbose.set(true);
......
......@@ -62,9 +62,16 @@ void OblogConfig::set(const std::string& key, const std::string& value)
void OblogConfig::generate_configs(std::map<std::string, std::string>& configs) const
{
for (auto& entry : _configs) {
configs.emplace(entry.first, entry.second->debug_str());
const std::string& val = entry.second->debug_str();
if (val.empty()) {
continue;
}
configs.emplace(entry.first, val);
}
for (auto& entry : _extras) {
if (entry.second.empty()) {
continue;
}
configs.emplace(entry.first, entry.second);
}
}
......
......@@ -28,6 +28,10 @@ namespace logproxy {
static Config& _s_config = Config::instance();
#ifdef LOGMSG_BY_LIBOBLOG
static __thread LogMsgBuf* _t_s_lmb = nullptr;
#endif
SenderRoutine::SenderRoutine(ObLogReader& reader, OblogAccess& oblog, BlockingQueue<ILogRecord*>& rqueue)
: Thread("SenderRoutine"), _reader(reader), _oblog(oblog), _rqueue(rqueue)
{}
......@@ -70,7 +74,7 @@ void SenderRoutine::stop()
void SenderRoutine::run()
{
LogMsgLocalInit();
LogMsgLocalInit;
std::vector<ILogRecord*> records;
records.reserve(_s_config.read_wait_num.val());
......@@ -123,7 +127,11 @@ void SenderRoutine::run()
for (i = 0; i < records.size(); ++i) {
ILogRecord* r = records[i];
size_t size = 0;
#ifdef LOGMSG_BY_LIBOBLOG
const char* rbuf = r->toString(&size, _t_s_lmb, true);
#else
const char* rbuf = r->toString(&size, true);
#endif
if (rbuf == nullptr) {
OMS_ERROR << "failed parse logmsg Record, !!!EXIT!!!";
stop();
......@@ -169,7 +177,7 @@ void SenderRoutine::run()
}
}
LogMsgLocalDestroy();
LogMsgLocalDestroy;
_reader.stop();
}
......
......@@ -57,7 +57,7 @@ TEST(COMPRESS, lz4_flow)
char* raw_block = (char*)malloc(block_size + 1);
ASSERT_TRUE(raw_block != nullptr);
int decompressed_size = LZ4_decompress_safe(compressed + offset + 8, raw_block, compressed_block_size, block_size);
ASSERT_EQ(decompressed_size, block_size);
ASSERT_EQ((uint32_t)decompressed_size, block_size);
raw_block[decompressed_size] = '\0';
OMS_INFO << "decompress block: " << raw_block << ", size:" << block_size
<< ", compressed size:" << compressed_block_size;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册