提交 67e9cd9d 编写于 作者: Y youngwolf

Release 2.4.0

Add reliable UDP socket;
service_pump support multiple io_context;
Support connected UDP socket;
Enhance the reusability of st_asio_wrapper's ssl sockets.
上级 9f2e7448
...@@ -10,8 +10,9 @@ st_asio_wrapper is an asynchronous c/s framework based on Boost.Asio, besides al ...@@ -10,8 +10,9 @@ st_asio_wrapper is an asynchronous c/s framework based on Boost.Asio, besides al
5. Worker thread management;</br> 5. Worker thread management;</br>
6. Support message buffer;</br> 6. Support message buffer;</br>
7. Widely support timers;</br> 7. Widely support timers;</br>
8. Support TCP/UDP;</br> 8. Support TCP/UDP and Unix domin TCP/UDP;</br>
9. Support ssl;</br> 9. Support reliable UDP (base on KCP -- https://github.com/skywind3000/kcp);</br>
10. Support ssl;</br>
Quick start: Quick start:
- -
......
...@@ -57,7 +57,7 @@ public: ...@@ -57,7 +57,7 @@ public:
short_connection(i_matrix& matrix_) : socks4::client_socket(matrix_) {} short_connection(i_matrix& matrix_) : socks4::client_socket(matrix_) {}
protected: protected:
virtual void on_connect() {close_reconnect(); client_socket::on_connect();} //close reconnecting mechanism virtual void on_connect() {set_reconnect(false); client_socket::on_connect();} //close reconnecting mechanism
}; };
class short_client : public multi_client_base<short_connection> class short_client : public multi_client_base<short_connection>
......
...@@ -31,25 +31,41 @@ using namespace st_asio_wrapper::ext::tcp; ...@@ -31,25 +31,41 @@ using namespace st_asio_wrapper::ext::tcp;
class echo_socket : public client_socket class echo_socket : public client_socket
{ {
public: public:
echo_socket(i_matrix& matrix_) : client_socket(matrix_), msg_len(ST_ASIO_MSG_BUFFER_SIZE - ST_ASIO_HEAD_LEN) {unpacker()->stripped(false);} echo_socket(i_matrix& matrix_) : client_socket(matrix_), max_delay(1.f), msg_len(ST_ASIO_MSG_BUFFER_SIZE - ST_ASIO_HEAD_LEN) {unpacker()->stripped(false);}
void begin(float max_delay_, size_t msg_len_) {max_delay = max_delay_; msg_len = msg_len_;}
void begin(size_t msg_len_) {msg_len = msg_len_;}
void check_delay(float max_delay) {if (is_connected() && (double) last_send_time.elapsed().wall / 1000000000 > max_delay) force_shutdown();}
protected: protected:
bool check_delay(bool restart_timer)
{
boost::lock_guard<boost::mutex> lock(mutex);
if (is_connected() && (double) last_send_time.elapsed().wall / 1000000000 > max_delay)
{
force_shutdown();
return false;
}
else if (restart_timer)
{
last_send_time.stop();
last_send_time.start();
}
return true;
}
virtual void on_connect() virtual void on_connect()
{ {
boost::asio::ip::tcp::no_delay option(true); boost::asio::ip::tcp::no_delay option(true);
lowest_layer().set_option(option); lowest_layer().set_option(option);
char* buff = new char[msg_len]; char* buff = new char[msg_len];
memset(buff, 'Y', msg_len); //what should we send? memset(buff, '$', msg_len); //what should we send?
last_send_time.stop(); last_send_time.stop();
last_send_time.start(); last_send_time.start();
send_msg(buff, msg_len, true); send_msg(buff, msg_len, true);
delete[] buff; delete[] buff;
set_timer(TIMER_END, 5000, boost::lambda::if_then_else_return(boost::lambda::bind(&echo_socket::check_delay, this, false), true, false));
client_socket::on_connect(); client_socket::on_connect();
} }
...@@ -80,30 +96,21 @@ protected: ...@@ -80,30 +96,21 @@ protected:
//msg handling end //msg handling end
private: private:
void handle_msg(out_msg_type& msg) void handle_msg(out_msg_type& msg) {if (check_delay(true)) direct_send_msg(msg, true);}
{
last_send_time.stop();
last_send_time.start();
direct_send_msg(msg, true);
}
private: private:
float max_delay;
size_t msg_len; size_t msg_len;
boost::timer::cpu_timer last_send_time; boost::timer::cpu_timer last_send_time;
boost::mutex mutex;
}; };
class echo_client : public multi_client_base<echo_socket> class echo_client : public multi_client_base<echo_socket>
{ {
public: public:
echo_client(service_pump& service_pump_) : multi_client_base<echo_socket>(service_pump_) {} echo_client(service_pump& service_pump_) : multi_client_base<echo_socket>(service_pump_) {}
void begin(float max_delay, size_t msg_len) {do_something_to_all(boost::bind(&echo_socket::begin, boost::placeholders::_1, max_delay, msg_len));}
void begin(float max_delay, size_t msg_len)
{
do_something_to_all(boost::bind(&echo_socket::begin, boost::placeholders::_1, msg_len));
set_timer(TIMER_END, 5000, (boost::lambda::bind(&echo_client::check_delay, this, max_delay), true));
}
void check_delay(float max_delay) {do_something_to_all(boost::bind(&echo_socket::check_delay, boost::placeholders::_1, max_delay));}
}; };
int main(int argc, const char* argv[]) int main(int argc, const char* argv[])
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
//#define ST_ASIO_USE_STEADY_TIMER //#define ST_ASIO_USE_STEADY_TIMER
#define ST_ASIO_USE_SYSTEM_TIMER #define ST_ASIO_USE_SYSTEM_TIMER
#define ST_ASIO_AVOID_AUTO_STOP_SERVICE #define ST_ASIO_AVOID_AUTO_STOP_SERVICE
#define ST_ASIO_DECREASE_THREAD_AT_RUNTIME //#define ST_ASIO_DECREASE_THREAD_AT_RUNTIME
//#define ST_ASIO_MAX_SEND_BUF 65536 //#define ST_ASIO_MAX_SEND_BUF 65536
//#define ST_ASIO_MAX_RECV_BUF 65536 //#define ST_ASIO_MAX_RECV_BUF 65536
//if there's a huge number of links, please reduce messge buffer via ST_ASIO_MAX_SEND_BUF and ST_ASIO_MAX_RECV_BUF macro. //if there's a huge number of links, please reduce messge buffer via ST_ASIO_MAX_SEND_BUF and ST_ASIO_MAX_RECV_BUF macro.
...@@ -165,6 +165,16 @@ protected: ...@@ -165,6 +165,16 @@ protected:
} }
#endif #endif
//demonstrate strict reference balance between multiple io_context.
virtual bool change_io_context()
{
if (NULL == get_matrix())
return false;
reset_next_layer(get_matrix()->get_service_pump().assign_io_context());
return true;
}
private: private:
void handle_msg(out_msg_ctype& msg) void handle_msg(out_msg_ctype& msg)
{ {
...@@ -406,7 +416,7 @@ void start_test(int repeat_times, char mode, echo_client& client, size_t send_th ...@@ -406,7 +416,7 @@ void start_test(int repeat_times, char mode, echo_client& client, size_t send_th
int main(int argc, const char* argv[]) int main(int argc, const char* argv[])
{ {
printf("usage: %s [<service thread number=1> [<send thread number=8> [<port=%d> [<ip=%s> [link num=16]]]]]\n", argv[0], ST_ASIO_SERVER_PORT, ST_ASIO_SERVER_IP); printf("usage: %s [<service thread number=4> [<send thread number=8> [<port=%d> [<ip=%s> [link num=16]]]]]\n", argv[0], ST_ASIO_SERVER_PORT, ST_ASIO_SERVER_IP);
if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h"))) if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
return 0; return 0;
else else
...@@ -421,6 +431,12 @@ int main(int argc, const char* argv[]) ...@@ -421,6 +431,12 @@ int main(int argc, const char* argv[])
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
service_pump sp; service_pump sp;
#ifndef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ST_ASIO_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(4);
#endif
echo_client client(sp); echo_client client(sp);
//echo client means to cooperate with echo server while doing performance test, it will not send msgs back as echo server does, //echo client means to cooperate with echo server while doing performance test, it will not send msgs back as echo server does,
//otherwise, dead loop will occur, network resource will be exhausted. //otherwise, dead loop will occur, network resource will be exhausted.
...@@ -457,7 +473,7 @@ int main(int argc, const char* argv[]) ...@@ -457,7 +473,7 @@ int main(int argc, const char* argv[])
//or just add up total message size), under this scenario, just one service thread without receiving buffer will obtain the best IO throughput. //or just add up total message size), under this scenario, just one service thread without receiving buffer will obtain the best IO throughput.
//the server has such behavior too. //the server has such behavior too.
sp.start_service(thread_num); sp.start_service(std::max(thread_num, sp.get_io_context_num()));
while(sp.is_running()) while(sp.is_running())
{ {
std::string str; std::string str;
...@@ -478,8 +494,10 @@ int main(int argc, const char* argv[]) ...@@ -478,8 +494,10 @@ int main(int argc, const char* argv[])
client.list_all_object(); client.list_all_object();
else if (INCREASE_THREAD == str) else if (INCREASE_THREAD == str)
sp.add_service_thread(1); sp.add_service_thread(1);
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
else if (DECREASE_THREAD == str) else if (DECREASE_THREAD == str)
sp.del_service_thread(1); sp.del_service_thread(1);
#endif
else if (is_testing) else if (is_testing)
puts("testing has not finished yet!"); puts("testing has not finished yet!");
else if (QUIT_COMMAND == str) else if (QUIT_COMMAND == str)
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
//#define ST_ASIO_USE_SYSTEM_TIMER //#define ST_ASIO_USE_SYSTEM_TIMER
#define ST_ASIO_ALIGNED_TIMER #define ST_ASIO_ALIGNED_TIMER
#define ST_ASIO_AVOID_AUTO_STOP_SERVICE #define ST_ASIO_AVOID_AUTO_STOP_SERVICE
#define ST_ASIO_DECREASE_THREAD_AT_RUNTIME //#define ST_ASIO_DECREASE_THREAD_AT_RUNTIME
//#define ST_ASIO_MAX_SEND_BUF 65536 //#define ST_ASIO_MAX_SEND_BUF 65536
//#define ST_ASIO_MAX_RECV_BUF 65536 //#define ST_ASIO_MAX_RECV_BUF 65536
//if there's a huge number of links, please reduce messge buffer via ST_ASIO_MAX_SEND_BUF and ST_ASIO_MAX_RECV_BUF macro. //if there's a huge number of links, please reduce messge buffer via ST_ASIO_MAX_SEND_BUF and ST_ASIO_MAX_RECV_BUF macro.
...@@ -153,13 +153,15 @@ protected: ...@@ -153,13 +153,15 @@ protected:
virtual bool on_msg_handle(out_msg_type& msg) {return send_msg(msg);} virtual bool on_msg_handle(out_msg_type& msg) {return send_msg(msg);}
#endif #endif
//msg handling end //msg handling end
//demonstrate strict reference balance between multiple io_context.
virtual bool change_io_context() {reset_next_layer(get_server().get_service_pump().assign_io_context()); return true;}
}; };
typedef server_base<echo_socket, object_pool<echo_socket>, i_echo_server> echo_server_base; class echo_server : public server2<echo_socket, i_echo_server>
class echo_server : public echo_server_base
{ {
public: public:
echo_server(service_pump& service_pump_) : echo_server_base(service_pump_) {} echo_server(service_pump& service_pump_) : server2<echo_socket, i_echo_server>(service_pump_) {}
protected: protected:
//from i_echo_server, pure virtual function, we must implement it. //from i_echo_server, pure virtual function, we must implement it.
...@@ -243,7 +245,7 @@ private: ...@@ -243,7 +245,7 @@ private:
*/ */
int main(int argc, const char* argv[]) int main(int argc, const char* argv[])
{ {
printf("usage: %s [<service thread number=1> [<port=%d> [ip=0.0.0.0]]]\n", argv[0], ST_ASIO_SERVER_PORT); printf("usage: %s [<service thread number=4> [<port=%d> [ip=0.0.0.0]]]\n", argv[0], ST_ASIO_SERVER_PORT);
puts("normal server's port will be 100 larger."); puts("normal server's port will be 100 larger.");
if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h"))) if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
return 0; return 0;
...@@ -259,6 +261,12 @@ int main(int argc, const char* argv[]) ...@@ -259,6 +261,12 @@ int main(int argc, const char* argv[])
context.run(); context.run();
*/ */
service_pump sp; service_pump sp;
#ifndef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ST_ASIO_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(4);
#endif
echo_server echo_server_(sp); //echo server echo_server echo_server_(sp); //echo server
//demonstrate how to use singel_service //demonstrate how to use singel_service
...@@ -286,7 +294,7 @@ int main(int argc, const char* argv[]) ...@@ -286,7 +294,7 @@ int main(int argc, const char* argv[])
global_packer->prefix_suffix("begin", "end"); global_packer->prefix_suffix("begin", "end");
#endif #endif
sp.start_service(thread_num); sp.start_service(std::max(thread_num, sp.get_io_context_num()));
normal_server_.start_service(1); normal_server_.start_service(1);
short_server.start_service(1); short_server.start_service(1);
while(sp.is_running()) while(sp.is_running())
...@@ -329,8 +337,10 @@ int main(int argc, const char* argv[]) ...@@ -329,8 +337,10 @@ int main(int argc, const char* argv[])
} }
else if (INCREASE_THREAD == str) else if (INCREASE_THREAD == str)
sp.add_service_thread(1); sp.add_service_thread(1);
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
else if (DECREASE_THREAD == str) else if (DECREASE_THREAD == str)
sp.del_service_thread(1); sp.del_service_thread(1);
#endif
else else
{ {
// /* // /*
......
...@@ -29,7 +29,7 @@ atomic<boost::int_fast64_t> received_size; ...@@ -29,7 +29,7 @@ atomic<boost::int_fast64_t> received_size;
int main(int argc, const char* argv[]) int main(int argc, const char* argv[])
{ {
puts("this is a file transfer client."); puts("this is a file transmission client.");
printf("usage: %s [<port=%d> [<ip=%s> [link num=1]]]\n", argv[0], ST_ASIO_SERVER_PORT, ST_ASIO_SERVER_IP); printf("usage: %s [<port=%d> [<ip=%s> [link num=1]]]\n", argv[0], ST_ASIO_SERVER_PORT, ST_ASIO_SERVER_IP);
if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h"))) if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
return 0; return 0;
...@@ -37,6 +37,12 @@ int main(int argc, const char* argv[]) ...@@ -37,6 +37,12 @@ int main(int argc, const char* argv[])
puts("type " QUIT_COMMAND " to end."); puts("type " QUIT_COMMAND " to end.");
service_pump sp; service_pump sp;
#ifndef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ST_ASIO_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(4);
#endif
file_client client(sp); file_client client(sp);
if (argc > 3) if (argc > 3)
......
...@@ -253,7 +253,7 @@ private: ...@@ -253,7 +253,7 @@ private:
file_size = -1; file_size = -1;
received_size = 0; received_size = 0;
printf("transfer %s begin.\n", file_name.data()); printf("transmit %s begin.\n", file_name.data());
if (find(0)->get_file(file_name)) if (find(0)->get_file(file_name))
{ {
do_something_to_all(boost::lambda::if_then(0U != boost::lambda::bind((boost::uint_fast64_t (file_socket::*)() const) &file_socket::id, *boost::lambda::_1), do_something_to_all(boost::lambda::if_then(0U != boost::lambda::bind((boost::uint_fast64_t (file_socket::*)() const) &file_socket::id, *boost::lambda::_1),
...@@ -264,7 +264,7 @@ private: ...@@ -264,7 +264,7 @@ private:
break; break;
} }
else else
printf("transfer %s failed!\n", file_name.data()); printf("transmit %s failed!\n", file_name.data());
} }
} }
......
...@@ -5,9 +5,9 @@ ...@@ -5,9 +5,9 @@
#define ST_ASIO_DEFAULT_PACKER packer2<> #define ST_ASIO_DEFAULT_PACKER packer2<>
//#define ST_ASIO_RECV_BUFFER_TYPE std::vector<boost::asio::mutable_buffer> //scatter-gather buffer, it's very useful under certain situations (for example, ring buffer). //#define ST_ASIO_RECV_BUFFER_TYPE std::vector<boost::asio::mutable_buffer> //scatter-gather buffer, it's very useful under certain situations (for example, ring buffer).
//#define ST_ASIO_SCATTERED_RECV_BUFFER //used by unpackers, not belongs to st_asio_wrapper //#define ST_ASIO_SCATTERED_RECV_BUFFER //used by unpackers, not belongs to st_asio_wrapper
//note, these two macro are not requisite, i'm just showing how to use them. //note, these two macro are not requisite, I'm just showing how to use them.
//all other definitions are in the makefile, because we have two cpp files, defining them in more than one place is riskful ( //all other definitions are in the makefile, because we have two cpp files, defining them in more than one place is risky (
// we may define them to different values between the two cpp files) // we may define them to different values between the two cpp files)
//configuration //configuration
...@@ -19,22 +19,63 @@ ...@@ -19,22 +19,63 @@
#define STATISTIC "statistic" #define STATISTIC "statistic"
#define LIST_ALL_CLIENT "list all client" #define LIST_ALL_CLIENT "list all client"
#if !defined(_MSC_VER) && !defined(__MINGW64__) && !defined(__MINGW32__)
void signal_handler(service_pump& sp, boost::asio::signal_set& signal_receiver, const boost::system::error_code& ec, int signal_number)
{
if (!ec)
return sp.end_service();
signal_receiver.async_wait(boost::bind(&signal_handler, boost::ref(sp), boost::ref(signal_receiver), boost::placeholders::_1, boost::placeholders::_2));
}
#endif
int main(int argc, const char* argv[]) int main(int argc, const char* argv[])
{ {
puts("this is a file transfer server."); puts("this is a file transmission server.");
#if defined(_MSC_VER) || defined(__MINGW64__) || defined(__MINGW32__)
printf("usage: %s [<port=%d> [ip=0.0.0.0]]\n", argv[0], ST_ASIO_SERVER_PORT); printf("usage: %s [<port=%d> [ip=0.0.0.0]]\n", argv[0], ST_ASIO_SERVER_PORT);
#else
printf("usage: %s [-d] [<port=%d> [ip=0.0.0.0]]\n", argv[0], ST_ASIO_SERVER_PORT);
#endif
if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h"))) if (argc >= 2 && (0 == strcmp(argv[1], "--help") || 0 == strcmp(argv[1], "-h")))
return 0; return 0;
else else
puts("type " QUIT_COMMAND " to end."); puts("type " QUIT_COMMAND " to end.");
int index = 0;
if (argc >= 2 && 0 == strcmp(argv[1], "-d"))
{
#if defined(_MSC_VER) || defined(__MINGW64__) || defined(__MINGW32__)
puts("on windows, -d is not supported!");
return 1;
#endif
index = 1;
}
service_pump sp; service_pump sp;
#ifndef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
//if you want to decrease service thread at runtime, then you cannot use multiple io_context, if somebody indeed needs it, please let me know.
//with multiple io_context, the number of service thread must be bigger than or equal to the number of io_context, please note.
//with multiple io_context, please also define macro ST_ASIO_AVOID_AUTO_STOP_SERVICE.
sp.set_io_context_num(8);
#endif
server_base<file_socket> file_server_(sp); server_base<file_socket> file_server_(sp);
if (argc > 2) if (argc > 2 + index)
file_server_.set_server_addr(atoi(argv[1]), argv[2]); file_server_.set_server_addr(atoi(argv[1 + index]), argv[2 + index]);
else if (argc > 1) else if (argc > 1 + index)
file_server_.set_server_addr(atoi(argv[1])); file_server_.set_server_addr(atoi(argv[1 + index]));
#if !defined(_MSC_VER) && !defined(__MINGW64__) && !defined(__MINGW32__)
if (1 == index)
{
boost::asio::signal_set signal_receiver(sp, SIGINT, SIGTERM);
signal_receiver.async_wait(boost::bind(&signal_handler, boost::ref(sp), boost::ref(signal_receiver), boost::placeholders::_1, boost::placeholders::_2));
sp.run_service();
return 0;
}
#endif
sp.start_service(); sp.start_service();
while(sp.is_running()) while(sp.is_running())
......
...@@ -3,9 +3,9 @@ ...@@ -3,9 +3,9 @@
#define ST_ASIO_DEFAULT_PACKER packer2<> #define ST_ASIO_DEFAULT_PACKER packer2<>
//#define ST_ASIO_RECV_BUFFER_TYPE std::vector<boost::asio::mutable_buffer> //scatter-gather buffer, it's very useful under certain situations (for example, ring buffer). //#define ST_ASIO_RECV_BUFFER_TYPE std::vector<boost::asio::mutable_buffer> //scatter-gather buffer, it's very useful under certain situations (for example, ring buffer).
//#define ST_ASIO_SCATTERED_RECV_BUFFER //used by unpackers, not belongs to st_asio_wrapper //#define ST_ASIO_SCATTERED_RECV_BUFFER //used by unpackers, not belongs to st_asio_wrapper
//note, these two macro are not requisite, i'm just showing how to use them. //note, these two macro are not requisite, I'm just showing how to use them.
//all other definitions are in the makefile, because we have two cpp files, defining them in more than one place is riskful ( //all other definitions are in the makefile, because we have two cpp files, defining them in more than one place is risky (
// we may define them to different values between the two cpp files) // we may define them to different values between the two cpp files)
//configuration //configuration
......
...@@ -260,6 +260,7 @@ public: ...@@ -260,6 +260,7 @@ public:
virtual void dump_left_data() const {} virtual void dump_left_data() const {}
//heartbeat must not be included in msg_can, otherwise you must handle heartbeat at where you handle normal messages. //heartbeat must not be included in msg_can, otherwise you must handle heartbeat at where you handle normal messages.
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) = 0; virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) = 0;
virtual void compose_msg(const char* data, size_t size, container_type& msg_can) {} //reliable UDP socket needs this
virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return 0;} virtual size_t completion_condition(const boost::system::error_code& ec, size_t bytes_transferred) {return 0;}
virtual buffer_type prepare_next_recv() = 0; virtual buffer_type prepare_next_recv() = 0;
...@@ -579,20 +580,20 @@ template<typename T> struct obj_with_begin_time_promise : public obj_with_begin_ ...@@ -579,20 +580,20 @@ template<typename T> struct obj_with_begin_time_promise : public obj_with_begin_
#endif #endif
//free functions, used to do something to any container(except map and multimap) optionally with any mutex //free functions, used to do something to any container(except map and multimap) optionally with any mutex
template<typename _Can, typename _Mutex, typename _Predicate> template<typename _Can, typename _Mutex, typename _Predicate, template<typename> class LockType>
void do_something_to_all(_Can& __can, _Mutex& __mutex, const _Predicate& __pred) void do_something_to_all(_Can& __can, _Mutex& __mutex, const _Predicate& __pred)
{ {
boost::lock_guard<boost::mutex> lock(__mutex); LockType<_Mutex> lock(__mutex);
for (BOOST_AUTO(iter, __can.begin()); iter != __can.end(); ++iter) __pred(*iter); for (BOOST_AUTO(iter, __can.begin()); iter != __can.end(); ++iter) __pred(*iter);
} }
template<typename _Can, typename _Predicate> template<typename _Can, typename _Predicate>
void do_something_to_all(_Can& __can, const _Predicate& __pred) {for (BOOST_AUTO(iter, __can.begin()); iter != __can.end(); ++iter) __pred(*iter);} void do_something_to_all(_Can& __can, const _Predicate& __pred) {for (BOOST_AUTO(iter, __can.begin()); iter != __can.end(); ++iter) __pred(*iter);}
template<typename _Can, typename _Mutex, typename _Predicate> template<typename _Can, typename _Mutex, typename _Predicate, template<typename> class LockType>
void do_something_to_one(_Can& __can, _Mutex& __mutex, const _Predicate& __pred) void do_something_to_one(_Can& __can, _Mutex& __mutex, const _Predicate& __pred)
{ {
boost::lock_guard<boost::mutex> lock(__mutex); LockType<_Mutex> lock(__mutex);
for (BOOST_AUTO(iter, __can.begin()); iter != __can.end(); ++iter) if (__pred(*iter)) break; for (BOOST_AUTO(iter, __can.begin()); iter != __can.end(); ++iter) if (__pred(*iter)) break;
} }
...@@ -608,22 +609,22 @@ size_t get_size_in_byte(const _Can& __can) ...@@ -608,22 +609,22 @@ size_t get_size_in_byte(const _Can& __can)
} }
//member functions, used to do something to any member container(except map and multimap) optionally with any member mutex //member functions, used to do something to any member container(except map and multimap) optionally with any member mutex
#define DO_SOMETHING_TO_ALL_MUTEX(CAN, MUTEX) DO_SOMETHING_TO_ALL_MUTEX_NAME(do_something_to_all, CAN, MUTEX) #define DO_SOMETHING_TO_ALL_MUTEX(CAN, MUTEX, LOCKTYPE) DO_SOMETHING_TO_ALL_MUTEX_NAME(do_something_to_all, CAN, MUTEX, LOCKTYPE)
#define DO_SOMETHING_TO_ALL(CAN) DO_SOMETHING_TO_ALL_NAME(do_something_to_all, CAN) #define DO_SOMETHING_TO_ALL(CAN) DO_SOMETHING_TO_ALL_NAME(do_something_to_all, CAN)
#define DO_SOMETHING_TO_ALL_MUTEX_NAME(NAME, CAN, MUTEX) \ #define DO_SOMETHING_TO_ALL_MUTEX_NAME(NAME, CAN, MUTEX, LOCKTYPE) \
template<typename _Predicate> void NAME(const _Predicate& __pred) {boost::lock_guard<boost::mutex> lock(MUTEX); for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) __pred(*iter);} template<typename _Predicate> void NAME(const _Predicate& __pred) {LOCKTYPE lock(MUTEX); for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) __pred(*iter);}
#define DO_SOMETHING_TO_ALL_NAME(NAME, CAN) \ #define DO_SOMETHING_TO_ALL_NAME(NAME, CAN) \
template<typename _Predicate> void NAME(const _Predicate& __pred) {for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) __pred(*iter);} \ template<typename _Predicate> void NAME(const _Predicate& __pred) {for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) __pred(*iter);} \
template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) __pred(*iter);} template<typename _Predicate> void NAME(const _Predicate& __pred) const {for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) __pred(*iter);}
#define DO_SOMETHING_TO_ONE_MUTEX(CAN, MUTEX) DO_SOMETHING_TO_ONE_MUTEX_NAME(do_something_to_one, CAN, MUTEX) #define DO_SOMETHING_TO_ONE_MUTEX(CAN, MUTEX, LOCKTYPE) DO_SOMETHING_TO_ONE_MUTEX_NAME(do_something_to_one, CAN, MUTEX, LOCKTYPE)
#define DO_SOMETHING_TO_ONE(CAN) DO_SOMETHING_TO_ONE_NAME(do_something_to_one, CAN) #define DO_SOMETHING_TO_ONE(CAN) DO_SOMETHING_TO_ONE_NAME(do_something_to_one, CAN)
#define DO_SOMETHING_TO_ONE_MUTEX_NAME(NAME, CAN, MUTEX) \ #define DO_SOMETHING_TO_ONE_MUTEX_NAME(NAME, CAN, MUTEX, LOCKTYPE) \
template<typename _Predicate> void NAME(const _Predicate& __pred) \ template<typename _Predicate> void NAME(const _Predicate& __pred) \
{boost::lock_guard<boost::mutex> lock(MUTEX); for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) if (__pred(*iter)) break;} {LOCKTYPE lock(MUTEX); for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) if (__pred(*iter)) break;}
#define DO_SOMETHING_TO_ONE_NAME(NAME, CAN) \ #define DO_SOMETHING_TO_ONE_NAME(NAME, CAN) \
template<typename _Predicate> void NAME(const _Predicate& __pred) {for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) if (__pred(*iter)) break;} \ template<typename _Predicate> void NAME(const _Predicate& __pred) {for (BOOST_AUTO(iter, CAN.begin()); iter != CAN.end(); ++iter) if (__pred(*iter)) break;} \
......
...@@ -520,13 +520,13 @@ ...@@ -520,13 +520,13 @@
* HIGHLIGHT: * HIGHLIGHT:
* *
* FIX: * FIX:
* If give up connecting (prepare_reconnect returns -1 or call close_reconnect), st_asio_wrapper::socket::started() still returns true (should be false). * If give up connecting (prepare_reconnect returns -1 or call set_reconnect(false)), st_asio_wrapper::socket::started() still returns true (should be false).
* *
* ENHANCEMENTS: * ENHANCEMENTS:
* Expose server_base's acceptor via next_layer(). * Expose server_base's acceptor via next_layer().
* Prefix suffix packer and unpacker support heartbeat. * Prefix suffix packer and unpacker support heartbeat.
* New demo socket_management demonstrates how to manage sockets if you use other keys rather than the original id. * New demo socket_management demonstrates how to manage sockets if you use other keys rather than the original id.
* Control reconnecting more flexibly, see function client_socket_base::open_reconnect and client_socket_base::close_reconnect for more details. * Control reconnecting more flexibly, see function client_socket_base::set_reconnect and client_socket_base::is_reconnect for more details.
* client_socket_base support binding to a specific local address. * client_socket_base support binding to a specific local address.
* *
* DELETION: * DELETION:
...@@ -806,6 +806,40 @@ ...@@ -806,6 +806,40 @@
* *
* REPLACEMENTS: * REPLACEMENTS:
* *
* ===============================================================
* 2021.9.18 version 2.4.0
*
* SPECIAL ATTENTION (incompatible with old editions):
* client_socket's function open_reconnect and close_reconnect have been replaced by function set_reconnect(bool).
*
* HIGHLIGHT:
* service_pump support multiple io_context, just needs the number of service thread to be bigger than or equal to the number of io_context.
* Introduce virtual function change_io_context() to st_asio_wrapper::socket to balance the reference of multiple io_context strictly,
* this is because after a socket been reused, its next_layer still based on previous io_context, this may break the reference balance of
* multiple io_context, re-write this virtual function to re-create the next_layer base on the io_context which has the least references.
* ssl's server_socket_base and client_socket_base already did this, please note.
* Support reliable UDP (based on KCP -- https://github.com/skywind3000/kcp.git), thus introduce new macro ST_ASIO_RELIABLE_UDP_NSND_QUE to
* specify the default value of the max size of ikcpcb::nsnd_que (congestion control).
* Support connected UDP socket, set macro ST_ASIO_UDP_CONNECT_MODE to true to open it, you must also provide peer's ip address via set_peer_addr,
* function set_connect_mode can open it too (before start_service). For connected UDP socket, the peer_addr parameter in send_msg (series)
* will be ignored, please note.
*
* FIX:
* single_service_pump support ssl single_client(_base).
*
* ENHANCEMENTS:
* Enhance the reusability of st_asio_wrapper's ssl sockets, now they can be reused (include reconnecting) just as normal socket.
* Suppress error logs for empty heartbeat (suppose you want to stop heartbeat but keep heartbeat checking).
*
* DELETION:
* Delete macro ST_ASIO_REUSE_SSL_STREAM, now st_asio_wrapper's ssl sockets can be reused just as normal socket.
*
* REFACTORING:
* Re-implement the reusability (object reuse and reconnecting) of st_asio_wrapper's ssl sockets.
*
* REPLACEMENTS:
* client_socket's function open_reconnect and close_reconnect have been replaced by function set_reconnect(bool).
*
*/ */
#ifndef ST_ASIO_CONFIG_H_ #ifndef ST_ASIO_CONFIG_H_
...@@ -815,8 +849,8 @@ ...@@ -815,8 +849,8 @@
# pragma once # pragma once
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
#define ST_ASIO_VER 20302 //[x]xyyzz -> [x]x.[y]y.[z]z #define ST_ASIO_VER 20400 //[x]xyyzz -> [x]x.[y]y.[z]z
#define ST_ASIO_VERSION "2.3.2" #define ST_ASIO_VERSION "2.4.0"
//#define ST_ASIO_HIDE_WARNINGS //#define ST_ASIO_HIDE_WARNINGS
...@@ -896,10 +930,16 @@ ...@@ -896,10 +930,16 @@
#endif #endif
#if BOOST_ASIO_VERSION < 101100 #if BOOST_ASIO_VERSION < 101100
namespace boost {namespace asio {typedef io_service io_context;}} namespace boost {namespace asio {typedef io_service io_context; typedef io_context execution_context;}}
#define make_strand_handler(S, F) S.wrap(F) #define make_strand_handler(S, F) S.wrap(F)
#elif BOOST_ASIO_VERSION == 101100
namespace boost {namespace asio {typedef io_service io_context;}}
#define make_strand_handler(S, F) boost::asio::wrap(S, F)
#elif BOOST_ASIO_VERSION < 101700
namespace boost {namespace asio {typedef executor any_io_executor;}}
#define make_strand_handler(S, F) boost::asio::bind_executor(S, F)
#else #else
#define make_strand_handler(S, F) boost::asio::bind_executor(S, F) #define make_strand_handler(S, F) boost::asio::bind_executor(S, F)
#endif #endif
//boost and compiler check //boost and compiler check
...@@ -1098,6 +1138,17 @@ namespace boost {namespace asio {typedef io_service io_context;}} ...@@ -1098,6 +1138,17 @@ namespace boost {namespace asio {typedef io_service io_context;}}
#define ST_ASIO_UDP_DEFAULT_IP_VERSION boost::asio::ip::udp::v4() #define ST_ASIO_UDP_DEFAULT_IP_VERSION boost::asio::ip::udp::v4()
#endif #endif
#ifndef ST_ASIO_UDP_CONNECT_MODE
#define ST_ASIO_UDP_CONNECT_MODE false
#endif
//max value that ikcpcb::nsnd_que can get to, then st_asio_wrapper will suspend message sending (congestion control).
#ifndef ST_ASIO_RELIABLE_UDP_NSND_QUE
#define ST_ASIO_RELIABLE_UDP_NSND_QUE 1024
#elif ST_ASIO_RELIABLE_UDP_NSND_QUE < 0
#error kcp send queue must be bigger than or equal to zero.
#endif
//close port reuse //close port reuse
//#define ST_ASIO_NOT_REUSE_ADDRESS //#define ST_ASIO_NOT_REUSE_ADDRESS
...@@ -1119,7 +1170,7 @@ namespace boost {namespace asio {typedef io_service io_context;}} ...@@ -1119,7 +1170,7 @@ namespace boost {namespace asio {typedef io_service io_context;}}
//buffer type used when receiving messages (unpacker's prepare_next_recv() need to return this type) //buffer type used when receiving messages (unpacker's prepare_next_recv() need to return this type)
#ifndef ST_ASIO_RECV_BUFFER_TYPE #ifndef ST_ASIO_RECV_BUFFER_TYPE
#if BOOST_ASIO_VERSION >= 101100 #if BOOST_ASIO_VERSION > 101100
#define ST_ASIO_RECV_BUFFER_TYPE boost::asio::mutable_buffer #define ST_ASIO_RECV_BUFFER_TYPE boost::asio::mutable_buffer
#else #else
#define ST_ASIO_RECV_BUFFER_TYPE boost::asio::mutable_buffers_1 #define ST_ASIO_RECV_BUFFER_TYPE boost::asio::mutable_buffers_1
...@@ -1145,12 +1196,6 @@ namespace boost {namespace asio {typedef io_service io_context;}} ...@@ -1145,12 +1196,6 @@ namespace boost {namespace asio {typedef io_service io_context;}}
//#define ST_ASIO_ALWAYS_SEND_HEARTBEAT //#define ST_ASIO_ALWAYS_SEND_HEARTBEAT
//always send heartbeat in each ST_ASIO_HEARTBEAT_INTERVAL seconds without checking if we're sending other messages or not. //always send heartbeat in each ST_ASIO_HEARTBEAT_INTERVAL seconds without checking if we're sending other messages or not.
//#define ST_ASIO_REUSE_SSL_STREAM
//if you need ssl::client_socket_base to be able to reconnect the server, or to open object pool in ssl::object_pool, you must define this macro.
//I tried many ways, only one way can make boost::asio::ssl::stream reusable, which is:
// don't call any shutdown functions of boost::asio::ssl::stream, just call boost::asio::ip::tcp::socket's shutdown function,
// this seems not a normal procedure, but it works, I believe that asio's defect caused this problem.
//#define ST_ASIO_AVOID_AUTO_STOP_SERVICE //#define ST_ASIO_AVOID_AUTO_STOP_SERVICE
//wrap service_pump with boost::asio::io_service::work (boost::asio::executor_work_guard), then it will never run out until you explicitly call stop_service(). //wrap service_pump with boost::asio::io_service::work (boost::asio::executor_work_guard), then it will never run out until you explicitly call stop_service().
...@@ -1182,6 +1227,11 @@ namespace boost {namespace asio {typedef io_service io_context;}} ...@@ -1182,6 +1227,11 @@ namespace boost {namespace asio {typedef io_service io_context;}}
//during message sending, calling send_msg() will fail, this is by design to avoid boost::asio::io_context using up all virtual memory, this also //during message sending, calling send_msg() will fail, this is by design to avoid boost::asio::io_context using up all virtual memory, this also
// means that before the sending really started, you can greedily call send_msg() and may exhaust all virtual memory, please note. // means that before the sending really started, you can greedily call send_msg() and may exhaust all virtual memory, please note.
//#define ST_ASIO_ARBITRARY_SEND
//dispatch an async do_send_msg invocation for each message, this feature brings 2 behaviors:
// 1. it can also fix the situation i described for macro ST_ASIO_EXPOSE_SEND_INTERFACE,
// 2. it brings better effeciency for specific ENV, try to find them by you own.
//#define ST_ASIO_PASSIVE_RECV //#define ST_ASIO_PASSIVE_RECV
//to gain the ability of changing the unpacker at runtime, with this macro, st_asio_wrapper will not do message receiving automatically (except //to gain the ability of changing the unpacker at runtime, with this macro, st_asio_wrapper will not do message receiving automatically (except
// the first one, if macro ST_ASIO_SYNC_RECV been defined, the first one will be omitted too), so you need to manually call recv_msg(), // the first one, if macro ST_ASIO_SYNC_RECV been defined, the first one will be omitted too), so you need to manually call recv_msg(),
......
/*
* reliable_udp.h
*
* Created on: 2021-9-3
* Author: youngwolf
* email: mail2tao@163.com
* QQ: 676218192
* Community on QQ: 198941541
*
* reliable udp related conveniences.
*/
#ifndef ST_ASIO_EXT_RELIABLE_UDP_H_
#define ST_ASIO_EXT_RELIABLE_UDP_H_
#include "packer.h"
#include "unpacker.h"
#include "../udp/reliable_socket.h"
#include "../udp/socket_service.h"
#include "../single_service_pump.h"
#ifndef ST_ASIO_DEFAULT_PACKER
#define ST_ASIO_DEFAULT_PACKER st_asio_wrapper::ext::packer<>
#endif
#ifndef ST_ASIO_DEFAULT_UDP_UNPACKER
#define ST_ASIO_DEFAULT_UDP_UNPACKER st_asio_wrapper::ext::udp_unpacker
#endif
namespace st_asio_wrapper { namespace ext { namespace udp {
typedef st_asio_wrapper::udp::reliable_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER> reliable_socket;
typedef st_asio_wrapper::udp::single_socket_service_base<reliable_socket> single_reliable_socket_service;
typedef st_asio_wrapper::udp::multi_socket_service_base<reliable_socket> multi_reliable_socket_service;
}}} //namespace
#endif /* ST_ASIO_EXT_RELIABLE_UDP_H_ */
...@@ -29,9 +29,28 @@ ...@@ -29,9 +29,28 @@
namespace st_asio_wrapper { namespace ext { namespace ssl { namespace st_asio_wrapper { namespace ext { namespace ssl {
typedef st_asio_wrapper::ssl::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> client_socket; typedef st_asio_wrapper::ssl::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> client_socket;
template<typename Matrix = i_matrix>
class client_socket2 : public st_asio_wrapper::ssl::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix>
{
private:
typedef st_asio_wrapper::ssl::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix> super;
public:
client_socket2(boost::asio::io_context& io_context_, boost::asio::ssl::context& ctx_) : super(io_context_, ctx_) {}
client_socket2(Matrix& matrix_, boost::asio::ssl::context& ctx_) : super(matrix_, ctx_) {}
};
typedef client_socket connector; typedef client_socket connector;
typedef st_asio_wrapper::ssl::single_client_base<client_socket> single_client; typedef st_asio_wrapper::ssl::single_client_base<client_socket> single_client;
typedef st_asio_wrapper::ssl::multi_client_base<client_socket> multi_client; typedef st_asio_wrapper::ssl::multi_client_base<client_socket> multi_client;
template<typename Socket, typename Matrix = i_matrix>
class multi_client2 : public st_asio_wrapper::ssl::multi_client_base<Socket, st_asio_wrapper::ssl::object_pool<Socket>, Matrix>
{
private:
typedef st_asio_wrapper::ssl::multi_client_base<Socket, st_asio_wrapper::ssl::object_pool<Socket>, Matrix> super;
public:
multi_client2(service_pump& service_pump_, const boost::asio::ssl::context::method& m) : super(service_pump_, m) {}
};
typedef multi_client client; typedef multi_client client;
typedef st_asio_wrapper::ssl::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> server_socket; typedef st_asio_wrapper::ssl::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> server_socket;
...@@ -39,12 +58,21 @@ template <typename Server = st_asio_wrapper::tcp::i_server> ...@@ -39,12 +58,21 @@ template <typename Server = st_asio_wrapper::tcp::i_server>
class server_socket2 : public st_asio_wrapper::ssl::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> class server_socket2 : public st_asio_wrapper::ssl::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server>
{ {
private: private:
typedef st_asio_wrapper::ssl::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> super; typedef st_asio_wrapper::ssl::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> super;
public: public:
server_socket2(Server& server_, boost::asio::ssl::context& ctx) : super(server_, ctx) {} server_socket2(Server& server_, boost::asio::ssl::context& ctx) : super(server_, ctx) {}
}; };
typedef st_asio_wrapper::ssl::server_base<server_socket> server; typedef st_asio_wrapper::ssl::server_base<server_socket> server;
template<typename Socket, typename Server = st_asio_wrapper::tcp::i_server>
class server2 : public st_asio_wrapper::ssl::server_base<Socket, st_asio_wrapper::ssl::object_pool<Socket>, Server>
{
private:
typedef st_asio_wrapper::ssl::server_base<Socket, st_asio_wrapper::ssl::object_pool<Socket>, Server> super;
public:
server2(service_pump& service_pump_, const boost::asio::ssl::context::method& m) : super(service_pump_, m) {}
};
}}} //namespace }}} //namespace
......
...@@ -33,9 +33,28 @@ ...@@ -33,9 +33,28 @@
namespace st_asio_wrapper { namespace ext { namespace tcp { namespace st_asio_wrapper { namespace ext { namespace tcp {
typedef st_asio_wrapper::tcp::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> client_socket; typedef st_asio_wrapper::tcp::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> client_socket;
template<typename Matrix = i_matrix>
class client_socket2 : public st_asio_wrapper::tcp::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix>
{
private:
typedef st_asio_wrapper::tcp::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix> super;
public:
client_socket2(boost::asio::io_context& io_context_) : super(io_context_) {}
client_socket2(Matrix& matrix_) : super(matrix_) {}
};
typedef client_socket connector; typedef client_socket connector;
typedef st_asio_wrapper::tcp::single_client_base<client_socket> single_client; typedef st_asio_wrapper::tcp::single_client_base<client_socket> single_client;
typedef st_asio_wrapper::tcp::multi_client_base<client_socket> multi_client; typedef st_asio_wrapper::tcp::multi_client_base<client_socket> multi_client;
template<typename Socket, typename Matrix = i_matrix>
class multi_client2 : public st_asio_wrapper::tcp::multi_client_base<Socket, object_pool<Socket>, Matrix>
{
private:
typedef st_asio_wrapper::tcp::multi_client_base<Socket, object_pool<Socket>, Matrix> super;
public:
multi_client2(service_pump& service_pump_) : super(service_pump_) {}
};
typedef multi_client client; typedef multi_client client;
typedef st_asio_wrapper::tcp::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> server_socket; typedef st_asio_wrapper::tcp::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> server_socket;
...@@ -43,48 +62,99 @@ template <typename Server = st_asio_wrapper::tcp::i_server> ...@@ -43,48 +62,99 @@ template <typename Server = st_asio_wrapper::tcp::i_server>
class server_socket2 : public st_asio_wrapper::tcp::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> class server_socket2 : public st_asio_wrapper::tcp::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server>
{ {
private: private:
typedef st_asio_wrapper::tcp::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> super; typedef st_asio_wrapper::tcp::server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> super;
public: public:
server_socket2(Server& server_) : super(server_) {} server_socket2(Server& server_) : super(server_) {}
template<typename Arg> server_socket2(Server& server_, Arg& arg) : super(server_, arg) {} template<typename Arg> server_socket2(Server& server_, Arg& arg) : super(server_, arg) {}
}; };
typedef st_asio_wrapper::tcp::server_base<server_socket> server; typedef st_asio_wrapper::tcp::server_base<server_socket> server;
template<typename Socket, typename Server = st_asio_wrapper::tcp::i_server>
class server2 : public st_asio_wrapper::tcp::server_base<Socket, object_pool<Socket>, Server>
{
private:
typedef st_asio_wrapper::tcp::server_base<Socket, object_pool<Socket>, Server> super;
public:
server2(service_pump& service_pump_) : super(service_pump_) {}
};
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
typedef st_asio_wrapper::tcp::unix_client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> unix_client_socket; typedef st_asio_wrapper::tcp::unix_client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> unix_client_socket;
template<typename Matrix = i_matrix>
class unix_client_socket2 : public st_asio_wrapper::tcp::unix_client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix>
{
private:
typedef st_asio_wrapper::tcp::unix_client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix> super;
public:
unix_client_socket2(boost::asio::io_context& io_context_) : super(io_context_) {}
unix_client_socket2(Matrix& matrix_) : super(matrix_) {}
};
typedef st_asio_wrapper::tcp::single_client_base<unix_client_socket> unix_single_client; typedef st_asio_wrapper::tcp::single_client_base<unix_client_socket> unix_single_client;
typedef st_asio_wrapper::tcp::multi_client_base<unix_client_socket> unix_multi_client; typedef st_asio_wrapper::tcp::multi_client_base<unix_client_socket> unix_multi_client;
//typedef multi_client2 unix_multi_client2; //multi_client2 can be used for unix socket too, but we cannot typedef it.
typedef st_asio_wrapper::tcp::unix_server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> unix_server_socket; typedef st_asio_wrapper::tcp::unix_server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> unix_server_socket;
template<typename Server = st_asio_wrapper::tcp::i_server> template<typename Server = st_asio_wrapper::tcp::i_server>
class unix_server_socket2 : public st_asio_wrapper::tcp::unix_server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> class unix_server_socket2 : public st_asio_wrapper::tcp::unix_server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server>
{ {
private: private:
typedef st_asio_wrapper::tcp::unix_server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> super; typedef st_asio_wrapper::tcp::unix_server_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Server> super;
public: public:
unix_server_socket2(Server& server_) : super(server_) {} unix_server_socket2(Server& server_) : super(server_) {}
template<typename Arg> unix_server_socket2(Server& server_, Arg& arg) : super(server_, arg) {} template<typename Arg> unix_server_socket2(Server& server_, Arg& arg) : super(server_, arg) {}
}; };
typedef st_asio_wrapper::tcp::unix_server_base<unix_server_socket> unix_server; typedef st_asio_wrapper::tcp::unix_server_base<unix_server_socket> unix_server;
template<typename Socket, typename Server = st_asio_wrapper::tcp::i_server>
class unix_server2 : public st_asio_wrapper::tcp::unix_server_base<Socket, object_pool<Socket>, Server>
{
private:
typedef st_asio_wrapper::tcp::unix_server_base<Socket, object_pool<Socket>, Server> super;
public:
unix_server2(service_pump& service_pump_) : super(service_pump_) {}
};
#endif #endif
namespace proxy { namespace proxy {
namespace socks4 { namespace socks4 {
typedef st_asio_wrapper::tcp::proxy::socks4::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> client_socket; typedef st_asio_wrapper::tcp::proxy::socks4::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> client_socket;
template<typename Matrix = i_matrix>
class client_socket2 : public st_asio_wrapper::tcp::proxy::socks4::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix>
{
private:
typedef st_asio_wrapper::tcp::proxy::socks4::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix> super;
public:
client_socket2(boost::asio::io_context& io_context_) : super(io_context_) {}
client_socket2(Matrix& matrix_) : super(matrix_) {}
};
typedef client_socket connector; typedef client_socket connector;
typedef st_asio_wrapper::tcp::single_client_base<client_socket> single_client; typedef st_asio_wrapper::tcp::single_client_base<client_socket> single_client;
typedef st_asio_wrapper::tcp::multi_client_base<client_socket> multi_client; typedef st_asio_wrapper::tcp::multi_client_base<client_socket> multi_client;
//typedef st_asio_wrapper::ext::tcp::multi_client2 multi_client2; //multi_client2 can be used for socks4 too, but we cannot typedef it.
typedef multi_client client; typedef multi_client client;
} }
namespace socks5 { namespace socks5 {
typedef st_asio_wrapper::tcp::proxy::socks5::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> client_socket; typedef st_asio_wrapper::tcp::proxy::socks5::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER> client_socket;
template<typename Matrix = i_matrix>
class client_socket2 : public st_asio_wrapper::tcp::proxy::socks5::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix>
{
private:
typedef st_asio_wrapper::tcp::proxy::socks5::client_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UNPACKER, Matrix> super;
public:
client_socket2(boost::asio::io_context& io_context_) : super(io_context_) {}
client_socket2(Matrix& matrix_) : super(matrix_) {}
};
typedef client_socket connector; typedef client_socket connector;
typedef st_asio_wrapper::tcp::single_client_base<client_socket> single_client; typedef st_asio_wrapper::tcp::single_client_base<client_socket> single_client;
typedef st_asio_wrapper::tcp::multi_client_base<client_socket> multi_client; typedef st_asio_wrapper::tcp::multi_client_base<client_socket> multi_client;
//typedef st_asio_wrapper::ext::tcp::multi_client2 multi_client2; //multi_client2 can be used for socks5 too, but we cannot typedef it.
typedef multi_client client; typedef multi_client client;
} }
......
...@@ -30,14 +30,44 @@ ...@@ -30,14 +30,44 @@
namespace st_asio_wrapper { namespace ext { namespace udp { namespace st_asio_wrapper { namespace ext { namespace udp {
typedef st_asio_wrapper::udp::socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER> socket; typedef st_asio_wrapper::udp::socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER> socket;
template<typename Matrix = i_matrix>
class socket2 : public st_asio_wrapper::udp::socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER, Matrix>
{
private:
typedef st_asio_wrapper::udp::socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER, Matrix> super;
public:
socket2(boost::asio::io_context& io_context_) : super(io_context_) {}
socket2(Matrix& matrix_) : super(matrix_) {}
};
typedef st_asio_wrapper::udp::single_socket_service_base<socket> single_socket_service; typedef st_asio_wrapper::udp::single_socket_service_base<socket> single_socket_service;
typedef st_asio_wrapper::udp::multi_socket_service_base<socket> multi_socket_service; typedef st_asio_wrapper::udp::multi_socket_service_base<socket> multi_socket_service;
template<typename Socket, typename Matrix = i_matrix>
class multi_socket_service2 : public st_asio_wrapper::udp::multi_socket_service_base<Socket, object_pool<Socket>, Matrix>
{
private:
typedef st_asio_wrapper::udp::multi_socket_service_base<Socket, object_pool<Socket>, Matrix> super;
public:
multi_socket_service2(service_pump& service_pump_) : super(service_pump_) {}
};
typedef multi_socket_service socket_service; typedef multi_socket_service socket_service;
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
typedef st_asio_wrapper::udp::unix_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER> unix_socket; typedef st_asio_wrapper::udp::unix_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER> unix_socket;
template<typename Matrix = i_matrix>
class unix_socket2 : public st_asio_wrapper::udp::unix_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER, Matrix>
{
private:
typedef st_asio_wrapper::udp::unix_socket_base<ST_ASIO_DEFAULT_PACKER, ST_ASIO_DEFAULT_UDP_UNPACKER, Matrix> super;
public:
unix_socket2(boost::asio::io_context& io_context_) : super(io_context_) {}
unix_socket2(Matrix& matrix_) : super(matrix_) {}
};
typedef st_asio_wrapper::udp::single_socket_service_base<unix_socket> unix_single_socket_service; typedef st_asio_wrapper::udp::single_socket_service_base<unix_socket> unix_single_socket_service;
typedef st_asio_wrapper::udp::multi_socket_service_base<unix_socket> unix_multi_socket_service; typedef st_asio_wrapper::udp::multi_socket_service_base<unix_socket> unix_multi_socket_service;
//typedef multi_socket_service2 unix_multi_socket_service2; //multi_socket_service2 can be used for unix socket too, but we cannot typedef it.
#endif #endif
}}} //namespace }}} //namespace
......
...@@ -156,7 +156,7 @@ public: ...@@ -156,7 +156,7 @@ public:
//this is just to satisfy the compiler, it's not a real scatter-gather buffer, //this is just to satisfy the compiler, it's not a real scatter-gather buffer,
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer. //if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return typename super::buffer_type(1, boost::asio::buffer(raw_buff) + remain_len);} virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return typename super::buffer_type(1, boost::asio::buffer(raw_buff) + remain_len);}
#elif BOOST_ASIO_VERSION < 101100 #elif BOOST_ASIO_VERSION <= 101100
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return boost::asio::buffer(boost::asio::buffer(raw_buff) + remain_len);} virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return boost::asio::buffer(boost::asio::buffer(raw_buff) + remain_len);}
#else #else
virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return boost::asio::buffer(raw_buff) + remain_len;} virtual typename super::buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return boost::asio::buffer(raw_buff) + remain_len;}
...@@ -317,7 +317,7 @@ public: ...@@ -317,7 +317,7 @@ public:
return typename super::buffer_type(1, boost::asio::buffer(const_cast<char*>(big_msg.data()), big_msg.size()) + remain_len); return typename super::buffer_type(1, boost::asio::buffer(const_cast<char*>(big_msg.data()), big_msg.size()) + remain_len);
} }
#elif BOOST_ASIO_VERSION < 101100 #elif BOOST_ASIO_VERSION <= 101100
virtual typename super::buffer_type prepare_next_recv() virtual typename super::buffer_type prepare_next_recv()
{ {
assert(remain_len < (big_msg.empty() ? raw_buff.size() : big_msg.size())); assert(remain_len < (big_msg.empty() ? raw_buff.size() : big_msg.size()));
...@@ -372,6 +372,7 @@ public: ...@@ -372,6 +372,7 @@ public:
virtual void reset() {} virtual void reset() {}
virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can) virtual bool parse_msg(size_t bytes_transferred, container_type& msg_can)
{assert(bytes_transferred <= ST_ASIO_MSG_BUFFER_SIZE); msg_can.emplace_back(raw_buff.data(), bytes_transferred); return true;} {assert(bytes_transferred <= ST_ASIO_MSG_BUFFER_SIZE); msg_can.emplace_back(raw_buff.data(), bytes_transferred); return true;}
virtual void compose_msg(const char* data, size_t size, container_type& msg_can) {assert(NULL != data && size > 0); msg_can.emplace_back(data, size);}
#ifdef ST_ASIO_SCATTERED_RECV_BUFFER #ifdef ST_ASIO_SCATTERED_RECV_BUFFER
//this is just to satisfy the compiler, it's not a real scatter-gather buffer, //this is just to satisfy the compiler, it's not a real scatter-gather buffer,
...@@ -444,6 +445,8 @@ public: ...@@ -444,6 +445,8 @@ public:
msg_can.emplace_back(new std::string(raw_buff.data(), bytes_transferred)); msg_can.emplace_back(new std::string(raw_buff.data(), bytes_transferred));
return true; return true;
} }
virtual void compose_msg(const char* data, size_t size, typename super::container_type& msg_can)
{assert(NULL != data && size > 0); msg_can.emplace_back(new std::string(data, size));}
#ifdef ST_ASIO_SCATTERED_RECV_BUFFER #ifdef ST_ASIO_SCATTERED_RECV_BUFFER
//this is just to satisfy the compiler, it's not a real scatter-gather buffer, //this is just to satisfy the compiler, it's not a real scatter-gather buffer,
...@@ -710,7 +713,7 @@ public: ...@@ -710,7 +713,7 @@ public:
//if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer. //if you introduce a ring buffer, then you will have the chance to provide a real scatter-gather buffer.
#ifdef ST_ASIO_SCATTERED_RECV_BUFFER #ifdef ST_ASIO_SCATTERED_RECV_BUFFER
virtual buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return buffer_type(1, boost::asio::buffer(raw_buff) + remain_len);} virtual buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return buffer_type(1, boost::asio::buffer(raw_buff) + remain_len);}
#elif BOOST_ASIO_VERSION < 101100 #elif BOOST_ASIO_VERSION <= 101100
virtual buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return boost::asio::buffer(boost::asio::buffer(raw_buff) + remain_len);} virtual buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return boost::asio::buffer(boost::asio::buffer(raw_buff) + remain_len);}
#else #else
virtual buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return boost::asio::buffer(raw_buff) + remain_len;} virtual buffer_type prepare_next_recv() {assert(remain_len < ST_ASIO_MSG_BUFFER_SIZE); return boost::asio::buffer(raw_buff) + remain_len;}
......
...@@ -18,14 +18,14 @@ ...@@ -18,14 +18,14 @@
namespace st_asio_wrapper namespace st_asio_wrapper
{ {
class service_pump : public boost::asio::io_context class service_pump
{ {
public: public:
class i_service class i_service
{ {
protected: protected:
i_service(service_pump& service_pump_) : sp(service_pump_), started_(false), id_(0), data(NULL) {service_pump_.add(this);} i_service(service_pump& service_pump_) : sp(service_pump_), started_(false), id_(0), data(NULL) {sp.add(this);}
virtual ~i_service() {} virtual ~i_service() {sp.remove(this);}
public: public:
//for the same i_service, start_service and stop_service are not thread safe, //for the same i_service, start_service and stop_service are not thread safe,
...@@ -56,29 +56,152 @@ public: ...@@ -56,29 +56,152 @@ public:
void* data; //magic data, you can use it in any way void* data; //magic data, you can use it in any way
}; };
protected:
struct context
{
boost::asio::io_context io_context;
unsigned refs;
#ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE
#if BOOST_ASIO_VERSION > 101100
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work;
#else
boost::shared_ptr<boost::asio::io_service::work> work;
#endif
#endif
boost::thread_group threads;
#if BOOST_ASIO_VERSION >= 101200
context(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) : io_context(concurrency_hint), refs(0)
#else
context() : refs(0)
#endif
#ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE
#if BOOST_ASIO_VERSION > 101100
, work(io_context.get_executor())
#else
, work(boost::make_shared<boost::asio::io_service::work>(boost::ref(io_context)))
//this wrapper boost::ref (and many others in st_asio_wrapper) is just for gcc 4.7, terrible gcc 4.7
#endif
#endif
{}
};
public: public:
typedef i_service* object_type; typedef i_service* object_type;
typedef const object_type object_ctype; typedef const object_type object_ctype;
typedef boost::container::list<object_type> container_type; typedef boost::container::list<object_type> container_type;
#if BOOST_ASIO_VERSION >= 101200 #if BOOST_ASIO_VERSION >= 101200
service_pump(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) : boost::asio::io_context(concurrency_hint), started(false) #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
service_pump(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) : started(false), real_thread_num(0), del_thread_num(0), single_io_context(true)
{context_can.emplace_back(concurrency_hint);}
#else #else
service_pump() : started(false) service_pump(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) : started(false), single_io_context(true) {context_can.emplace_back(concurrency_hint);}
bool set_io_context_num(int io_context_num, int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) //call this before adding any services to this service_pump
{
if (io_context_num < 1 || is_service_started() || context_can.size() > 1) //can only be called once
return false;
for (int i = 1; i < io_context_num; ++i)
context_can.emplace_back(concurrency_hint);
single_io_context = context_can.size() < 2;
return true;
}
#endif #endif
#else
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
, real_thread_num(0), del_thread_num(0) service_pump() : started(false), real_thread_num(0), del_thread_num(0), single_io_context(true), context_can(1) {}
#endif
#ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE
#if BOOST_ASIO_VERSION >= 101100
, work(get_executor())
#else #else
, work(boost::make_shared<boost::asio::io_service::work>(boost::ref(*this))) service_pump() : started(false), single_io_context(true), context_can(1) {}
bool set_io_context_num(int io_context_num) //call this before adding any services to this service_pump
{
if (io_context_num < 1 || is_service_started() || context_can.size() > 1) //can only be called once
return false;
context_can.resize(io_context_num);
single_io_context = context_can.size() < 2;
return true;
}
#endif #endif
#endif #endif
{}
virtual ~service_pump() {stop_service();} virtual ~service_pump() {stop_service();}
int get_io_context_num() const {return (int) context_can.size();}
void get_io_context_refs(boost::container::list<unsigned>& refs)
{
if (!single_io_context)
{
boost::lock_guard<boost::mutex> lock(context_can_mutex);
for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
refs.push_back(iter->refs);
}
}
operator boost::asio::io_context& () {return assign_io_context();}
#if BOOST_ASIO_VERSION > 101100
boost::asio::io_context::executor_type get_executor() {return assign_io_context().get_executor();}
#endif
boost::asio::io_context& assign_io_context(bool increase_ref = true) //pick the context which has the least references
{
if (single_io_context)
return context_can.front().io_context;
context* ctx = NULL;
unsigned refs = 0;
boost::lock_guard<boost::mutex> lock(context_can_mutex);
for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
{
if (0 == iter->refs || 0 == refs || refs > iter->refs)
{
refs = iter->refs;
ctx = &*iter;
}
if (0 == iter->refs)
break;
}
if (NULL != ctx)
{
if (increase_ref)
++ctx->refs;
return ctx->io_context;
}
throw "no available io_context!";
}
void return_io_context(const boost::asio::execution_context& io_context)
{
if (!single_io_context)
{
boost::lock_guard<boost::mutex> lock(context_can_mutex);
for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
if (&io_context == &iter->io_context)
{
--iter->refs;
break;
}
}
}
void assign_io_context(const boost::asio::execution_context& io_context)
{
if (!single_io_context)
{
boost::lock_guard<boost::mutex> lock(context_can_mutex);
for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
if (&io_context == &iter->io_context)
{
++iter->refs;
break;
}
}
}
object_type find(int id) object_type find(int id)
{ {
boost::lock_guard<boost::mutex> lock(service_can_mutex); boost::lock_guard<boost::mutex> lock(service_can_mutex);
...@@ -122,6 +245,9 @@ public: ...@@ -122,6 +245,9 @@ public:
st_asio_wrapper::do_something_to_all(temp_service_can, boost::bind(&service_pump::stop_and_free, this, boost::placeholders::_1)); st_asio_wrapper::do_something_to_all(temp_service_can, boost::bind(&service_pump::stop_and_free, this, boost::placeholders::_1));
} }
//stop io_context directly, call this only if the stop_service invocation cannot stop the io_context
void stop() {for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter) iter->io_context.stop();}
void start_service(int thread_num = ST_ASIO_SERVICE_THREAD_NUM) {if (!is_service_started()) do_service(thread_num);} void start_service(int thread_num = ST_ASIO_SERVICE_THREAD_NUM) {if (!is_service_started()) do_service(thread_num);}
//stop the service, must be invoked explicitly when the service need to stop, for example, close the application //stop the service, must be invoked explicitly when the service need to stop, for example, close the application
void stop_service() void stop_service()
...@@ -155,8 +281,7 @@ public: ...@@ -155,8 +281,7 @@ public:
{ {
if (!is_service_started()) if (!is_service_started())
{ {
do_service(thread_num - 1); do_service(thread_num, true);
run();
wait_service(); wait_service();
} }
} }
...@@ -169,39 +294,93 @@ public: ...@@ -169,39 +294,93 @@ public:
if (is_service_started()) if (is_service_started())
{ {
#ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE #ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE
work.reset(); for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
iter->work.reset();
#endif #endif
do_something_to_all(boost::mem_fn(&i_service::stop_service)); do_something_to_all(boost::mem_fn(&i_service::stop_service));
} }
} }
bool is_running() const {return !stopped();} bool is_running() const
{
for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
if (!iter->io_context.stopped())
return true;
return false;
}
bool is_service_started() const {return started;} bool is_service_started() const {return started;}
void add_service_thread(int thread_num) {for (int i = 0; i < thread_num; ++i) service_threads.create_thread(boost::bind(&service_pump::run, this));} //not thread safe
#if BOOST_ASIO_VERSION >= 101200
void add_service_thread(int thread_num, bool block = false, int io_context_num = 0, int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE)
#else
void add_service_thread(int thread_num, bool block = false, int io_context_num = 0)
#endif
{
if (io_context_num > 0)
{
if (thread_num < io_context_num)
{
unified_out::error_out("thread_num must be bigger than or equal to io_context_num.");
return;
}
else
{
single_io_context = false;
boost::lock_guard<boost::mutex> lock(context_can_mutex);
#if BOOST_ASIO_VERSION >= 101200
for (int i = 0; i < io_context_num; ++i)
context_can.emplace_back(concurrency_hint);
#else
context_can.resize((size_t) io_context_num + context_can.size());
#endif
}
}
for (int i = 0; i < thread_num; ++i)
{
BOOST_AUTO(ctx, assign_thread());
if (NULL == ctx)
unified_out::error_out("no available io_context!");
else if (block && i + 1 == thread_num)
run(ctx); //block at here
else
ctx->threads.create_thread(boost::bind(&service_pump::run, this, ctx));
}
}
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
void del_service_thread(int thread_num) {if (thread_num > 0) {del_thread_num += thread_num;}} void del_service_thread(int thread_num) {if (thread_num > 0) del_thread_num += thread_num;}
int service_thread_num() const {return real_thread_num;} int service_thread_num() const {return real_thread_num;}
#endif #endif
protected: protected:
void do_service(int thread_num) void do_service(int thread_num, bool block = false)
{ {
if (thread_num <= 0 || (size_t) thread_num < context_can.size())
{
unified_out::error_out("thread_num must be bigger than or equal to io_context_num.");
return;
}
started = true; started = true;
unified_out::info_out("service pump started."); unified_out::info_out("service pump started.");
for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
#if BOOST_ASIO_VERSION >= 101100 #if BOOST_ASIO_VERSION >= 101100
restart(); //this is needed when restart service iter->io_context.restart(); //this is needed when restart service
#else #else
reset(); //this is needed when restart service iter->io_context.reset(); //this is needed when restart service
#endif #endif
do_something_to_all(boost::mem_fn(&i_service::start_service)); do_something_to_all(boost::mem_fn(&i_service::start_service));
add_service_thread(thread_num); add_service_thread(thread_num, block);
} }
void wait_service() void wait_service()
{ {
service_threads.join_all(); for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
iter->threads.join_all();
started = false; started = false;
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
...@@ -228,7 +407,7 @@ protected: ...@@ -228,7 +407,7 @@ protected:
#endif #endif
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
size_t run() size_t run(context* ctx)
{ {
size_t n = 0; size_t n = 0;
...@@ -254,9 +433,9 @@ protected: ...@@ -254,9 +433,9 @@ protected:
//we cannot always decrease service thread timely (because run_one can block). //we cannot always decrease service thread timely (because run_one can block).
size_t this_n = 0; size_t this_n = 0;
#ifdef ST_ASIO_NO_TRY_CATCH #ifdef ST_ASIO_NO_TRY_CATCH
this_n = boost::asio::io_context::run_one(); this_n = ctx->io_context.run_one();
#else #else
try {this_n = boost::asio::io_context::run_one();} catch (const std::exception& e) {if (!on_exception(e)) break;} try {this_n = ctx->io_context.run_one();} catch (const std::exception& e) {if (!on_exception(e)) break;}
#endif #endif
if (this_n > 0) if (this_n > 0)
n += this_n; //n can overflow, please note. n += this_n; //n can overflow, please note.
...@@ -273,13 +452,36 @@ protected: ...@@ -273,13 +452,36 @@ protected:
return n; return n;
} }
#elif !defined(ST_ASIO_NO_TRY_CATCH) #elif !defined(ST_ASIO_NO_TRY_CATCH)
size_t run() {while (true) {try {return boost::asio::io_context::run();} catch (const std::exception& e) {if (!on_exception(e)) return 0;}}} size_t run(context* ctx) {while (true) {try {return ctx->io_context.run();} catch (const std::exception& e) {if (!on_exception(e)) return 0;}}}
#else
size_t run(context* ctx) {return ctx->io_context.run();}
#endif #endif
DO_SOMETHING_TO_ALL_MUTEX(service_can, service_can_mutex) DO_SOMETHING_TO_ALL_MUTEX(service_can, service_can_mutex, boost::lock_guard<boost::mutex>)
DO_SOMETHING_TO_ONE_MUTEX(service_can, service_can_mutex) DO_SOMETHING_TO_ONE_MUTEX(service_can, service_can_mutex, boost::lock_guard<boost::mutex>)
private: private:
context* assign_thread() //pick the context which has the least threads
{
context* ctx = NULL;
size_t num = 0;
for (BOOST_AUTO(iter, context_can.begin()); iter != context_can.end(); ++iter)
{
size_t this_num = iter->threads.size();
if (0 == this_num || 0 == num || num > this_num)
{
num = this_num;
ctx = &*iter;
}
if (0 == this_num)
break;
}
return ctx;
}
void add(object_type i_service_) void add(object_type i_service_)
{ {
assert(NULL != i_service_); assert(NULL != i_service_);
...@@ -296,20 +498,15 @@ private: ...@@ -296,20 +498,15 @@ private:
bool started; bool started;
container_type service_can; container_type service_can;
boost::mutex service_can_mutex; boost::mutex service_can_mutex;
boost::thread_group service_threads;
#ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME #ifdef ST_ASIO_DECREASE_THREAD_AT_RUNTIME
atomic_int_fast32_t real_thread_num; atomic_int_fast32_t real_thread_num;
atomic_int_fast32_t del_thread_num; atomic_int_fast32_t del_thread_num;
#endif #endif
#ifdef ST_ASIO_AVOID_AUTO_STOP_SERVICE bool single_io_context;
#if BOOST_ASIO_VERSION >= 101100 boost::container::list<context> context_can;
boost::asio::executor_work_guard<executor_type> work; boost::mutex context_can_mutex;
#else
boost::shared_ptr<boost::asio::io_service::work> work;
#endif
#endif
}; };
} //namespace } //namespace
......
...@@ -28,8 +28,11 @@ public: ...@@ -28,8 +28,11 @@ public:
public: public:
#if BOOST_ASIO_VERSION >= 101200 #if BOOST_ASIO_VERSION >= 101200
single_service_pump(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) : service_pump(concurrency_hint), Service(boost::ref(*(service_pump*) this)) {} single_service_pump(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) : service_pump(concurrency_hint), Service(boost::ref(*(service_pump*) this)) {}
template<typename Arg> single_service_pump(Arg& arg, int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) :
service_pump(concurrency_hint), Service(boost::ref(*(service_pump*) this), arg) {}
#else #else
single_service_pump() : Service(boost::ref(*(service_pump*) this)) {} single_service_pump() : Service(boost::ref(*(service_pump*) this)) {}
template<typename Arg> single_service_pump(Arg& arg) : Service(boost::ref(*(service_pump*) this), arg) {}
#endif #endif
}; };
......
...@@ -63,6 +63,28 @@ protected: ...@@ -63,6 +63,28 @@ protected:
start_atomic.store(0, boost::memory_order_relaxed); start_atomic.store(0, boost::memory_order_relaxed);
} }
//guarantee no operations (include asynchronous operations) be performed on this socket during call following reset_next_layer functions.
#if BOOST_ASIO_VERSION <= 101100
void reset_next_layer() {reset_next_layer(next_layer_.get_io_service());}
template<typename Arg> void reset_next_layer(Arg& arg) {reset_next_layer(next_layer_.get_io_service(), arg);}
#elif BOOST_ASIO_VERSION < 101300
void reset_next_layer() {reset_next_layer(static_cast<boost::asio::io_context&>(next_layer_.get_executor().context()));}
template<typename Arg>
void reset_next_layer(Arg& arg) {reset_next_layer(static_cast<boost::asio::io_context&>(next_layer_.get_executor().context()), arg);}
#else
void reset_next_layer() {reset_next_layer((const boost::asio::any_io_executor&) next_layer_.get_executor());}
template<typename Arg> void reset_next_layer(Arg& arg) {reset_next_layer(next_layer_.get_executor(), arg);}
#endif
void reset_next_layer(boost::asio::io_context& io_context) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(io_context);}
template<typename Arg>
void reset_next_layer(boost::asio::io_context& io_context, Arg& arg) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(io_context, arg);}
#if BOOST_ASIO_VERSION >= 101300
void reset_next_layer(const boost::asio::any_io_executor& executor) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(executor);}
template<typename Arg>
void reset_next_layer(const boost::asio::any_io_executor& executor, Arg& arg) {(&next_layer_)->~Socket(); new (&next_layer_) Socket(executor, arg);}
#endif
void reset() void reset()
{ {
bool need_clean_up = is_timer(TIMER_DELAY_CLOSE); bool need_clean_up = is_timer(TIMER_DELAY_CLOSE);
...@@ -147,9 +169,13 @@ private: ...@@ -147,9 +169,13 @@ private:
public: public:
#endif #endif
#ifndef ST_ASIO_EXPOSE_SEND_INTERFACE #ifndef ST_ASIO_EXPOSE_SEND_INTERFACE
private: protected:
#endif #endif
#ifdef ST_ASIO_ARBITRARY_SEND
void send_msg() {dispatch_strand(rw_strand, boost::bind(&socket::do_send_msg, this, false));}
#else
void send_msg() {if (!sending && is_ready()) dispatch_strand(rw_strand, boost::bind(&socket::do_send_msg, this, false));} void send_msg() {if (!sending && is_ready()) dispatch_strand(rw_strand, boost::bind(&socket::do_send_msg, this, false));}
#endif
public: public:
void start_heartbeat(int interval, int max_absence = ST_ASIO_HEARTBEAT_MAX_ABSENCE) void start_heartbeat(int interval, int max_absence = ST_ASIO_HEARTBEAT_MAX_ABSENCE)
...@@ -314,9 +340,23 @@ protected: ...@@ -314,9 +340,23 @@ protected:
// include user timers(created by set_timer()) and user async calls(started via post(), dispatch() or defer()), this means you can clean up any resource // include user timers(created by set_timer()) and user async calls(started via post(), dispatch() or defer()), this means you can clean up any resource
// in this socket except this socket itself, because this socket maybe is being maintained by object_pool. // in this socket except this socket itself, because this socket maybe is being maintained by object_pool.
//otherwise (bigger than zero), socket simply call this callback ST_ASIO_DELAY_CLOSE seconds later after link down, no any guarantees. //otherwise (bigger than zero), socket simply call this callback ST_ASIO_DELAY_CLOSE seconds later after link down, no any guarantees.
//if you overwrote this callback, do not forget to call parent class' on_close at the end.
virtual void on_close() {unified_out::info_out(ST_ASIO_LLF " on_close()", id());} virtual void on_close() {unified_out::info_out(ST_ASIO_LLF " on_close()", id());}
virtual void after_close() {} //a good case for using this is to reconnect the server, please refer to client_socket_base. virtual void after_close() {} //a good case for using this is to reconnect the server, please refer to client_socket_base.
//reused socket still based on previous io_context, and may break the reference balance of multiple io_context, if you want to balance it strictly,
// re-write this virtual function to re-create the next_layer base on the io_context which has the least references and return true, like:
//virtual bool change_io_context() {ST_THIS reset_next_layer(ST_THIS get_server().get_service_pump().assign_io_context()); return true;} or
//virtual bool change_io_context()
//{
// if (NULL == ST_THIS get_matrix())
// return false;
//
// ST_THIS reset_next_layer(ST_THIS get_matrix()->get_service_pump().assign_io_context());
// return true;
//}
virtual bool change_io_context() {return false;}
#ifdef ST_ASIO_SYNC_DISPATCH #ifdef ST_ASIO_SYNC_DISPATCH
//return positive value if handled some messages (include all messages), if some msg left behind, socket will re-dispatch them asynchronously //return positive value if handled some messages (include all messages), if some msg left behind, socket will re-dispatch them asynchronously
//notice: using inconstant reference is for the ability of swapping //notice: using inconstant reference is for the ability of swapping
......
...@@ -65,20 +65,31 @@ public: ...@@ -65,20 +65,31 @@ public:
virtual const char* type_name() const {return "TCP (client endpoint)";} virtual const char* type_name() const {return "TCP (client endpoint)";}
virtual int type_id() const {return 1;} virtual int type_id() const {return 1;}
virtual void reset() {need_reconnect = ST_ASIO_RECONNECT; super::reset();} virtual void reset()
{
need_reconnect = ST_ASIO_RECONNECT;
if (NULL != matrix)
if (!ST_THIS change_io_context())
#if BOOST_ASIO_VERSION < 101100
matrix->get_service_pump().assign_io_context(ST_THIS next_layer().get_io_service());
#else
matrix->get_service_pump().assign_io_context(ST_THIS next_layer().get_executor().context());
#endif
super::reset();
}
bool set_server_addr(unsigned short port, const std::string& ip = ST_ASIO_SERVER_IP) {return set_addr(server_addr, port, ip);} bool set_server_addr(unsigned short port, const std::string& ip = ST_ASIO_SERVER_IP) {return set_addr(server_addr, port, ip);}
bool set_server_addr(const std::string& file_name) {server_addr = typename Family::endpoint(file_name); return true;} bool set_server_addr(const std::string& file_name) {server_addr = typename Family::endpoint(file_name); return true;}
const typename Family::endpoint& get_server_addr() const {return server_addr;} const typename Family::endpoint& get_server_addr() const {return server_addr;}
//if you don't want to reconnect to the server after link broken, define macro ST_ASIO_RECONNECT as false, call close_reconnect() in on_connect() //if you don't want to reconnect to the server after link broken, define macro ST_ASIO_RECONNECT as false, call set_reconnect(false) in on_connect()
// or rewrite after_close() virtual function and do nothing in it. // or rewrite after_close() virtual function and do nothing in it.
//if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function. //if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function.
//disconnect(bool), force_shutdown(bool) and graceful_shutdown(bool, bool) can overwrite reconnecting behavior, please note. //disconnect(bool), force_shutdown(bool) and graceful_shutdown(bool, bool) can overwrite reconnecting behavior, please note.
//reset() virtual function will set reconnecting behavior according to macro ST_ASIO_RECONNECT, please note. //reset() virtual function will set reconnecting behavior according to macro ST_ASIO_RECONNECT, please note.
//if prepare_reconnect returns negative value, reconnecting will be closed, please note. //if prepare_reconnect returns negative value, reconnecting will be closed, please note.
void open_reconnect() {need_reconnect = true;} void set_reconnect(bool reconnect) {need_reconnect = reconnect;}
void close_reconnect() {need_reconnect = false;} bool is_reconnect() const {return need_reconnect;}
//if the connection is broken unexpectedly, generic_client_socket will try to reconnect to the server automatically (if need_reconnect is true). //if the connection is broken unexpectedly, generic_client_socket will try to reconnect to the server automatically (if need_reconnect is true).
void disconnect(bool reconnect = false) {force_shutdown(reconnect);} void disconnect(bool reconnect = false) {force_shutdown(reconnect);}
...@@ -159,6 +170,17 @@ protected: ...@@ -159,6 +170,17 @@ protected:
return false; return false;
} }
virtual void on_close()
{
if (!need_reconnect && NULL != matrix)
#if BOOST_ASIO_VERSION < 101100
matrix->get_service_pump().return_io_context(ST_THIS next_layer().get_io_service());
#else
matrix->get_service_pump().return_io_context(ST_THIS next_layer().get_executor().context());
#endif
super::on_close();
}
//reconnect at here rather than in on_recv_error to make sure no async invocations performed on this socket before reconnecting. //reconnect at here rather than in on_recv_error to make sure no async invocations performed on this socket before reconnecting.
//if you don't want to reconnect the server after link broken, rewrite this virtual function and do nothing in it or call close_reconnt(). //if you don't want to reconnect the server after link broken, rewrite this virtual function and do nothing in it or call close_reconnt().
//if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function. //if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function.
......
...@@ -43,7 +43,11 @@ public: ...@@ -43,7 +43,11 @@ public:
buff[0] = 4; buff[0] = 4;
buff[1] = 1; buff[1] = 1;
*((unsigned short*) boost::next(buff, 2)) = htons(target_addr.port()); *((unsigned short*) boost::next(buff, 2)) = htons(target_addr.port());
#if BOOST_ASIO_VERSION == 101100
memcpy(boost::next(buff, 4), boost::asio::ip::address_cast<boost::asio::ip::address_v4>(target_addr.address()).to_bytes().data(), 4);
#else
memcpy(boost::next(buff, 4), target_addr.address().to_v4().to_bytes().data(), 4); memcpy(boost::next(buff, 4), target_addr.address().to_v4().to_bytes().data(), 4);
#endif
memcpy(boost::next(buff, 8), "st_asio", sizeof("st_asio")); memcpy(boost::next(buff, 8), "st_asio", sizeof("st_asio"));
req_len = 8 + sizeof("st_asio"); req_len = 8 + sizeof("st_asio");
...@@ -221,14 +225,22 @@ private: ...@@ -221,14 +225,22 @@ private:
else if (target_addr.address().is_v4()) else if (target_addr.address().is_v4())
{ {
buff[3] = 1; buff[3] = 1;
#if BOOST_ASIO_VERSION == 101100
memcpy(boost::next(buff, 4), boost::asio::ip::address_cast<boost::asio::ip::address_v4>(target_addr.address()).to_bytes().data(), 4);
#else
memcpy(boost::next(buff, 4), target_addr.address().to_v4().to_bytes().data(), 4); memcpy(boost::next(buff, 4), target_addr.address().to_v4().to_bytes().data(), 4);
#endif
*((unsigned short*) boost::next(buff, 8)) = htons(target_addr.port()); *((unsigned short*) boost::next(buff, 8)) = htons(target_addr.port());
req_len = 10; req_len = 10;
} }
else //ipv6 else //ipv6
{ {
buff[3] = 4; buff[3] = 4;
#if BOOST_ASIO_VERSION == 101100
memcpy(boost::next(buff, 4), boost::asio::ip::address_cast<boost::asio::ip::address_v6>(target_addr.address()).to_bytes().data(), 16);
#else
memcpy(boost::next(buff, 4), target_addr.address().to_v6().to_bytes().data(), 16); memcpy(boost::next(buff, 4), target_addr.address().to_v6().to_bytes().data(), 16);
#endif
*((unsigned short*) boost::next(buff, 20)) = htons(target_addr.port()); *((unsigned short*) boost::next(buff, 20)) = htons(target_addr.port());
req_len = 22; req_len = 22;
} }
......
...@@ -82,7 +82,7 @@ public: ...@@ -82,7 +82,7 @@ public:
else else
unified_out::info_out("finished pre-creating server sockets."); unified_out::info_out("finished pre-creating server sockets.");
#if BOOST_ASIO_VERSION >= 101100 #if BOOST_ASIO_VERSION > 101100
acceptor.listen(boost::asio::socket_base::max_listen_connections, ec); assert(!ec); acceptor.listen(boost::asio::socket_base::max_listen_connections, ec); assert(!ec);
#else #else
acceptor.listen(boost::asio::socket_base::max_connections, ec); assert(!ec); acceptor.listen(boost::asio::socket_base::max_connections, ec); assert(!ec);
......
...@@ -33,6 +33,16 @@ public: ...@@ -33,6 +33,16 @@ public:
virtual const char* type_name() const {return "TCP (server endpoint)";} virtual const char* type_name() const {return "TCP (server endpoint)";}
virtual int type_id() const {return 2;} virtual int type_id() const {return 2;}
virtual void reset()
{
if (!ST_THIS change_io_context())
#if BOOST_ASIO_VERSION < 101100
server.get_service_pump().assign_io_context(ST_THIS lowest_layer().get_io_service());
#else
server.get_service_pump().assign_io_context(ST_THIS lowest_layer().get_executor().context());
#endif
super::reset();
}
virtual void take_over(boost::shared_ptr<generic_server_socket> socket_ptr) {} //restore this socket from socket_ptr virtual void take_over(boost::shared_ptr<generic_server_socket> socket_ptr) {} //restore this socket from socket_ptr
void disconnect() {force_shutdown();} void disconnect() {force_shutdown();}
...@@ -78,6 +88,11 @@ protected: ...@@ -78,6 +88,11 @@ protected:
virtual void on_async_shutdown_error() {force_shutdown();} virtual void on_async_shutdown_error() {force_shutdown();}
virtual bool on_heartbeat_error() {ST_THIS show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;} virtual bool on_heartbeat_error() {ST_THIS show_info("server link:", "broke unexpectedly."); force_shutdown(); return false;}
#if BOOST_ASIO_VERSION < 101100
virtual void on_close() {server.get_service_pump().return_io_context(ST_THIS lowest_layer().get_io_service()); super::on_close();}
#else
virtual void on_close() {server.get_service_pump().return_io_context(ST_THIS lowest_layer().get_executor().context()); super::on_close();}
#endif
private: private:
Server& server; Server& server;
......
...@@ -49,7 +49,9 @@ public: ...@@ -49,7 +49,9 @@ public:
in_msg_type msg; in_msg_type msg;
ST_THIS packer()->pack_heartbeat(msg); ST_THIS packer()->pack_heartbeat(msg);
dur.end(); dur.end();
do_direct_send_msg(msg);
if (!msg.empty())
do_direct_send_msg(msg);
} }
//reset all, be ensure that there's no any operations performed on this socket when invoke it //reset all, be ensure that there's no any operations performed on this socket when invoke it
...@@ -404,7 +406,9 @@ private: ...@@ -404,7 +406,9 @@ private:
#endif #endif
#endif #endif
sending_msgs.clear(); sending_msgs.clear();
#ifndef ST_ASIO_ARBITRARY_SEND
if (!do_send_msg(true) && !send_buffer.empty()) //send msg in sequence if (!do_send_msg(true) && !send_buffer.empty()) //send msg in sequence
#endif
do_send_msg(true); //just make sure no pending msgs do_send_msg(true); //just make sure no pending msgs
} }
else else
......
...@@ -26,38 +26,13 @@ namespace st_asio_wrapper { namespace ssl { ...@@ -26,38 +26,13 @@ namespace st_asio_wrapper { namespace ssl {
template <typename Socket> template <typename Socket>
class socket : public Socket class socket : public Socket
{ {
#ifndef ST_ASIO_REUSE_SSL_STREAM
#ifdef ST_ASIO_REUSE_OBJECT
#error please define ST_ASIO_REUSE_SSL_STREAM macro explicitly if you need boost::asio::ssl::stream to be reusable!
#endif
#if ST_ASIO_RECONNECT
#ifdef _MSC_VER
#pragma message("without macro ST_ASIO_REUSE_SSL_STREAM, ssl::client_socket_base is not able to reconnect the server.")
#else
#warning without macro ST_ASIO_REUSE_SSL_STREAM, ssl::client_socket_base is not able to reconnect the server.
#endif
#endif
#endif
public: public:
template<typename Arg> socket(Arg& arg, boost::asio::ssl::context& ctx) : Socket(arg, ctx) {} template<typename Arg> socket(Arg& arg, boost::asio::ssl::context& ctx) : Socket(arg, ctx) {}
protected: protected:
virtual void on_recv_error(const boost::system::error_code& ec) virtual void on_recv_error(const boost::system::error_code& ec)
{ {
#ifndef ST_ASIO_REUSE_SSL_STREAM shutdown_ssl(true);
if (ST_THIS is_ready())
{
ST_THIS status = Socket::GRACEFUL_SHUTTING_DOWN;
ST_THIS show_info("ssl link:", "been shut down.");
boost::system::error_code ec;
ST_THIS next_layer().shutdown(ec);
if (ec && boost::asio::error::eof != ec) //the endpoint who initiated a shutdown operation will get error eof.
unified_out::info_out(ST_ASIO_LLF " shutdown ssl link failed (maybe intentionally because of reusing)", ST_THIS id());
}
#endif
Socket::on_recv_error(ec); Socket::on_recv_error(ec);
} }
...@@ -72,13 +47,9 @@ protected: ...@@ -72,13 +47,9 @@ protected:
void shutdown_ssl(bool sync = true) void shutdown_ssl(bool sync = true)
{ {
if (!ST_THIS is_ready()) if (!ST_THIS is_ready())
{
Socket::force_shutdown();
return; return;
}
ST_THIS status = Socket::GRACEFUL_SHUTTING_DOWN; ST_THIS status = Socket::GRACEFUL_SHUTTING_DOWN;
if (!sync) if (!sync)
{ {
ST_THIS show_info("ssl link:", "been shutting down."); ST_THIS show_info("ssl link:", "been shutting down.");
...@@ -87,11 +58,11 @@ protected: ...@@ -87,11 +58,11 @@ protected:
else else
{ {
ST_THIS show_info("ssl link:", "been shut down."); ST_THIS show_info("ssl link:", "been shut down.");
boost::system::error_code ec; boost::system::error_code ec;
ST_THIS next_layer().shutdown(ec); ST_THIS next_layer().shutdown(ec);
if (ec && boost::asio::error::eof != ec) //the endpoint who initiated a shutdown operation will get error eof. if (ec && boost::asio::error::eof != ec) //the endpoint who initiated a shutdown operation will get error eof.
unified_out::info_out(ST_ASIO_LLF " shutdown ssl link failed (maybe intentionally because of reusing)", ST_THIS id()); unified_out::info_out(ST_ASIO_LLF " shutdown ssl link failed: %s", ST_THIS id(), ec.message().data());
} }
} }
...@@ -112,29 +83,43 @@ private: ...@@ -112,29 +83,43 @@ private:
typedef socket<tcp::client_socket_base<Packer, Unpacker, Matrix, boost::asio::ssl::stream<boost::asio::ip::tcp::socket>, InQueue, InContainer, OutQueue, OutContainer> > super; typedef socket<tcp::client_socket_base<Packer, Unpacker, Matrix, boost::asio::ssl::stream<boost::asio::ip::tcp::socket>, InQueue, InContainer, OutQueue, OutContainer> > super;
public: public:
client_socket_base(boost::asio::io_context& io_context_, boost::asio::ssl::context& ctx) : super(io_context_, ctx) {} client_socket_base(boost::asio::io_context& io_context_, boost::asio::ssl::context& ctx_) : super(io_context_, ctx_), ctx(ctx_) {}
client_socket_base(Matrix& matrix_, boost::asio::ssl::context& ctx) : super(matrix_, ctx) {} client_socket_base(Matrix& matrix_, boost::asio::ssl::context& ctx_) : super(matrix_, ctx_), ctx(ctx_) {}
virtual const char* type_name() const {return "SSL (client endpoint)";} virtual const char* type_name() const {return "SSL (client endpoint)";}
virtual int type_id() const {return 3;} virtual int type_id() const {return 3;}
#ifndef ST_ASIO_REUSE_SSL_STREAM
void disconnect(bool reconnect = false) {force_shutdown(reconnect);} void disconnect(bool reconnect = false) {force_shutdown(reconnect);}
void force_shutdown(bool reconnect = false) {graceful_shutdown(reconnect);} void force_shutdown(bool reconnect = false) {graceful_shutdown(reconnect);}
void graceful_shutdown(bool reconnect = false, bool sync = true) void graceful_shutdown(bool reconnect = false, bool sync = true)
{ {
if (reconnect) if (ST_THIS is_ready())
unified_out::error_out(ST_ASIO_LLF " reconnecting mechanism is not available, please define macro ST_ASIO_REUSE_SSL_STREAM", ST_THIS id()); {
ST_THIS set_reconnect(reconnect);
shutdown_ssl(sync); shutdown_ssl(sync);
}
else
super::force_shutdown(reconnect);
} }
protected: protected:
virtual int prepare_reconnect(const boost::system::error_code& ec) {return -1;} virtual void on_unpack_error() {unified_out::info_out(ST_ASIO_LLF " can not unpack msg.", ST_THIS id()); ST_THIS unpacker()->dump_left_data(); force_shutdown(ST_THIS is_reconnect());}
#else virtual void after_close()
protected: {
#endif if (ST_THIS is_reconnect())
virtual void on_unpack_error() {unified_out::info_out(ST_ASIO_LLF " can not unpack msg.", ST_THIS id()); ST_THIS unpacker()->dump_left_data(); ST_THIS force_shutdown();} ST_THIS reset_next_layer(ctx);
super::after_close();
}
virtual bool change_io_context()
{
if (NULL == ST_THIS get_matrix())
return false;
ST_THIS reset_next_layer(ST_THIS get_matrix()->get_service_pump().assign_io_context(), ctx);
return true;
}
private: private:
virtual void connect_handler(const boost::system::error_code& ec) //intercept tcp::client_socket_base::connect_handler virtual void connect_handler(const boost::system::error_code& ec) //intercept tcp::client_socket_base::connect_handler
...@@ -153,10 +138,6 @@ private: ...@@ -153,10 +138,6 @@ private:
{ {
ST_THIS on_handshake(ec); ST_THIS on_handshake(ec);
#if ST_ASIO_RECONNECT && !defined(ST_ASIO_REUSE_SSL_STREAM)
ST_THIS close_reconnect();
#endif
if (!ec) if (!ec)
super::connect_handler(ec); //return to tcp::client_socket_base::connect_handler super::connect_handler(ec); //return to tcp::client_socket_base::connect_handler
else else
...@@ -164,6 +145,9 @@ private: ...@@ -164,6 +145,9 @@ private:
} }
using super::shutdown_ssl; using super::shutdown_ssl;
private:
boost::asio::ssl::context& ctx;
}; };
template<typename Object> template<typename Object>
...@@ -192,16 +176,14 @@ private: ...@@ -192,16 +176,14 @@ private:
typedef socket<tcp::server_socket_base<Packer, Unpacker, Server, boost::asio::ssl::stream<boost::asio::ip::tcp::socket>, InQueue, InContainer, OutQueue, OutContainer> > super; typedef socket<tcp::server_socket_base<Packer, Unpacker, Server, boost::asio::ssl::stream<boost::asio::ip::tcp::socket>, InQueue, InContainer, OutQueue, OutContainer> > super;
public: public:
server_socket_base(Server& server_, boost::asio::ssl::context& ctx) : super(server_, ctx) {} server_socket_base(Server& server_, boost::asio::ssl::context& ctx_) : super(server_, ctx_), ctx(ctx_) {}
virtual const char* type_name() const {return "SSL (server endpoint)";} virtual const char* type_name() const {return "SSL (server endpoint)";}
virtual int type_id() const {return 4;} virtual int type_id() const {return 4;}
#ifndef ST_ASIO_REUSE_SSL_STREAM
void disconnect() {force_shutdown();} void disconnect() {force_shutdown();}
void force_shutdown() {graceful_shutdown();} //must with async mode (the default value), because server_base::uninit will call this function void force_shutdown() {graceful_shutdown();} //must with async mode (the default value), because server_base::uninit will call this function
void graceful_shutdown(bool sync = false) {shutdown_ssl(sync);} void graceful_shutdown(bool sync = false) {if (ST_THIS is_ready()) shutdown_ssl(sync); else super::force_shutdown();}
#endif
protected: protected:
virtual bool do_start() //intercept tcp::server_socket_base::do_start (to add handshake) virtual bool do_start() //intercept tcp::server_socket_base::do_start (to add handshake)
...@@ -214,6 +196,8 @@ protected: ...@@ -214,6 +196,8 @@ protected:
virtual void on_unpack_error() {unified_out::info_out(ST_ASIO_LLF " can not unpack msg.", ST_THIS id()); ST_THIS unpacker()->dump_left_data(); ST_THIS force_shutdown();} virtual void on_unpack_error() {unified_out::info_out(ST_ASIO_LLF " can not unpack msg.", ST_THIS id()); ST_THIS unpacker()->dump_left_data(); ST_THIS force_shutdown();}
virtual bool change_io_context() {ST_THIS reset_next_layer(ST_THIS get_server().get_service_pump().assign_io_context(), ctx); return true;}
private: private:
void handle_handshake(const boost::system::error_code& ec) void handle_handshake(const boost::system::error_code& ec)
{ {
...@@ -226,6 +210,9 @@ private: ...@@ -226,6 +210,9 @@ private:
} }
using super::shutdown_ssl; using super::shutdown_ssl;
private:
boost::asio::ssl::context& ctx;
}; };
template<typename Socket, typename Pool = object_pool<Socket>, typename Server = tcp::i_server> class server_base : public tcp::server_base<Socket, Pool, Server> template<typename Socket, typename Pool = object_pool<Socket>, typename Server = tcp::i_server> class server_base : public tcp::server_base<Socket, Pool, Server>
...@@ -238,10 +225,10 @@ template<typename Socket> class single_client_base : public tcp::single_client_b ...@@ -238,10 +225,10 @@ template<typename Socket> class single_client_base : public tcp::single_client_b
public: public:
single_client_base(service_pump& service_pump_, boost::asio::ssl::context& ctx) : tcp::single_client_base<Socket>(service_pump_, ctx) {} single_client_base(service_pump& service_pump_, boost::asio::ssl::context& ctx) : tcp::single_client_base<Socket>(service_pump_, ctx) {}
}; };
template<typename Socket, typename Pool = object_pool<Socket> > class multi_client_base : public tcp::multi_client_base<Socket, Pool> template<typename Socket, typename Pool = object_pool<Socket>, typename Matrix = i_matrix> class multi_client_base : public tcp::multi_client_base<Socket, Pool, Matrix>
{ {
public: public:
multi_client_base(service_pump& service_pump_, const boost::asio::ssl::context::method& m) : tcp::multi_client_base<Socket, Pool>(service_pump_, m) {} multi_client_base(service_pump& service_pump_, const boost::asio::ssl::context::method& m) : tcp::multi_client_base<Socket, Pool, Matrix>(service_pump_, m) {}
}; };
}} //namespace }} //namespace
......
...@@ -106,7 +106,7 @@ public: ...@@ -106,7 +106,7 @@ public:
{BOOST_AUTO(unused, call_back); return create_or_update_timer(id, interval, unused, start);} {BOOST_AUTO(unused, call_back); return create_or_update_timer(id, interval, unused, start);}
bool change_timer_status(tid id, typename timer_info::timer_status status) {BOOST_AUTO(ti, find_timer(id)); return NULL != ti ? ti->status = status, true : false;} bool change_timer_status(tid id, typename timer_info::timer_status status) {BOOST_AUTO(ti, find_timer(id)); return NULL != ti ? ti->status = status, true : false;}
bool change_timer_interval(tid id, size_t interval) {BOOST_AUTO(ti, find_timer(id)); return NULL != ti ? ti->interval_ms = interval, true : false;} bool change_timer_interval(tid id, unsigned interval) {BOOST_AUTO(ti, find_timer(id)); return NULL != ti ? ti->interval_ms = interval, true : false;}
//after this call, call_back cannot be used again, please note. //after this call, call_back cannot be used again, please note.
bool change_timer_call_back(tid id, boost::function<bool(tid)>& call_back) {BOOST_AUTO(ti, find_timer(id)); return NULL != ti ? ti->call_back.swap(call_back), true : false;} bool change_timer_call_back(tid id, boost::function<bool(tid)>& call_back) {BOOST_AUTO(ti, find_timer(id)); return NULL != ti ? ti->call_back.swap(call_back), true : false;}
...@@ -136,8 +136,8 @@ public: ...@@ -136,8 +136,8 @@ public:
boost::lambda::bind((void (timer::*) (timer_info&)) &timer::stop_timer, this, boost::lambda::_1))); boost::lambda::bind((void (timer::*) (timer_info&)) &timer::stop_timer, this, boost::lambda::_1)));
} }
DO_SOMETHING_TO_ALL_MUTEX(timer_can, timer_can_mutex) DO_SOMETHING_TO_ALL_MUTEX(timer_can, timer_can_mutex, boost::lock_guard<boost::mutex>)
DO_SOMETHING_TO_ONE_MUTEX(timer_can, timer_can_mutex) DO_SOMETHING_TO_ONE_MUTEX(timer_can, timer_can_mutex, boost::lock_guard<boost::mutex>)
protected: protected:
bool start_timer(timer_info& ti, unsigned interval_ms) bool start_timer(timer_info& ti, unsigned interval_ms)
......
/*
* reliable_socket.h
*
* Created on: 2021-9-3
* Author: youngwolf
* email: mail2tao@163.com
* QQ: 676218192
* Community on QQ: 198941541
*
* reliable UDP socket
*/
#ifndef ST_ASIO_RELIABLE_UDP_SOCKET_H_
#define ST_ASIO_RELIABLE_UDP_SOCKET_H_
#include <ikcp.h>
#include "socket.h"
namespace st_asio_wrapper { namespace udp {
template <typename Packer, typename Unpacker, typename Matrix = i_matrix,
template<typename> class InQueue = ST_ASIO_INPUT_QUEUE, template<typename> class InContainer = ST_ASIO_INPUT_CONTAINER,
template<typename> class OutQueue = ST_ASIO_OUTPUT_QUEUE, template<typename> class OutContainer = ST_ASIO_OUTPUT_CONTAINER>
class reliable_socket_base : public socket_base<Packer, Unpacker, Matrix, InQueue, InContainer, OutQueue, OutContainer>
{
private:
typedef socket_base<Packer, Unpacker, Matrix, InQueue, InContainer, OutQueue, OutContainer> super;
public:
static const typename super::tid TIMER_BEGIN = super::TIMER_END;
static const typename super::tid TIMER_CC = TIMER_BEGIN;
static const typename super::tid TIMER_KCP_UPDATE = TIMER_BEGIN + 1;
static const typename super::tid TIMER_END = TIMER_BEGIN + 5;
public:
reliable_socket_base(boost::asio::io_context& io_context_) : super(io_context_), kcp(NULL), need_kcp_check(false), max_nsnd_que(ST_ASIO_RELIABLE_UDP_NSND_QUE) {}
reliable_socket_base(Matrix& matrix_) : super(matrix_), kcp(NULL), need_kcp_check(false), max_nsnd_que(ST_ASIO_RELIABLE_UDP_NSND_QUE) {}
~reliable_socket_base() {release_kcp();}
ikcpcb* get_kcpcb() {return kcp;}
ikcpcb* create_kcpcb(IUINT32 conv, void* user) {if (ST_THIS started()) return NULL; release_kcp(); return (kcp = ikcp_create(conv, user));}
IUINT32 get_max_nsnd_que() const {return max_nsnd_que;}
void set_max_nsnd_que(IUINT32 max_nsnd_que_) {max_nsnd_que = max_nsnd_que_;}
int output(const char* buf, int len)
{
boost::system::error_code ec;
ST_THIS next_layer().send(boost::asio::buffer(buf, (size_t) len), 0, ec);
return ec ? (unified_out::error_out(ST_ASIO_LLF " send msg error (%d)", ST_THIS id(), ec.value()), 0) : len;
}
//from kpc's test.h
/* get system time */
static inline void itimeofday(long *sec, long *usec)
{
#if defined(__unix)
struct timeval time;
gettimeofday(&time, NULL);
if (sec) *sec = time.tv_sec;
if (usec) *usec = time.tv_usec;
#else
static long mode = 0, addsec = 0;
BOOL retval;
static IINT64 freq = 1;
IINT64 qpc;
if (mode == 0) {
retval = QueryPerformanceFrequency((LARGE_INTEGER*)&freq);
freq = (freq == 0)? 1 : freq;
retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
addsec = (long)time(NULL);
addsec = addsec - (long)((qpc / freq) & 0x7fffffff);
mode = 1;
}
retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
retval = retval * 2;
if (sec) *sec = (long)(qpc / freq) + addsec;
if (usec) *usec = (long)((qpc % freq) * 1000000 / freq);
#endif
}
/* get clock in millisecond 64 */
static inline IINT64 iclock64(void)
{
long s, u;
IINT64 value;
itimeofday(&s, &u);
value = ((IINT64)s) * 1000 + (u / 1000);
return value;
}
static inline IUINT32 iclock()
{
return (IUINT32)(iclock64() & 0xfffffffful);
}
//from kpc's test.h
protected:
virtual bool do_start()
{
if (NULL != kcp)
ST_THIS set_timer(TIMER_KCP_UPDATE, kcp_check(), boost::bind(&reliable_socket_base::timer_handler, this, boost::placeholders::_1));
return super::do_start();
}
virtual void on_close() {release_kcp(); super::on_close();}
virtual bool check_send_cc() //congestion control, return true means can continue to send messages
{
boost::lock_guard<boost::mutex> lock(mutex);
if (NULL == kcp || kcp->nsnd_que <= max_nsnd_que)
return true;
ST_THIS set_timer(TIMER_CC, 10, boost::bind(&reliable_socket_base::timer_handler, this, boost::placeholders::_1));
return false;
}
virtual bool do_send_msg(const typename super::in_msg& sending_msg)
{
boost::lock_guard<boost::mutex> lock(mutex);
if (NULL == kcp)
return false;
int re = ikcp_send(kcp, sending_msg.data(), (long) sending_msg.size());
if (re < 0)
unified_out::error_out("ikcp_send return error: %d", re);
else
need_kcp_check = true;
return true;
}
virtual void pre_handle_msg(typename Unpacker::container_type& msg_can)
{
boost::lock_guard<boost::mutex> lock(mutex);
if (NULL == kcp)
return;
for (BOOST_AUTO(iter, msg_can.begin()); iter != msg_can.end(); ++iter)
{
int re = ikcp_input(kcp, iter->data(), (long) iter->size());
if (re < 0)
unified_out::error_out("ikcp_input return error: %d", re);
else
need_kcp_check = true;
}
msg_can.clear();
char buff[ST_ASIO_MSG_BUFFER_SIZE];
while (true)
{
int re = ikcp_recv(kcp, buff, sizeof(buff));
if (re < 0)
break;
ST_THIS unpacker()->compose_msg(buff, (size_t) re, msg_can);
}
}
private:
IUINT32 kcp_check()
{
IUINT32 now = iclock();
return ikcp_check(kcp, now) - now;
}
bool timer_handler(typename super::tid id)
{
switch (id)
{
case TIMER_CC:
if (NULL != kcp)
{
boost::unique_lock<boost::mutex> lock(mutex);
if (kcp->nsnd_que > max_nsnd_que)
return true; //continue CC
lock.unlock();
super::resume_sending();
}
break;
case TIMER_KCP_UPDATE:
if (NULL != kcp)
{
boost::lock_guard<boost::mutex> lock(mutex);
ikcp_update(kcp, iclock());
if (need_kcp_check)
{
need_kcp_check = false;
ST_THIS change_timer_interval(TIMER_KCP_UPDATE, kcp_check());
}
return true;
}
break;
default:
assert(false);
break;
}
return false;
}
void release_kcp() {if (NULL != kcp) ikcp_release(kcp); kcp = NULL;}
private:
ikcpcb* kcp;
boost::mutex mutex;
bool need_kcp_check;
IUINT32 max_nsnd_que;
};
}} //namespace
#endif /* ST_ASIO_RELIABLE_UDP_SOCKET_H_ */
...@@ -57,11 +57,11 @@ public: ...@@ -57,11 +57,11 @@ public:
} }
protected: protected:
generic_socket(boost::asio::io_context& io_context_) : super(io_context_), has_bound(false), matrix(NULL) {} generic_socket(boost::asio::io_context& io_context_) : super(io_context_), is_bound(false), is_connected(false), connect_mode(ST_ASIO_UDP_CONNECT_MODE), matrix(NULL) {}
generic_socket(Matrix& matrix_) : super(matrix_.get_service_pump()), has_bound(false), matrix(&matrix_) {} generic_socket(Matrix& matrix_) : super(matrix_.get_service_pump()), is_bound(false), is_connected(false), connect_mode(ST_ASIO_UDP_CONNECT_MODE), matrix(&matrix_) {}
public: public:
virtual bool is_ready() {return has_bound;} virtual bool is_ready() {return is_bound;}
virtual void send_heartbeat() virtual void send_heartbeat()
{ {
in_msg_type msg(peer_addr); in_msg_type msg(peer_addr);
...@@ -78,12 +78,23 @@ public: ...@@ -78,12 +78,23 @@ public:
//for udp::single_service_base, this virtual function will never be called, please note. //for udp::single_service_base, this virtual function will never be called, please note.
virtual void reset() virtual void reset()
{ {
has_bound = false; is_connected = is_bound = false;
sending_msg.clear(); sending_msg.clear();
if (NULL != matrix)
if (!ST_THIS change_io_context())
#if BOOST_ASIO_VERSION < 101100
matrix->get_service_pump().assign_io_context(ST_THIS lowest_layer().get_io_service());
#else
matrix->get_service_pump().assign_io_context(ST_THIS lowest_layer().get_executor().context());
#endif
super::reset(); super::reset();
} }
bool connected() const {return is_connected;}
void set_connect_mode() {connect_mode = true;}
bool get_connect_mode() const {return connect_mode;}
bool set_local_addr(unsigned short port, const std::string& ip = std::string()) {return set_addr(local_addr, port, ip);} bool set_local_addr(unsigned short port, const std::string& ip = std::string()) {return set_addr(local_addr, port, ip);}
bool set_local_addr(const std::string& file_name) {local_addr = typename Family::endpoint(file_name); return true;} bool set_local_addr(const std::string& file_name) {local_addr = typename Family::endpoint(file_name); return true;}
const typename Family::endpoint& get_local_addr() const {return local_addr;} const typename Family::endpoint& get_local_addr() const {return local_addr;}
...@@ -173,6 +184,7 @@ protected: ...@@ -173,6 +184,7 @@ protected:
const Matrix* get_matrix() const {return matrix;} const Matrix* get_matrix() const {return matrix;}
virtual bool bind(const typename Family::endpoint& local_addr) {return true;} virtual bool bind(const typename Family::endpoint& local_addr) {return true;}
virtual bool connect(const typename Family::endpoint& peer_addr) {return false;}
virtual bool do_start() virtual bool do_start()
{ {
...@@ -184,7 +196,7 @@ protected: ...@@ -184,7 +196,7 @@ protected:
if (ec) if (ec)
{ {
unified_out::error_out("cannot create socket: %s", ec.message().data()); unified_out::error_out("cannot create socket: %s", ec.message().data());
return (has_bound = false); return (is_bound = false);
} }
#ifndef ST_ASIO_NOT_REUSE_ADDRESS #ifndef ST_ASIO_NOT_REUSE_ADDRESS
...@@ -193,9 +205,11 @@ protected: ...@@ -193,9 +205,11 @@ protected:
} }
if (!bind(local_addr)) if (!bind(local_addr))
return (has_bound = false); return (is_bound = false);
else if (connect_mode)
is_connected = connect(peer_addr);
return (has_bound = true) && super::do_start(); return (is_bound = true) && super::do_start();
} }
//msg was failed to send and udp::generic_socket will not hold it any more, if you want to re-send it in the future, //msg was failed to send and udp::generic_socket will not hold it any more, if you want to re-send it in the future,
...@@ -221,9 +235,27 @@ protected: ...@@ -221,9 +235,27 @@ protected:
return true; return true;
} }
virtual void on_close()
{
#ifdef ST_ASIO_SYNC_SEND #ifdef ST_ASIO_SYNC_SEND
virtual void on_close() {if (sending_msg.p) sending_msg.p->set_value(NOT_APPLICABLE); super::on_close();} if (sending_msg.p)
sending_msg.p->set_value(NOT_APPLICABLE);
#endif
if (NULL != matrix)
#if BOOST_ASIO_VERSION < 101100
matrix->get_service_pump().return_io_context(ST_THIS lowest_layer().get_io_service());
#else
matrix->get_service_pump().return_io_context(ST_THIS lowest_layer().get_executor().context());
#endif #endif
super::on_close();
}
//reliable UDP socket needs following virtual functions to specify different behaviors.
virtual bool check_send_cc() {return true;} //congestion control, return true means can continue to send messages
virtual bool do_send_msg(const typename super::in_msg& sending_msg) {return false;} //customize message sending, for connected socket only
virtual void pre_handle_msg(typename Unpacker::container_type& msg_can) {}
void resume_sending() {sending = false; super::send_msg();} //for reliable UDP socket only
private: private:
using super::close; using super::close;
...@@ -251,8 +283,12 @@ private: ...@@ -251,8 +283,12 @@ private:
#ifdef ST_ASIO_PASSIVE_RECV #ifdef ST_ASIO_PASSIVE_RECV
reading = true; reading = true;
#endif #endif
ST_THIS next_layer().async_receive_from(recv_buff, temp_addr, make_strand_handler(rw_strand, if (is_connected)
ST_THIS make_handler_error_size(boost::bind(&generic_socket::recv_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)))); ST_THIS next_layer().async_receive(recv_buff, make_strand_handler(rw_strand,
ST_THIS make_handler_error_size(boost::bind(&generic_socket::recv_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))));
else
ST_THIS next_layer().async_receive_from(recv_buff, temp_addr, make_strand_handler(rw_strand,
ST_THIS make_handler_error_size(boost::bind(&generic_socket::recv_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))));
} }
} }
...@@ -265,11 +301,14 @@ private: ...@@ -265,11 +301,14 @@ private:
typename Unpacker::container_type msg_can; typename Unpacker::container_type msg_can;
ST_THIS unpacker()->parse_msg(bytes_transferred, msg_can); ST_THIS unpacker()->parse_msg(bytes_transferred, msg_can);
if (is_connected)
pre_handle_msg(msg_can);
#ifdef ST_ASIO_PASSIVE_RECV #ifdef ST_ASIO_PASSIVE_RECV
reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle() reading = false; //clear reading flag before call handle_msg() to make sure that recv_msg() can be called successfully in on_msg_handle()
#endif #endif
for (BOOST_AUTO(iter, msg_can.begin()); iter != msg_can.end(); ++iter) for (BOOST_AUTO(iter, msg_can.begin()); iter != msg_can.end(); ++iter)
temp_msg_can.emplace_back(temp_addr, boost::ref(*iter)); temp_msg_can.emplace_back(is_connected ? peer_addr : temp_addr, boost::ref(*iter));
if (handle_msg()) //if macro ST_ASIO_PASSIVE_RECV been defined, handle_msg will always return false if (handle_msg()) //if macro ST_ASIO_PASSIVE_RECV been defined, handle_msg will always return false
do_recv_msg(); //receive msg in sequence do_recv_msg(); //receive msg in sequence
} }
...@@ -297,13 +336,20 @@ private: ...@@ -297,13 +336,20 @@ private:
if (!in_strand && sending) if (!in_strand && sending)
return true; return true;
if ((sending = send_buffer.try_dequeue(sending_msg))) if (is_connected && !check_send_cc())
sending = true;
else if ((sending = send_buffer.try_dequeue(sending_msg)))
{ {
stat.send_delay_sum += statistic::now() - sending_msg.begin_time; stat.send_delay_sum += statistic::now() - sending_msg.begin_time;
sending_msg.restart(); sending_msg.restart();
ST_THIS next_layer().async_send_to(boost::asio::buffer(sending_msg.data(), sending_msg.size()), sending_msg.peer_addr, make_strand_handler(rw_strand, if (!is_connected)
ST_THIS make_handler_error_size(boost::bind(&generic_socket::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)))); ST_THIS next_layer().async_send_to(boost::asio::buffer(sending_msg.data(), sending_msg.size()), sending_msg.peer_addr, make_strand_handler(rw_strand,
ST_THIS make_handler_error_size(boost::bind(&generic_socket::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))));
else if (do_send_msg(sending_msg))
ST_THIS post_strand(rw_strand, boost::bind(&generic_socket::send_handler, this, boost::system::error_code(), sending_msg.size()));
else
ST_THIS next_layer().async_send(boost::asio::buffer(sending_msg.data(), sending_msg.size()), make_strand_handler(rw_strand,
ST_THIS make_handler_error_size(boost::bind(&generic_socket::send_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))));
return true; return true;
} }
...@@ -363,7 +409,7 @@ private: ...@@ -363,7 +409,7 @@ private:
#endif #endif
using super::rw_strand; using super::rw_strand;
bool has_bound; bool is_bound, is_connected, connect_mode;
typename super::in_msg sending_msg; typename super::in_msg sending_msg;
typename Family::endpoint local_addr; typename Family::endpoint local_addr;
typename Family::endpoint temp_addr; //used when receiving messages typename Family::endpoint temp_addr; //used when receiving messages
...@@ -400,6 +446,23 @@ protected: ...@@ -400,6 +446,23 @@ protected:
return true; return true;
} }
virtual bool connect(const boost::asio::ip::udp::endpoint& peer_addr)
{
if (0 != peer_addr.port() || !peer_addr.address().is_unspecified())
{
boost::system::error_code ec;
ST_THIS lowest_layer().connect(peer_addr, ec);
if (ec)
unified_out::error_out("cannot connect to the peer");
else
return true;
}
else
unified_out::error_out("invalid peer ip address");
return false;
}
}; };
#ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS #ifdef BOOST_ASIO_HAS_LOCAL_SOCKETS
...@@ -431,6 +494,23 @@ protected: ...@@ -431,6 +494,23 @@ protected:
return true; return true;
} }
virtual bool connect(const boost::asio::local::datagram_protocol::endpoint& peer_addr)
{
if (!peer_addr.path().empty())
{
boost::system::error_code ec;
ST_THIS lowest_layer().connect(peer_addr, ec);
if (ec)
unified_out::error_out("cannot connect to the peer");
else
return true;
}
else
unified_out::error_out("invalid peer path");
return false;
}
}; };
#endif #endif
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
//configuration //configuration
#define ST_ASIO_SERVER_PORT 9527 #define ST_ASIO_SERVER_PORT 9527
#define ST_ASIO_REUSE_OBJECT //use objects pool #define ST_ASIO_REUSE_OBJECT //use objects pool
#define ST_ASIO_REUSE_SSL_STREAM
//#define ST_ASIO_DEFAULT_PACKER packer2<unique_buffer<std::string>, std::string> //#define ST_ASIO_DEFAULT_PACKER packer2<unique_buffer<std::string>, std::string>
//#define ST_ASIO_DEFAULT_UNPACKER unpacker2<> //#define ST_ASIO_DEFAULT_UNPACKER unpacker2<>
#define ST_ASIO_HEARTBEAT_INTERVAL 5 //SSL has supported heartbeat because we used user data instead of OOB to implement #define ST_ASIO_HEARTBEAT_INTERVAL 5 //SSL has supported heartbeat because we used user data instead of OOB to implement
...@@ -32,6 +31,8 @@ int main(int argc, const char* argv[]) ...@@ -32,6 +31,8 @@ int main(int argc, const char* argv[])
service_pump sp; service_pump sp;
server server_(sp, boost::asio::ssl::context::sslv23_server); server server_(sp, boost::asio::ssl::context::sslv23_server);
server_.set_start_object_id(1000);
server_.context().set_options(boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::no_sslv2 | boost::asio::ssl::context::single_dh_use); server_.context().set_options(boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::no_sslv2 | boost::asio::ssl::context::single_dh_use);
server_.context().set_verify_mode(boost::asio::ssl::context::verify_peer | boost::asio::ssl::context::verify_fail_if_no_peer_cert); server_.context().set_verify_mode(boost::asio::ssl::context::verify_peer | boost::asio::ssl::context::verify_fail_if_no_peer_cert);
server_.context().load_verify_file("client_certs/server.crt"); server_.context().load_verify_file("client_certs/server.crt");
...@@ -62,7 +63,7 @@ int main(int argc, const char* argv[]) ...@@ -62,7 +63,7 @@ int main(int argc, const char* argv[])
ctx.load_verify_file("certs/server.crt"); ctx.load_verify_file("certs/server.crt");
ctx.use_certificate_chain_file("client_certs/server.crt"); ctx.use_certificate_chain_file("client_certs/server.crt");
ctx.use_private_key_file("client_certs/server.key", boost::asio::ssl::context::pem); ctx.use_private_key_file("client_certs/server.key", boost::asio::ssl::context::pem);
ctx.use_tmp_dh_file("client_certs/dh1024.pem"); ctx.use_tmp_dh_file("client_certs/dh2048.pem");
single_client client_(sp, ctx); single_client client_(sp, ctx);
*/ */
...@@ -89,22 +90,6 @@ int main(int argc, const char* argv[]) ...@@ -89,22 +90,6 @@ int main(int argc, const char* argv[])
printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client_.size(), client_.valid_size(), client_.invalid_object_size()); printf("link #: " ST_ASIO_SF ", valid links: " ST_ASIO_SF ", invalid links: " ST_ASIO_SF "\n", client_.size(), client_.valid_size(), client_.invalid_object_size());
client_.list_all_object(); client_.list_all_object();
} }
#ifndef ST_ASIO_REUSE_SSL_STREAM
else if (RESTART_COMMAND == str || RECONNECT == str)
puts("please define macro ST_ASIO_REUSE_SSL_STREAM to test this feature.");
else if (SHUTDOWN_LINK == str)
// server_.at(0)->graceful_shutdown();
// server_.at(0)->graceful_shutdown(true);
// server_.at(0)->force_shutdown();
client_.graceful_shutdown(client_.at(0));
// client_.graceful_shutdown(client_.at(0), false);
// client_.force_shutdown(client_.at(0));
// client_.graceful_shutdown(); //if you used single_client
// client_.graceful_shutdown(false, false); //if you used single_client
// client_.force_shutdown(); //if you used single_client
#else
else if (RESTART_COMMAND == str) else if (RESTART_COMMAND == str)
{ {
sp.stop_service(&client_); sp.stop_service(&client_);
...@@ -145,7 +130,6 @@ int main(int argc, const char* argv[]) ...@@ -145,7 +130,6 @@ int main(int argc, const char* argv[])
// client_.at(0)->graceful_shutdown(false, false); // client_.at(0)->graceful_shutdown(false, false);
// client_.graceful_shutdown(client_.at(0), false); // client_.graceful_shutdown(client_.at(0), false);
// client_.graceful_shutdown(false, false); //if you used single_client // client_.graceful_shutdown(false, false); //if you used single_client
#endif
else else
server_.broadcast_msg(str); server_.broadcast_msg(str);
} }
......
module = udp_test module = udp_test
ext_cflag = -I../../kcp/
ext_libs = -L../../kcp/ -lkcp
include ../config.mk include ../config.mk
...@@ -11,23 +11,24 @@ ...@@ -11,23 +11,24 @@
//you will see them cross together on the receiver's screen. //you will see them cross together on the receiver's screen.
//with this macro, if heartbeat not applied, macro ST_ASIO_AVOID_AUTO_STOP_SERVICE must be defined to avoid the service_pump run out. //with this macro, if heartbeat not applied, macro ST_ASIO_AVOID_AUTO_STOP_SERVICE must be defined to avoid the service_pump run out.
#define ST_ASIO_AVOID_AUTO_STOP_SERVICE #define ST_ASIO_AVOID_AUTO_STOP_SERVICE
#define ST_ASIO_UDP_CONNECT_MODE true
//#define ST_ASIO_HEARTBEAT_INTERVAL 5 //neither udp_unpacker nor udp_unpacker2 support heartbeat message, so heartbeat will be treated as normal message. //#define ST_ASIO_HEARTBEAT_INTERVAL 5 //neither udp_unpacker nor udp_unpacker2 support heartbeat message, so heartbeat will be treated as normal message.
//#define ST_ASIO_DEFAULT_UDP_UNPACKER udp_unpacker2<> //#define ST_ASIO_DEFAULT_UDP_UNPACKER udp_unpacker2<>
//configuration //configuration
#include "../include/ext/udp.h" #include "../include/ext/reliable_udp.h"
using namespace st_asio_wrapper; using namespace st_asio_wrapper;
#define QUIT_COMMAND "quit" #define QUIT_COMMAND "quit"
#define RESTART_COMMAND "restart" #define RESTART_COMMAND "restart"
void sync_recv_thread(ext::udp::single_socket_service& service) void sync_recv_thread(ext::udp::reliable_socket& socket)
{ {
list<ext::udp::single_socket_service::out_msg_type> msg_can; list<ext::udp::reliable_socket::out_msg_type> msg_can;
sync_call_result re = SUCCESS; sync_call_result re = SUCCESS;
do do
{ {
re = service.sync_recv_msg(msg_can, 50); //st_asio_wrapper will not maintain messages in msg_can anymore after sync_recv_msg return, please note. re = socket.sync_recv_msg(msg_can, 50); //st_asio_wrapper will not maintain messages in msg_can anymore after sync_recv_msg return, please note.
if (SUCCESS == re) if (SUCCESS == re)
{ {
for (BOOST_AUTO(iter, msg_can.begin()); iter != msg_can.end(); ++iter) for (BOOST_AUTO(iter, msg_can.begin()); iter != msg_can.end(); ++iter)
...@@ -38,6 +39,10 @@ void sync_recv_thread(ext::udp::single_socket_service& service) ...@@ -38,6 +39,10 @@ void sync_recv_thread(ext::udp::single_socket_service& service)
puts("sync recv end."); puts("sync recv end.");
} }
//because st_asio_wrapper is header only, it cannot provide the implementation of below global function, but kcp needs it,
//you're supposed to provide it and call reliable_socket_base::output directly in it, like:
int output(const char* buf, int len, ikcpcb* kcp, void* user) {return ((ext::udp::single_reliable_socket_service*) user)->output(buf, len);}
int main(int argc, const char* argv[]) int main(int argc, const char* argv[])
{ {
printf("usage: %s <my port> <peer port> [peer ip=127.0.0.1]\n", argv[0]); printf("usage: %s <my port> <peer port> [peer ip=127.0.0.1]\n", argv[0]);
...@@ -49,10 +54,15 @@ int main(int argc, const char* argv[]) ...@@ -49,10 +54,15 @@ int main(int argc, const char* argv[])
puts("type " QUIT_COMMAND " to end."); puts("type " QUIT_COMMAND " to end.");
service_pump sp; service_pump sp;
ext::udp::single_socket_service service(sp); ext::udp::single_reliable_socket_service service(sp);
service.set_local_addr((unsigned short) atoi(argv[1])); //for multicast, do not bind to a specific IP, just port is enough service.set_local_addr((unsigned short) atoi(argv[1])); //for multicast, do not bind to a specific IP, just port is enough
service.set_peer_addr((unsigned short) atoi(argv[2]), argc >= 4 ? argv[3] : "127.0.0.1"); service.set_peer_addr((unsigned short) atoi(argv[2]), argc >= 4 ? argv[3] : "127.0.0.1");
//reliable_socket cannot become reliable without below statement, instead, it downgrade to normal UDP socket
service.create_kcpcb(0, (void*) &service);
//without below statement, your application will core dump
ikcp_setoutput(service.get_kcpcb(), &output);
sp.start_service(); sp.start_service();
//for broadcast //for broadcast
// service.lowest_layer().set_option(boost::asio::socket_base::broadcast(true)); //usage: ./udp_test 5000 5000 "255.255.255.255" // service.lowest_layer().set_option(boost::asio::socket_base::broadcast(true)); //usage: ./udp_test 5000 5000 "255.255.255.255"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册