提交 815c1495 编写于 作者: Y youngwolf 提交者: youngwolf

Move callback sockets from ascs::ext to ascs::ext::callbacks

Add callback registration support for server and object_pool.
Rename file ascs/ext/socket.h to ascs/ext/callbacks.h
上级 8357aaea
......@@ -38,7 +38,7 @@ public:
};
#include <ascs/ext/tcp.h>
#include <ascs/ext/socket.h>
#include <ascs/ext/callbacks.h>
using namespace ascs;
using namespace ascs::tcp;
using namespace ascs::ext;
......@@ -51,7 +51,8 @@ using namespace ascs::ext::tcp::proxy;
#define STATISTIC "statistic"
//we only want close reconnecting mechanism on these sockets, so it cannot be done by defining macro ASCS_RECONNECT to false
class short_client : public multi_client_base<c_socket<socks4::client_socket>>
//method 1
class short_client : public multi_client_base<callbacks::c_socket<socks4::client_socket>>
{
public:
short_client(service_pump& service_pump_) : multi_client_base(service_pump_) {set_server_addr(ASCS_SERVER_PORT);}
......@@ -79,6 +80,20 @@ private:
std::string ip;
};
//method 2
//class short_client : public multi_client_base<socks4::client_socket, callbacks::object_pool<object_pool<socks4::client_socket>>>
//{
//public:
// we now can not call register_on_connect on socket_ptr, since it's not wrapped by callbacks::c_socket
// socket_ptr->register_on_connect([](socks4::client_socket* socket) {socket->set_reconnect(false);}, true); //close reconnection mechanism
//};
//
//but now, we're able to call register_on_create on client2, since its object pool is wrapped by callbacks::object_pool
//short_client client2(...);
//client2.register_on_create([](object_pool<socks4::client_socket>*, object_pool<socks4::client_socket>::object_ctype& object_ptr) {
// object_ptr->set_reconnect(false); //close reconnection mechanism
//});
std::thread create_sync_recv_thread(client_socket& client)
{
return std::thread([&]() {
......
......@@ -48,7 +48,7 @@
//configuration
#include <ascs/ext/tcp.h>
#include <ascs/ext/socket.h>
#include <ascs/ext/callbacks.h>
using namespace ascs;
using namespace ascs::tcp;
using namespace ascs::ext;
......@@ -182,21 +182,11 @@ protected:
};
#endif
//demonstrate how to accept just one client at server endpoint
class normal_server : public server_base<normal_socket>
{
public:
normal_server(service_pump& service_pump_) : server_base(service_pump_) {}
protected:
virtual int async_accept_num() {return 1;}
virtual bool on_accept(object_ctype& socket_ptr) {stop_listen(); return true;}
};
class short_connection : public s_socket<server_socket_base<packer<>, unpacker<>>>
class short_connection : public callbacks::s_socket<server_socket_base<packer<>, unpacker<>>>
{
private:
typedef s_socket<server_socket_base<ext::packer<>, ext::unpacker<>>> super;
typedef callbacks::s_socket<server_socket_base<ext::packer<>, ext::unpacker<>>> super;
typedef server_socket_base<ext::packer<>, ext::unpacker<>> raw_socket;
public:
short_connection(i_server& server_) : super(server_)
......@@ -257,7 +247,12 @@ int main(int argc, const char* argv[])
//demonstrate how to use singel_service
//because of normal_socket, this server cannot support fixed_length_packer/fixed_length_unpacker and prefix_suffix_packer/prefix_suffix_unpacker,
//the reason is these packer and unpacker need additional initializations that normal_socket not implemented, see echo_socket's constructor for more details.
single_service_pump<normal_server> normal_server_;
single_service_pump<callbacks::server<server_base<normal_socket>>> normal_server_;
//following statements demonstrate how to accept just one client at server endpoint
normal_server_.register_async_accept_num([](server_base<normal_socket>*) {return 1;});
normal_server_.register_on_accept([](server_base<normal_socket>* server, server_base<normal_socket>::object_ctype&) {server->stop_listen(); return true;}, false);
//demonstrate how to use singel_service
single_service_pump<server_base<short_connection>> short_server;
unsigned short port = ASCS_SERVER_PORT;
......@@ -340,16 +335,16 @@ int main(int argc, const char* argv[])
// */
/*
//if all clients used the same protocol, we can pack msg one time, and send it repeatedly like this:
packer p;
packer<> p;
auto msg = p.pack_msg(str.data(), str.size() + 1);
//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
//so need \0 character when printing it.
if (!msg.empty())
((normal_server&) normal_server_).do_something_to_all([&](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(msg);});
((server_base<normal_socket>&) normal_server_).do_something_to_all([&](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(msg);});
*/
/*
//if demo client is using stream_unpacker
((normal_server&) normal_server_).do_something_to_all([&](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(str);});
((server_base<normal_socket>&) normal_server_).do_something_to_all([&](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(str);});
//or
normal_server_.broadcast_native_msg(str);
*/
......
......@@ -7,33 +7,33 @@
* QQ: 676218192
* Community on QQ: 198941541
*
* customize socket by event registration instead of overwrite virtual functions.
* customize sockets/server/object_pool by event registration instead of overwrite virtual functions.
*/
#ifndef _ASCS_EXT_SOCKET_H_
#define _ASCS_EXT_SOCKET_H_
#ifndef _ASCS_EXT_CALLBACKS_H_
#define _ASCS_EXT_CALLBACKS_H_
#include <functional>
#include "../base.h"
namespace ascs { namespace ext {
namespace ascs { namespace ext { namespace callbacks {
#define call_cb_void(fun) virtual void fun() {if (cb_##fun.first) cb_##fun.first(this); if (cb_##fun.second) Socket::fun();}
#define call_cb_1_void(fun, p) {if (cb_##fun.first) cb_##fun.first(this, p); if (cb_##fun.second) Socket::fun(p);}
#define call_cb_2_void(fun, p1, p2) {if (cb_##fun.first) cb_##fun.first(this, p1, p2); if (cb_##fun.second) Socket::fun(p1, p2);}
#define call_cb_void(super, fun) virtual void fun() {if (cb_##fun.first) cb_##fun.first(this); if (cb_##fun.second) super::fun();}
#define call_cb_1_void(super, fun, p) {if (cb_##fun.first) cb_##fun.first(this, p); if (cb_##fun.second) super::fun(p);}
#define call_cb_2_void(super, fun, p1, p2) {if (cb_##fun.first) cb_##fun.first(this, p1, p2); if (cb_##fun.second) super::fun(p1, p2);}
#define call_cb_combine(fun) virtual bool fun() {auto re = cb_##fun.first ? cb_##fun.first(this) : true; if (re && cb_##fun.second) re = Socket::fun(); return re;}
#define call_cb_1_return(init, fun, p) {auto re = init; if (cb_##fun.first) re = cb_##fun.first(this, p); if (cb_##fun.second) re = Socket::fun(p); return re;}
#define call_cb_combine(super, fun) virtual bool fun() {auto re = cb_##fun.first ? cb_##fun.first(this) : true; if (re && cb_##fun.second) re = super::fun(); return re;}
#define call_cb_return(super, type, fun) virtual type fun() {auto re = type(); if (cb_##fun.first) re = cb_##fun.first(this); if (cb_##fun.second) re = super::fun(); return re;}
#define call_cb_1_combine(super, fun, p) {auto re = cb_##fun.first ? cb_##fun.first(this, p) : true; if (re && cb_##fun.second) re = super::fun(p); return re;}
#define call_cb_1_return(super, type, fun, p) {auto re = type(); if (cb_##fun.first) re = cb_##fun.first(this, p); if (cb_##fun.second) re = super::fun(p); return re;}
#define call_cb_2_combine(super, fun, p1, p2) {auto re = cb_##fun.first ? cb_##fun.first(this, p1, p2) : true; if (re && cb_##fun.second) re = super::fun(p1, p2); return re;}
#define register_cb(fun, init) \
template<typename CallBack> void register_##fun(CallBack&& cb, bool pass_on = init) {cb_##fun.first = std::forward<CallBack>(cb); cb_##fun.second = pass_on;}
template<typename Socket> class g_socket : public Socket //udp socket will use g_socket only
{
public:
typedef Socket raw_socket;
public:
template<typename Arg> g_socket(Arg& arg) : Socket(arg) {first_init();}
template<typename Arg1, typename Arg2> g_socket(Arg1& arg1, Arg2&& arg2) : Socket(arg1, std::forward<Arg2>(arg2)) {first_init();}
......@@ -63,37 +63,37 @@ public:
#endif
public:
call_cb_combine(obsoleted)
call_cb_combine(is_ready)
call_cb_void(send_heartbeat)
call_cb_void(reset)
call_cb_combine(Socket, obsoleted)
call_cb_combine(Socket, is_ready)
call_cb_void(Socket, send_heartbeat)
call_cb_void(Socket, reset)
protected:
call_cb_combine(on_heartbeat_error)
virtual void on_send_error(const asio::error_code& ec, typename Socket::in_container_type& msg_can) call_cb_2_void(on_send_error, ec, msg_can)
virtual void on_recv_error(const asio::error_code& ec) call_cb_1_void(on_recv_error, ec)
call_cb_void(on_close)
call_cb_void(after_close)
call_cb_combine(Socket, on_heartbeat_error)
virtual void on_send_error(const asio::error_code& ec, typename Socket::in_container_type& msg_can) call_cb_2_void(Socket, on_send_error, ec, msg_can)
virtual void on_recv_error(const asio::error_code& ec) call_cb_1_void(Socket, on_recv_error, ec)
call_cb_void(Socket, on_close)
call_cb_void(Socket, after_close)
#ifdef ASCS_SYNC_DISPATCH
virtual size_t on_msg(std::list<typename Socket::out_msg_type>& msg_can) call_cb_1_return((size_t) 0, on_msg, msg_can)
virtual size_t on_msg(std::list<typename Socket::out_msg_type>& msg_can) call_cb_1_return(Socket, size_t, on_msg, msg_can)
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
virtual size_t on_msg_handle(typename Socket::out_queue_type& msg_can) call_cb_1_return((size_t) 0, on_msg_handle, msg_can)
virtual size_t on_msg_handle(typename Socket::out_queue_type& msg_can) call_cb_1_return(Socket, size_t, on_msg_handle, msg_can)
#else
virtual bool on_msg_handle(typename Socket::out_msg_type& msg) call_cb_1_return(false, on_msg_handle, msg)
virtual bool on_msg_handle(typename Socket::out_msg_type& msg) call_cb_1_combine(Socket, on_msg_handle, msg)
#endif
#ifdef ASCS_WANT_MSG_SEND_NOTIFY
virtual void on_msg_send(typename Socket::in_msg_type& msg) call_cb_1_void(on_msg_send, msg)
virtual void on_msg_send(typename Socket::in_msg_type& msg) call_cb_1_void(Socket, on_msg_send, msg)
#endif
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
virtual void on_all_msg_send(typename Socket::in_msg_type& msg) call_cb_1_void(on_all_msg_send, msg)
virtual void on_all_msg_send(typename Socket::in_msg_type& msg) call_cb_1_void(Socket, on_all_msg_send, msg)
#endif
#ifdef ASCS_SHRINK_SEND_BUFFER
virtual size_t calc_shrink_size(size_t current_size) call_cb_1_return((size_t) 0, calc_shrink_size, current_size)
virtual void on_msg_discard(typename Socket::in_container_type& msg_can) call_cb_1_void(on_msg_discard, msg_can)
virtual size_t calc_shrink_size(size_t current_size) call_cb_1_return(Socket, size_t, calc_shrink_size, current_size)
virtual void on_msg_discard(typename Socket::in_container_type& msg_can) call_cb_1_void(Socket, on_msg_discard, msg_can)
#endif
private:
......@@ -175,9 +175,9 @@ public:
register_cb(on_async_shutdown_error, true)
protected:
call_cb_void(on_connect)
call_cb_void(on_unpack_error)
call_cb_void(on_async_shutdown_error)
call_cb_void(Socket, on_connect)
call_cb_void(Socket, on_unpack_error)
call_cb_void(Socket, on_async_shutdown_error)
private:
void first_init()
......@@ -202,7 +202,7 @@ public:
register_cb(prepare_reconnect, false)
protected:
virtual int prepare_reconnect(const asio::error_code& ec) call_cb_1_return(0, prepare_reconnect, ec)
virtual int prepare_reconnect(const asio::error_code& ec) call_cb_1_return(Socket, int, prepare_reconnect, ec)
private:
void first_init() {cb_prepare_reconnect.second = true;}
......@@ -220,7 +220,7 @@ public:
register_cb(take_over, false)
public:
virtual void take_over(std::shared_ptr<typename Socket::type_of_object_restore> socket_ptr) call_cb_1_void(take_over, socket_ptr)
virtual void take_over(std::shared_ptr<typename Socket::type_of_object_restore> socket_ptr) call_cb_1_void(Socket, take_over, socket_ptr)
private:
void first_init() {cb_take_over.second = true;}
......@@ -229,6 +229,56 @@ private:
std::pair<std::function<void(Socket*, std::shared_ptr<typename Socket::type_of_object_restore>)>, bool> cb_take_over;
};
}} //namespace
template<typename Server> class server : public Server //for server
{
public:
template<typename Arg> server(Arg& arg) : Server(arg) {first_init();}
template<typename Arg1, typename Arg2> server(Arg1& arg1, Arg2&& arg2) : Server(arg1, std::forward<Arg2>(arg2)) {first_init();}
register_cb(async_accept_num, false)
register_cb(start_next_accept, true)
register_cb(on_accept, true)
register_cb(on_accept_error, true)
protected:
call_cb_return(Server, int, async_accept_num)
call_cb_void(Server, start_next_accept)
virtual bool on_accept(typename Server::object_ctype& socket_ptr) call_cb_1_combine(Server, on_accept, socket_ptr)
virtual bool on_accept_error(const asio::error_code& ec, typename Server::object_ctype& socket_ptr) call_cb_2_combine(Server, on_accept_error, ec, socket_ptr)
private:
void first_init()
{
cb_async_accept_num.second = true;
cb_start_next_accept.second = true;
cb_on_accept.second = true;
cb_on_accept_error.second = true;
}
private:
std::pair<std::function<int(Server*)>, bool> cb_async_accept_num;
std::pair<std::function<void(Server*)>, bool> cb_start_next_accept;
std::pair<std::function<bool(Server*, typename Server::object_ctype&)>, bool> cb_on_accept;
std::pair<std::function<bool(Server*, const asio::error_code&, typename Server::object_ctype&)>, bool> cb_on_accept_error;
};
template<typename ObjectPool> class object_pool : public ObjectPool
{
public:
template<typename Arg> object_pool(Arg& arg) : ObjectPool(arg) {first_init();}
register_cb(on_create, false)
protected:
virtual void on_create(typename ObjectPool::object_ctype& object_ptr) call_cb_1_void(ObjectPool, on_create, object_ptr)
private:
void first_init() {cb_on_create.second = true;}
private:
std::pair<std::function<void(ObjectPool*, typename ObjectPool::object_ctype&)>, bool> cb_on_create;
};
}}} //namespace
#endif /* _ASCS_EXT_SOCKET_H_ */
#endif /* _ASCS_EXT_CALLBACKS_H_ */
......@@ -199,12 +199,12 @@ public:
void graceful_shutdown() {this->do_something_to_all([](typename Pool::object_ctype& item) {item->graceful_shutdown();});}
protected:
virtual int async_accept_num() {return ASCS_ASYNC_ACCEPT_NUM;}
virtual bool init() {return start_listen() ? (this->start(), true) : false;}
virtual void uninit() {this->stop(); stop_listen(); force_shutdown();} //if you wanna graceful shutdown, call graceful_shutdown before stop_service.
virtual bool on_accept(typename Pool::object_ctype& socket_ptr) {return true;}
virtual int async_accept_num() {return ASCS_ASYNC_ACCEPT_NUM;}
virtual void start_next_accept() {std::lock_guard<std::mutex> lock(mutex); do_async_accept(create_object());}
virtual bool on_accept(typename Pool::object_ctype& socket_ptr) {return true;}
//if you want to ignore this error and continue to accept new connections immediately, return true in this virtual function;
//if you want to ignore this error and continue to accept new connections after a specific delay, start a timer immediately and return false
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册