提交 bd102bd3 编写于 作者: xiaonuo911teamo's avatar xiaonuo911teamo

1.第一次编写规范化整理

上级 9d904914
......@@ -3,3 +3,4 @@ inner-depend/
code/.DS_Store
opt/linux-x86_64
build/
.vscode/
......@@ -21,7 +21,7 @@ function create_dir() {
mkdir -p $1
}
function get_cpu_num(){
function get_cpu_num() {
return `cat /proc/cpuinfo | grep processor | wc -l`
}
......@@ -65,7 +65,7 @@ function native_x86() {
export TYPE_CHAR=$TYPE_CHAR'n'
}
function set_release(){
function set_release() {
export CMAKE_BUILD_TYPE="-DCMAKE_BUILD_TYPE=MinSizeRel"
}
......
......@@ -39,7 +39,7 @@ struct Cake {
DoubleBufferData<Cake> cake; // 表示蛋糕
std::thread producer([&](){
std::thread producer([&]() {
while(1) {
if (!cake.is_update()) { // get_data 和 set_data会自动更新内部的is_updated状态
AppUtil::sleep_ms(2000);
......@@ -48,7 +48,7 @@ std::thread producer([&](){
}
});
std::thread customer([&](){
std::thread customer([&]() {
int plan[6] = {1, 2, 5, 7, 8, 10}; // 计划第几秒去取蛋糕
for (int i = 0; i < 6;) {
static int64_t init_time = AppUtil::get_current_ms();
......
......@@ -12,16 +12,16 @@ public:
_data_buffer_updated(false), _has_data(false) {}
DoubleBufferData(const T& data):
_data_buffer_updated(false){
_data_buffer_updated(false) {
set_data(data);
}
DoubleBufferData& operator=(const T& data){
DoubleBufferData& operator=(const T& data) {
set_data(data);
return *this;
}
void set_data(const T& data){
void set_data(const T& data) {
_data_mutex.lock();
_data_buffer = data;
_data_buffer_updated = true;
......@@ -29,7 +29,7 @@ public:
_data_mutex.unlock();
}
const T get_data(){
const T get_data() {
T data;
_data_mutex.lock();
data = _data_buffer;
......@@ -38,7 +38,7 @@ public:
return data;
}
const T peek_data(){
const T peek_data() {
T data;
_data_mutex.lock();
data = _data_buffer;
......@@ -47,7 +47,7 @@ public:
}
// 数据是否有更新
bool is_updated(){
bool is_updated() {
return _data_buffer_updated;
}
......
......@@ -27,7 +27,7 @@ public:
void push_data(const std::deque<T>& data) {
std::lock_guard<std::mutex> lg(data_mutex);
_data_buffer.clear();
for (auto& t : data){
for (auto& t : data) {
_data_buffer.push_back(t);
_data_buffer_updated = true;
......
......@@ -27,7 +27,7 @@ public:
void push_data(const std::vector<T>& data) {
std::lock_guard<std::mutex> lg(_data_mutex);
_data_buffer.clear();
for (auto& t : data){
for (auto& t : data) {
_data_buffer.push_back(t);
if (_data_buffer.size() > _max_size) {
_data_buffer.pop_back();
......
......@@ -27,7 +27,7 @@ class Diagnose{
public:
// 功能等同于Message中的add_server_func
// 此处register_server的含义是为了注册一个用于诊断的服务,所以内部自动在服务名称后面追加了 "_diag"
static void register_server(const std::string& name, std::function<std::string(const std::string&)> func){
static void register_server(const std::string& name, std::function<std::string(const std::string&)> func) {
if (Messager::has_server<std::string(const std::string&)>(name + "__diag")) {
WARNING() << " diag server named " << name << " is already exist!";
} else {
......@@ -39,7 +39,7 @@ public:
template<class T>
static void register_diag_action(const std::string& name,
std::function<void(const T&)> func,
TriggerType trigger_type = TRUE, int trigger_times = 1){
TriggerType trigger_type = TRUE, int trigger_times = 1) {
auto& trigger_times_map = get_trigger_times_map<T>();
auto& condition_map = get_condition_map<T>();
auto& last_condition_map = get_last_condition_map<T>();
......@@ -47,50 +47,50 @@ public:
trigger_times_id++;
int trigger_times_id_captured = trigger_times_id;
auto trigger_func = [=, &trigger_times_map](const T& data, bool is_change){
auto trigger_func = [=, &trigger_times_map](const T& data, bool is_change) {
auto& func_trigger_times = trigger_times_map[trigger_times_id_captured];
if (is_change){
if (is_change) {
func_trigger_times = 0;
}
if (++func_trigger_times >= trigger_times){
if (++func_trigger_times >= trigger_times) {
func(data);
func_trigger_times = 0;
}
};
Messager::subcribe<bool, T>(name + "__diag", [=, &condition_map, &last_condition_map](bool condition, const T& data){
Messager::subcribe<bool, T>(name + "__diag", [=, &condition_map, &last_condition_map](bool condition, const T& data) {
auto& last_condition = condition_map[name];
bool is_changed_true_false = last_condition ^ condition;
bool has_last_last_condition = last_condition_map.find(name) != last_condition_map.end();
bool is_changed_up_down = false;
if (has_last_last_condition){
if (has_last_last_condition) {
is_changed_up_down = !last_condition_map[name] ^ last_condition;
}
switch (trigger_type) {
case TRUE:
if (condition){
if (condition) {
trigger_func(data, is_changed_true_false);
}
break;
case FALSE:
if (!condition){
if (!condition) {
trigger_func(data, is_changed_true_false);
}
break;
case UP:
if (!last_condition && condition){
if (!last_condition && condition) {
trigger_func(data, is_changed_up_down);
}
break;
case DOWN:
if (last_condition && !condition){
if (last_condition && !condition) {
trigger_func(data, is_changed_up_down);
}
break;
case UPDOWN:
if (last_condition ^ condition){
if (last_condition ^ condition) {
trigger_func(data, is_changed_up_down);
}
break;
......@@ -101,15 +101,15 @@ public:
static void register_diag_action(const std::string& name,
std::function<void()> func,
TriggerType trigger_type = TRUE, int trigger_times = 1){
register_diag_action<bool>(name, [func](const bool&){
TriggerType trigger_type = TRUE, int trigger_times = 1) {
register_diag_action<bool>(name, [func](const bool&) {
func();
}, trigger_type, trigger_times);
}
// 激活以name为名的action,信号量为condition,就是0或1,data对应处理函数中需要的参数
template<class T>
static void fire_diag_condition(const std::string& name, bool condition, const T& data){
static void fire_diag_condition(const std::string& name, bool condition, const T& data) {
auto& condition_map = get_condition_map<T>();
auto& last_condition_map = get_last_condition_map<T>();
if (condition_map.find(name) != condition_map.end()) {
......@@ -119,18 +119,18 @@ public:
condition_map[name] = condition;
}
static void fire_diag_condition(const std::string& name, bool condition){
static void fire_diag_condition(const std::string& name, bool condition) {
fire_diag_condition(name, condition, true);
}
// 调用以name为名的服务,以value为参数,使用data为返回值
// 服务name使用register_server进行注册
// 此处只实现了string类型的特化版本,用于输出诊断监控的数据信息,详情可看test
static bool diag_call(const std::string& name, const std::string& value, std::string& data){
static bool diag_call(const std::string& name, const std::string& value, std::string& data) {
return diag_call("name:" + name + ";value:" + value, data);
}
// 从字串 'name:***;value:***',解析出name和value,调用diag(name, value, data)
static bool diag_call(const std::string& input, std::string& data){
static bool diag_call(const std::string& input, std::string& data) {
static std::regex regex("\\s*name:\\s*(\\w*)\\s*;\\s*value:\\s*(\\w*)\\s*");
std::smatch result;
bool retval = false;
......@@ -139,7 +139,7 @@ public:
std::string value = result[2];
auto func = Messager::get_server_func<std::string(const std::string&)>(name + "__diag");
if (func){
if (func) {
data = func(value);
retval = true;
} else {
......@@ -154,13 +154,13 @@ public:
public:
// 手动注册diag_call中调用服务的储存结构,不手动调用时,会自动注册
static void register_diag_server_map(){
static void register_diag_server_map() {
Messager::register_server_map<std::string(const std::string&)>();
}
// 手动注册模板为T,中断触发系统,不手动调用时,会自动注册
template<class T>
static void register_condition_data_map(){
static void register_condition_data_map() {
Messager::register_data_map<bool, T>();
get_condition_map<T>();
get_last_condition_map<T>();
......@@ -169,19 +169,19 @@ public:
private:
template<class T>
static std::map<std::string, bool> &get_condition_map(){
static std::map<std::string, bool> &get_condition_map() {
static std::map<std::string, bool> condition_map;
return condition_map;
}
template<class T>
static std::map<std::string, bool> &get_last_condition_map(){
static std::map<std::string, bool> &get_last_condition_map() {
static std::map<std::string, bool> condition_map;
return condition_map;
}
template<class T>
static std::map<int, int> &get_trigger_times_map(){
static std::map<int, int> &get_trigger_times_map() {
static std::map<int, int> trigger_times_map;
return trigger_times_map;
}
......
......@@ -38,13 +38,13 @@ private:
std::mutex _mutex;
public:
~Frequence(){
~Frequence() {
stop();
wait();
}
public:
static void trigger_once(const std::string& name){
static void trigger_once(const std::string& name) {
instance().trigger_inner(name);
}
......@@ -59,7 +59,7 @@ public:
private:
// 3秒累计一次频率
Frequence(): TimerElement(3000, "Frequence"){
Frequence(): TimerElement(3000, "Frequence") {
start();
}
......@@ -86,7 +86,7 @@ private:
return frequence;
}
static Frequence& instance(){
static Frequence& instance() {
static Frequence frequence;
return frequence;
}
......
......@@ -58,7 +58,7 @@ public:
_address.sin_port = htons(_port);
inet_aton("0.0.0.0", &_address.sin_addr);
_address_len = sizeof(struct sockaddr_in);
if (bind(this->_fd, (struct sockaddr*)&_address, _address_len) == -1){
if (bind(this->_fd, (struct sockaddr*)&_address, _address_len) == -1) {
FATAL() << "[listen_on_port] with [port=" << port << "] Cannot bind socket";
}
}
......
......@@ -6,56 +6,56 @@ namespace iv_log {
class Debug : public LogInterface{
public:
Debug(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Debug(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss) {
Messager::publish("log_debug", ss.str());
std::cout << ss.str();
}, file, line, DEBUG){
}, file, line, DEBUG) {
}
};
class Info : public LogInterface{
public:
Info(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Info(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss) {
Messager::publish("log_info", ss.str());
std::cout << ss.str();
}, file, line, INFO){
}, file, line, INFO) {
}
};
class Warning : public LogInterface{
public:
Warning(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Warning(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss) {
Messager::publish("log_warning", ss.str());
std::cerr << ss.str();
}, file, line, WARNING){
}, file, line, WARNING) {
}
};
class Error : public LogInterface{
public:
Error(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Error(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss) {
Messager::publish("log_error", ss.str());
std::cerr << ss.str();
}, file, line, ERROR){
}, file, line, ERROR) {
}
};
class Fatal : public LogInterface{
public:
Fatal(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Fatal(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss) {
Messager::publish("log_fatal", ss.str());
std::cerr << ss.str();
throw std::bad_exception();
}, file, line, FATAL){
}, file, line, FATAL) {
}
};
class Direct : public LogInterface{
public:
Direct(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss){
Direct(const std::string& file, int line) : LogInterface([&](const std::stringstream& ss) {
Messager::publish("log_direct", ss.str());
std::cerr << ss.str();
}, file, line, DIRECT){
}, file, line, DIRECT) {
}
};
}
......@@ -17,7 +17,7 @@ enum LogLevel{
class LogInterface{
public:
LogInterface(const std::function<void(const std::stringstream&)> logging_func,
const std::string& file, int line, LogLevel level){
const std::string& file, int line, LogLevel level) {
_logging_func = logging_func;
_level = level;
_ss << "[" << get_log_level_string() << " "
......@@ -28,7 +28,7 @@ public:
}
template<class T>
LogInterface& operator << (const T& t){
LogInterface& operator << (const T& t) {
_ss << t;
return *this;
}
......@@ -37,17 +37,17 @@ public:
return *this;
}
~LogInterface(){
if (get_global_log_level() <= _level){
~LogInterface() {
if (get_global_log_level() <= _level) {
_ss << std::endl;
_logging_func(_ss);
}
}
static void set_log_level(LogLevel log_level){
static void set_log_level(LogLevel log_level) {
get_global_log_level() = log_level;
}
static std::string log_level_to_string(LogLevel log_level){
static std::string log_level_to_string(LogLevel log_level) {
switch (log_level) {
case DEBUG:
return "DEBUG";
......@@ -66,19 +66,21 @@ public:
}
private:
static LogLevel& get_global_log_level(){
static LogLevel& get_global_log_level() {
static LogLevel log_level = INFO;
return log_level;
}
std::string get_log_level_string(){
std::string get_log_level_string() {
return log_level_to_string(_level);
}
private:
LogInterface& operator = (const LogInterface&);
private:
std::stringstream _ss;
std::string _file;
int _line;
LogLevel _level = INFO;
std::function<void(const std::stringstream&)> _logging_func;
LogInterface& operator = (const LogInterface&);
};
......@@ -115,7 +115,7 @@ public:
template<typename T>
static void add_server_func(const std::string &key, std::function<T> func) {
auto &server_func = get_server_func<T>(key);
if (server_func){
if (server_func) {
publish("log_fatal", "server_func is already exists, key: " + key);
throw std::bad_exception();
}
......@@ -125,7 +125,7 @@ public:
template<typename T>
static bool has_server(const std::string &key) {
auto &server_func = get_server_func<T>(key);
if (server_func){
if (server_func) {
return true;
} else {
return false;
......@@ -183,8 +183,8 @@ public:
static std::vector<std::string> get_server_list() {
std::vector<std::string> keys;
auto& server_map = get_server_map<T>();
for (auto& server : server_map){
if (server.second){
for (auto& server : server_map) {
if (server.second) {
keys.push_back(server.first);
}
}
......
......@@ -7,36 +7,36 @@
// 模块控制器,可同时控制多个模块启动,停止
class PipeController {
public:
// 注册模块,到pipe_elements
// 注册模块,到_pipe_elements
void register_element(std::shared_ptr<basic::PipeElement> element) {
pipe_elements.push_back(element);
_pipe_elements.push_back(element);
}
// 加入模块,到pipe_elements,与register_element是两种方式
// 加入模块,到_pipe_elements,与register_element是两种方式
template<class T>
void add_element() {
auto element = std::make_shared<T>();
pipe_elements.push_back(element);
_pipe_elements.push_back(element);
}
template<class T>
void add_element(T *ele) {
std::shared_ptr<T> sele(ele);
pipe_elements.push_back(sele);
_pipe_elements.push_back(sele);
}
void start() {
for (auto iter : pipe_elements) {
for (auto iter : _pipe_elements) {
iter->start();
}
}
void stop() {
for (auto iter : pipe_elements) {
for (auto iter : _pipe_elements) {
iter->stop();
}
}
void wait() {
for (auto iter : pipe_elements) {
for (auto iter : _pipe_elements) {
iter->wait();
}
}
......@@ -56,6 +56,6 @@ public:
private:
// 保存多个element, 进行同步管理
std::vector<std::shared_ptr<basic::PipeElement>> pipe_elements;
std::vector<std::shared_ptr<basic::PipeElement>> _pipe_elements;
};
......@@ -17,128 +17,128 @@ namespace basic {
class PipeElement {
public:
// 构造PipeElement,循环处理模块
// @wait_cond 是否等待submit处理
// @_wait_cond 是否等待submit处理
// @name 模块名称
explicit PipeElement(bool wait_cond = true, std::string name = "default") :
running(false), busy(false),
wait_cond(wait_cond),
submitted(false) , thr_name(name){
explicit PipeElement(bool _wait_cond = true, std::string name = "default") :
_running(false), _busy(false),
_wait_cond(_wait_cond),
_submitted(false) , _thr_name(name) {
DIRECT() << "Task " << name << " loaded!";
}
virtual ~PipeElement() = default;
// 模块启动函数
virtual void start() {
if (running) {
ERROR() << "start pipe element failed, pipe element is already running!";
if (_running) {
ERROR() << "start pipe element failed, pipe element is already _running!";
return;
}
thd = std::thread(
_thd = std::thread(
[&] {
thread_initial();
running = true;
_running = true;
if (appPref.has_string_key("affinity.CpuIndex")){
if (appPref.has_string_key("affinity.CpuIndex")) {
auto affinity_str = appPref.get_string_data("affinity.CpuIndex");
AppUtil::set_thread_affinity(std::stoi(affinity_str));
}
auto affinity_key = "affinity." + thr_name;
if (appPref.has_string_key(affinity_key)){
auto affinity_key = "affinity." + _thr_name;
if (appPref.has_string_key(affinity_key)) {
auto affinity_str = appPref.get_string_data(affinity_key);
AppUtil::set_thread_affinity(std::stoi(affinity_str));
}
while (running) {
if (wait_cond && !submitted) {
std::unique_lock<std::mutex> lck(cv_mutex);
func_cv.wait(lck);
while (_running) {
if (_wait_cond && !_submitted) {
std::unique_lock<std::mutex> lck(_cv_mutex);
_func_cv.wait(lck);
if (!running) {
if (!_running) {
break;
}
}
busy = true;
submitted = false;
_busy = true;
_submitted = false;
this->thread_func();
busy = false;
_busy = false;
}
thread_closing();
});
pthread_setname_np(thd.native_handle(), thr_name.c_str());
auto prio_key = "priority." + thr_name;
if (appPref.has_string_key(prio_key)){
pthread_setname_np(_thd.native_handle(), _thr_name.c_str());
auto prio_key = "_priority." + _thr_name;
if (appPref.has_string_key(prio_key)) {
auto prio_str = appPref.get_string_data(prio_key);
set_pthd_schd_param(SCHED_RR, std::stoi(prio_str));
INFO() << "Task " << thr_name << " priority: " << prio_str;
INFO() << "Task " << _thr_name << " _priority: " << prio_str;
} else {
WARNING() << "Task " << thr_name << " using default priority";
WARNING() << "Task " << _thr_name << " using default _priority";
}
}
// 模块停止函数
virtual void stop() {
if (!running) {
ERROR() << "stop failed, pipe element is not running!";
if (!_running) {
ERROR() << "stop failed, pipe element is not _running!";
return;
}
running = false;
_running = false;
submit();
}
// 等待模块退出函数
void wait() {
thd.join();
_thd.join();
}
// 提交函数,在wait_cond=true时,主动调用处理函数
void submit() {
submitted = true;
func_cv.notify_one();
_submitted = true;
_func_cv.notify_one();
}
// 获取busy状态
bool is_busy() {
return busy;
return _busy;
}
// 直接设置 wait_cond 状态
// 直接设置 _wait_cond 状态
void set_wait_cond(bool value) {
wait_cond = value;
_wait_cond = value;
}
// 返回模块的运行状态
bool is_running() const {
return running;
return _running;
}
// 设置线程的优先级和优先策略
void set_pthd_schd_param(int in_policy, int in_priority ) {
sched_param param;
param.sched_priority = in_priority;
pthread_setschedparam(thd.native_handle(), in_policy, &param);
pthread_setschedparam(_thd.native_handle(), in_policy, &param);
this->policy = in_policy;
this->priority = in_priority;
this->_policy = in_policy;
this->_priority = in_priority;
}
protected:
std::thread thd;
std::condition_variable func_cv;
std::mutex cv_mutex;
std::atomic_bool running; // 运行状态
std::atomic_bool busy;
std::atomic_bool wait_cond;
std::atomic_bool submitted;
std::string thr_name;
int priority; // 优先级
int policy; // 优先策略
private:
virtual void thread_func() {};
virtual void thread_initial() {}
virtual void thread_closing() {}
protected:
std::thread _thd;
std::condition_variable _func_cv;
std::mutex _cv_mutex;
std::atomic_bool _running; // 运行状态
std::atomic_bool _busy;
std::atomic_bool _wait_cond;
std::atomic_bool _submitted;
std::string _thr_name;
int _priority; // 优先级
int _policy; // 优先策略
};
}
......@@ -16,33 +16,33 @@ namespace basic {
// 任务池模块,将待处理的任务一一加入任务池,会自动进行处理
class TaskPoolElement : public PipeElement {
public:
explicit TaskPoolElement(const std::string& name) : PipeElement(false, name){
explicit TaskPoolElement(const std::string& name) : PipeElement(false, name) {
start();
}
~TaskPoolElement(){
~TaskPoolElement() {
stop();
wait();
}
// 将func加入任务池
void asysc(const std::function<void()>& func){
void asysc(const std::function<void()>& func) {
std::unique_lock<std::mutex> lk(_mutex);
_funcs.push(func);
lk.unlock();
cv.notify_one();
_cv.notify_one();
}
// 当前任务数量
size_t task_size(){
size_t task_size() {
return _funcs.size();
}
private:
virtual void thread_func() override {
std::unique_lock<std::mutex> lk(_mutex);
cv.wait(lk, [this](){
_cv.wait(lk, [this]() {
return !_funcs.empty();
});
auto func = _funcs.front();
......@@ -51,8 +51,10 @@ private:
func();
};
private:
std::mutex _mutex;
std::condition_variable cv;
std::condition_variable _cv;
std::queue<std::function<void()>> _funcs;
};
}
......@@ -15,7 +15,7 @@ public:
virtual ~TimerElement() = default;
virtual void start() {
if (running) {
if (_running) {
ERROR() << "start pipe element failed, pipe element is already running!";
return;
}
......@@ -46,14 +46,14 @@ public:
_interval = value * 1000;
}
protected:
int _interval;
uint64_t _last_time = 0;
private:
virtual void thread_func() {
timer_func();
}
virtual void timer_func() {}
protected:
int _interval;
uint64_t _last_time = 0;
};
\ No newline at end of file
......@@ -15,5 +15,6 @@ private:
Messager::publish("timer_trigger");
}
private:
int64_t _precision;
};
\ No newline at end of file
......@@ -68,7 +68,7 @@ public:
// @pattern 可带有通配符的路径,如 /home/nvidia/.../config/ini/*.ini
//
// ini文件的编写规则另见其他说明
static void load_ini_config(const std::string& config_pattern){
static void load_ini_config(const std::string& config_pattern) {
std::vector<std::string> config_files;
if (glob_files(config_pattern, config_files) == true) {
for (auto& config_file : config_files) {
......
......@@ -12,7 +12,7 @@ public:
}
protected:
AppPreference(){
AppPreference() {
};
private:
std::map<std::string, std::string> strings;
......
......@@ -43,7 +43,7 @@
static uint64_t __last_time = 0;\
static double v = 0;\
uint64_t __time = AppUtil::get_current_ms();\
if (__time > __last_time + 3000){ \
if (__time > __last_time + 3000) { \
v = 1000.0 * __count / (__time - __last_time);\
__last_time = __time;\
__count = 0;\
......@@ -54,7 +54,7 @@
static uint64_t __last_time = 0;\
uint64_t __time = AppUtil::get_current_ms();\
bool do_exec = false;\
if (__time > __last_time + interval){ \
if (__time > __last_time + interval) { \
__last_time = __time;\
do_exec = true;\
}\
......@@ -64,7 +64,7 @@
static int64_t __last_time = 0;\
int64_t __time = timpstamp;\
bool do_exec = false;\
if (abs(__time - __last_time) > interval){ \
if (abs(__time - __last_time) > interval) { \
__last_time = __time;\
do_exec = true;\
}\
......@@ -99,7 +99,7 @@ public:
}
// 读取文件path,以string输出,string格式限制大小<2G
static std::string get_file_text(const std::string& path){
static std::string get_file_text(const std::string& path) {
std::ifstream ifs(path);
std::stringstream ss;
if (!ifs.is_open()) {
......@@ -204,7 +204,7 @@ public:
if (fp != nullptr) {
std::vfprintf(fp, format, ap);
if ((file != "stdout") && (file != "stderr")){
if ((file != "stdout") && (file != "stderr")) {
fclose(fp);
fp = nullptr;
}
......@@ -364,7 +364,7 @@ public:
struct dirent *dp;
dirp = opendir(path.c_str());
while ((dp = readdir(dirp)) != NULL) {
if(std::regex_match(dp->d_name, std::regex(filter))){
if(std::regex_match(dp->d_name, std::regex(filter))) {
//忽略 . 和 ..
if ((0 == strcmp(cur_dir, dp->d_name)) || (0 == strcmp(up_dir, dp->d_name)) ) {
continue;
......@@ -373,7 +373,7 @@ public:
}
}
(void) closedir(dirp);
std::sort(file_list.begin(), file_list.end(), [](const std::string& s1, const std::string& s2){
std::sort(file_list.begin(), file_list.end(), [](const std::string& s1, const std::string& s2) {
return s1.compare(s2) < 0;
});
return file_list;
......@@ -451,7 +451,7 @@ public:
std::vector<std::string> files = get_file_list(dir_name, filter);
int file_size = files.size();
for (int index = 0; file_size > num_limited; index++, file_size--){
for (int index = 0; file_size > num_limited; index++, file_size--) {
std::string file = dir_name + "/" + files[index];
remove_rf(file.c_str());
}
......
......@@ -41,7 +41,7 @@ public:
return *this;
}
bool is_finished(){
bool is_finished() {
return file.peek() == EOF;
};
private:
......
......@@ -11,13 +11,13 @@ typedef void (*VoidFunc)();
class DlUtils{
DlUtils();
static std::map<std::string, void*>& get_plug_map(){
static std::map<std::string, void*>& get_plug_map() {
static std::map<std::string, void*> plug_map;
return plug_map;
}
public:
static void initial_path(char* argv[]){
static void initial_path(char* argv[]) {
char absolute_path[10240];
realpath(argv[0], absolute_path);
std::string exe_file(absolute_path);
......@@ -28,12 +28,12 @@ public:
std::string lib_folder = app_folder + "/lib";
std::string config_folder = app_folder + "/config";
auto config_dir = opendir(config_folder.c_str());
if (config_dir){
if (config_dir) {
closedir(config_dir);
} else {
config_folder = root_folder + "/config";
config_dir = opendir(config_folder.c_str());
if (config_dir){
if (config_dir) {
closedir(config_dir);
} else {
FATAL() << "no config dir found!";
......@@ -54,7 +54,7 @@ public:
Diagnose::register_server("load_plugin", [&](const std::string& name) {
std::string message;
if (DlUtils::try_load_plugin(name.c_str(), message)){
if (DlUtils::try_load_plugin(name.c_str(), message)) {
DlUtils::run_plugin(name.c_str());
}
return message;
......@@ -66,13 +66,13 @@ public:
lib_file += name;
lib_file += ".so";
std::stringstream ss;
if (get_plug_map().find(name) != get_plug_map().end()){
if (get_plug_map().find(name) != get_plug_map().end()) {
ss << "load_plugin: plugin " << name << " is already loaded!";
message = ss.str();
return false;
}
auto plugin = dlopen(lib_file.c_str(), RTLD_LAZY);
if (!plugin){
if (!plugin) {
ss << "load_plugin: load library faild, error " << dlerror() << " path:" << lib_file;
message = ss.str();
return false;
......@@ -80,7 +80,7 @@ public:
std::string load_func_name = std::string("load_") + name;
VoidFunc load = (VoidFunc)dlsym(plugin, load_func_name.c_str());
if (!load){
if (!load) {
ss << "load_plugin: load func exec faild, error " << dlerror() << " path:" << load_func_name;
message = ss.str();
return false;
......@@ -92,41 +92,41 @@ public:
return true;
}
static void load_plugin(const char* name){
static void load_plugin(const char* name) {
std::string message;
bool res = try_load_plugin(name, message);
FATAL_IF_NOT(res) << message;
}
static void run_plugin(const char* name){
static void run_plugin(const char* name) {
std::string run_func_name = std::string("run_") + name;
if (get_plug_map().find(name) == get_plug_map().end()){
if (get_plug_map().find(name) == get_plug_map().end()) {
FATAL() << "run_plugin: plugin " << name << " is not loaded!";
}
auto plugin = get_plug_map()[name];
VoidFunc run = (VoidFunc)dlsym(plugin, run_func_name.c_str());
if (!run){
if (!run) {
FATAL() << "run_plugin: run func exec faild, error " << dlerror();
}
run();
}
static void unload_plugin(const char* name){
static void unload_plugin(const char* name) {
std::string unload_func_name = std::string("unload_") + name;
if (get_plug_map().find(name) == get_plug_map().end()){
if (get_plug_map().find(name) == get_plug_map().end()) {
FATAL() << "unload_plugin: plugin " << name << " is not loaded!";
}
auto plugin = get_plug_map()[name];
VoidFunc unload = (VoidFunc)dlsym(plugin, unload_func_name.c_str());
if (!unload){
if (!unload) {
FATAL() << "unload_plugin: unload func exec faild, error " << dlerror();
}
unload();
}
static std::string path_go_back(const std::string& path, int times = 1){
static std::string path_go_back(const std::string& path, int times = 1) {
std::string path_res = path;
for (int i = 0; i < times; i++){
for (int i = 0; i < times; i++) {
path_res = path_res.substr(0, path_res.find_last_of('/'));
}
return path_res;
......@@ -135,7 +135,7 @@ public:
private:
static void initial_log()
{
if (appPref.has_string_key("log.log_level")){
if (appPref.has_string_key("log.log_level")) {
int log_level = std::stoi(appPref.get_string_data("log.log_level"));
bool level_check = log_level < LogLevel::INFO || log_level > LogLevel::FATAL;
log_level = level_check ? LogLevel::DIRECT : log_level;
......
......@@ -25,7 +25,7 @@ public:
}
~TimerCounter() {
if (_enable_cout){
if (_enable_cout) {
long delta = get_time_ms_elapsed();
if (delta > _threshold) {
//INFO() << _flag << " Time elapsed: " << delta << "ms";
......
......@@ -32,42 +32,42 @@ protected:
}
public:
void set_send_timeout(int time_ms){
void set_send_timeout(int time_ms) {
int rc = 0;
rc = zmq_setsockopt (_zmq_socket, ZMQ_SNDTIMEO, &time_ms, sizeof(time_ms));
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
_send_time_out = time_ms;
}
void set_recv_timeout(int time_ms){
void set_recv_timeout(int time_ms) {
int rc = 0;
rc = zmq_setsockopt (_zmq_socket, ZMQ_RCVTIMEO, &time_ms, sizeof(time_ms));
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
_recv_time_out = time_ms;
}
void set_send_queue_size(int size){
void set_send_queue_size(int size) {
int rc = 0;
rc = zmq_setsockopt(_zmq_socket, ZMQ_SNDHWM, &size, sizeof(size));
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
}
void set_send_buffer_size(int size){
void set_send_buffer_size(int size) {
int rc = 0;
rc = zmq_setsockopt(_zmq_socket, ZMQ_SNDBUF, &size, sizeof(size));
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
}
int shutdown(){
int shutdown() {
return zmq_ctx_shutdown(_zmq_context);
}
protected:
void bind(const std::string& url){
void bind(const std::string& url) {
int rc = zmq_bind (_zmq_socket, url.c_str());
FATAL_IF(rc != 0) << "zmq_bind faild! url: " << url << "; errno = " << errno;
}
void connect(const std::string& url){
void connect(const std::string& url) {
int rc = zmq_connect(_zmq_socket, url.c_str());
FATAL_IF(rc != 0) << "zmq_connect faild! url: " << url << "; errno = " << errno;
}
......@@ -81,15 +81,15 @@ protected:
class ZmqPublisher : public ZmqInterface{
public:
ZmqPublisher(){
ZmqPublisher() {
_zmq_socket = zmq_socket(_zmq_context, ZMQ_PUB);
}
void register_publisher(const std::string& url){
void register_publisher(const std::string& url) {
bind(url);
}
int publish(const std::string& data){
int publish(const std::string& data) {
int size = zmq_send(_zmq_socket, (char*)data.c_str(), data.size(), 0);
return size;
}
......@@ -98,21 +98,21 @@ public:
class ZmqSubscriber : public ZmqInterface{
public:
ZmqSubscriber(){
ZmqSubscriber() {
_zmq_socket = zmq_socket(_zmq_context, ZMQ_SUB);
set_filter("");
}
void subscribe(const std::string& url){
void subscribe(const std::string& url) {
connect(url);
}
int receive(std::string& buffer){
int receive(std::string& buffer) {
int size = zmq_recv(_zmq_socket, (char*)buffer.c_str(), buffer.size(), 0);
return size;
}
void set_filter(const std::string& filter){
void set_filter(const std::string& filter) {
int rc = zmq_setsockopt(_zmq_socket, ZMQ_SUBSCRIBE, filter.c_str(), 0);
FATAL_IF(rc != 0) << "zmq_setsockopt faild! rc = " << rc << "; errno = " << errno;
}
......@@ -120,20 +120,20 @@ public:
class ZmqResponser : public ZmqInterface{
public:
ZmqResponser(){
ZmqResponser() {
_zmq_socket = zmq_socket(_zmq_context, ZMQ_REP);
_recv_buffer.resize(200 * 1024);
}
void set_reponse_func(const std::function<std::string(const std::string&)>& func){
void set_reponse_func(const std::function<std::string(const std::string&)>& func) {
_rep_func = func;
}
void register_responser(const std::string& url){
void register_responser(const std::string& url) {
bind(url);
}
void receive_and_respose(){
void receive_and_respose() {
int size = zmq_recv(_zmq_socket, (char*)_recv_buffer.c_str(), _recv_buffer.size(), 0);
if (size > 0 && _rep_func) {
auto rep_str = _rep_func(_recv_buffer.substr(0, size));
......@@ -147,18 +147,18 @@ private:
class ZmqRequester : public ZmqInterface{
public:
ZmqRequester(){
ZmqRequester() {
_zmq_socket = zmq_socket(_zmq_context, ZMQ_REQ);
_recv_buffer.resize(200 * 1024);
}
void register_requester(const std::string& url){
void register_requester(const std::string& url) {
connect(url);
_url_list.push_back(url);
}
bool request_and_receive(const std::string& req_data, std::string& rep_data){
bool request_and_receive(const std::string& req_data, std::string& rep_data) {
int size = zmq_send(_zmq_socket, req_data.data(), req_data.length(), 0);
if (size > 0) {
size = zmq_recv(_zmq_socket, (char*)_recv_buffer.c_str(), _recv_buffer.size(), 0);
......@@ -167,10 +167,10 @@ public:
return size >= 0;
}
void reconnect(){
void reconnect() {
zmq_close(_zmq_socket);
_zmq_socket = zmq_socket(_zmq_context, ZMQ_REQ);
for (auto& url : _url_list){
for (auto& url : _url_list) {
connect(url);
}
set_recv_timeout(_recv_time_out);
......
......@@ -14,7 +14,7 @@
#include <utils/app_config.hpp>
MmapAsyncWrite::MmapAsyncWrite() : PipeElement(true, "MmapAsyncWrite"){
MmapAsyncWrite::MmapAsyncWrite() : PipeElement(true, "MmapAsyncWrite") {
_is_init = false;
}
......
......@@ -7,7 +7,7 @@
#include <utils/app_preference.hpp>
// TODO: 死循环, CPU占用问题
InputResultZmqElement::InputResultZmqElement() : basic::PipeElement(false, "InputResultZmq"){
InputResultZmqElement::InputResultZmqElement() : basic::PipeElement(false, "InputResultZmq") {
auto input_result_url = appPref.get_string_data("zmq.input_result_url");
_zmq_subscriber.subscribe(input_result_url);
_zmq_subscriber.set_recv_timeout(100);
......@@ -21,10 +21,10 @@ void InputResultZmqElement::thread_func()
message::BytesItem bytes;
std::string sub_str = _zmq_buffer.substr(0, size);
bytes.ParseFromString(sub_str);
if (bytes.id() == 3001){
if (bytes.id() == 3001) {
std::string data = bytes.data();
Messager::publish("log_remote", "[remote]" + data);
} else if (bytes.id() == 3002){
} else if (bytes.id() == 3002) {
QnxProcList result;
result.ParseFromString(bytes.data());
Messager::publish("performance_result", result);
......
......@@ -5,15 +5,15 @@ PipeController g_controllor;
extern "C"{
void LOAD_PLUGIN(){
void LOAD_PLUGIN() {
g_controllor.add_element<InputResultZmqElement>();
}
void RUN_PLUGIN(){
void RUN_PLUGIN() {
g_controllor.start();
}
void UNLOAD_PLUGIN(){
void UNLOAD_PLUGIN() {
g_controllor.stop();
g_controllor.wait();
}
......
......@@ -17,7 +17,7 @@
MainLoop main_loop;
TimerTrigger g_timer_trigger;
void stop(int){
void stop(int) {
main_loop.stop();
g_timer_trigger.stop();
}
......@@ -28,22 +28,22 @@ int main(int argc, char *argv[])
std::stringstream ss;
ss << appPref.get_string_data("exe_file") << " ";
main_loop.start();
for (int i = 1; i < argc; i++){
for (int i = 1; i < argc; i++) {
DlUtils::load_plugin(argv[i]);
ss << argv[i] << " ";
}
g_timer_trigger.start();
for (int i = 1; i < argc; i++){
for (int i = 1; i < argc; i++) {
DlUtils::run_plugin(argv[i]);
}
signal(SIGINT, &stop);
main_loop.wait();
g_timer_trigger.wait();
for (int i = 1; i < argc; i++){
for (int i = 1; i < argc; i++) {
DlUtils::unload_plugin(argv[i]);
}
ss << " &";
if (main_loop.is_restart()){
if (main_loop.is_restart()) {
system(ss.str().c_str());
}
}
......@@ -3,9 +3,9 @@
MainLoop::MainLoop() : TimerElement(1000, "MainLoop")
{
Diagnose::register_server("exec_cmd", [&](const std::string& cmd){
Diagnose::register_server("exec_cmd", [&](const std::string& cmd) {
std::string message;
if (cmd == "quit"){
if (cmd == "quit") {
stop();
} else if (cmd == "restart") {
stop();
......
......@@ -5,11 +5,11 @@
#include <message/messager.hpp>
#include <utils/app_preference.hpp>
OutputResultZmqElement::OutputResultZmqElement() : basic::PipeElement(true, "OutputResultZmq"){
OutputResultZmqElement::OutputResultZmqElement() : basic::PipeElement(true, "OutputResultZmq") {
auto output_result_url = appPref.get_string_data("zmq.output_result_url");
_zmq_publisher.register_publisher(output_result_url);
auto log_func = [&](const std::string& log){
auto log_func = [&](const std::string& log) {
auto data_info = std::make_shared<message::BytesItem>();
data_info->set_data(log);
data_info->set_id(3001);
......@@ -39,14 +39,14 @@ OutputResultZmqElement::OutputResultZmqElement() : basic::PipeElement(true, "Out
void OutputResultZmqElement::thread_func()
{
auto buffer_data = _buffered_data.get_data();
for (const auto& data : buffer_data){
for (const auto& data : buffer_data) {
std::string buffer;
data->SerializeToString(&buffer);
_zmq_publisher.publish(buffer);
}
}
OutputResultZmqElement::~OutputResultZmqElement(){
OutputResultZmqElement::~OutputResultZmqElement() {
}
......
......@@ -5,15 +5,15 @@ PipeController g_controllor;
extern "C"{
void LOAD_PLUGIN(){
void LOAD_PLUGIN() {
g_controllor.add_element<OutputResultZmqElement>();
}
void RUN_PLUGIN(){
void RUN_PLUGIN() {
g_controllor.start();
}
void UNLOAD_PLUGIN(){
void UNLOAD_PLUGIN() {
g_controllor.stop();
g_controllor.wait();
}
......
......@@ -2,38 +2,38 @@
#include <stdio.h>
#include <sys/stat.h>
FLogger::FLogger(const std::string &dir) : _dir(dir){
FLogger::FLogger(const std::string &dir) : _dir(dir) {
mkdir(dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
}
void FLogger::error(const std::string &log){
void FLogger::error(const std::string &log) {
_error_size += log.size();
if(check_error_size()){
if(check_error_size()) {
_error_size = log.size();
}
_error_ostream << log;
}
void FLogger::info(const std::string &log){
void FLogger::info(const std::string &log) {
_info_size += log.size();
if(check_info_size()){
if(check_info_size()) {
_info_size = log.size();
}
_info_ostream << log;
}
void FLogger::flush(){
void FLogger::flush() {
_error_ostream.flush();
_info_ostream.flush();
}
void FLogger::remove_oldest_log(){
void FLogger::remove_oldest_log() {
auto files = AppUtil::get_file_list(_dir, ".*\\.log");
if (!files.empty()){
if (!files.empty()) {
std::string file = _dir + "/" + files.front();
std::remove(file.c_str());
bool failed = std::ifstream(file).is_open();
if (failed){
if (failed) {
ERROR() << "remove error log file faild: " << file;
}
}
......@@ -41,29 +41,29 @@ void FLogger::remove_oldest_log(){
// 检查当前ERROR.log文件的写入大小, 超过上限值, 会写入新文件
// 同时, 会检查最多保留的log个数, INFO+ERROR log总个数有一个上限值 _max_num
bool FLogger::check_error_size(){
bool FLogger::check_error_size() {
bool res = _error_size > _max_size;
if (res || !_error_ostream.is_open()){
if (res || !_error_ostream.is_open()) {
_error_ostream = std::ofstream(_dir + "/" + AppUtil::now_date() + "_" + AppUtil::now_time() + "_ERROR.log");
check_file_num();
}
return res;
}
bool FLogger::check_info_size(){
bool FLogger::check_info_size() {
bool res = _info_size > _max_size;
if (res|| !_info_ostream.is_open()){
if (res|| !_info_ostream.is_open()) {
_info_ostream = std::ofstream(_dir + "/" + AppUtil::now_date() + "_" + AppUtil::now_time() + "_INFO.log");
check_file_num();
}
return res;
}
void FLogger::check_file_num(){
void FLogger::check_file_num() {
auto files = AppUtil::get_file_list(_dir, ".*\\.log");
if (files.size() > max_num()){ // 使用 max_num() 并不比直接使用_max_num效率差, 编译优化后, 效率是相同的.
if (files.size() > max_num()) { // 使用 max_num() 并不比直接使用_max_num效率差, 编译优化后, 效率是相同的.
int i = files.size() - max_num();
for (; i > 0; i--){
for (; i > 0; i--) {
remove_oldest_log();
}
}
......
......@@ -9,7 +9,7 @@ class FLogger
{
public:
FLogger(const std::string& dir);
~FLogger(){
~FLogger() {
};
......
......@@ -5,15 +5,15 @@ PipeController g_controllor;
extern "C"{
void LOAD_PLUGIN(){
void LOAD_PLUGIN() {
g_controllor.add_element<ServerLogElement>();
}
void RUN_PLUGIN(){
void RUN_PLUGIN() {
g_controllor.start();
}
void UNLOAD_PLUGIN(){
void UNLOAD_PLUGIN() {
g_controllor.stop();
g_controllor.wait();
}
......
......@@ -4,29 +4,29 @@
ServerLogElement::ServerLogElement() :
TimerElement(1000, "ServerLog"),
_buffer_error(1000), _buffer_info(1000),
_floger(appPref.get_string_data("log.log_dir")){
_floger(appPref.get_string_data("log.log_dir")) {
_floger.set_max_num(AppUtil::safe_stoi(appPref.get_string_data("log.log_num")));
_floger.set_max_size(AppUtil::safe_stoi(appPref.get_string_data("log.log_size")) * 1024);
Messager::subcribe<std::string>("log_debug", [&](const std::string& log){
Messager::subcribe<std::string>("log_debug", [&](const std::string& log) {
info(log);
});
Messager::subcribe<std::string>("log_info", [&](const std::string& log){
Messager::subcribe<std::string>("log_info", [&](const std::string& log) {
info(log);
});
Messager::subcribe<std::string>("log_direct", [&](const std::string& log){
Messager::subcribe<std::string>("log_direct", [&](const std::string& log) {
info(log);
});
Messager::subcribe<std::string>("log_warning", [&](const std::string& log){
Messager::subcribe<std::string>("log_warning", [&](const std::string& log) {
error(log);
});
Messager::subcribe<std::string>("log_error", [&](const std::string& log){
Messager::subcribe<std::string>("log_error", [&](const std::string& log) {
error(log);
});
Messager::subcribe<std::string>("log_fatal", [&](const std::string& log){
Messager::subcribe<std::string>("log_fatal", [&](const std::string& log) {
error(log);
});
Messager::subcribe<std::string>("log_remote", [&](const std::string& log){
Messager::subcribe<std::string>("log_remote", [&](const std::string& log) {
info(log);
});
}
......@@ -48,15 +48,15 @@ void ServerLogElement::info(const std::string &log)
void ServerLogElement::timer_func()
{
if (_buffer_error.is_updated()){
if (_buffer_error.is_updated()) {
auto errors = _buffer_error.get_data();
for (auto error : errors){
for (auto error : errors) {
_floger.error(error);
}
}
if (_buffer_info.is_updated()){
if (_buffer_info.is_updated()) {
auto infos = _buffer_info.get_data();
for (auto info : infos){
for (auto info : infos) {
_floger.info(info);
}
}
......
......@@ -14,20 +14,22 @@ public:
public:
static void initial();
const static ServerLogElement& instance(){
const static ServerLogElement& instance() {
static ServerLogElement l_instance;
return l_instance;
}
private:
void error(const std::string& log);
void info(const std::string& log);
private:
virtual void timer_func() override;
private:
DoubleBufferedVector<std::string> _buffer_error;
DoubleBufferedVector<std::string> _buffer_info;
FLogger _floger;
private:
virtual void timer_func() override;
};
......@@ -5,15 +5,15 @@
PipeController g_controllor;
extern "C"{
void LOAD_PLUGIN(){
void LOAD_PLUGIN() {
g_controllor.add_element<ServerProcTask>();
}
void RUN_PLUGIN(){
void RUN_PLUGIN() {
g_controllor.start();
}
void UNLOAD_PLUGIN(){
void UNLOAD_PLUGIN() {
g_controllor.stop();
g_controllor.wait();
}
......
......@@ -2,23 +2,23 @@
#include <diag/diagnose.hpp>
#include <utils/app_preference.hpp>
void ProcUtils::get_proc_list(QnxProcList &list){
void ProcUtils::get_proc_list(QnxProcList &list) {
parse_procfs(list);
}
void ProcUtils::get_proc_list_str(std::string &list_str){
void ProcUtils::get_proc_list_str(std::string &list_str) {
QnxProcList list;
parse_procfs(list);
proc_list_to_string(list, list_str, -1, -1);
}
void ProcUtils::get_proc_list_str_byid(std::string &list_str, int id){
void ProcUtils::get_proc_list_str_byid(std::string &list_str, int id) {
QnxProcList list;
parse_procfs(list);
proc_list_to_string(list, list_str, id, -1);
}
void ProcUtils::get_current_proc_list_str(std::string &list_str){
void ProcUtils::get_current_proc_list_str(std::string &list_str) {
QnxProcList list;
parse_procfs(list);
proc_list_to_string(list, list_str, getpid(), -1);
......
......@@ -6,7 +6,7 @@
#include <utils/app_preference.hpp>
#include <utils/data_recoder.hpp>
ServerProcTask::ServerProcTask() : TimerElement(1000, "ServerProcTask"){
ServerProcTask::ServerProcTask() : TimerElement(1000, "ServerProcTask") {
_proc_log_dir = appPref.get_string_data("log.log_dir") + "/proc/";
// Linux 中创建文件夹的函数,但是没有找到对应 -p 的参数
if (0 != mkdir(_proc_log_dir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) {
......@@ -16,19 +16,19 @@ ServerProcTask::ServerProcTask() : TimerElement(1000, "ServerProcTask"){
_proc_log = std::stod(appPref.get_string_data("log.proc_log")) > 0;
_proc_remote = std::stod(appPref.get_string_data("proc.is_remote")) > 0;
Diagnose::register_server("get_proc", [&](const std::string&){
Diagnose::register_server("get_proc", [&](const std::string&) {
return _proc_text.get_data();
});
Diagnose::register_server("get_proc_pb", [&](const std::string&){
Diagnose::register_server("get_proc_pb", [&](const std::string&) {
return _proc_list.get_data().SerializeAsString();
});
if (_proc_remote){
if (_proc_remote) {
set_interval(get_interval() / 2);
Messager::subcribe<QnxProcList>(
"performance_result",
[this](const QnxProcList& data){
[this](const QnxProcList& data) {
_proc_list = data;
});
}
......@@ -39,8 +39,8 @@ void ServerProcTask::timer_func()
static int i = 0;
std::cout << "-------------------------------: " << i++ << std::endl;
QnxProcList list;
if (_proc_remote){
if (!_proc_list.is_updated()){
if (_proc_remote) {
if (!_proc_list.is_updated()) {
return;
}
list = _proc_list.get_data();
......@@ -52,7 +52,7 @@ void ServerProcTask::timer_func()
std::string list_str;
proc_list_to_string(list, list_str, -1, -1);
_proc_text = list_str;
if (_proc_log){
if (_proc_log) {
// TODO: 修改命名规则,带有':'的文件名,不方便处理
static std::string log_path = _proc_log_dir + AppUtil::now_date()
+ "_" + AppUtil::now_time() + ".proc";
......
......@@ -11,6 +11,8 @@ public:
// TimerElement interface
private:
void timer_func();
private:
DoubleBufferData<std::string> _proc_text;
DoubleBufferData<QnxProcList> _proc_list;
std::string _proc_log_dir;
......
......@@ -11,8 +11,8 @@
std::string thread_name_filter(const std::string& topic)
{
std::string rostopic;
for (auto& c : topic){
if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '_'){
for (auto& c : topic) {
if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '_') {
rostopic += c;
}
}
......@@ -399,7 +399,7 @@ void memory_utilization(float& fmemtotal, float& fmemfree, float& fmemvalid) {
}
// 获取总的CPU使用率
float total_cpu_usage(){
float total_cpu_usage() {
std::ifstream stream("/proc/stat");
int user = 0;
int nice = 0;
......@@ -423,7 +423,7 @@ float total_cpu_usage(){
}
// 获取pid进程的tid线程的调度优先级和运行的cpu id
void get_priority_and_core(pid_t pid, pid_t tid, int & prio, int& core){
void get_priority_and_core(pid_t pid, pid_t tid, int & prio, int& core) {
std::string line;
std::vector<std::string> tokens;
std::string stat_file;
......@@ -433,7 +433,7 @@ void get_priority_and_core(pid_t pid, pid_t tid, int & prio, int& core){
std::getline(stream, line);
std::istringstream linestream(line);
std::string token;
while(linestream >> token){
while(linestream >> token) {
tokens.push_back(token);
}
}
......@@ -451,7 +451,7 @@ float cpu_utilization(pid_t pid, pid_t tid = -1) {
std::vector<std::string> tokens;
std::string stat_file;
bool is_thread = (tid > 0);
if (is_thread){
if (is_thread) {
stat_file = "/proc/" + std::to_string(pid) + "/task/" + std::to_string(tid) + "/stat";
}else {
stat_file = "/proc/" + std::to_string(pid) + "/stat";
......@@ -461,7 +461,7 @@ float cpu_utilization(pid_t pid, pid_t tid = -1) {
std::getline(stream, line);
std::istringstream linestream(line);
std::string token;
while(linestream >> token){
while(linestream >> token) {
tokens.push_back(token);
}
stream.close();
......@@ -472,7 +472,7 @@ float cpu_utilization(pid_t pid, pid_t tid = -1) {
long stime = stol(tokens[14]);
long ustime = utime + stime;
long time = AppUtil::get_current_ms();
if (is_thread){
if (is_thread) {
static std::map<pid_t, long> last_ustime_map;
static std::map<pid_t, long> last_time_map;
cpu_usage =0.1f * (last_ustime_map[tid] - ustime) / (last_time_map[tid] - time) * sysconf(_SC_CLK_TCK);
......@@ -505,7 +505,7 @@ bool parse_procfs(QnxProcList& info) {
while ((dirent = readdir(dir)) != nullptr) {
if (isdigit(dirent->d_name[0])) {
int32_t pid = atoi(dirent->d_name);
if (pid <= 0){
if (pid <= 0) {
WARNING() << "pid <= 0: pid " << pid;
continue;
}
......@@ -629,7 +629,7 @@ bool parse_procfs(QnxProcList& info, pid_t pid) {
std::sort(
info.mutable_proc_list()->begin(),
info.mutable_proc_list()->end(),
[](const QnxProcStatus& a, const QnxProcStatus& b){
[](const QnxProcStatus& a, const QnxProcStatus& b) {
return a.cpu_used() > b.cpu_used();
});
......@@ -681,7 +681,7 @@ void proc_list_to_string(const QnxProcList& info, std::string& proc_str, pid_t p
int64_t i = 0;
auto proc_list = info.proc_list();
for (auto iter = proc_list.begin(); iter != proc_list.end(); ++iter) {
if (pid > 0 && iter->pid() != pid){
if (pid > 0 && iter->pid() != pid) {
continue;
}
sprintf(buffer,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册