未验证 提交 833674c1 编写于 作者: Z zhagnlu 提交者: GitHub

add glog configurable function and redirect aws log to segcore log (#25664)

Signed-off-by: Nluzhang <luzhang@zilliz.com>
Co-authored-by: Nluzhang <luzhang@zilliz.com>
上级 b986e3af
# if true, only log to stdout
--logtostdout=true
--logtostderr=false
--alsologtostderr=false
# `INFO``, ``WARNING``, ``ERROR``, and ``FATAL`` are 0, 1, 2, and 3
--minloglevel=0
--log_dir=/var/lib/milvus/logs/
# MB
--max_log_size=200
--stop_logging_if_full_disk=true
\ No newline at end of file
......@@ -87,6 +87,9 @@ minio:
# Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws".
# Leave it empty if you want to use AWS default endpoint
iamEndpoint:
# Log level for aws sdk log.
# Supported level: off, fatal, error, warn, info, debug, trace
logLevel: error
# Milvus supports four MQ: rocksmq(based on RockDB), natsmq(embedded nats-server), Pulsar and Kafka.
# You can change your mq by setting mq.type field.
......
......@@ -68,7 +68,8 @@ class MilvusConan(ConanFile):
"aws-sdk-cpp:transfer": False,
"gtest:build_gmock": False,
"boost:without_locale": False,
"glog:with_gflags": False,
"glog:with_gflags": True,
"glog:shared": True,
"prometheus-cpp:with_pull": False,
"fmt:header_only": True,
}
......
......@@ -84,6 +84,7 @@ typedef struct CStorageConfig {
const char* root_path;
const char* storage_type;
const char* iam_endpoint;
const char* log_level;
bool useSSL;
bool useIAM;
} CStorageConfig;
......
......@@ -34,6 +34,14 @@ KnowhereInitImpl(const char* conf_file) {
knowhere::KnowhereConfig::SetEarlyStopThreshold(0);
knowhere::KnowhereConfig::ShowVersion();
google::InitGoogleLogging("milvus");
#ifdef EMBEDDED_MILVUS
// always disable all logs for embedded milvus
google::SetCommandLineOption("minloglevel", "4");
#endif
if (conf_file != nullptr) {
gflags::SetCommandLineOption("flagfile", conf_file);
}
};
std::call_once(init_knowhere_once_, init);
......
......@@ -65,4 +65,11 @@ SegcoreSetSimdType(const char* value) {
return ret;
}
extern "C" void
SegcoreCloseGlog() {
if (google::IsGoogleLoggingInitialized()) {
google::ShutdownGoogleLogging();
}
}
} // namespace milvus::segcore
......@@ -37,6 +37,9 @@ SegcoreSetSimdType(const char*);
void
SegcoreSetKnowhereThreadPoolNum(const uint32_t num_threads);
void
SegcoreCloseGlog();
#ifdef __cplusplus
}
#endif
......@@ -18,6 +18,7 @@
#include <aws/core/auth/AWSCredentials.h>
#include <aws/core/auth/AWSCredentialsProviderChain.h>
#include <aws/core/auth/STSCredentialsProvider.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/s3/model/CreateBucketRequest.h>
#include <aws/s3/model/DeleteBucketRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
......@@ -89,7 +90,14 @@ ConvertFromAwsString(const Aws::String& aws_str) {
}
void
MinioChunkManager::InitSDKAPI(RemoteStorageType type, bool useIAM) {
AwsLogger::ProcessFormattedStatement(Aws::String&& statement) {
LOG_SEGCORE_INFO_ << "[AWS LOG] " << statement;
}
void
MinioChunkManager::InitSDKAPI(RemoteStorageType type,
bool useIAM,
const std::string& log_level_str) {
std::scoped_lock lock{client_mutex_};
const size_t initCount = init_count_++;
if (initCount == 0) {
......@@ -112,8 +120,30 @@ MinioChunkManager::InitSDKAPI(RemoteStorageType type, bool useIAM) {
GOOGLE_CLIENT_FACTORY_ALLOCATION_TAG, credentials);
};
}
sdk_options_.loggingOptions.logLevel =
Aws::Utils::Logging::LogLevel::Info;
LOG_SEGCORE_INFO_ << "init aws with log level:" << log_level_str;
auto get_aws_log_level = [](const std::string& level_str) {
Aws::Utils::Logging::LogLevel level =
Aws::Utils::Logging::LogLevel::Off;
if (level_str == "fatal") {
level = Aws::Utils::Logging::LogLevel::Fatal;
} else if (level_str == "error") {
level = Aws::Utils::Logging::LogLevel::Error;
} else if (level_str == "warn") {
level = Aws::Utils::Logging::LogLevel::Warn;
} else if (level_str == "info") {
level = Aws::Utils::Logging::LogLevel::Info;
} else if (level_str == "debug") {
level = Aws::Utils::Logging::LogLevel::Debug;
} else if (level_str == "trace") {
level = Aws::Utils::Logging::LogLevel::Trace;
}
return level;
};
auto log_level = get_aws_log_level(log_level_str);
sdk_options_.loggingOptions.logLevel = log_level;
sdk_options_.loggingOptions.logger_create_fn = [log_level]() {
return std::make_shared<AwsLogger>(log_level);
};
Aws::InitAPI(sdk_options_);
}
}
......@@ -222,7 +252,7 @@ MinioChunkManager::MinioChunkManager(const StorageConfig& storage_config)
storageType = RemoteStorageType::S3;
}
InitSDKAPI(storageType, storage_config.useIAM);
InitSDKAPI(storageType, storage_config.useIAM, storage_config.log_level);
// The ClientConfiguration default constructor will take a long time.
// For more details, please refer to https://github.com/aws/aws-sdk-cpp/issues/1440
......
......@@ -23,6 +23,7 @@
#include <aws/core/http/URI.h>
#include <aws/core/http/curl/CurlHttpClient.h>
#include <aws/core/http/standard/StandardHttpRequest.h>
#include <aws/core/utils/logging/FormattedLogSystem.h>
#include <aws/s3/S3Client.h>
#include <google/cloud/credentials.h>
#include <google/cloud/internal/oauth2_credentials.h>
......@@ -44,6 +45,25 @@ namespace milvus::storage {
enum class RemoteStorageType { S3 = 0, GOOGLE_CLOUD = 1, ALIYUN_CLOUD = 2 };
/**
* @brief user defined aws logger, redirect aws log to segcore log
*/
class AwsLogger : public Aws::Utils::Logging::FormattedLogSystem {
public:
AwsLogger(Aws::Utils::Logging::LogLevel log_level)
: Aws::Utils::Logging::FormattedLogSystem(log_level) {
}
virtual ~AwsLogger() {
}
virtual void
Flush() override {
}
protected:
virtual void
ProcessFormattedStatement(Aws::String&& statement) override;
};
/**
* @brief This MinioChunkManager is responsible for read and write file in S3.
*/
......@@ -149,7 +169,9 @@ class MinioChunkManager : public ChunkManager {
std::vector<std::string>
ListObjects(const char* bucket_name, const char* prefix = nullptr);
void
InitSDKAPI(RemoteStorageType type, bool useIAM);
InitSDKAPI(RemoteStorageType type,
bool useIAM,
const std::string& log_level);
void
ShutdownSDKAPI();
void
......
......@@ -89,6 +89,7 @@ struct StorageConfig {
std::string root_path = "files";
std::string storage_type = "minio";
std::string iam_endpoint = "";
std::string log_level = "error";
bool useSSL = false;
bool useIAM = false;
};
......
......@@ -64,6 +64,7 @@ InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config) {
std::string(c_storage_config.storage_type);
storage_config.iam_endpoint =
std::string(c_storage_config.iam_endpoint);
storage_config.log_level = std::string(c_storage_config.log_level);
storage_config.useSSL = c_storage_config.useSSL;
storage_config.useIAM = c_storage_config.useIAM;
milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(
......
......@@ -144,9 +144,9 @@ func (i *IndexNode) Register() error {
}
func (i *IndexNode) initSegcore() {
cEasyloggingYaml := C.CString(path.Join(Params.BaseTable.GetConfigDir(), paramtable.DefaultEasyloggingYaml))
C.IndexBuilderInit(cEasyloggingYaml)
C.free(unsafe.Pointer(cEasyloggingYaml))
cGlogConf := C.CString(path.Join(Params.BaseTable.GetConfigDir(), paramtable.DefaultGlogConf))
C.IndexBuilderInit(cGlogConf)
C.free(unsafe.Pointer(cGlogConf))
// override index builder SIMD type
cSimdType := C.CString(Params.CommonCfg.SimdType.GetValue())
......@@ -170,6 +170,10 @@ func (i *IndexNode) initSegcore() {
initcore.InitLocalChunkManager(localDataRootPath)
}
func (i *IndexNode) CloseSegcore() {
initcore.CleanGlogManager()
}
func (i *IndexNode) initSession() error {
i.session = sessionutil.NewSession(i.loopCtx, Params.EtcdCfg.MetaRootPath.GetValue(), i.etcdCli)
if i.session == nil {
......@@ -254,6 +258,7 @@ func (i *IndexNode) Stop() error {
i.session.Stop()
}
i.CloseSegcore()
log.Info("Index node stopped.")
})
return nil
......
......@@ -180,9 +180,9 @@ func (node *QueryNode) Register() error {
// InitSegcore set init params of segCore, such as chunckRows, SIMD type...
func (node *QueryNode) InitSegcore() error {
cEasyloggingYaml := C.CString(path.Join(paramtable.Get().BaseTable.GetConfigDir(), paramtable.DefaultEasyloggingYaml))
C.SegcoreInit(cEasyloggingYaml)
C.free(unsafe.Pointer(cEasyloggingYaml))
cGlogConf := C.CString(path.Join(paramtable.Get().BaseTable.GetConfigDir(), paramtable.DefaultGlogConf))
C.SegcoreInit(cGlogConf)
C.free(unsafe.Pointer(cGlogConf))
// override segcore chunk size
cChunkRows := C.int64_t(paramtable.Get().QueryNodeCfg.ChunkRows.GetAsInt64())
......@@ -227,6 +227,12 @@ func (node *QueryNode) InitSegcore() error {
return initcore.InitRemoteChunkManager(paramtable.Get())
}
func (node *QueryNode) CloseSegcore() {
// safe stop
initcore.CleanRemoteChunkManager()
initcore.CleanGlogManager()
}
// Init function init historical and streaming module to manage segments
func (node *QueryNode) Init() error {
var initError error
......@@ -433,8 +439,7 @@ func (node *QueryNode) Stop() error {
node.manager.Segment.Clear()
}
// safe stop
initcore.CleanRemoteChunkManager()
node.CloseSegcore()
})
return nil
}
......
......@@ -17,11 +17,12 @@
package initcore
/*
#cgo pkg-config: milvus_common milvus_storage
#cgo pkg-config: milvus_common milvus_storage milvus_segcore
#include <stdlib.h>
#include <stdint.h>
#include "common/init_c.h"
#include "segcore/segcore_init_c.h"
#include "storage/storage_c.h"
*/
import "C"
......@@ -62,6 +63,7 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
cRootPath := C.CString(params.MinioCfg.RootPath.GetValue())
cStorageType := C.CString(params.CommonCfg.StorageType.GetValue())
cIamEndPoint := C.CString(params.MinioCfg.IAMEndpoint.GetValue())
cLogLevel := C.CString(params.MinioCfg.LogLevel.GetValue())
defer C.free(unsafe.Pointer(cAddress))
defer C.free(unsafe.Pointer(cBucketName))
defer C.free(unsafe.Pointer(cAccessKey))
......@@ -69,6 +71,7 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
defer C.free(unsafe.Pointer(cRootPath))
defer C.free(unsafe.Pointer(cStorageType))
defer C.free(unsafe.Pointer(cIamEndPoint))
defer C.free(unsafe.Pointer(cLogLevel))
storageConfig := C.CStorageConfig{
address: cAddress,
bucket_name: cBucketName,
......@@ -79,6 +82,7 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error {
iam_endpoint: cIamEndPoint,
useSSL: C.bool(params.MinioCfg.UseSSL.GetAsBool()),
useIAM: C.bool(params.MinioCfg.UseIAM.GetAsBool()),
log_level: cLogLevel,
}
status := C.InitRemoteChunkManagerSingleton(storageConfig)
......@@ -89,6 +93,10 @@ func CleanRemoteChunkManager() {
C.CleanRemoteChunkManagerSingleton()
}
func CleanGlogManager() {
C.SegcoreCloseGlog()
}
// HandleCStatus deals with the error returned from CGO
func HandleCStatus(status *C.CStatus, extraInfo string) error {
if status.error_code == 0 {
......
......@@ -32,7 +32,7 @@ import (
type UniqueID = typeutil.UniqueID
const (
DefaultEasyloggingYaml = "easylogging.yaml"
DefaultGlogConf = "glog.conf"
DefaultMinioHost = "localhost"
DefaultMinioPort = "9000"
DefaultMinioAccessKey = "minioadmin"
......@@ -47,6 +47,7 @@ const (
DefaultLogFormat = "text"
DefaultLogLevelForBase = "debug"
DefaultRootPath = ""
DefaultMinioLogLevel = "error"
DefaultKnowhereThreadPoolNumRatioInBuild = 1
)
......
......@@ -804,6 +804,7 @@ type MinioConfig struct {
UseIAM ParamItem `refreshable:"false"`
CloudProvider ParamItem `refreshable:"false"`
IAMEndpoint ParamItem `refreshable:"false"`
LogLevel ParamItem `refreshable:"false"`
}
func (p *MinioConfig) Init(base *BaseTable) {
......@@ -920,4 +921,12 @@ Leave it empty if you want to use AWS default endpoint`,
Export: true,
}
p.IAMEndpoint.Init(base.mgr)
p.LogLevel = ParamItem{
Key: "minio.logLevel",
DefaultValue: DefaultMinioLogLevel,
Version: "2.3.0",
Doc: `Log level for aws sdk log. Supported level: off, fatal, error, warn, info, debug, trace`,
Export: true,
}
p.LogLevel.Init(base.mgr)
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册