提交 b2f2aab0 编写于 作者: Y youngwolf

Rollback--Change the return type of socket::on_msg and socket::on_msg_handle from size_t to bool.

上级 59495794
......@@ -114,7 +114,7 @@ protected:
#ifdef ASCS_SYNC_DISPATCH
//do not hold msg_can for further using, return from on_msg as quickly as possible
//access msg_can freely within this callback, it's always thread safe.
virtual bool on_msg(list<out_msg_type>& msg_can)
virtual size_t on_msg(list<out_msg_type>& msg_can)
{
ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->handle_msg(msg);});
msg_can.clear(); //if we left behind some messages in msg_can, they will be dispatched via on_msg_handle asynchronously, which means it's
......@@ -122,20 +122,20 @@ protected:
//here we always consumed all messages, so we can use sync message dispatching, otherwise, we should not use sync message dispatching
//except we can bear message disordering.
return true;
return 1;
}
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
//do not hold msg_can for further using, access msg_can and return from on_msg_handle as quickly as possible
//can only access msg_can via functions that marked as 'thread safe', if you used non-lock queue, its your responsibility to guarantee
// that new messages will not come until we returned from this callback (for example, pingpong test).
virtual bool on_msg_handle(out_queue_type& msg_can)
virtual size_t on_msg_handle(out_queue_type& msg_can)
{
out_container_type tmp_can;
msg_can.swap(tmp_can); //to consume a part of the messages in msg_can, see echo_server
ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->handle_msg(msg);});
return true;
return 1;
}
#else
virtual bool on_msg_handle(out_msg_type& msg) {handle_msg(msg); return true;}
......
......@@ -102,10 +102,10 @@ protected:
#ifdef ASCS_SYNC_DISPATCH //do not open this feature
//do not hold msg_can for further using, return from on_msg as quickly as possible
//access msg_can freely within this callback, it's always thread safe.
virtual bool on_msg(list<out_msg_type>& msg_can)
virtual size_t on_msg(list<out_msg_type>& msg_can)
{
if (!is_send_buffer_available())
return false;
return 0;
//here if we cannot handle all messages in msg_can, do not use sync message dispatching except we can bear message disordering,
//this is because on_msg_handle can be invoked concurrently with the next on_msg (new messages arrived) and then disorder messages.
//and do not try to handle all messages here (just for echo_server's business logic) because:
......@@ -116,7 +116,7 @@ protected:
ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->send_msg(std::move(msg), true);});
msg_can.clear();
return true;
return 1;
}
#endif
......@@ -124,17 +124,17 @@ protected:
//do not hold msg_can for further using, access msg_can and return from on_msg_handle as quickly as possible
//can only access msg_can via functions that marked as 'thread safe', if you used non-lock queue, its your responsibility to guarantee
// that new messages will not come until we returned from this callback (for example, pingpong test).
virtual bool on_msg_handle(out_queue_type& msg_can)
virtual size_t on_msg_handle(out_queue_type& msg_can)
{
if (!is_send_buffer_available())
return false;
return 0;
out_container_type tmp_can;
msg_can.move_items_out(tmp_can, 10); //don't be too greedy, here is in a service thread, we should not block this thread for a long time
//following statement can avoid one memory replication if the type of out_msg_type and in_msg_type are identical.
ascs::do_something_to_all(tmp_can, [this](out_msg_type& msg) {this->send_msg(std::move(msg), true);});
return true;
return 1;
}
#else
//following statement can avoid one memory replication if the type of out_msg_type and in_msg_type are identical.
......@@ -190,14 +190,14 @@ protected:
#ifdef ASCS_SYNC_DISPATCH
//do not hold msg_can for further using, return from on_msg as quickly as possible
//access msg_can freely within this callback, it's always thread safe.
virtual bool on_msg(list<out_msg_type>& msg_can) {auto re = server_socket_base::on_msg(msg_can); force_shutdown(); return re;}
virtual size_t on_msg(list<out_msg_type>& msg_can) {auto re = server_socket_base::on_msg(msg_can); force_shutdown(); return re;}
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
//do not hold msg_can for further using, access msg_can and return from on_msg_handle as quickly as possible
//can only access msg_can via functions that marked as 'thread safe', if you used non-lock queue, its your responsibility to guarantee
// that new messages will not come until we returned from this callback (for example, pingpong test).
virtual bool on_msg_handle(out_queue_type& msg_can) {auto re = server_socket_base::on_msg_handle(msg_can); force_shutdown(); return re;}
virtual size_t on_msg_handle(out_queue_type& msg_can) {auto re = server_socket_base::on_msg_handle(msg_can); force_shutdown(); return re;}
#else
virtual bool on_msg_handle(out_msg_type& msg) {auto re = server_socket_base::on_msg_handle(msg); force_shutdown(); return re;}
#endif
......
......@@ -53,7 +53,7 @@ protected:
//msg handling, must define macro ASCS_SYNC_DISPATCH
//do not hold msg_can for further using, access msg_can and return from on_msg as quickly as possible
//access msg_can freely within this callback, it's always thread safe.
virtual bool on_msg(list<out_msg_type>& msg_can)
virtual size_t on_msg(list<out_msg_type>& msg_can)
{
ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->handle_msg(msg);});
msg_can.clear(); //if we left behind some messages in msg_can, they will be dispatched via on_msg_handle asynchronously, which means it's
......@@ -61,7 +61,7 @@ protected:
//here we always consumed all messages, so we can use sync message dispatching, otherwise, we should not use sync message dispatching
//except we can bear message disordering.
return true;
return 1;
}
//msg handling end
......
......@@ -39,7 +39,7 @@ protected:
//msg handling: send the original msg back (echo server), must define macro ASCS_SYNC_DISPATCH
//do not hold msg_can for further using, access msg_can and return from on_msg as quickly as possible
//access msg_can freely within this callback, it's always thread safe.
virtual bool on_msg(list<out_msg_type>& msg_can)
virtual size_t on_msg(list<out_msg_type>& msg_can)
{
//if the type of out_msg_type and in_msg_type are not identical, the compilation will fail, then you should use send_native_msg instead.
ascs::do_something_to_all(msg_can, [this](out_msg_type& msg) {this->direct_send_msg(std::move(msg));});
......@@ -48,7 +48,7 @@ protected:
//here we always consumed all messages, so we can use sync message dispatching, otherwise, we should not use sync message dispatching
//except we can bear message disordering.
return true;
return 1;
}
//msg handling end
};
......
......@@ -549,7 +549,6 @@
* Rename ext::udp::service to socket_service.
* Rename socket::get_pending_send_msg_num to get_pending_send_msg_size and socket::get_pending_recv_msg_num to get_pending_recv_msg_size,
* and the return value not means message entries any more, but total size of all messages.
* Change the return type of socket::on_msg and socket::on_msg_handle from size_t to bool.
*
* HIGHLIGHT:
* Introduce new class single_service_pump--one service_pump for one service.
......
......@@ -278,27 +278,27 @@ protected:
virtual void after_close() {} //a good case for using this is to reconnect the server, please refer to client_socket_base.
#ifdef ASCS_SYNC_DISPATCH
//return true if handled some messages (include all messages), if some msg left behind, socket will re-dispatch them asynchronously
//return positive value if handled some messages (include all messages), if some msg left behind, socket will re-dispatch them asynchronously
//notice: using inconstant is for the convenience of swapping
virtual bool on_msg(list<OutMsgType>& msg_can)
virtual size_t on_msg(list<OutMsgType>& msg_can)
{
//it's always thread safe in this virtual function, because it blocks message receiving
ascs::do_something_to_all(msg_can, [](OutMsgType& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data());});
msg_can.clear(); //have handled all messages
return true;
return 1;
}
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
//return true if handled some messages (include all messages), if some msg left behind, socket will re-dispatch them asynchronously
//return positive value if handled some messages (include all messages), if some msg left behind, socket will re-dispatch them asynchronously
//notice: using inconstant is for the convenience of swapping
virtual bool on_msg_handle(out_queue_type& msg_can)
virtual size_t on_msg_handle(out_queue_type& msg_can)
{
out_container_type tmp_can;
msg_can.swap(tmp_can);
msg_can.swap(tmp_can); //must be thread safe, or aovid race condition from your business logic
ascs::do_something_to_all(tmp_can, [](OutMsgType& msg) {unified_out::debug_out("recv(" ASCS_SF "): %s", msg.size(), msg.data());});
return true;
return 1;
}
#else
//return true means msg been handled, false means msg cannot be handled right now, and socket will re-dispatch it asynchronously
......@@ -370,9 +370,8 @@ protected:
bool handle_msg()
{
auto changed = false;
auto size_in_byte = ascs::get_size_in_byte(temp_msg_can);
stat.recv_msg_sum += temp_msg_can.size(); //this can have linear complexity on old gcc or Cygwin and Mingw64, please note.;
stat.recv_msg_sum += temp_msg_can.size(); //this can have linear complexity in old gcc or Cygwin and Mingw64, please note.;
stat.recv_byte_sum += size_in_byte;
#ifdef ASCS_SYNC_RECV
std::unique_lock<std::mutex> lock(sync_recv_mutex);
......@@ -387,7 +386,7 @@ protected:
else if (temp_msg_can.empty())
return handled_msg(); //sync_recv_msg() has consumed temp_msg_can
else
changed = true;
size_in_byte = 0; //to re-calculate size_in_byte
}
lock.unlock();
#endif
......@@ -398,7 +397,8 @@ protected:
#endif
{
auto_duration dur(stat.handle_time_sum);
changed = on_msg(temp_msg_can) || changed;
if (on_msg(temp_msg_can) > 0)
size_in_byte = 0; //to re-calculate size_in_byte
empty = temp_msg_can.empty();
}
#elif defined(ASCS_PASSIVE_RECV)
......@@ -415,7 +415,7 @@ protected:
temp_buffer.emplace_back(std::move(*iter));
temp_msg_can.clear();
recv_msg_buffer.move_items_in(temp_buffer, changed ? 0 : size_in_byte);
recv_msg_buffer.move_items_in(temp_buffer, size_in_byte);
dispatch_msg();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册