提交 648b247d 编写于 作者: Y youngwolf 提交者: youngowlf

Send more msgs in one async_write call.

上级 0ddfbfc2
......@@ -187,7 +187,7 @@ protected:
};
//protocol: length + body
//this unpacker demonstrate how to forbid memory copying while parsing msgs (let asio write msg directly).
//this unpacker demonstrate how to forbid memory replication while parsing msgs (let asio write msg directly).
class non_copy_unpacker : public i_unpacker<basic_buffer>
{
public:
......@@ -260,10 +260,10 @@ public:
private:
ST_ASIO_HEAD_TYPE head;
//please note that we don't have a fixed size array with maximum size any more(like the default unpacker).
//this is very useful if you have very few but very large msgs, fox example:
//you have a very large msg(1M size), but all others are very small, if you use a fixed size array to hold msgs in the unpackers,
//all the unpackers must have an array with at least 1M size, each st_socket will have a unpacker, this will cause your application occupy very large memory but with very low utilization ratio.
//this unbuffered_unpacker will resolve above problem, and with another benefit: no memory replication needed any more.
//this is very useful if you have a few type of msgs which are very large, fox example: you have a type of very large msg(1M size),
//but all others are very small, if you use the default unpacker, all unpackers must have a fixed buffer with at least 1M size, each st_socket has a unpacker,
//this will cause your application to occupy very large memory but with very low utilization ratio.
//this non_copy_unpacker will resolve above problem, and with another benefit: no memory replication needed any more.
msg_type raw_buff;
int step; //-1-error format, 0-want the head, 1-want the body
};
......@@ -391,8 +391,7 @@ public:
//a return value of 0 indicates that the read operation is complete. a non-zero value indicates the maximum number
//of bytes to be read on the next call to the stream's async_read_some function. ---boost::asio::async_read
//read as many as possible to reduce asynchronous call-back(st_tcp_socket_base::recv_handler), and don't forget to handle
//stick package carefully in parse_msg function.
//read as many as possible to reduce asynchronous call-back, and don't forget to handle stick package carefully in parse_msg function.
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred)
{
if (ec)
......
......@@ -115,7 +115,7 @@ namespace st_asio_wrapper
};
//packer concept
//just provide msg_type definition, you should not call any functions of it, and send msgs directly
//just provide msg_type definition, you should not call any functions of it, but send msgs directly
template<typename MsgType>
class dummy_packer : public i_packer<MsgType>
{
......
......@@ -111,7 +111,8 @@ public:
uint_fast64_t send_msg_sum; //not counted msgs in sending buffer
uint_fast64_t send_byte_sum; //not counted msgs in sending buffer
stat_duration send_delay_sum; //from send_(native_)msg, post_(native_)msg(exclude msg packing) to asio::async_write
stat_duration send_time_sum; //from asio::async_write to send_handler, this indicate your network's speed or load
stat_duration send_time_sum; //from asio::async_write to send_handler
//above two items indicate your network's speed or load
//recv corresponding statistic
uint_fast64_t recv_msg_sum; //include msgs in receiving buffer
......@@ -186,7 +187,6 @@ protected:
recv_msg_buffer.clear();
temp_msg_buffer.clear();
last_send_msg.clear();
last_dispatch_msg.clear();
}
......@@ -418,7 +418,7 @@ protected:
if (dispatch_all)
{
#ifndef ST_ASIO_DISCARD_MSG_WHEN_LINK_DOWN
st_asio_wrapper::do_something_to_all(recv_msg_buffer, [this](OutMsgType& msg) {ST_THIS on_msg_handle(msg, true);});
st_asio_wrapper::do_something_to_all(recv_msg_buffer, [this](out_msg& msg) {ST_THIS on_msg_handle(msg, true);});
#endif
recv_msg_buffer.clear();
}
......@@ -522,7 +522,6 @@ protected:
uint_fast64_t _id;
Socket next_layer_;
in_msg last_send_msg;
out_msg last_dispatch_msg;
boost::shared_ptr<i_packer<typename Packer::msg_type>> packer_;
......
......@@ -36,10 +36,7 @@ public:
size_t valid_size()
{
size_t size = 0;
ST_THIS do_something_to_all([&size](typename Pool::object_ctype& item) {
if (item->is_connected())
++size;
});
ST_THIS do_something_to_all([&size](typename Pool::object_ctype& item) {if (item->is_connected()) ++size;});
return size;
}
......
......@@ -13,6 +13,8 @@
#ifndef ST_ASIO_WRAPPER_TCP_SOCKET_H_
#define ST_ASIO_WRAPPER_TCP_SOCKET_H_
#include <vector>
#include "st_asio_wrapper_socket.h"
#ifndef ST_ASIO_GRACEFUL_CLOSE_MAX_DURATION
......@@ -115,13 +117,31 @@ protected:
ST_THIS sending = false;
else if (!ST_THIS sending && !ST_THIS send_msg_buffer.empty())
{
ST_THIS stat.send_delay_sum += super::statistic::local_time() - ST_THIS send_msg_buffer.front().begin_time;
ST_THIS sending = true;
ST_THIS last_send_msg.swap(ST_THIS send_msg_buffer.front());
ST_THIS send_msg_buffer.pop_front();
#ifdef ST_ASIO_WANT_MSG_SEND_NOTIFY
const size_t max_send_size = 0;
#else
const size_t max_send_size = boost::asio::detail::default_max_transfer_size;
#endif
size_t size = 0;
std::vector<boost::asio::const_buffer> bufs;
auto end_time = super::statistic::local_time();
last_send_msg.clear();
for (auto iter = std::begin(ST_THIS send_msg_buffer); true;)
{
size += iter->size();
bufs.push_back(boost::asio::buffer(iter->data(), iter->size()));
ST_THIS stat.send_delay_sum += end_time - iter->begin_time;
++iter;
if (size >= max_send_size || iter == std::end(ST_THIS send_msg_buffer))
{
last_send_msg.splice(std::end(last_send_msg), ST_THIS send_msg_buffer, std::begin(ST_THIS send_msg_buffer), iter);
last_send_msg.front().restart();
break;
}
}
ST_THIS last_send_msg.restart();
boost::asio::async_write(ST_THIS next_layer(), boost::asio::buffer(ST_THIS last_send_msg.data(), ST_THIS last_send_msg.size()),
boost::asio::async_write(ST_THIS next_layer(), bufs,
ST_THIS make_handler_error_size(boost::bind(&st_tcp_socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}
......@@ -201,13 +221,11 @@ private:
{
if (!ec)
{
assert(bytes_transferred == ST_THIS last_send_msg.size());
ST_THIS stat.send_time_sum += super::statistic::local_time() - ST_THIS last_send_msg.begin_time;
ST_THIS stat.send_time_sum += super::statistic::local_time() - last_send_msg.front().begin_time;
ST_THIS stat.send_byte_sum += bytes_transferred;
++ST_THIS stat.send_msg_sum;
ST_THIS stat.send_msg_sum += last_send_msg.size();
#ifdef ST_ASIO_WANT_MSG_SEND_NOTIFY
ST_THIS on_msg_send(ST_THIS last_send_msg);
ST_THIS on_msg_send(last_send_msg.front());
#endif
}
else
......@@ -220,16 +238,17 @@ private:
if (!ec)
#ifdef ST_ASIO_WANT_ALL_MSG_SEND_NOTIFY
if (!do_send_msg())
ST_THIS on_all_msg_send(ST_THIS last_send_msg);
ST_THIS on_all_msg_send(last_send_msg.back());
#else
do_send_msg();
#endif
if (!ST_THIS sending)
ST_THIS last_send_msg.clear();
last_send_msg.clear();
}
protected:
typename super::in_container_type last_send_msg;
boost::shared_ptr<i_unpacker<out_msg_type>> unpacker_;
int close_state; //2-the first step of graceful close, 1-force close, 0-normal state
};
......
......@@ -133,14 +133,14 @@ protected:
ST_THIS sending = false;
else if (!ST_THIS sending && !ST_THIS send_msg_buffer.empty())
{
ST_THIS stat.send_delay_sum += super::statistic::local_time() - ST_THIS send_msg_buffer.front().begin_time;
ST_THIS sending = true;
ST_THIS last_send_msg.swap(ST_THIS send_msg_buffer.front());
ST_THIS stat.send_delay_sum += super::statistic::local_time() - ST_THIS send_msg_buffer.front().begin_time;
ST_THIS send_msg_buffer.front().swap(last_send_msg);
ST_THIS send_msg_buffer.pop_front();
boost::shared_lock<boost::shared_mutex> lock(close_mutex);
ST_THIS last_send_msg.restart();
ST_THIS next_layer().async_send_to(boost::asio::buffer(ST_THIS last_send_msg.data(), ST_THIS last_send_msg.size()), ST_THIS last_send_msg.peer_addr,
last_send_msg.restart();
ST_THIS next_layer().async_send_to(boost::asio::buffer(last_send_msg.data(), last_send_msg.size()), last_send_msg.peer_addr,
ST_THIS make_handler_error_size(boost::bind(&st_udp_socket_base::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)));
}
......@@ -211,13 +211,13 @@ private:
{
if (!ec)
{
assert(bytes_transferred == ST_THIS last_send_msg.size());
assert(bytes_transferred == last_send_msg.size());
ST_THIS stat.send_time_sum += super::statistic::local_time() - ST_THIS last_send_msg.begin_time;
ST_THIS stat.send_time_sum += super::statistic::local_time() - last_send_msg.begin_time;
ST_THIS stat.send_byte_sum += bytes_transferred;
++ST_THIS stat.send_msg_sum;
#ifdef ST_ASIO_WANT_MSG_SEND_NOTIFY
ST_THIS on_msg_send(ST_THIS last_send_msg);
ST_THIS on_msg_send(last_send_msg);
#endif
}
else
......@@ -231,17 +231,18 @@ private:
//for UDP in st_asio_wrapper, sending error will not stop the following sending.
#ifdef ST_ASIO_WANT_ALL_MSG_SEND_NOTIFY
if (!do_send_msg())
ST_THIS on_all_msg_send(ST_THIS last_send_msg);
ST_THIS on_all_msg_send(last_send_msg);
#else
do_send_msg();
#endif
if (!ST_THIS sending)
ST_THIS last_send_msg.clear();
last_send_msg.clear();
}
protected:
boost::shared_ptr<i_udp_unpacker<typename Unpacker::msg_type>> unpacker_;
typename super::in_msg last_send_msg;
boost::shared_ptr<i_udp_unpacker<typename Packer::msg_type>> unpacker_;
boost::asio::ip::udp::endpoint peer_addr, local_addr;
boost::shared_mutex close_mutex;
......
......@@ -45,6 +45,8 @@ public:
}
protected:
virtual void on_connect() {boost::asio::ip::tcp::no_delay option(true); lowest_layer().set_option(option); st_connector::on_connect();}
//msg handling
#ifndef ST_ASIO_FORCE_TO_USE_MSG_RECV_BUFFER
virtual bool on_msg(out_msg_type& msg) {handle_msg(msg); return true;}
......
......@@ -44,6 +44,9 @@ public:
return stat;
}
protected:
virtual bool on_accept(object_ctype& client_ptr) {boost::asio::ip::tcp::no_delay option(true); client_ptr->lowest_layer().set_option(option); return true;}
};
int main(int argc, const char* argv[])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册