提交 807fc26c 编写于 作者: Y youngwolf 提交者: youngwolf

Single thread mode optimization -- eliminate the usage of strand.

上级 8357aaea
......@@ -474,7 +474,14 @@ int main(int argc, const char* argv[])
//or just add up total message size), under this scenario, just one service thread without receiving buffer will obtain the best IO throughput.
//the server has such behavior too.
sp.start_service(std::max(thread_num, sp.get_io_context_num()));
//explicitly apply single thread mode by setting the thread_num to be <= 0
//even the thread_num equals to io_context_num, ascs will not apply single thread mode automatically,
//this is because we can increase thread_num at runtime.
//with single thread mode, we'd better use ASIO_CONCURRENCY_HINT_UNSAFE to construct service_pump.
if (sp.get_io_context_num() > 0)
sp.start_service(0);
else
sp.start_service(std::max(thread_num, sp.get_io_context_num()));
while(sp.is_running())
{
std::string str;
......@@ -513,7 +520,11 @@ int main(int argc, const char* argv[])
//add all clients back
for (size_t i = 0; i < link_num; ++i)
client.add_socket(port, ip);
sp.start_service(std::max(thread_num, sp.get_io_context_num()));
if (sp.get_io_context_num() > 0)
sp.start_service(0);
else
sp.start_service(std::max(thread_num, sp.get_io_context_num()));
}
else
{
......
......@@ -279,7 +279,14 @@ int main(int argc, const char* argv[])
global_packer->prefix_suffix("begin", "end");
#endif
sp.start_service(std::max(thread_num, sp.get_io_context_num()));
//explicitly apply single thread mode by setting the thread_num to be <= 0
//even the thread_num equals to io_context_num, ascs will not apply single thread mode automatically,
//this is because we can increase thread_num at runtime.
//with single thread mode, we'd better use ASIO_CONCURRENCY_HINT_UNSAFE to construct service_pump.
if (sp.get_io_context_num() > 0)
sp.start_service(0);
else
sp.start_service(std::max(thread_num, sp.get_io_context_num()));
normal_server_.start_service(1);
short_server.start_service(1);
while(sp.is_running())
......@@ -300,7 +307,10 @@ int main(int argc, const char* argv[])
{
sp.stop_service();
dump_io_context_refs(sp);
sp.start_service(std::max(thread_num, sp.get_io_context_num()));
if (sp.get_io_context_num() > 0)
sp.start_service(0);
else
sp.start_service(std::max(thread_num, sp.get_io_context_num()));
dump_io_context_refs(sp);
}
else if (STATISTIC == str)
......
......@@ -26,30 +26,44 @@ class executor
{
protected:
virtual ~executor() {}
executor(asio::io_context& _io_context_) : io_context_(_io_context_) {}
executor(asio::io_context& _io_context_) : io_context_(_io_context_), single_thread(false) {}
void set_single_thread() {single_thread = true;}
bool is_single_thread() const {return single_thread;}
public:
typedef std::function<void(const asio::error_code&)> handler_with_error;
typedef std::function<void(const asio::error_code&, size_t)> handler_with_error_size;
bool stopped() const {return io_context_.stopped();}
#if ASIO_VERSION >= 101100
template<typename F> void post(F&& handler) {asio::post(io_context_, std::forward<F>(handler));}
template<typename F> void defer(F&& handler) {asio::defer(io_context_, std::forward<F>(handler));}
template<typename F> void dispatch(F&& handler) {asio::dispatch(io_context_, std::forward<F>(handler));}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {asio::post(strand, std::forward<F>(handler));}
template<typename F> void defer_strand(asio::io_context::strand& strand, F&& handler) {asio::defer(strand, std::forward<F>(handler));}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler) {asio::dispatch(strand, std::forward<F>(handler));}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? post(std::forward<F>(handler)) : asio::post(strand, std::forward<F>(handler));}
template<typename F> void defer_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? defer(std::forward<F>(handler)) : asio::defer(strand, std::forward<F>(handler));}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? dispatch(std::forward<F>(handler)) : asio::dispatch(strand, std::forward<F>(handler));}
#else
template<typename F> void post(F&& handler) {io_context_.post(std::forward<F>(handler));}
template<typename F> void dispatch(F&& handler) {io_context_.dispatch(std::forward<F>(handler));}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {strand.post(std::forward<F>(handler));}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler) {strand.dispatch(std::forward<F>(handler));}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? post(std::forward<F>(handler)) : strand.post(std::forward<F>(handler));}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? dispatch(std::forward<F>(handler)) : strand.dispatch(std::forward<F>(handler));}
#endif
template<typename F> inline F&& make_handler_error(F&& f) const {return std::forward<F>(f);}
template<typename F> inline F&& make_handler_error_size(F&& f) const {return std::forward<F>(f);}
template<typename F> inline handler_with_error make_handler_error(F&& f) const {return std::forward<F>(f);}
template<typename F> inline handler_with_error_size make_handler_error_size(F&& f) const {return std::forward<F>(f);}
protected:
asio::io_context& io_context_;
bool single_thread;
};
} //namespace
......
......@@ -93,12 +93,12 @@ public:
#if ASIO_VERSION >= 101200
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
service_pump(int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE) : started(false), first(false), real_thread_num(0), del_thread_num(0), single_ctx(true)
service_pump(int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE) : started(false), first(false), real_thread_num(0), del_thread_num(0), single_ctx(true), single_thread(false)
{context_can.emplace_back(concurrency_hint);}
#else
//basically, the parameter multi_ctx is designed to be used by single_service_pump, which means single_service_pump always think it's using multiple io_context
//for service_pump, you should use set_io_context_num function instead if you really need multiple io_context.
service_pump(int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE, bool multi_ctx = false) : started(false), first(false), single_ctx(!multi_ctx)
service_pump(int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE, bool multi_ctx = false) : started(false), first(false), single_ctx(!multi_ctx), single_thread(false)
{context_can.emplace_back(concurrency_hint);}
bool set_io_context_num(int io_context_num, int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE) //call this before construct any services on this service_pump
{
......@@ -115,11 +115,11 @@ public:
#endif
#else
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
service_pump() : started(false), first(false), real_thread_num(0), del_thread_num(0), single_ctx(true), context_can(1) {}
service_pump() : started(false), first(false), real_thread_num(0), del_thread_num(0), single_ctx(true), single_thread(false), context_can(1) {}
#else
//basically, the parameter multi_ctx is designed to be used by single_service_pump, which means single_service_pump always think it's using multiple io_context
//for service_pump, you should use set_io_context_num function instead if you really need multiple io_context.
service_pump(bool multi_ctx = false) : started(false), first(false), single_ctx(!multi_ctx), context_can(1) {}
service_pump(bool multi_ctx = false) : started(false), first(false), single_ctx(!multi_ctx), single_thread(false), context_can(1) {}
bool set_io_context_num(int io_context_num) //call this before construct any services on this service_pump
{
if (io_context_num < 1 || is_service_started() || context_can.size() > 1) //can only be called once
......@@ -293,6 +293,7 @@ public:
bool is_service_started() const {return started;}
bool is_first_running() const {return first;}
bool is_single_thread() const {return single_thread;}
//not thread safe
#if ASIO_VERSION >= 101200
......@@ -301,6 +302,35 @@ public:
void add_service_thread(int thread_num, bool block = false, int io_context_num = 0)
#endif
{
if (!is_service_started())
unified_out::error_out("call add_service_thread after start_service, please!");
else
#if ASIO_VERSION >= 101200
do_add_service_thread(false, thread_num, block, io_context_num, concurrency_hint);
#else
do_add_service_thread(false, thread_num, block, io_context_num);
#endif
}
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
void del_service_thread(int thread_num) {if (thread_num > 0) del_thread_num += thread_num;}
int service_thread_num() const {return real_thread_num;}
#endif
protected:
//not thread safe
#if ASIO_VERSION >= 101200
void do_add_service_thread(bool first, int thread_num, bool block = false, int io_context_num = 0, int concurrency_hint = ASIO_CONCURRENCY_HINT_SAFE)
#else
void do_add_service_thread(bool first, int thread_num, bool block = false, int io_context_num = 0)
#endif
{
if (!first && is_single_thread() && thread_num > 0 && thread_num != io_context_num)
{
unified_out::error_out("for single thread mode, thread_num must equals to io_context_num!");
return;
}
if (io_context_num > 0)
{
if (thread_num < io_context_num)
......@@ -333,19 +363,20 @@ public:
}
}
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
void del_service_thread(int thread_num) {if (thread_num > 0) del_thread_num += thread_num;}
int service_thread_num() const {return real_thread_num;}
#endif
protected:
void do_service(int thread_num, bool block = false)
{
if (thread_num <= 0 || (size_t) thread_num < context_can.size())
if (thread_num <= 0)
{
single_thread = true;
thread_num = (int) context_can.size();
}
else if ((size_t) thread_num < context_can.size())
{
unified_out::error_out("thread_num must be bigger than or equal to io_context_num.");
return;
}
else
single_thread = false;
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
if (!is_first_running())
......@@ -368,7 +399,7 @@ protected:
ascs::do_something_to_all(context_can, [](context& item) {item.io_context.reset();}); //this is needed when restart service
#endif
do_something_to_all([](object_type& item) {item->start_service();});
add_service_thread(thread_num, block);
do_add_service_thread(true, thread_num, block);
}
void wait_service()
......@@ -497,7 +528,7 @@ private:
std::atomic_int_fast32_t del_thread_num;
#endif
bool single_ctx;
bool single_ctx, single_thread;
std::list<context> context_can;
std::mutex context_can_mutex;
};
......
......@@ -13,6 +13,7 @@
#ifndef _ASCS_CLIENT_SOCKET_H_
#define _ASCS_CLIENT_SOCKET_H_
#include "../service_pump.h"
#include "socket.h"
namespace ascs { namespace tcp {
......@@ -52,8 +53,8 @@ public:
}
protected:
generic_client_socket(asio::io_context& io_context_) : super(io_context_) {first_init();}
template<typename Arg> generic_client_socket(asio::io_context& io_context_, Arg&& arg) : super(io_context_, std::forward<Arg>(arg)) {first_init();}
generic_client_socket(service_pump& service_pump_) : super(service_pump_) {first_init(service_pump_);}
template<typename Arg> generic_client_socket(service_pump& service_pump_, Arg&& arg) : super(service_pump_, std::forward<Arg>(arg)) {first_init(service_pump_); }
generic_client_socket(Matrix& matrix_) : super(matrix_.get_service_pump()) {first_init(&matrix_);}
template<typename Arg> generic_client_socket(Matrix& matrix_, Arg&& arg) : super(matrix_.get_service_pump(), std::forward<Arg>(arg)) {first_init(&matrix_);}
......@@ -116,7 +117,8 @@ public:
protected:
//helper function, just call it in constructor
void first_init(Matrix* matrix_ = nullptr) {need_reconnect = ASCS_RECONNECT; matrix = matrix_;}
void first_init(const service_pump& service_pump_) {need_reconnect = ASCS_RECONNECT; matrix = nullptr; if (service_pump_.is_single_thread()) this->set_single_thread();}
void first_init(Matrix* matrix_) {need_reconnect = ASCS_RECONNECT; matrix = matrix_; if (matrix_->get_service_pump().is_single_thread()) this->set_single_thread();}
Matrix* get_matrix() {return matrix;}
const Matrix* get_matrix() const {return matrix;}
......@@ -222,9 +224,9 @@ private:
typedef generic_client_socket<socket_base<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer, ReaderWriter>, Matrix> super;
public:
client_socket_base(asio::io_context& io_context_) : super(io_context_) {this->set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
client_socket_base(service_pump& service_pump_) : super(service_pump_) {this->set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
template<typename Arg>
client_socket_base(asio::io_context& io_context_, Arg&& arg) : super(io_context_, std::forward<Arg>(arg)) {this->set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
client_socket_base(service_pump& service_pump_, Arg&& arg) : super(service_pump_, std::forward<Arg>(arg)) {this->set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
client_socket_base(Matrix& matrix_) : super(matrix_) {this->set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
template<typename Arg> client_socket_base(Matrix& matrix_, Arg&& arg) : super(matrix_, std::forward<Arg>(arg)) {this->set_server_addr(ASCS_SERVER_PORT, ASCS_SERVER_IP);}
......@@ -276,7 +278,7 @@ private:
typedef generic_client_socket<socket_base<asio::local::stream_protocol::socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer, ReaderWriter>, Matrix, asio::local::stream_protocol> super;
public:
unix_client_socket_base(asio::io_context& io_context_) : super(io_context_) {this->set_server_addr("./ascs-unix-socket");}
unix_client_socket_base(service_pump& service_pump_) : super(service_pump_) {this->set_server_addr("./ascs-unix-socket");}
unix_client_socket_base(Matrix& matrix_) : super(matrix_) {this->set_server_addr("./ascs-unix-socket");}
};
#endif
......
......@@ -28,7 +28,7 @@ private:
typedef ascs::tcp::client_socket_base<Packer, Unpacker, Matrix, Socket, InQueue, InContainer, OutQueue, OutContainer> super;
public:
client_socket_base(asio::io_context& io_context_) : super(io_context_), req_len(0) {}
client_socket_base(service_pump& service_pump_) : super(service_pump_), req_len(0) {}
client_socket_base(Matrix& matrix_) : super(matrix_), req_len(0) {}
virtual const char* type_name() const {return "SOCKS4 (client endpoint)";}
......@@ -115,7 +115,7 @@ private:
typedef ascs::tcp::client_socket_base<Packer, Unpacker, Matrix, Socket, InQueue, InContainer, OutQueue, OutContainer> super;
public:
client_socket_base(asio::io_context& io_context_) : super(io_context_), req_len(0), res_len(0), step(-1), target_port(0) {}
client_socket_base(service_pump& service_pump_) : super(service_pump_), req_len(0), res_len(0), step(-1), target_port(0) {}
client_socket_base(Matrix& matrix_) : super(matrix_), req_len(0), res_len(0), step(-1), target_port(0) {}
virtual const char* type_name() const {return "SOCKS5 (client endpoint)";}
......
......@@ -26,8 +26,11 @@ private:
typedef Socket super;
public:
generic_server_socket(Server& server_) : super(server_.get_service_pump()), server(server_) {}
template<typename Arg> generic_server_socket(Server& server_, Arg&& arg) : super(server_.get_service_pump(), std::forward<Arg>(arg)), server(server_) {}
generic_server_socket(Server& server_) : super(server_.get_service_pump()), server(server_)
{if (server_.get_service_pump().is_single_thread()) this->set_single_thread();}
template<typename Arg> generic_server_socket(Server& server_, Arg&& arg) : super(server_.get_service_pump(), std::forward<Arg>(arg)), server(server_)
{if (server_.get_service_pump().is_single_thread()) this->set_single_thread();}
~generic_server_socket() {this->clear_io_context_refs();}
virtual const char* type_name() const {return "TCP (server endpoint)";}
......
......@@ -327,13 +327,12 @@ private:
else if (this->test_and_set_reading())
return;
#endif
auto cb = this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);});
#ifdef ASCS_PASSIVE_RECV
if (!this->async_read(make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);}))))
if (!this->async_read(this->is_single_thread() ? std::move(cb) : make_strand_handler(rw_strand, std::move(cb))))
this->clear_reading();
#else
this->async_read(make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
this->async_read(this->is_single_thread() ? std::move(cb) : make_strand_handler(rw_strand, std::move(cb)));
#endif
}
......@@ -401,8 +400,8 @@ private:
if (!sending_buffer.empty())
{
sending_msgs.front().restart();
this->async_write(sending_buffer, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
auto cb = this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);});
this->async_write(sending_buffer, this->is_single_thread() ? std::move(cb) : make_strand_handler(rw_strand, std::move(cb)));
return true;
}
else
......
......@@ -72,7 +72,7 @@ private:
typedef socket<tcp::client_socket_base<Packer, Unpacker, Matrix, asio::ssl::stream<asio::ip::tcp::socket>, InQueue, InContainer, OutQueue, OutContainer>> super;
public:
client_socket_base(asio::io_context& io_context_, asio::ssl::context& ctx_) : super(io_context_, ctx_) {}
client_socket_base(service_pump& service_pump_, asio::ssl::context& ctx_) : super(service_pump_, ctx_) {}
client_socket_base(Matrix& matrix_, asio::ssl::context& ctx_) : super(matrix_, ctx_) {}
virtual const char* type_name() const {return "SSL (client endpoint)";}
......
......@@ -23,7 +23,10 @@ class tracked_executor
{
protected:
virtual ~tracked_executor() {}
tracked_executor(asio::io_context& _io_context_) : io_context_(_io_context_), aci(std::make_shared<char>('\0')) {}
tracked_executor(asio::io_context& _io_context_) : io_context_(_io_context_), single_thread(false), aci(std::make_shared<char>('\0')) {}
void set_single_thread() {single_thread = true;}
bool is_single_thread() const {return single_thread;}
public:
typedef std::function<void(const asio::error_code&)> handler_with_error;
......@@ -36,14 +39,21 @@ public:
template<typename F> void post(F&& handler) {asio::post(io_context_, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void defer(F&& handler) {asio::defer(io_context_, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch(F&& handler) {asio::dispatch(io_context_, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {asio::post(strand, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, F&& handler) {asio::defer(strand, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler) {asio::dispatch(strand, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? post(std::forward<F>(handler)) : asio::post(strand, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? defer(std::forward<F>(handler)) : asio::defer(strand, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? dispatch(std::forward<F>(handler)) : asio::dispatch(strand, [ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
#else
template<typename F> void post(F&& handler) {io_context_.post([ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch(F&& handler) {io_context_.dispatch([ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler) {strand.post([ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler) {strand.dispatch([ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? post(std::forward<F>(handler)) : strand.post([ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, F&& handler)
{single_thread ? dispatch(std::forward<F>(handler)) : strand.dispatch([ref_holder(aci), handler(std::forward<F>(handler))]() {handler();});}
#endif
template<typename F> handler_with_error make_handler_error(F&& handler) const {return [ref_holder(aci), handler(std::forward<F>(handler))](const auto& ec) {handler(ec);};}
......@@ -54,14 +64,21 @@ public:
template<typename F> void post(const F& handler) {auto ref_holder(aci); asio::post(io_context_, [=]() {(void) ref_holder; handler();});}
template<typename F> void defer(const F& handler) {auto ref_holder(aci); asio::defer(io_context_, [=]() {(void) ref_holder; handler();});}
template<typename F> void dispatch(const F& handler) {auto ref_holder(aci); asio::dispatch(io_context_, [=]() {(void) ref_holder; handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {auto ref_holder(aci); asio::post(strand, [=]() {(void) ref_holder; handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, const F& handler) {auto ref_holder(aci); asio::defer(strand, [=]() {(void) ref_holder; handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, const F& handler) {auto ref_holder(aci); asio::dispatch(strand, [=]() {(void) ref_holder; handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler)
{if (single_thread) return post(handler); auto ref_holder(aci); asio::post(strand, [=]() {(void) ref_holder; handler();});}
template<typename F> void defer_strand(asio::io_context::strand& strand, const F& handler)
{if (single_thread) return defer(handler); auto ref_holder(aci); asio::defer(strand, [=]() {(void) ref_holder; handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, const F& handler)
{if (single_thread) return dispatch(handler); auto ref_holder(aci); asio::dispatch(strand, [=]() {(void) ref_holder; handler();});}
#else
template<typename F> void post(const F& handler) {auto ref_holder(aci); io_context_.post([=]() {(void) ref_holder; handler();});}
template<typename F> void dispatch(const F& handler) {auto ref_holder(aci); io_context_.dispatch([=]() {(void) ref_holder; handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler) {auto ref_holder(aci); strand.post([=]() {(void) ref_holder; handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, const F& handler) {auto ref_holder(aci); strand.dispatch([=]() {(void) ref_holder; handler();});}
template<typename F> void post_strand(asio::io_context::strand& strand, const F& handler)
{if (single_thread) return post(handler); auto ref_holder(aci); strand.post([=]() {(void) ref_holder; handler();});}
template<typename F> void dispatch_strand(asio::io_context::strand& strand, const F& handler)
{if (single_thread) return dispatch(handler); auto ref_holder(aci); strand.dispatch([=]() {(void) ref_holder; handler();});}
#endif
template<typename F> handler_with_error make_handler_error(const F& handler) const {auto ref_holder(aci); return [=](const asio::error_code& ec) {(void) ref_holder; handler(ec);};}
......@@ -75,6 +92,7 @@ public:
protected:
asio::io_context& io_context_;
bool single_thread;
private:
std::shared_ptr<char> aci; //asynchronous calling indicator
......
......@@ -37,7 +37,7 @@ public:
static const typename super::tid TIMER_END = TIMER_BEGIN + 5;
public:
reliable_socket_base(asio::io_context& io_context_) : super(io_context_), kcp(nullptr), max_nsnd_que(ASCS_RELIABLE_UDP_NSND_QUE) {}
reliable_socket_base(service_pump& service_pump_) : super(service_pump_), kcp(nullptr), max_nsnd_que(ASCS_RELIABLE_UDP_NSND_QUE) {}
reliable_socket_base(Matrix& matrix_) : super(matrix_), kcp(nullptr), max_nsnd_que(ASCS_RELIABLE_UDP_NSND_QUE) {}
~reliable_socket_base() {release_kcp();}
......
......@@ -13,6 +13,7 @@
#ifndef _ASCS_UDP_SOCKET_H_
#define _ASCS_UDP_SOCKET_H_
#include "../service_pump.h"
#include "../socket.h"
namespace ascs { namespace udp {
......@@ -57,8 +58,11 @@ public:
}
protected:
generic_socket(asio::io_context& io_context_) : super(io_context_), is_bound(false), is_connected(false), connect_mode(ASCS_UDP_CONNECT_MODE), matrix(nullptr) {}
generic_socket(Matrix& matrix_) : super(matrix_.get_service_pump()), is_bound(false), is_connected(false), connect_mode(ASCS_UDP_CONNECT_MODE), matrix(&matrix_) {}
generic_socket(service_pump& service_pump_) : super(service_pump_), is_bound(false), is_connected(false), connect_mode(ASCS_UDP_CONNECT_MODE), matrix(nullptr)
{if (service_pump_.is_single_thread()) this->set_single_thread();}
generic_socket(Matrix& matrix_) : super(matrix_.get_service_pump()), is_bound(false), is_connected(false), connect_mode(ASCS_UDP_CONNECT_MODE), matrix(&matrix_)
{if (matrix_->get_service_pump().is_single_thread()) this->set_single_thread();}
~generic_socket() {this->clear_io_context_refs();}
public:
......@@ -274,12 +278,10 @@ private:
unified_out::error_out(ASCS_LLF " the unpacker returned an empty buffer, quit receiving!", this->id());
else
{
if (is_connected)
this->next_layer().async_receive(recv_buff, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
else
this->next_layer().async_receive_from(recv_buff, temp_addr, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
auto cb = this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);});
if (!this->is_single_thread())
cb = make_strand_handler(rw_strand, std::move(cb));
is_connected ? this->next_layer().async_receive(recv_buff, std::move(cb)) : this->next_layer().async_receive_from(recv_buff, temp_addr, std::move(cb));
return;
}
......@@ -342,14 +344,16 @@ private:
{
stat.send_delay_sum += statistic::now() - sending_msg.begin_time;
sending_msg.restart();
if (!is_connected)
this->next_layer().async_send_to(asio::buffer(sending_msg.data(), sending_msg.size()), sending_msg.peer_addr, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
else if (do_send_msg(sending_msg))
if (is_connected && do_send_msg(sending_msg))
this->post_in_io_strand([this]() {this->send_handler(asio::error_code(), sending_msg.size());});
else
this->next_layer().async_send(asio::buffer(sending_msg.data(), sending_msg.size()), make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
{
auto cb = this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);});
if (!this->is_single_thread())
cb = make_strand_handler(rw_strand, std::move(cb));
auto buf = asio::buffer(sending_msg.data(), sending_msg.size());
is_connected ? this->next_layer().async_send(std::move(buf), std::move(cb)) : this->next_layer().async_send_to(std::move(buf), sending_msg.peer_addr, std::move(cb));
}
return true;
}
else
......@@ -423,7 +427,7 @@ private:
typedef generic_socket<Packer, Unpacker, Matrix, asio::ip::udp::socket, asio::ip::udp, InQueue, InContainer, OutQueue, OutContainer> super;
public:
socket_base(asio::io_context& io_context_) : super(io_context_) {}
socket_base(service_pump& service_pump_) : super(service_pump_) {}
socket_base(Matrix& matrix_) : super(matrix_) {}
protected:
......@@ -471,7 +475,7 @@ private:
typedef generic_socket<Packer, Unpacker, Matrix, asio::local::datagram_protocol::socket, asio::local::datagram_protocol, InQueue, InContainer, OutQueue, OutContainer> super;
public:
unix_socket_base(asio::io_context& io_context_) : super(io_context_) {}
unix_socket_base(service_pump& service_pump_) : super(service_pump_) {}
unix_socket_base(Matrix& matrix_) : super(matrix_) {}
protected:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册