提交 99cf013b 编写于 作者: Y youngowlf

2.0.4 release.

Support boost-1.66
Fix race condition when controlling the number of service thread at runtime.
Fix compiler warnings.
Add verification for object allocation.
Add log if stop_timer failed.
Hide variable server in server_socket_base, use get_server function instead.
A boolean value instead of void will be returned from set_timer and update_timer_info.
Move function show_info from server_socket_base and client_socket_base to tcp::show_info.
上级 401b4da3
...@@ -6,21 +6,22 @@ st_asio_wrapper is an asynchronous c/s framework based on Boost.Asio, besides al ...@@ -6,21 +6,22 @@ st_asio_wrapper is an asynchronous c/s framework based on Boost.Asio, besides al
1. Based on message just like UDP with several couple of build-in packer and unpacker;</br> 1. Based on message just like UDP with several couple of build-in packer and unpacker;</br>
2. Support packer and unpacker customization, and replacing packer and unpacker at run-time;</br> 2. Support packer and unpacker customization, and replacing packer and unpacker at run-time;</br>
3. Automatically reconnect to the server after link broken;</br> 3. Automatically reconnect to the server after link broken;</br>
4. Widely support timers;</br> 4. Support object pool, object reusing and restoration;</br>
5. Support object pool, object reusing;</br> 5. Worker thread management;</br>
6. Support message buffer;</br> 6. Support message buffer;</br>
7. Support ssl;</br> 7. Widely support timers;</br>
8. Support TCP/UDP.</br> 8. Support TCP/UDP;</br>
9. Support ssl;</br>
Quick start: Quick start:
- -
### server: ### server:
Derive your own socket from `server_socket_base`, you must at least re-write the `on_msg` or `on_msg_handle` virtual function and handle messages in it;</br> Derive your own socket from `server_socket_base`, you must at least re-write the `on_msg` or `on_msg_handle` virtual function and handle messages in it;</br>
Create a `service_pump` object, create a `server_base` object, call `service_pump::start_service`;</br> Create a `service_pump` object, create a `server_base<your_socket>` object, call `service_pump::start_service`;</br>
Call `server_socket_base::send_msg` when you have messages need to send.</br> Call `server_socket_base::send_msg` when you have messages need to send.</br>
### client: ### client:
Derive your own socket from `client_socket_base`, you must at least re-write the `on_msg` or `on_msg_handle` virtual function and handle messages in it;</br> Derive your own socket from `client_socket_base`, you must at least re-write the `on_msg` or `on_msg_handle` virtual function and handle messages in it;</br>
Create a `service_pump` object, create a `tcp::client_base` object, set server address via `client_socket_base::set_server_addr`, call `service_pump::start_service`;</br> Create a `service_pump` object, create a `multi_client_base<your_socket>` object, add some socket via `multi_client_base::add_socket`, call `service_pump::start_service`;</br>
Call `client_socket_base::send_msg` when you have messages need to send.</br> Call `client_socket_base::send_msg` when you have messages need to send.</br>
Directory structure: Directory structure:
......
module = concurrent_client module = concurrent_client
ext_libs = -lboost_timer -lboost_chrono ext_libs = -lboost_timer
include ../config.mk include ../config.mk
...@@ -13,18 +13,19 @@ else ...@@ -13,18 +13,19 @@ else
dir = release dir = release
endif endif
cflag += -DBOOST_ASIO_NO_DEPRECATED cflag += -DBOOST_ASIO_NO_DEPRECATED
common_libs = -lboost_system -lboost_thread -lboost_chrono
kernel = ${shell uname -s} kernel = ${shell uname -s}
ifeq (${kernel}, SunOS) ifeq (${kernel}, SunOS)
cflag += -pthreads ${ext_cflag} ${boost_include_dir} cflag += -pthreads ${ext_cflag} ${boost_include_dir}
lflag += -pthreads -lsocket -lnsl ${boost_lib_dir} -lboost_system -lboost_thread ${ext_libs} lflag += -pthreads -lsocket -lnsl ${boost_lib_dir} ${common_libs} ${ext_libs}
else else
ifeq (${kernel}, FreeBSD) ifeq (${kernel}, FreeBSD)
cflag += -pthread ${ext_cflag} ${boost_include_dir} cflag += -pthread ${ext_cflag} ${boost_include_dir}
lflag += -pthread ${boost_lib_dir} -lboost_system -lboost_thread -lboost_atomic ${ext_libs} lflag += -pthread ${boost_lib_dir} ${common_libs} -lboost_atomic ${ext_libs}
else # here maybe still have other kernels need to be separated out else # here maybe still have other kernels need to be separated out
cflag += -pthread ${ext_cflag} ${boost_include_dir} cflag += -pthread ${ext_cflag} ${boost_include_dir}
lflag += -pthread ${boost_lib_dir} -lboost_system -lboost_thread ${ext_libs} lflag += -pthread ${boost_lib_dir} ${common_libs} ${ext_libs}
endif endif
endif endif
......
...@@ -355,7 +355,7 @@ void send_msg_concurrently(echo_client& client, size_t send_thread_num, size_t m ...@@ -355,7 +355,7 @@ void send_msg_concurrently(echo_client& client, size_t send_thread_num, size_t m
begin_time.stop(); begin_time.stop();
double used_time = (double) begin_time.elapsed().wall / 1000000000; double used_time = (double) begin_time.elapsed().wall / 1000000000;
printf(" finished in %f seconds, speed: %f(*2) MBps.\n", used_time, total_msg_bytes / used_time / 1024 / 1024); printf(" finished in %f seconds, TPS: %f(*2), speed: %f(*2) MBps.\n", used_time, link_num * msg_num / used_time, total_msg_bytes / used_time / 1024 / 1024);
} }
static bool is_testing; static bool is_testing;
......
module = echo_client module = echo_client
ext_libs = -lboost_timer -lboost_chrono ext_libs = -lboost_timer
include ../config.mk include ../config.mk
...@@ -108,7 +108,7 @@ protected: ...@@ -108,7 +108,7 @@ protected:
{ {
//the type of tcp::server_socket_base::server now can be controlled by derived class(echo_socket), //the type of tcp::server_socket_base::server now can be controlled by derived class(echo_socket),
//which is actually i_echo_server, so, we can invoke i_echo_server::test virtual function. //which is actually i_echo_server, so, we can invoke i_echo_server::test virtual function.
server.test(); get_server().test();
echo_socket_base::on_recv_error(ec); echo_socket_base::on_recv_error(ec);
} }
...@@ -171,14 +171,9 @@ public: ...@@ -171,14 +171,9 @@ public:
normal_socket(i_server& server_) : normal_socket_base(server_) {} normal_socket(i_server& server_) : normal_socket_base(server_) {}
protected: protected:
virtual bool do_start() //demo client needs heartbeat (macro ST_ASIO_HEARTBEAT_INTERVAL been defined), pleae note that the interval (here is 5) must be equal to
{ //macro ST_ASIO_HEARTBEAT_INTERVAL defined in demo client, and macro ST_ASIO_HEARTBEAT_MAX_ABSENCE must has the same value as demo client's.
//demo client needs heartbeat (macro ST_ASIO_HEARTBEAT_INTERVAL been defined), pleae note that the interval (here is 5) must be equal to virtual void on_connect() {start_heartbeat(5);}
//macro ST_ASIO_HEARTBEAT_INTERVAL defined in demo client, and macro ST_ASIO_HEARTBEAT_MAX_ABSENCE must has the same value as demo client's.
start_heartbeat(5);
return normal_socket_base::do_start();
}
}; };
#endif #endif
......
module = file_client module = file_client
ext_cflag = -D_FILE_OFFSET_BITS=64 ext_cflag = -D_FILE_OFFSET_BITS=64
ext_libs = -lboost_timer -lboost_chrono ext_libs = -lboost_timer
include ../config.mk include ../config.mk
...@@ -120,7 +120,7 @@ void file_socket::handle_msg(out_msg_ctype& msg) ...@@ -120,7 +120,7 @@ void file_socket::handle_msg(out_msg_ctype& msg)
{ {
boost::uint_fast64_t id; boost::uint_fast64_t id;
memcpy(&id, boost::next(msg.data(), ORDER_LEN), sizeof(boost::uint_fast64_t)); memcpy(&id, boost::next(msg.data(), ORDER_LEN), sizeof(boost::uint_fast64_t));
server.restore_socket(ST_THIS shared_from_this(), id); get_server().restore_socket(ST_THIS shared_from_this(), id);
} }
default: default:
break; break;
......
...@@ -93,8 +93,8 @@ private: ...@@ -93,8 +93,8 @@ private:
atomic_type& atomic; atomic_type& atomic;
}; };
class service_pump;
class object; class object;
class service_pump;
namespace tcp namespace tcp
{ {
class i_server class i_server
......
...@@ -331,6 +331,31 @@ ...@@ -331,6 +331,31 @@
* *
* REPLACEMENTS: * REPLACEMENTS:
* *
* ===============================================================
* 2017.12.25 version 2.0.4
*
* SPECIAL ATTENTION (incompatible with old editions):
*
* HIGHLIGHT:
* Support boost-1.66
*
* FIX:
* Fix race condition when controlling the number of service thread at runtime.
* Fix compiler warnings.
*
* ENHANCEMENTS:
* Add verification for object allocation.
* Add log if stop_timer failed.
* Hide variable server in server_socket_base, use get_server function instead.
* A boolean value instead of void will be returned from set_timer and update_timer_info.
*
* DELETION:
*
* REFACTORING:
* Move function show_info from server_socket_base and client_socket_base to tcp::show_info.
*
* REPLACEMENTS:
*
*/ */
#ifndef ST_ASIO_CONFIG_H_ #ifndef ST_ASIO_CONFIG_H_
...@@ -340,8 +365,8 @@ ...@@ -340,8 +365,8 @@
# pragma once # pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ST_ASIO_VER 20003 //[x]xyyzz -> [x]x.[y]y.[z]z #define ST_ASIO_VER 20004 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ST_ASIO_VERSION "2.0.3" #define ST_ASIO_VERSION "2.0.4"
//boost and compiler check //boost and compiler check
#ifdef _MSC_VER #ifdef _MSC_VER
...@@ -583,7 +608,7 @@ namespace boost {namespace asio {typedef io_service io_context;}} ...@@ -583,7 +608,7 @@ namespace boost {namespace asio {typedef io_service io_context;}}
//#define ST_ASIO_REUSE_SSL_STREAM //#define ST_ASIO_REUSE_SSL_STREAM
//if you need ssl::client_socket_base to be able to reconnect the server, or to open object pool in ssl::object_pool, you must define this macro. //if you need ssl::client_socket_base to be able to reconnect the server, or to open object pool in ssl::object_pool, you must define this macro.
//I tried many ways, onle one way can make boost::asio::ssl::stream reusable, which is: //I tried many ways, only one way can make boost::asio::ssl::stream reusable, which is:
// don't call any shutdown functions of boost::asio::ssl::stream, just call boost::asio::ip::tcp::socket's shutdown function, // don't call any shutdown functions of boost::asio::ssl::stream, just call boost::asio::ip::tcp::socket's shutdown function,
// this seems not a normal procedure, but it works, I believe that asio's defect caused this problem. // this seems not a normal procedure, but it works, I believe that asio's defect caused this problem.
......
...@@ -80,8 +80,10 @@ public: ...@@ -80,8 +80,10 @@ public:
bool try_dequeue(T& item) {typename Lockable::lock_guard lock(*this); return try_dequeue_(item);} bool try_dequeue(T& item) {typename Lockable::lock_guard lock(*this); return try_dequeue_(item);}
//not thread safe //not thread safe
bool enqueue_(const T& item) {ST_THIS emplace_back(item); return true;} bool enqueue_(const T& item)
bool enqueue_(T& item) {ST_THIS emplace_back(); ST_THIS back().swap(item); return true;} //after this, item will becomes empty, please note. {try {ST_THIS emplace_back(item);} catch (const std::exception& e) {unified_out::error_out("cannot hold more objects (%s)", e.what()); return false;} return true;}
bool enqueue_(T& item) //after this, item will becomes empty, please note.
{try {ST_THIS emplace_back();} catch (const std::exception& e) {unified_out::error_out("cannot hold more objects (%s)", e.what()); return false;} ST_THIS back().swap(item); return true;}
void move_items_in_(boost::container::list<T>& can) {ST_THIS splice(ST_THIS end(), can);} void move_items_in_(boost::container::list<T>& can) {ST_THIS splice(ST_THIS end(), can);}
bool try_dequeue_(T& item) {if (ST_THIS empty()) return false; item.swap(ST_THIS front()); ST_THIS pop_front(); return true;} bool try_dequeue_(T& item) {if (ST_THIS empty()) return false; item.swap(ST_THIS front()); ST_THIS pop_front(); return true;}
}; };
......
...@@ -54,6 +54,9 @@ protected: ...@@ -54,6 +54,9 @@ protected:
{ {
assert(object_ptr && !object_ptr->is_equal_to(-1)); assert(object_ptr && !object_ptr->is_equal_to(-1));
if (!object_ptr)
return false;
boost::lock_guard<boost::mutex> lock(object_can_mutex); boost::lock_guard<boost::mutex> lock(object_can_mutex);
return object_can.size() < max_size_ ? object_can.emplace(object_ptr->id(), object_ptr).second : false; return object_can.size() < max_size_ ? object_can.emplace(object_ptr->id(), object_ptr).second : false;
} }
...@@ -71,7 +74,7 @@ protected: ...@@ -71,7 +74,7 @@ protected:
if (exist) if (exist)
{ {
boost::lock_guard<boost::mutex> lock(invalid_object_can_mutex); boost::lock_guard<boost::mutex> lock(invalid_object_can_mutex);
invalid_object_can.emplace_back(object_ptr); try {invalid_object_can.emplace_back(object_ptr);} catch (const std::exception& e) {unified_out::error_out("cannot hold more objects (%s)", e.what());}
} }
return exist; return exist;
...@@ -114,6 +117,20 @@ protected: ...@@ -114,6 +117,20 @@ protected:
return old_object_ptr; return old_object_ptr;
} }
#define CREATE_OBJECT_1_ARG(first_way) \
BOOST_AUTO(object_ptr, first_way()); \
if (!object_ptr) \
try {object_ptr = boost::make_shared<Object>(arg);} catch (const std::exception& e) {unified_out::error_out("cannot create object (%s)", e.what());} \
init_object(object_ptr); \
return object_ptr;
#define CREATE_OBJECT_2_ARG(first_way) \
BOOST_AUTO(object_ptr, first_way()); \
if (!object_ptr) \
try {object_ptr = boost::make_shared<Object>(arg1, arg2);} catch (const std::exception& e) {unified_out::error_out("cannot create object (%s)", e.what());} \
init_object(object_ptr); \
return object_ptr;
#if defined(ST_ASIO_REUSE_OBJECT) && !defined(ST_ASIO_RESTORE_OBJECT) #if defined(ST_ASIO_REUSE_OBJECT) && !defined(ST_ASIO_RESTORE_OBJECT)
object_type reuse_object() object_type reuse_object()
{ {
...@@ -124,46 +141,14 @@ protected: ...@@ -124,46 +141,14 @@ protected:
return object_ptr; return object_ptr;
} }
template<typename Arg> template<typename Arg> object_type create_object(Arg& arg) {CREATE_OBJECT_1_ARG(reuse_object);}
object_type create_object(Arg& arg) template<typename Arg1, typename Arg2> object_type create_object(Arg1& arg1, Arg2& arg2) {CREATE_OBJECT_2_ARG(reuse_object);}
{
BOOST_AUTO(object_ptr, reuse_object());
if (!object_ptr)
object_ptr = boost::make_shared<Object>(arg);
init_object(object_ptr);
return object_ptr;
}
template<typename Arg1, typename Arg2>
object_type create_object(Arg1& arg1, Arg2& arg2)
{
BOOST_AUTO(object_ptr, reuse_object());
if (!object_ptr)
object_ptr = boost::make_shared<Object>(arg1, arg2);
init_object(object_ptr);
return object_ptr;
}
#else #else
template<typename Arg> template<typename Arg> object_type create_object(Arg& arg) {CREATE_OBJECT_1_ARG(object_type);}
object_type create_object(Arg& arg) template<typename Arg1, typename Arg2> object_type create_object(Arg1& arg1, Arg2& arg2) {CREATE_OBJECT_2_ARG(object_type);}
{
BOOST_AUTO(object_ptr, boost::make_shared<Object>(arg));
init_object(object_ptr);
return object_ptr;
}
template<typename Arg1, typename Arg2>
object_type create_object(Arg1& arg1, Arg2& arg2)
{
BOOST_AUTO(object_ptr, boost::make_shared<Object>(arg1, arg2));
init_object(object_ptr);
return object_ptr;
}
#endif #endif
object_type create_object() {return create_object(boost::ref(sp));} object_type create_object() {return create_object(boost::ref(get_service_pump()));}
public: public:
//to configure unordered_set(for example, set factor or reserved size), not thread safe, so must be called before service_pump startup. //to configure unordered_set(for example, set factor or reserved size), not thread safe, so must be called before service_pump startup.
...@@ -256,7 +241,7 @@ public: ...@@ -256,7 +241,7 @@ public:
for (BOOST_AUTO(iter, object_can.begin()); iter != object_can.end();) for (BOOST_AUTO(iter, object_can.begin()); iter != object_can.end();)
if (iter->second->obsoleted()) if (iter->second->obsoleted())
{ {
objects.emplace_back(iter->second); try {objects.emplace_back(iter->second);} catch (const std::exception& e) {unified_out::error_out("cannot hold more objects (%s)", e.what());}
iter = object_can.erase(iter); iter = object_can.erase(iter);
} }
else else
......
...@@ -63,11 +63,11 @@ public: ...@@ -63,11 +63,11 @@ public:
service_pump() : started(false) service_pump() : started(false)
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
, real_thread_num(0), del_thread_num(0), del_thread_req(false) , real_thread_num(0), del_thread_num(0)
#endif #endif
#ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE #ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE
#if BOOST_ASIO_VERSION >= 101100 #if BOOST_ASIO_VERSION >= 101100
, work(boost::asio::make_work_guard(boost::ref(*this))) , work(get_executor())
#else #else
, work(boost::make_shared<boost::asio::io_service::work>(boost::ref(*this))) , work(boost::make_shared<boost::asio::io_service::work>(boost::ref(*this)))
#endif #endif
...@@ -171,8 +171,8 @@ public: ...@@ -171,8 +171,8 @@ public:
void add_service_thread(int thread_num) {for (int i = 0; i < thread_num; ++i) service_threads.create_thread(boost::bind(&service_pump::run, this));} void add_service_thread(int thread_num) {for (int i = 0; i < thread_num; ++i) service_threads.create_thread(boost::bind(&service_pump::run, this));}
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
void del_service_thread(int thread_num) {if (thread_num > 0) {del_thread_num.fetch_add(thread_num, boost::memory_order_relaxed); del_thread_req = true;}} void del_service_thread(int thread_num) {if (thread_num > 0) {del_thread_num += thread_num;}}
int service_thread_num() const {return real_thread_num.load(boost::memory_order_relaxed);} int service_thread_num() const {return real_thread_num;}
#endif #endif
protected: protected:
...@@ -222,7 +222,6 @@ protected: ...@@ -222,7 +222,6 @@ protected:
size_t run() size_t run()
{ {
size_t n = 0; size_t n = 0;
boost::unique_lock<boost::mutex> lock(del_thread_mutex, boost::defer_lock);
std::stringstream os; std::stringstream os;
os << "service thread[" << boost::this_thread::get_id() << "] begin."; os << "service thread[" << boost::this_thread::get_id() << "] begin.";
...@@ -230,21 +229,17 @@ protected: ...@@ -230,21 +229,17 @@ protected:
++real_thread_num; ++real_thread_num;
while (true) while (true)
{ {
if (del_thread_req) if (del_thread_num > 0)
{ {
if (--del_thread_num >= 0) if (--del_thread_num >= 0)
{ {
lock.lock(); if (--real_thread_num > 0) //forbid to stop all service thread
if (real_thread_num > 1)
break; break;
else else
lock.unlock(); ++real_thread_num;
} }
else else
{
del_thread_req = false;
++del_thread_num; ++del_thread_num;
}
} }
//we cannot always decrease service thread timely (because run_one can block). //we cannot always decrease service thread timely (because run_one can block).
...@@ -257,9 +252,11 @@ protected: ...@@ -257,9 +252,11 @@ protected:
if (this_n > 0) if (this_n > 0)
n += this_n; //n can overflow, please note. n += this_n; //n can overflow, please note.
else else
{
--real_thread_num;
break; break;
}
} }
--real_thread_num;
os.str(""); os.str("");
os << "service thread[" << boost::this_thread::get_id() << "] end."; os << "service thread[" << boost::this_thread::get_id() << "] end.";
unified_out::info_out(os.str().data()); unified_out::info_out(os.str().data());
...@@ -293,12 +290,10 @@ protected: ...@@ -293,12 +290,10 @@ protected:
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
atomic_int_fast32_t real_thread_num; atomic_int_fast32_t real_thread_num;
atomic_int_fast32_t del_thread_num; atomic_int_fast32_t del_thread_num;
boost::mutex del_thread_mutex;
bool del_thread_req;
#endif #endif
#ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE #ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE
#if ASIO_VERSION >= 101100 #if BOOST_ASIO_VERSION >= 101100
boost::asio::executor_work_guard<executor_type> work; boost::asio::executor_work_guard<executor_type> work;
#else #else
boost::shared_ptr<boost::asio::io_service::work> work; boost::shared_ptr<boost::asio::io_service::work> work;
......
...@@ -366,7 +366,7 @@ protected: ...@@ -366,7 +366,7 @@ protected:
{ {
scope_atomic_lock<> lock(send_atomic); scope_atomic_lock<> lock(send_atomic);
if (!sending && lock.locked()) if (!sending && lock.locked())
return sending = true; return (sending = true);
} }
return false; return false;
...@@ -378,7 +378,7 @@ protected: ...@@ -378,7 +378,7 @@ protected:
{ {
scope_atomic_lock<> lock(dispatch_atomic); scope_atomic_lock<> lock(dispatch_atomic);
if (!dispatching && lock.locked()) if (!dispatching && lock.locked())
return dispatching = true; return (dispatching = true);
} }
return false; return false;
......
...@@ -53,8 +53,13 @@ public: ...@@ -53,8 +53,13 @@ public:
typename Pool::object_type add_socket(unsigned short port, const std::string& ip = ST_ASIO_SERVER_IP) typename Pool::object_type add_socket(unsigned short port, const std::string& ip = ST_ASIO_SERVER_IP)
{ {
BOOST_AUTO(socket_ptr, ST_THIS create_object()); BOOST_AUTO(socket_ptr, ST_THIS create_object());
socket_ptr->set_server_addr(port, ip); if (!socket_ptr)
return ST_THIS add_socket(socket_ptr, false) ? socket_ptr : typename Pool::object_type(); return typename Pool::object_type();
else
{
socket_ptr->set_server_addr(port, ip);
return ST_THIS add_socket(socket_ptr, false) ? socket_ptr : typename Pool::object_type();
}
} }
/////////////////////////////////////////////////// ///////////////////////////////////////////////////
......
...@@ -62,7 +62,7 @@ public: ...@@ -62,7 +62,7 @@ public:
void force_shutdown(bool reconnect = false) void force_shutdown(bool reconnect = false)
{ {
if (super::FORCE_SHUTTING_DOWN != ST_THIS status) if (super::FORCE_SHUTTING_DOWN != ST_THIS status)
show_info("client link:", "been shut down."); ST_THIS show_info("client link:", "been shut down.");
need_reconnect = reconnect; need_reconnect = reconnect;
super::force_shutdown(); super::force_shutdown();
...@@ -77,28 +77,12 @@ public: ...@@ -77,28 +77,12 @@ public:
if (ST_THIS is_broken()) if (ST_THIS is_broken())
return force_shutdown(reconnect); return force_shutdown(reconnect);
else if (!ST_THIS is_shutting_down()) else if (!ST_THIS is_shutting_down())
show_info("client link:", "being shut down gracefully."); ST_THIS show_info("client link:", "being shut down gracefully.");
need_reconnect = reconnect; need_reconnect = reconnect;
super::graceful_shutdown(sync); super::graceful_shutdown(sync);
} }
void show_info(const char* head, const char* tail) const
{
boost::system::error_code ec;
BOOST_AUTO(ep, ST_THIS lowest_layer().local_endpoint(ec));
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().data(), ep.port(), tail);
}
void show_info(const char* head, const char* tail, const boost::system::error_code& ec) const
{
boost::system::error_code ec2;
BOOST_AUTO(ep, ST_THIS lowest_layer().local_endpoint(ec2));
if (!ec2)
unified_out::info_out("%s %s:%hu %s (%d %s)", head, ep.address().to_string().data(), ep.port(), tail, ec.value(), ec.message().data());
}
protected: protected:
virtual bool do_start() //connect virtual bool do_start() //connect
{ {
...@@ -122,7 +106,7 @@ protected: ...@@ -122,7 +106,7 @@ protected:
virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_shutdown();} virtual void on_unpack_error() {unified_out::info_out("can not unpack msg."); force_shutdown();}
virtual void on_recv_error(const boost::system::error_code& ec) virtual void on_recv_error(const boost::system::error_code& ec)
{ {
show_info("client link:", "broken/been shut down", ec); ST_THIS show_info("client link:", "broken/been shut down", ec);
force_shutdown(ST_THIS is_shutting_down() ? need_reconnect : prepare_reconnect(ec) >= 0); force_shutdown(ST_THIS is_shutting_down() ? need_reconnect : prepare_reconnect(ec) >= 0);
ST_THIS status = super::BROKEN; ST_THIS status = super::BROKEN;
...@@ -131,7 +115,7 @@ protected: ...@@ -131,7 +115,7 @@ protected:
virtual void on_async_shutdown_error() {force_shutdown(need_reconnect);} virtual void on_async_shutdown_error() {force_shutdown(need_reconnect);}
virtual bool on_heartbeat_error() virtual bool on_heartbeat_error()
{ {
show_info("client link:", "broke unexpectedly."); ST_THIS show_info("client link:", "broke unexpectedly.");
force_shutdown(ST_THIS is_shutting_down() ? need_reconnect : prepare_reconnect(boost::system::error_code(boost::asio::error::network_down)) >= 0); force_shutdown(ST_THIS is_shutting_down() ? need_reconnect : prepare_reconnect(boost::system::error_code(boost::asio::error::network_down)) >= 0);
return false; return false;
} }
......
...@@ -42,7 +42,7 @@ public: ...@@ -42,7 +42,7 @@ public:
void force_shutdown() void force_shutdown()
{ {
if (super::FORCE_SHUTTING_DOWN != ST_THIS status) if (super::FORCE_SHUTTING_DOWN != ST_THIS status)
show_info("server link:", "been shut down."); ST_THIS show_info("server link:", "been shut down.");
super::force_shutdown(); super::force_shutdown();
} }
...@@ -56,33 +56,20 @@ public: ...@@ -56,33 +56,20 @@ public:
if (ST_THIS is_broken()) if (ST_THIS is_broken())
return force_shutdown(); return force_shutdown();
else if (!ST_THIS is_shutting_down()) else if (!ST_THIS is_shutting_down())
show_info("server link:", "being shut down gracefully."); ST_THIS show_info("server link:", "being shut down gracefully.");
super::graceful_shutdown(sync); super::graceful_shutdown(sync);
} }
void show_info(const char* head, const char* tail) const
{
boost::system::error_code ec;
BOOST_AUTO(ep, ST_THIS lowest_layer().remote_endpoint(ec));
if (!ec)
unified_out::info_out("%s %s:%hu %s", head, ep.address().to_string().data(), ep.port(), tail);
}
void show_info(const char* head, const char* tail, const boost::system::error_code& ec) const
{
boost::system::error_code ec2;
BOOST_AUTO(ep, ST_THIS lowest_layer().remote_endpoint(ec2));
if (!ec2)
unified_out::info_out("%s %s:%hu %s (%d %s)", head, ep.address().to_string().data(), ep.port(), tail, ec.value(), ec.message().data());
}
protected: protected:
Server& get_server() {return server;}
const Server& get_server() const {return server;}
virtual void on_unpack_error() {unified_out::error_out("can not unpack msg."); force_shutdown();} virtual void on_unpack_error() {unified_out::error_out("can not unpack msg."); force_shutdown();}
//do not forget to force_shutdown this socket(in del_socket(), there's a force_shutdown() invocation) //do not forget to force_shutdown this socket(in del_socket(), there's a force_shutdown() invocation)
virtual void on_recv_error(const boost::system::error_code& ec) virtual void on_recv_error(const boost::system::error_code& ec)
{ {
show_info("server link:", "broken/been shut down", ec); ST_THIS show_info("server link:", "broken/been shut down", ec);
#ifdef ST_ASIO_CLEAR_OBJECT_INTERVAL #ifdef ST_ASIO_CLEAR_OBJECT_INTERVAL
force_shutdown(); force_shutdown();
...@@ -93,9 +80,9 @@ protected: ...@@ -93,9 +80,9 @@ protected:
} }
virtual void on_async_shutdown_error() {force_shutdown();} virtual void on_async_shutdown_error() {force_shutdown();}
virtual bool on_heartbeat_error() {show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;} virtual bool on_heartbeat_error() {ST_THIS show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;}
protected: private:
Server& server; Server& server;
}; };
......
...@@ -64,6 +64,34 @@ public: ...@@ -64,6 +64,34 @@ public:
bool is_connected() const {return CONNECTED == status;} bool is_connected() const {return CONNECTED == status;}
bool is_shutting_down() const {return FORCE_SHUTTING_DOWN == status || GRACEFUL_SHUTTING_DOWN == status;} bool is_shutting_down() const {return FORCE_SHUTTING_DOWN == status || GRACEFUL_SHUTTING_DOWN == status;}
void show_info(const char* head, const char* tail) const
{
boost::system::error_code ec;
BOOST_AUTO(local_ep, ST_THIS lowest_layer().local_endpoint(ec));
if (!ec)
{
BOOST_AUTO(remote_ep, ST_THIS lowest_layer().remote_endpoint(ec));
if (!ec)
unified_out::info_out("%s (%s:%hu %s:%hu) %s", head,
local_ep.address().to_string().data(), local_ep.port(),
remote_ep.address().to_string().data(), remote_ep.port(), tail);
}
}
void show_info(const char* head, const char* tail, const boost::system::error_code& ec) const
{
boost::system::error_code ec2;
BOOST_AUTO(local_ep, ST_THIS lowest_layer().local_endpoint(ec2));
if (!ec2)
{
BOOST_AUTO(remote_ep, ST_THIS lowest_layer().remote_endpoint(ec2));
if (!ec2)
unified_out::info_out("%s (%s:%hu %s:%hu) %s (%d %s)", head,
local_ep.address().to_string().data(), local_ep.port(),
remote_ep.address().to_string().data(), remote_ep.port(), tail, ec.value(), ec.message().data());
}
}
//get or change the unpacker at runtime //get or change the unpacker at runtime
//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention //changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used //we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
...@@ -188,8 +216,7 @@ protected: ...@@ -188,8 +216,7 @@ protected:
virtual bool do_send_msg(in_msg_type& msg) virtual bool do_send_msg(in_msg_type& msg)
{ {
last_send_msg.emplace_back(); last_send_msg.emplace_back(boost::ref(msg));
last_send_msg.back().swap(msg);
boost::asio::async_write(ST_THIS next_layer(), ST_ASIO_SEND_BUFFER_TYPE(last_send_msg.back().data(), last_send_msg.back().size()), boost::asio::async_write(ST_THIS next_layer(), ST_ASIO_SEND_BUFFER_TYPE(last_send_msg.back().data(), last_send_msg.back().size()),
ST_THIS make_handler_error_size(boost::bind(&socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); ST_THIS make_handler_error_size(boost::bind(&socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
return true; return true;
......
...@@ -15,11 +15,11 @@ ...@@ -15,11 +15,11 @@
#include <boost/asio/ssl.hpp> #include <boost/asio/ssl.hpp>
#include "../../object_pool.h"
#include "../client_socket.h"
#include "../client.h" #include "../client.h"
#include "../server_socket.h"
#include "../server.h" #include "../server.h"
#include "../client_socket.h"
#include "../server_socket.h"
#include "../../object_pool.h"
namespace st_asio_wrapper { namespace ssl { namespace st_asio_wrapper { namespace ssl {
...@@ -157,9 +157,8 @@ public: ...@@ -157,9 +157,8 @@ public:
object_pool(service_pump& service_pump_, boost::asio::ssl::context::method m) : super(service_pump_), ctx(m) {} object_pool(service_pump& service_pump_, boost::asio::ssl::context::method m) : super(service_pump_), ctx(m) {}
boost::asio::ssl::context& context() {return ctx;} boost::asio::ssl::context& context() {return ctx;}
typename object_pool::object_type create_object() {return create_object(boost::ref(ST_THIS sp));} typename object_pool::object_type create_object() {return create_object(boost::ref(ST_THIS get_service_pump()));}
template<typename Arg> template<typename Arg> typename object_pool::object_type create_object(Arg& arg) {return super::create_object(arg, boost::ref(ctx));}
typename object_pool::object_type create_object(Arg& arg) {return super::create_object(arg, boost::ref(ctx));}
protected: protected:
boost::asio::ssl::context ctx; boost::asio::ssl::context ctx;
...@@ -200,7 +199,7 @@ private: ...@@ -200,7 +199,7 @@ private:
if (!ec) if (!ec)
super::do_start(); //return to tcp::server_socket_base::do_start super::do_start(); //return to tcp::server_socket_base::do_start
else else
this->server.del_socket(ST_THIS shared_from_this()); this->get_server().del_socket(ST_THIS shared_from_this());
} }
}; };
......
...@@ -71,21 +71,24 @@ public: ...@@ -71,21 +71,24 @@ public:
{tid id = -1; do_something_to_all(boost::lambda::bind(&timer_info::id, boost::lambda::_1) = ++boost::lambda::var(id));} {tid id = -1; do_something_to_all(boost::lambda::bind(&timer_info::id, boost::lambda::_1) = ++boost::lambda::var(id));}
//after this call, call_back cannot be used again, please note. //after this call, call_back cannot be used again, please note.
void update_timer_info(tid id, size_t interval, boost::function<bool(tid)>& call_back, bool start = false) bool update_timer_info(tid id, size_t interval, boost::function<bool(tid)>& call_back, bool start = false)
{ {
timer_info& ti = timer_can[id]; timer_info& ti = timer_can[id];
if (timer_info::TIMER_FAKE == ti.status) if (timer_info::TIMER_FAKE == ti.status)
ti.timer = boost::make_shared<timer_type>(boost::ref(io_context_)); try {ti.timer = boost::make_shared<timer_type>(boost::ref(io_context_));}
catch (const std::exception& e) {unified_out::error_out("cannot create timer %d (%s)", ti.id, e.what()); return false;}
ti.status = timer_info::TIMER_OK; ti.status = timer_info::TIMER_OK;
ti.interval_ms = interval; ti.interval_ms = interval;
ti.call_back.swap(call_back); ti.call_back.swap(call_back);
if (start) if (start)
start_timer(ti); start_timer(ti);
return true;
} }
void update_timer_info(tid id, size_t interval, const boost::function<bool (tid)>& call_back, bool start = false) bool update_timer_info(tid id, size_t interval, const boost::function<bool (tid)>& call_back, bool start = false)
{BOOST_AUTO(unused, call_back); update_timer_info(id, interval, unused, start);} {BOOST_AUTO(unused, call_back); return update_timer_info(id, interval, unused, start);}
void change_timer_status(tid id, timer_info::timer_status status) {timer_can[id].status = status;} void change_timer_status(tid id, timer_info::timer_status status) {timer_can[id].status = status;}
void change_timer_interval(tid id, size_t interval) {timer_can[id].interval_ms = interval;} void change_timer_interval(tid id, size_t interval) {timer_can[id].interval_ms = interval;}
...@@ -95,8 +98,8 @@ public: ...@@ -95,8 +98,8 @@ public:
void change_timer_call_back(tid id, const boost::function<bool(tid)>& call_back) {BOOST_AUTO(unused, call_back); change_timer_call_back(id, unused);} void change_timer_call_back(tid id, const boost::function<bool(tid)>& call_back) {BOOST_AUTO(unused, call_back); change_timer_call_back(id, unused);}
//after this call, call_back cannot be used again, please note. //after this call, call_back cannot be used again, please note.
void set_timer(tid id, size_t interval, boost::function<bool(tid)>& call_back) {update_timer_info(id, interval, call_back, true);} bool set_timer(tid id, size_t interval, boost::function<bool(tid)>& call_back) {return update_timer_info(id, interval, call_back, true);}
void set_timer(tid id, size_t interval, const boost::function<bool(tid)>& call_back) {update_timer_info(id, interval, call_back, true);} bool set_timer(tid id, size_t interval, const boost::function<bool(tid)>& call_back) {return update_timer_info(id, interval, call_back, true);}
bool start_timer(tid id) bool start_timer(tid id)
{ {
...@@ -129,7 +132,7 @@ protected: ...@@ -129,7 +132,7 @@ protected:
{ {
assert(timer_info::TIMER_OK == ti.status); assert(timer_info::TIMER_OK == ti.status);
#if BOOST_ASIO_VERSION >= 101100 #if BOOST_ASIO_VERSION >= 101100 && (defined(ST_ASIO_USE_STEADY_TIMER) || defined(ST_ASIO_USE_SYSTEM_TIMER))
ti.timer->expires_after(milliseconds(ti.interval_ms)); ti.timer->expires_after(milliseconds(ti.interval_ms));
#else #else
ti.timer->expires_from_now(milliseconds(ti.interval_ms)); ti.timer->expires_from_now(milliseconds(ti.interval_ms));
...@@ -141,7 +144,7 @@ protected: ...@@ -141,7 +144,7 @@ protected:
{ {
if (timer_info::TIMER_OK == ti.status) //enable stopping timers that has been stopped if (timer_info::TIMER_OK == ti.status) //enable stopping timers that has been stopped
{ {
try {ti.timer->cancel();} catch (const boost::system::system_error& e) {} try {ti.timer->cancel();} catch (const boost::system::system_error& e) {unified_out::error_out("cannot stop timer %d (%d %s)", ti.id, e.code().value(), e.what());}
ti.status = timer_info::TIMER_CANCELED; ti.status = timer_info::TIMER_CANCELED;
} }
} }
......
...@@ -142,7 +142,7 @@ protected: ...@@ -142,7 +142,7 @@ protected:
virtual bool do_send_msg(in_msg_type& msg) virtual bool do_send_msg(in_msg_type& msg)
{ {
last_send_msg.swap(msg); last_send_msg = msg;
boost::lock_guard<boost::mutex> lock(shutdown_mutex); boost::lock_guard<boost::mutex> lock(shutdown_mutex);
ST_THIS next_layer().async_send_to(ST_ASIO_SEND_BUFFER_TYPE(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr, ST_THIS next_layer().async_send_to(ST_ASIO_SEND_BUFFER_TYPE(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr,
ST_THIS make_handler_error_size(boost::bind(&socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); ST_THIS make_handler_error_size(boost::bind(&socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
......
...@@ -36,8 +36,13 @@ public: ...@@ -36,8 +36,13 @@ public:
typename Pool::object_type add_socket(unsigned short port, const std::string& ip = std::string()) typename Pool::object_type add_socket(unsigned short port, const std::string& ip = std::string())
{ {
BOOST_AUTO(socket_ptr, ST_THIS create_object()); BOOST_AUTO(socket_ptr, ST_THIS create_object());
socket_ptr->set_local_addr(port, ip); if (!socket_ptr)
return ST_THIS add_socket(socket_ptr) ? socket_ptr : typename Pool::object_type(); return typename Pool::object_type();
else
{
socket_ptr->set_local_addr(port, ip);
return ST_THIS add_socket(socket_ptr) ? socket_ptr : typename Pool::object_type();
}
} }
//functions with a socket_ptr parameter will remove the link from object pool first, then call corresponding function //functions with a socket_ptr parameter will remove the link from object pool first, then call corresponding function
......
module = pingpong_client module = pingpong_client
ext_libs = -lboost_timer -lboost_chrono ext_libs = -lboost_timer
include ../config.mk include ../config.mk
...@@ -201,7 +201,7 @@ int main(int argc, const char* argv[]) ...@@ -201,7 +201,7 @@ int main(int argc, const char* argv[])
boost::uint64_t total_msg_bytes = link_num; total_msg_bytes *= msg_len; total_msg_bytes *= msg_num; boost::uint64_t total_msg_bytes = link_num; total_msg_bytes *= msg_len; total_msg_bytes *= msg_num;
double used_time = (double) begin_time.elapsed().wall / 1000000000; double used_time = (double) begin_time.elapsed().wall / 1000000000;
printf("finished in %f seconds, speed: %f(*2) MBps.\n", used_time, total_msg_bytes / used_time / 1024 / 1024); printf("finished in %f seconds, TPS: %f(*2), speed: %f(*2) MBps.\n", used_time, link_num * msg_num / used_time, total_msg_bytes / used_time / 1024 / 1024);
delete[] init_msg; delete[] init_msg;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册