提交 7589a79f 编写于 作者: Y youngwolf

Introduce memory fence to synchronize socket's status -- sending and reading

上级 35defdff
......@@ -809,10 +809,8 @@
*
* SPECIAL ATTENTION (incompatible with old editions):
* Graceful shutdown does not support sync mode anymore.
* Use post_strand instead of dispatch_strand in send_msg and recv_msg, because we don't synchronize socket's member variable sending and reading,
* there's still a race condition even in the same strand because of memory synchronization.
* But we can difine macro ASCS_USE_DISPATCH_IN_IO to use dispatch_strand back, because dispatch_strand can be more efficient in specific cases.
* The macro will be removed after I confirmed that dispatch_strand indeed introduces race condition.
* Introduce memory fence to synchronize socket's status -- sending and reading, dispatch_strand is not enough, except post_strand,
* but dispatch_strand is more efficient than post_strand.
*
* HIGHLIGHT:
* Make shutdown thread safe.
......
......@@ -46,9 +46,9 @@ protected:
_id = -1;
packer_ = std::make_shared<Packer>();
unpacker_ = std::make_shared<Unpacker>();
sending = false;
clear_sending();
#ifdef ASCS_PASSIVE_RECV
reading = false;
clear_reading();
#endif
#ifdef ASCS_SYNC_RECV
sr_status = sync_recv_status::NOT_REQUESTED;
......@@ -89,9 +89,9 @@ protected:
stat.reset();
packer_->reset();
unpacker_->reset();
sending = false;
clear_sending();
#ifdef ASCS_PASSIVE_RECV
reading = false;
clear_reading();
#endif
#ifdef ASCS_SYNC_RECV
sr_status = sync_recv_status::NOT_REQUESTED;
......@@ -167,12 +167,8 @@ public:
}
#ifdef ASCS_PASSIVE_RECV
bool is_reading() const {return reading;}
#ifdef ASCS_USE_DISPATCH_IN_IO
void recv_msg() {if (!reading && is_ready()) dispatch_in_io_strand([this]() {this->do_recv_msg();});}
#else
void recv_msg() {if (!reading && is_ready()) post_in_io_strand([this]() {this->do_recv_msg();});}
#endif
bool is_reading() const {return 1 == reading.load(std::memory_order_relaxed);}
void recv_msg() {if (is_ready() && !is_reading()) dispatch_in_io_strand([this]() {this->do_recv_msg();});}
#else
private:
void recv_msg() {dispatch_in_io_strand([this]() {this->do_recv_msg();});}
......@@ -181,18 +177,10 @@ public:
#ifndef ASCS_EXPOSE_SEND_INTERFACE
protected:
#endif
#ifdef ASCS_USE_DISPATCH_IN_IO
#ifdef ASCS_ARBITRARY_SEND
void send_msg() {dispatch_in_io_strand([this]() {this->do_send_msg();});}
#else
void send_msg() {if (!sending && is_ready()) dispatch_in_io_strand([this]() {this->do_send_msg();});}
#endif
#else
#ifdef ASCS_ARBITRARY_SEND
void send_msg() {post_in_io_strand([this]() {this->do_send_msg();});}
#else
void send_msg() {if (!sending && is_ready()) post_in_io_strand([this]() {this->do_send_msg();});}
#endif
void send_msg() {if (is_ready() && !is_sending()) dispatch_in_io_strand([this]() {this->do_send_msg();});}
#endif
public:
......@@ -220,7 +208,7 @@ public:
return false;
#ifndef ASCS_ALWAYS_SEND_HEARTBEAT
if (!sending && now - stat.last_send_time >= interval) //don't need to send heartbeat if we're sending messages
if (!is_sending() && now - stat.last_send_time >= interval) //don't need to send heartbeat if we're sending messages
#endif
send_heartbeat();
}
......@@ -228,7 +216,7 @@ public:
return true;
}
bool is_sending() const {return sending;}
bool is_sending() const {return 1 == sending.load(std::memory_order_relaxed);}
bool is_dispatching() const {return dispatching;}
bool is_recv_idle() const {return recv_idle_began;}
......@@ -432,6 +420,14 @@ protected:
bool shrink_send_buffer() const {return is_send_buffer_available();}
#endif
#ifdef ASCS_PASSIVE_RECV
void clear_reading() {reading.store(0, std::memory_order_release);}
bool test_and_set_reading() {return 1 == reading.exchange(1, std::memory_order_acq_rel);}
#endif
void clear_sending() {sending.store(0, std::memory_order_release);}
bool test_and_set_sending() {return 1 == sending.exchange(1, std::memory_order_acq_rel);}
//subclass notify shutdown event
bool close(bool use_close = false) //if not use_close, shutdown (both direction) will be used
{
......@@ -721,7 +717,7 @@ private:
dispatching_msg.clear();
#endif
dispatching = false;
dispatch_msg(); //dispatch msg in sequence
post_in_dis_strand([this]() {this->do_dispatch_msg();}); //dispatch msg in sequence
}
}
else
......@@ -766,11 +762,6 @@ protected:
std::list<OutMsgType> temp_msg_can;
in_queue_type send_buffer;
volatile bool sending;
#ifdef ASCS_PASSIVE_RECV
volatile bool reading;
#endif
asio::io_context::strand rw_strand;
private:
......@@ -792,6 +783,10 @@ private:
uint_fast64_t _id;
Socket next_layer_;
#ifdef ASCS_PASSIVE_RECV
std::atomic_size_t reading;
#endif
std::atomic_size_t sending;
std::atomic_flag start_atomic;
asio::io_context::strand dis_strand;
......
......@@ -322,13 +322,15 @@ private:
virtual void do_recv_msg()
{
#ifdef ASCS_PASSIVE_RECV
if (reading || !is_ready())
if (!is_ready())
return;
else if (this->test_and_set_reading())
return;
#endif
#ifdef ASCS_PASSIVE_RECV
if (this->async_read(make_strand_handler(rw_strand,
if (!this->async_read(make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);}))))
reading = true;
this->clear_reading();
#else
this->async_read(make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
......@@ -338,7 +340,7 @@ private:
void recv_handler(const asio::error_code& ec, size_t bytes_transferred)
{
#ifdef ASCS_PASSIVE_RECV
reading = false; //clear reading flag before calling handle_msg() to make sure that recv_msg() is available in on_msg() and on_msg_handle()
this->clear_reading(); //clear reading flag before calling handle_msg() to make sure that recv_msg() is available in on_msg() and on_msg_handle()
#endif
auto need_next_recv = false;
if (bytes_transferred > 0)
......@@ -378,7 +380,7 @@ private:
virtual bool do_send_msg(bool in_strand = false)
{
if (!in_strand && sending)
if (!in_strand && this->test_and_set_sending())
return true;
auto end_time = statistic::now();
......@@ -391,14 +393,14 @@ private:
if (!sending_buffer.empty())
{
sending = true;
sending_msgs.front().restart();
this->async_write(sending_buffer, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
return true;
}
else
this->clear_sending();
sending = false;
return false;
}
......@@ -410,7 +412,7 @@ private:
stat.send_byte_sum += bytes_transferred;
stat.send_time_sum += statistic::now() - sending_msgs.front().begin_time;
stat.send_msg_sum += sending_buffer.size();
stat.send_msg_sum += sending_buffer.size(); //before gcc 5.0, std::list::size() has linear complexity, very embarrassing!
#ifdef ASCS_SYNC_SEND
ascs::do_something_to_all(sending_msgs, [](typename super::in_msg& item) {if (item.p) {item.p->set_value(sync_call_result::SUCCESS);}});
#endif
......@@ -436,10 +438,12 @@ private:
#endif
#endif
sending_msgs.clear();
#ifndef ASCS_ARBITRARY_SEND
#ifdef ASCS_ARBITRARY_SEND
do_send_msg(true);
#else
if (!do_send_msg(true) && !send_buffer.empty()) //send msg in sequence
super::send_msg(); //just make sure no pending msgs
#endif
do_send_msg(true); //just make sure no pending msgs
}
else
{
......@@ -449,7 +453,7 @@ private:
on_send_error(ec, sending_msgs);
sending_msgs.clear(); //clear sending messages after on_send_error, then user can decide how to deal with them in on_send_error
sending = false;
this->clear_sending();
}
}
......@@ -481,15 +485,12 @@ private:
using super::temp_msg_can;
using super::send_buffer;
using super::sending;
#ifdef ASCS_PASSIVE_RECV
using super::reading;
#endif
using super::rw_strand;
//before gcc 5.0, std::list::size() has linear complexity, very embarrassing!
//so use std::vector (member variable) to reduce memory allocation and keep the number of sending msgs (its size() has constant complexity, it's very important).
typename super::in_container_type sending_msgs;
std::vector<asio::const_buffer> sending_buffer; //just to reduce memory allocation and keep the size of sending items (linear complexity, it's very important).
std::vector<asio::const_buffer> sending_buffer;
};
}} //namespace
......
......@@ -243,7 +243,7 @@ protected:
virtual bool do_send_msg(const typename super::in_msg& sending_msg) {return false;} //customize message sending, for connected socket only
virtual void pre_handle_msg(typename Unpacker::container_type& msg_can) {}
void resume_sending() {sending = false; super::send_msg();} //for reliable UDP socket only
void resume_sending() {this->clear_sending(); super::send_msg();} //for reliable UDP socket only
private:
using super::close;
......@@ -265,7 +265,7 @@ private:
virtual void do_recv_msg()
{
#ifdef ASCS_PASSIVE_RECV
if (reading)
if (this->test_and_set_reading())
return;
#endif
auto recv_buff = this->unpacker()->prepare_next_recv();
......@@ -274,20 +274,25 @@ private:
unified_out::error_out(ASCS_LLF " the unpacker returned an empty buffer, quit receiving!", this->id());
else
{
#ifdef ASCS_PASSIVE_RECV
reading = true;
#endif
if (is_connected)
this->next_layer().async_receive(recv_buff, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
else
this->next_layer().async_receive_from(recv_buff, temp_addr, make_strand_handler(rw_strand,
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->recv_handler(ec, bytes_transferred);})));
return;
}
#ifdef ASCS_PASSIVE_RECV
this->clear_reading();
#endif
}
void recv_handler(const asio::error_code& ec, size_t bytes_transferred)
{
#ifdef ASCS_PASSIVE_RECV
this->clear_reading(); //clear reading flag before calling handle_msg() to make sure that recv_msg() is available in on_msg() and on_msg_handle()
#endif
if (!ec && bytes_transferred > 0)
{
stat.last_recv_time = time(nullptr);
......@@ -298,9 +303,6 @@ private:
if (is_connected)
pre_handle_msg(msg_can);
#ifdef ASCS_PASSIVE_RECV
reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
ascs::do_something_to_all(msg_can, [this](typename Unpacker::msg_type& msg) {
this->temp_msg_can.emplace_back(this->is_connected ? this->peer_addr : this->temp_addr, std::move(msg));
});
......@@ -309,9 +311,6 @@ private:
}
else
{
#ifdef ASCS_PASSIVE_RECV
reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif
#if defined(_MSC_VER) || defined(__CYGWIN__) || defined(__MINGW32__) || defined(__MINGW64__)
if (ec && asio::error::connection_refused != ec && asio::error::connection_reset != ec)
#else
......@@ -328,14 +327,12 @@ private:
virtual bool do_send_msg(bool in_strand = false)
{
if (!in_strand && sending)
if (!in_strand && this->test_and_set_sending())
return true;
if (is_connected && !check_send_cc())
sending = true;
else if (is_connected && !check_send_cc())
;
else if (send_buffer.try_dequeue(sending_msg))
{
sending = true;
stat.send_delay_sum += statistic::now() - sending_msg.begin_time;
sending_msg.restart();
if (!is_connected)
......@@ -348,8 +345,9 @@ private:
this->make_handler_error_size([this](const asio::error_code& ec, size_t bytes_transferred) {this->send_handler(ec, bytes_transferred);})));
return true;
}
else
this->clear_sending();
sending = false;
return false;
}
......@@ -385,13 +383,12 @@ private:
sending_msg.clear(); //clear sending message after on_send_error, then user can decide how to deal with it in on_send_error
if (ec && (asio::error::not_socket == ec || asio::error::bad_descriptor == ec))
return;
this->clear_sending();
//send msg in sequence
//on windows, sending a msg to addr_any may cause errors, please note
//for UDP, sending error will not stop subsequent sending.
if (!do_send_msg(true) && !send_buffer.empty())
do_send_msg(true); //just make sure no pending msgs
else if (!do_send_msg(true) && !send_buffer.empty())
super::send_msg(); //just make sure no pending msgs
}
private:
......@@ -399,11 +396,6 @@ private:
using super::temp_msg_can;
using super::send_buffer;
using super::sending;
#ifdef ASCS_PASSIVE_RECV
using super::reading;
#endif
using super::rw_strand;
bool is_bound, is_connected, connect_mode;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册