提交 7a1b4780 编写于 作者: Y youngwolf

Introduce macro ASCS_EXPOSE_SEND_INTERFACE to expose send_msg() interface.

Introduce some helper classes (ascs::socket2, socket3 and socket4) to reduce the number of template parameters.
Refactoring: move unpacker and strand from tcp::socket_base and udp::socket_base to ascs::socket.
上级 8354c6b7
......@@ -854,10 +854,17 @@ static_assert(ASCS_MSG_HANDLING_INTERVAL >= 0, "the interval of msg handling mus
//call on_msg_handle, if failed, retry it after ASCS_MSG_HANDLING_INTERVAL milliseconds later.
//this value can be changed via ascs::socket::msg_handling_interval(size_t) at runtime.
//#define ASCS_EXPOSE_SEND_INTERFACE
//for some reason (i still not met yet), the message sending has stopped but some messages left behind in the sending buffer, they won't be
// sent until new messages come in, define this macro to expose send_msg() interface, then you can call it manually to fix this situation.
//during message sending, calling send_msg() will fail, this is by design to avoid asio::io_context using up all virtual memory, this also
// means that before the sending really started, you can greedily call send_msg() and may exhaust all virtual memory, please note.
//#define ASCS_PASSIVE_RECV
//to gain the ability of changing the unpacker at runtime, with this macro, ascs will not do message receiving automatically (except the first one),
// so you need to manually call recv_msg(), if you need to change the unpacker, do it before recv_msg() invocation, please note.
//during async message receiving, calling recv_msg() will fail, this is by design to avoid asio::io_context using up all virtual memory.
//during message receiving, calling recv_msg() will fail, this is by design to avoid asio::io_context using up all virtual memory, this also
// means that before the receiving really started, you can greedily call recv_msg() and may exhaust all virtual memory, please note.
//because user can greedily call recv_msg(), it's your responsibility to keep the recv buffer from overflowed, please pay special attention.
//this macro also makes you to be able to pause message receiving, then, if there's no other tasks (like timers), service_pump will stop itself,
// to avoid this, please define macro ASCS_AVOID_AUTO_STOP_SERVICE.
......
......@@ -20,7 +20,7 @@
namespace ascs
{
template<typename Socket, typename Packer, typename InMsgType, typename OutMsgType,
template<typename Socket, typename Packer, typename Unpacker, typename InMsgType, typename OutMsgType,
template<typename> class InQueue, template<typename> class InContainer, template<typename> class OutQueue, template<typename> class OutContainer>
class socket : public timer<tracked_executor>
{
......@@ -36,15 +36,16 @@ public:
static const tid TIMER_END = TIMER_BEGIN + 10;
protected:
socket(asio::io_context& io_context_) : super(io_context_), next_layer_(io_context_), strand(io_context_) {first_init();}
template<typename Arg>
socket(asio::io_context& io_context_, Arg&& arg) : super(io_context_), next_layer_(io_context_, std::forward<Arg>(arg)), strand(io_context_) {first_init();}
socket(asio::io_context& io_context_) : super(io_context_), rw_strand(io_context_), next_layer_(io_context_), dis_strand(io_context_) {first_init();}
template<typename Arg> socket(asio::io_context& io_context_, Arg&& arg) :
super(io_context_), rw_strand(io_context_), next_layer_(io_context_, std::forward<Arg>(arg)), dis_strand(io_context_) {first_init();}
//helper function, just call it in constructor
void first_init()
{
_id = -1;
packer_ = std::make_shared<Packer>();
unpacker_ = std::make_shared<Unpacker>();
sending = false;
#ifdef ASCS_PASSIVE_RECV
reading = false;
......@@ -72,6 +73,7 @@ protected:
stat.reset();
packer_->reset();
unpacker_->reset();
sending = false;
#ifdef ASCS_PASSIVE_RECV
reading = false;
......@@ -130,6 +132,19 @@ public:
}
}
#ifdef ASCS_PASSIVE_RECV
bool is_reading() const {return reading;}
void recv_msg() {if (!reading && is_ready()) dispatch_strand(rw_strand, [this]() {this->do_recv_msg();});}
#else
private:
void recv_msg() {dispatch_strand(rw_strand, [this]() {this->do_recv_msg();});}
#endif
#ifndef ASCS_EXPOSE_SEND_INTERFACE
private:
#endif
void send_msg() {if (!sending && is_ready()) dispatch_strand(rw_strand, [this]() {this->do_send_msg();});}
public:
void start_heartbeat(int interval, int max_absence = ASCS_HEARTBEAT_MAX_ABSENCE)
{
assert(interval > 0 && max_absence > 0);
......@@ -161,9 +176,6 @@ public:
}
bool is_sending() const {return sending;}
#ifdef ASCS_PASSIVE_RECV
bool is_reading() const {return reading;}
#endif
bool is_dispatching() const {return dispatching;}
bool is_recv_idle() const {return recv_idle_began;}
......@@ -181,11 +193,21 @@ public:
//get or change the packer at runtime
//changing packer at runtime is not thread-safe (if we're sending messages concurrently), please pay special attention,
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not commonly needed and you know how to avoid
// race condition between message sending and packer replacement (because ascs never send messages automatically except with macro
// ASCS_HEARTBEAT_INTERVAL, please note).
std::shared_ptr<i_packer<typename Packer::msg_type>> packer() {return packer_;}
std::shared_ptr<const i_packer<typename Packer::msg_type>> packer() const {return packer_;}
void packer(const std::shared_ptr<i_packer<typename Packer::msg_type>>& _packer_) {packer_ = _packer_;}
//get or change the unpacker at runtime
std::shared_ptr<i_unpacker<typename Unpacker::msg_type>> unpacker() {return unpacker_;}
std::shared_ptr<const i_unpacker<typename Unpacker::msg_type>> unpacker() const {return unpacker_;}
#ifdef ASCS_PASSIVE_RECV
//changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_PASSIVE_RECV macro.
void unpacker(const std::shared_ptr<i_unpacker<typename Unpacker::msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
#endif
//if you use can_overflow = true to invoke send_msg or send_native_msg, it will always succeed no matter the sending buffer is overflow or not,
//this can exhaust all virtual memory, please pay special attentions.
bool is_send_buffer_available() const {return send_buffer.size_in_byte() < ASCS_MAX_SEND_BUF;}
......@@ -420,7 +442,7 @@ protected:
{
if (msg.empty())
unified_out::error_out("found an empty message, please check your packer.");
else if (send_buffer.enqueue(std::forward<T>(msg)) && !sending && is_ready())
else if (send_buffer.enqueue(std::forward<T>(msg)))
send_msg();
//even if we meet an empty message (because of too big message or insufficient memory, most likely), we still return true, why?
......@@ -435,8 +457,7 @@ protected:
in_container_type temp_buffer;
ascs::do_something_to_all(msg_can, [&size_in_byte, &temp_buffer](InMsgType& msg) {size_in_byte += msg.size(); temp_buffer.emplace_back(std::move(msg));});
send_buffer.move_items_in(temp_buffer, size_in_byte);
if (!sending && is_ready())
send_msg();
send_msg();
return true;
}
......@@ -457,9 +478,8 @@ protected:
auto f = p->get_future();
if (!send_buffer.enqueue(std::move(unused)))
return sync_call_result::NOT_APPLICABLE;
else if (!sending && is_ready())
send_msg();
send_msg();
return 0 == duration || std::future_status::ready == f.wait_for(std::chrono::milliseconds(duration)) ? f.get() : sync_call_result::TIMEOUT;
}
......@@ -478,16 +498,15 @@ protected:
auto p = temp_buffer.back().p;
auto f = p->get_future();
send_buffer.move_items_in(temp_buffer, size_in_byte);
if (!sending && is_ready())
send_msg();
send_msg();
return 0 == duration || std::future_status::ready == f.wait_for(std::chrono::milliseconds(duration)) ? f.get() : sync_call_result::TIMEOUT;
}
#endif
private:
virtual void recv_msg() = 0;
virtual void send_msg() = 0;
virtual void do_recv_msg() = 0;
virtual bool do_send_msg(bool in_strand = false) = 0;
//please do not change id at runtime via the following function, except this socket is not managed by object_pool,
//it should only be used by object_pool when reusing or creating new socket.
......@@ -543,7 +562,7 @@ private:
}
//do not use dispatch_strand at here, because the handler (do_dispatch_msg) may call this function, which can lead stack overflow.
void dispatch_msg() {if (!dispatching) post_strand(strand, [this]() {this->do_dispatch_msg();});}
void dispatch_msg() {if (!dispatching) post_strand(dis_strand, [this]() {this->do_dispatch_msg();});}
void do_dispatch_msg()
{
#ifdef ASCS_DISPATCH_BATCH_MSG
......@@ -627,6 +646,7 @@ private:
protected:
struct statistic stat;
std::shared_ptr<i_packer<typename Packer::msg_type>> packer_;
std::shared_ptr<i_unpacker<typename Unpacker::msg_type>> unpacker_;
list<OutMsgType> temp_msg_can;
in_queue_type send_buffer;
......@@ -635,6 +655,7 @@ protected:
#ifdef ASCS_PASSIVE_RECV
volatile bool reading;
#endif
asio::io_context::strand rw_strand;
private:
bool recv_idle_began;
......@@ -651,7 +672,7 @@ private:
Socket next_layer_;
std::atomic_flag start_atomic;
asio::io_context::strand strand;
asio::io_context::strand dis_strand;
#ifdef ASCS_SYNC_RECV
enum sync_recv_status {NOT_REQUESTED, REQUESTED, RESPONDED, RESPONDED_FAILURE};
......@@ -664,6 +685,18 @@ private:
unsigned msg_resuming_interval_, msg_handling_interval_;
};
template<typename Socket, typename Packer, typename Unpacker,
template<typename> class InQueue, template<typename> class InContainer, template<typename> class OutQueue, template<typename> class OutContainer>
using socket2 = socket<Socket, Packer, Unpacker, typename Packer::msg_type, typename Unpacker::msg_type, InQueue, InContainer, OutQueue, OutContainer>;
template<typename Socket, typename Packer, typename Unpacker, template<typename> class InMsgWrapper, template<typename> class OutMsgWrapper,
template<typename> class InQueue, template<typename> class InContainer, template<typename> class OutQueue, template<typename> class OutContainer>
using socket3 = socket<Socket, Packer, Unpacker, InMsgWrapper<typename Packer::msg_type>, OutMsgWrapper<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer>;
template<typename Socket, typename Packer, typename Unpacker, template<typename> class MsgWrapper,
template<typename> class InQueue, template<typename> class InContainer, template<typename> class OutQueue, template<typename> class OutContainer>
using socket4 = socket3<Socket, Packer, Unpacker, MsgWrapper, MsgWrapper, InQueue, InContainer, OutQueue, OutContainer>;
} //namespace
#endif /* _ASCS_SOCKET_H_ */
......@@ -19,7 +19,7 @@ namespace ascs { namespace tcp {
template <typename Socket, typename Packer, typename Unpacker,
template<typename> class InQueue, template<typename> class InContainer, template<typename> class OutQueue, template<typename> class OutContainer>
class socket_base : public socket<Socket, Packer, typename Packer::msg_type, typename Unpacker::msg_type, InQueue, InContainer, OutQueue, OutContainer>
class socket_base : public socket2<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer>
{
public:
typedef typename Packer::msg_type in_msg_type;
......@@ -28,16 +28,13 @@ public:
typedef typename Unpacker::msg_ctype out_msg_ctype;
private:
typedef socket<Socket, Packer, in_msg_type, out_msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
typedef socket2<Socket, Packer, Unpacker, InQueue, InContainer, OutQueue, OutContainer> super;
protected:
enum link_status {CONNECTED, FORCE_SHUTTING_DOWN, GRACEFUL_SHUTTING_DOWN, BROKEN};
socket_base(asio::io_context& io_context_) : super(io_context_), strand(io_context_) {first_init();}
template<typename Arg> socket_base(asio::io_context& io_context_, Arg&& arg) : super(io_context_, std::forward<Arg>(arg)), strand(io_context_) {first_init();}
//helper function, just call it in constructor
void first_init() {status = link_status::BROKEN; unpacker_ = std::make_shared<Unpacker>();}
socket_base(asio::io_context& io_context_) : super(io_context_), status(link_status::BROKEN) {}
template<typename Arg> socket_base(asio::io_context& io_context_, Arg&& arg) : super(io_context_, std::forward<Arg>(arg)), status(link_status::BROKEN) {}
public:
static const typename super::tid TIMER_BEGIN = super::TIMER_END;
......@@ -59,7 +56,7 @@ public:
//notice, when reusing this socket, object_pool will invoke this function, so if you want to do some additional initialization
// for this socket, do it at here and in the constructor.
//for tcp::single_client_base and ssl::single_client_base, this virtual function will never be called, please note.
virtual void reset() {status = link_status::BROKEN; sending_msgs.clear(); unpacker_->reset(); super::reset();}
virtual void reset() {status = link_status::BROKEN; sending_msgs.clear(); super::reset();}
//SOCKET status
bool is_broken() const {return link_status::BROKEN == status;}
......@@ -113,15 +110,6 @@ public:
this->is_dispatching(), status, this->is_recv_idle());
}
//get or change the unpacker at runtime
std::shared_ptr<i_unpacker<out_msg_type>> unpacker() {return unpacker_;}
std::shared_ptr<const i_unpacker<out_msg_type>> unpacker() const {return unpacker_;}
#ifdef ASCS_PASSIVE_RECV
//changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_PASSIVE_RECV macro.
void unpacker(const std::shared_ptr<i_unpacker<out_msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
virtual void recv_msg() {if (!reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
///////////////////////////////////////////////////
//msg sending interface
//if the message already packed, do call direct_send_msg or direct_sync_send_msg to reduce unnecessary memory replication, if you will not
......@@ -208,11 +196,6 @@ protected:
virtual void on_async_shutdown_error() = 0;
private:
#ifndef ASCS_PASSIVE_RECV
virtual void recv_msg() {this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
virtual void send_msg() {this->dispatch_strand(strand, [this]() {this->do_send_msg(false);});}
using super::close;
using super::handle_error;
using super::handle_msg;
......@@ -234,7 +217,7 @@ private:
return unpacker_->completion_condition(ec, bytes_transferred);
}
void do_recv_msg()
virtual void do_recv_msg()
{
#ifdef ASCS_PASSIVE_RECV
if (reading)
......@@ -250,7 +233,7 @@ private:
reading = true;
#endif
asio::async_read(this->next_layer(), recv_buff,
[this](const asio::error_code& ec, size_t bytes_transferred)->size_t {return this->completion_checker(ec, bytes_transferred);}, make_strand_handler(strand,
[this](const asio::error_code& ec, size_t bytes_transferred)->size_t {return this->completion_checker(ec, bytes_transferred);}, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
}
}
......@@ -289,7 +272,7 @@ private:
}
}
bool do_send_msg(bool in_strand)
virtual bool do_send_msg(bool in_strand = false)
{
if (!in_strand && sending)
return true;
......@@ -309,7 +292,7 @@ private:
if ((sending = !sending_buffer.empty()))
{
sending_msgs.front().restart();
asio::async_write(this->next_layer(), sending_buffer, make_strand_handler(strand,
asio::async_write(this->next_layer(), sending_buffer, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
return true;
}
......@@ -378,6 +361,7 @@ protected:
private:
using super::stat;
using super::packer_;
using super::unpacker_;
using super::temp_msg_can;
using super::send_buffer;
......@@ -386,11 +370,10 @@ private:
#ifdef ASCS_PASSIVE_RECV
using super::reading;
#endif
using super::rw_strand;
std::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
typename super::in_container_type sending_msgs;
std::vector<asio::const_buffer> sending_buffer; //just to reduce memory allocation and keep the size of sending items (linear complexity, it's very important).
asio::io_context::strand strand;
};
}} //namespace
......
......@@ -20,7 +20,7 @@ namespace ascs { namespace udp {
template <typename Packer, typename Unpacker, typename Matrix = i_matrix, typename Socket = asio::ip::udp::socket,
template<typename> class InQueue = ASCS_INPUT_QUEUE, template<typename> class InContainer = ASCS_INPUT_CONTAINER,
template<typename> class OutQueue = ASCS_OUTPUT_QUEUE, template<typename> class OutContainer = ASCS_OUTPUT_CONTAINER>
class socket_base : public socket<Socket, Packer, udp_msg<typename Packer::msg_type>, udp_msg<typename Unpacker::msg_type>, InQueue, InContainer, OutQueue, OutContainer>
class socket_base : public socket4<Socket, Packer, Unpacker, udp_msg, InQueue, InContainer, OutQueue, OutContainer>
{
public:
typedef udp_msg<typename Packer::msg_type> in_msg_type;
......@@ -29,11 +29,11 @@ public:
typedef const out_msg_type out_msg_ctype;
private:
typedef socket<Socket, Packer, in_msg_type, out_msg_type, InQueue, InContainer, OutQueue, OutContainer> super;
typedef socket4<Socket, Packer, Unpacker, udp_msg, InQueue, InContainer, OutQueue, OutContainer> super;
public:
socket_base(asio::io_context& io_context_) : super(io_context_), strand(io_context_) {first_init();}
socket_base(Matrix& matrix_) : super(matrix_.get_service_pump()), strand(matrix_.get_service_pump()) {first_init(&matrix_);}
socket_base(asio::io_context& io_context_) : super(io_context_), has_bound(false), matrix(nullptr) {}
socket_base(Matrix& matrix_) : super(matrix_.get_service_pump()), has_bound(false), matrix(&matrix_) {}
virtual bool is_ready() {return has_bound;}
virtual void send_heartbeat()
......@@ -54,7 +54,6 @@ public:
has_bound = false;
sending_msg.clear();
unpacker_->reset();
super::reset();
}
......@@ -64,7 +63,7 @@ public:
const asio::ip::udp::endpoint& get_peer_addr() const {return peer_addr;}
void disconnect() {force_shutdown();}
void force_shutdown() {show_info("link:", "been shutting down."); this->dispatch_strand(strand, [this]() {this->shutdown();});}
void force_shutdown() {show_info("link:", "been shutting down."); this->dispatch_strand(rw_strand, [this]() {this->shutdown();});}
void graceful_shutdown() {force_shutdown();}
void show_info(const char* head, const char* tail) const {unified_out::info_out("%s %s:%hu %s", head, local_addr.address().to_string().data(), local_addr.port(), tail);}
......@@ -87,17 +86,6 @@ public:
this->is_dispatching(), this->is_recv_idle());
}
//get or change the unpacker at runtime
//changing unpacker at runtime is not thread-safe, this operation can only be done in on_msg(), reset() or constructor, please pay special attention
//we can resolve this defect via mutex, but i think it's not worth, because this feature is not frequently used
std::shared_ptr<i_unpacker<typename Unpacker::msg_type>> unpacker() {return unpacker_;}
std::shared_ptr<const i_unpacker<typename Unpacker::msg_type>> unpacker() const {return unpacker_;}
#ifdef ASCS_PASSIVE_RECV
//changing unpacker must before calling ascs::socket::recv_msg, and define ASCS_PASSIVE_RECV macro.
void unpacker(const std::shared_ptr<i_unpacker<typename Unpacker::msg_type>>& _unpacker_) {unpacker_ = _unpacker_;}
virtual void recv_msg() {if (!reading && is_ready()) this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
///////////////////////////////////////////////////
//msg sending interface
//if the message already packed, do call direct_send_msg or direct_sync_send_msg to reduce unnecessary memory replication, if you will not
......@@ -121,9 +109,6 @@ public:
///////////////////////////////////////////////////
protected:
//helper function, just call it in constructor
void first_init(Matrix* matrix_ = nullptr) {has_bound = false; unpacker_ = std::make_shared<Unpacker>(); matrix = matrix_;}
Matrix* get_matrix() {return matrix;}
const Matrix* get_matrix() const {return matrix;}
......@@ -182,11 +167,6 @@ protected:
#endif
private:
#ifndef ASCS_PASSIVE_RECV
virtual void recv_msg() {this->dispatch_strand(strand, [this]() {this->do_recv_msg();});}
#endif
virtual void send_msg() {this->dispatch_strand(strand, [this]() {this->do_send_msg(false);});}
using super::close;
using super::handle_error;
using super::handle_msg;
......@@ -197,7 +177,7 @@ private:
void shutdown() {close();}
void do_recv_msg()
virtual void do_recv_msg()
{
#ifdef ASCS_PASSIVE_RECV
if (reading)
......@@ -212,7 +192,7 @@ private:
#ifdef ASCS_PASSIVE_RECV
reading = true;
#endif
this->next_layer().async_receive_from(recv_buff, temp_addr, make_strand_handler(strand,
this->next_layer().async_receive_from(recv_buff, temp_addr, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
}
}
......@@ -252,7 +232,7 @@ private:
}
}
bool do_send_msg(bool in_strand)
virtual bool do_send_msg(bool in_strand = false)
{
if (!in_strand && sending)
return true;
......@@ -262,7 +242,7 @@ private:
stat.send_delay_sum += statistic::now() - sending_msg.begin_time;
sending_msg.restart();
this->next_layer().async_send_to(asio::buffer(sending_msg.data(), sending_msg.size()), sending_msg.peer_addr, make_strand_handler(strand,
this->next_layer().async_send_to(asio::buffer(sending_msg.data(), sending_msg.size()), sending_msg.peer_addr, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
return true;
}
......@@ -338,6 +318,7 @@ private:
private:
using super::stat;
using super::packer_;
using super::unpacker_;
using super::temp_msg_can;
using super::send_buffer;
......@@ -346,16 +327,15 @@ private:
#ifdef ASCS_PASSIVE_RECV
using super::reading;
#endif
using super::rw_strand;
bool has_bound;
typename super::in_msg sending_msg;
std::shared_ptr<i_unpacker<typename Unpacker::msg_type>> unpacker_;
asio::ip::udp::endpoint local_addr;
asio::ip::udp::endpoint temp_addr; //used when receiving messages
asio::ip::udp::endpoint peer_addr;
Matrix* matrix;
asio::io_context::strand strand;
};
}} //namespace
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册