提交 eb93b242 编写于 作者: Y youngwolf

Demonstrate how to implement short connections (in echo_server and client).

client_socket_base support binding to a specific local address.
Write detailed logs if failed to create or bind socket.
上级 cf86cc62
......@@ -13,6 +13,7 @@
//if the server send messages quickly enough, you will see them cross together.
#define ASCS_ALIGNED_TIMER
#define ASCS_CUSTOM_LOG
#define ASCS_WANT_ALL_MSG_SEND_NOTIFY
#define ASCS_DEFAULT_UNPACKER non_copy_unpacker
//#define ASCS_DEFAULT_UNPACKER stream_unpacker
......@@ -51,6 +52,7 @@ public:
#include <ascs/ext/tcp.h>
using namespace ascs;
using namespace ascs::tcp;
using namespace ascs::ext;
using namespace ascs::ext::tcp;
......@@ -58,6 +60,55 @@ using namespace ascs::ext::tcp;
#define RESTART_COMMAND "restart"
#define RECONNECT "reconnect"
//demonstrates how to access client in client_socket (just like access server in server_socket)
class i_controller
{
public:
virtual void on_all_msg_send(uint_fast64_t id) = 0;
//add more interfaces if needed
};
class short_connection : public client_socket
{
public:
short_connection(asio::io_context& io_context_) : client_socket(io_context_), controller(nullptr) {}
void set_controller(i_controller* _controller) {controller = _controller;}
i_controller* get_controller() const {return controller;}
protected:
virtual void on_connect() {close_reconnect();}
#ifdef ASCS_WANT_ALL_MSG_SEND_NOTIFY
virtual void on_all_msg_send(in_msg_type& msg) {controller->on_all_msg_send(id());}
#endif
private:
i_controller* controller;
};
class short_client : public multi_client_base<short_connection>, protected i_controller
{
public:
short_client(service_pump& service_pump_) : multi_client_base(service_pump_) {}
void set_server_addr(unsigned short _port, const std::string& _ip = ASCS_SERVER_IP) {port = _port; ip = _ip;}
bool send_msg(const std::string& msg) {return send_msg(msg, port, ip);}
bool send_msg(const std::string& msg, unsigned short port, const std::string& ip)
{
auto socket_ptr = add_socket(port, ip);
return socket_ptr ? socket_ptr->set_controller(this), socket_ptr->send_msg(msg) : false;
}
protected:
virtual void on_all_msg_send(uint_fast64_t id) {}
private:
unsigned short port;
std::string ip;
};
std::thread create_sync_recv_thread(single_client& client)
{
return std::thread([&client]() {
......@@ -86,15 +137,19 @@ int main(int argc, const char* argv[])
service_pump sp;
single_client client(sp);
short_client client2(sp); //without single_client, we need to define ASCS_AVOID_AUTO_STOP_SERVICE macro to forbid service_pump stopping services automatically
// argv[2] = "::1" //ipv6
// argv[2] = "127.0.0.1" //ipv4
unsigned short port = ASCS_SERVER_PORT + 100;
std::string ip = ASCS_SERVER_IP;
if (argc > 1)
port = (unsigned short) atoi(argv[1]);
if (argc > 2)
client.set_server_addr(atoi(argv[1]), argv[2]);
else if (argc > 1)
client.set_server_addr(atoi(argv[1]), ASCS_SERVER_IP);
else
client.set_server_addr(ASCS_SERVER_PORT + 100, ASCS_SERVER_IP);
ip = argv[2];
client.set_server_addr(port, ip);
client2.set_server_addr(port + 1, ip);
sp.start_service();
auto t = create_sync_recv_thread(client);
......@@ -118,8 +173,12 @@ int main(int argc, const char* argv[])
else if (RECONNECT == str)
client.graceful_shutdown(true);
else
client.sync_safe_send_msg(str, 100);
//client.safe_send_msg(str);
{
client.sync_safe_send_msg(str + " (from normal client)", 100);
//client.safe_send_msg(str + " (from normal client)");
client2.send_msg(str + " (from short client)");
}
}
return 0;
......
......@@ -170,12 +170,33 @@ public:
normal_socket(i_server& server_) : server_socket_base(server_) {}
protected:
//demo client needs heartbeat (macro ASCS_HEARTBEAT_INTERVAL been defined), pleae note that the interval (here is 5) must be equal to
//demo client needs heartbeat (macro ASCS_HEARTBEAT_INTERVAL been defined), please note that the interval (here is 5) must be equal to
//macro ASCS_HEARTBEAT_INTERVAL defined in demo client, and macro ASCS_HEARTBEAT_MAX_ABSENCE must has the same value as demo client's.
virtual void on_connect() {start_heartbeat(5);}
};
#endif
class short_connection : public server_socket_base<packer, unpacker>
{
public:
short_connection(i_server& server_) : server_socket_base(server_) {}
protected:
//msg handling
#ifdef ASCS_SYNC_DISPATCH
//do not hold msg_can for further using, return from on_msg as quickly as possible
virtual size_t on_msg(std::list<out_msg_type>& msg_can) {auto re = server_socket_base::on_msg(msg_can); force_shutdown(); return re;}
#endif
#ifdef ASCS_DISPATCH_BATCH_MSG
//do not hold msg_can for further using, access msg_can and return from on_msg_handle as quickly as possible
virtual size_t on_msg_handle(out_queue_type& msg_can) {auto re = server_socket_base::on_msg_handle(msg_can); force_shutdown(); return re;}
#else
virtual bool on_msg_handle(out_msg_type& msg) {auto re = server_socket_base::on_msg_handle(msg); force_shutdown(); return re;}
#endif
//msg handling end
};
int main(int argc, const char* argv[])
{
printf("usage: %s [<service thread number=1> [<port=%d> [ip=0.0.0.0]]]\n", argv[0], ASCS_SERVER_PORT);
......@@ -190,21 +211,20 @@ int main(int argc, const char* argv[])
//this server cannot support fixed_length_packer/fixed_length_unpacker and prefix_suffix_packer/prefix_suffix_unpacker,
//the reason is these packer and unpacker need additional initializations that normal_socket not implemented,
//see echo_socket's constructor for more details.
server_base<normal_socket> server_(sp);
server_base<normal_socket> normal_server(sp);
server_base<short_connection> short_server(sp);
echo_server echo_server_(sp); //echo server
unsigned short port = ASCS_SERVER_PORT;
std::string ip;
if (argc > 2)
port = (unsigned short) atoi(argv[2]);
if (argc > 3)
{
server_.set_server_addr(atoi(argv[2]) + 100, argv[3]);
echo_server_.set_server_addr(atoi(argv[2]), argv[3]);
}
else if (argc > 2)
{
server_.set_server_addr(atoi(argv[2]) + 100);
echo_server_.set_server_addr(atoi(argv[2]));
}
else
server_.set_server_addr(ASCS_SERVER_PORT + 100);
ip = argv[3];
normal_server.set_server_addr(port + 100, ip);
short_server.set_server_addr(port + 101, ip);
echo_server_.set_server_addr(port, ip);
auto thread_num = 1;
if (argc > 1)
......@@ -230,19 +250,19 @@ int main(int argc, const char* argv[])
}
else if (STATISTIC == str)
{
printf("normal server, link #: " ASCS_SF ", invalid links: " ASCS_SF "\n", server_.size(), server_.invalid_object_size());
printf("normal server, link #: " ASCS_SF ", invalid links: " ASCS_SF "\n", normal_server.size(), normal_server.invalid_object_size());
printf("echo server, link #: " ASCS_SF ", invalid links: " ASCS_SF "\n\n", echo_server_.size(), echo_server_.invalid_object_size());
puts(echo_server_.get_statistic().to_string().data());
}
else if (STATUS == str)
{
server_.list_all_status();
normal_server.list_all_status();
echo_server_.list_all_status();
}
else if (LIST_ALL_CLIENT == str)
{
puts("clients from normal server:");
server_.list_all_object();
normal_server.list_all_object();
puts("clients from echo server:");
echo_server_.list_all_object();
}
......@@ -254,7 +274,7 @@ int main(int argc, const char* argv[])
{
// /*
//broadcast series functions call pack_msg for each client respectively, because clients may used different protocols(so different type of packers, of course)
server_.broadcast_msg(str.data(), str.size() + 1, false);
normal_server.broadcast_msg(str.data(), str.size() + 1, false);
//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
//so need \0 character when printing it.
// */
......@@ -265,11 +285,11 @@ int main(int argc, const char* argv[])
//send \0 character too, because demo client used basic_buffer as its msg type, it will not append \0 character automatically as std::string does,
//so need \0 character when printing it.
if (!msg.empty())
server_.do_something_to_all([&msg](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(msg);});
normal_server.do_something_to_all([&msg](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(msg);});
*/
/*
//if demo client is using stream_unpacker
server_.do_something_to_all([&str](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(str);});
normal_server.do_something_to_all([&str](server_base<normal_socket>::object_ctype& item) {item->direct_send_msg(str);});
*/
}
}
......
......@@ -459,6 +459,7 @@
* Prefix suffix packer and unpacker support heartbeat.
* New demo socket_management demonstrates how to manage sockets if you use other keys rather than the original id.
* Control reconnecting more flexibly, see function client_socket_base::open_reconnect and client_socket_base::close_reconnect for more details.
* client_socket_base support binding to a specific local address.
*
* DELETION:
*
......@@ -650,8 +651,9 @@ static_assert(ASCS_GRACEFUL_SHUTDOWN_MAX_DURATION > 0, "graceful shutdown durati
#endif
static_assert(ASCS_ASYNC_ACCEPT_NUM > 0, "async accept number must be bigger than zero.");
//in set_server_addr, if the IP is empty, ASCS_TCP_DEFAULT_IP_VERSION will define the IP version, or the IP version will be deduced by the IP address.
//asio::ip::tcp::v4() means ipv4 and asio::ip::tcp::v6() means ipv6.
//in server_base::set_server_addr and set_local_addr, if the IP is empty, ASCS_(TCP/UDP)_DEFAULT_IP_VERSION will define the IP version,
// or the IP version will be deduced by the IP address.
//asio::ip::(tcp/udp)::v4() means ipv4 and asio::ip::(tcp/udp)::v6() means ipv6.
#ifndef ASCS_TCP_DEFAULT_IP_VERSION
#define ASCS_TCP_DEFAULT_IP_VERSION asio::ip::tcp::v4()
#endif
......
......@@ -38,21 +38,10 @@ public:
// call superclass' reset function, before reusing this socket, object_pool will invoke this function
virtual void reset() {need_reconnect = true; super::reset();}
bool set_server_addr(unsigned short port, const std::string& ip = ASCS_SERVER_IP)
{
asio::error_code ec;
#if ASIO_VERSION >= 101100
auto addr = asio::ip::make_address(ip, ec);
#else
auto addr = asio::ip::address::from_string(ip, ec);
#endif
if (ec)
return false;
server_addr = asio::ip::tcp::endpoint(addr, port);
return true;
}
bool set_server_addr(unsigned short port, const std::string& ip = ASCS_SERVER_IP) {return set_addr(server_addr, port, ip);}
const asio::ip::tcp::endpoint& get_server_addr() const {return server_addr;}
bool set_local_addr(unsigned short port, const std::string& ip = std::string()) {return set_addr(local_addr, port, ip);}
const asio::ip::tcp::endpoint& get_local_addr() const {return local_addr;}
//if you don't want to reconnect to the server after link broken, call close_reconnect() or rewrite after_close() virtual function and do nothing in it,
//if you want to control the retry times and delay time after reconnecting failed, rewrite prepare_reconnect virtual function.
......@@ -92,7 +81,29 @@ protected:
{
assert(!this->is_connected());
this->lowest_layer().async_connect(server_addr, this->make_handler_error([this](const asio::error_code& ec) {this->connect_handler(ec);}));
auto& lowest_object = this->lowest_layer();
if (0 != local_addr.port() || !local_addr.address().is_unspecified())
{
asio::error_code ec;
if (!lowest_object.is_open()) //user maybe has opened this socket (to set options for example)
{
lowest_object.open(local_addr.protocol(), ec); assert(!ec);
if (ec)
{
unified_out::error_out("cannot create socket: %s", ec.message().data());
return false;
}
}
lowest_object.bind(local_addr, ec); assert(!ec);
if (ec)
{
unified_out::error_out("cannot bind socket: %s", ec.message().data());
return false;
}
}
lowest_object.async_connect(server_addr, this->make_handler_error([this](const asio::error_code& ec) {this->connect_handler(ec);}));
return true;
}
......@@ -154,9 +165,34 @@ private:
return false;
}
bool set_addr(asio::ip::tcp::endpoint& endpoint, unsigned short port, const std::string& ip)
{
if (ip.empty())
endpoint = asio::ip::tcp::endpoint(ASCS_TCP_DEFAULT_IP_VERSION, port);
else
{
asio::error_code ec;
#if ASIO_VERSION >= 101100
auto addr = asio::ip::make_address(ip, ec); assert(!ec);
#else
auto addr = asio::ip::address::from_string(ip, ec); assert(!ec);
#endif
if (ec)
{
unified_out::error_out("invalid IP address %s.", ip.data());
return false;
}
endpoint = asio::ip::tcp::endpoint(addr, port);
}
return true;
}
private:
bool need_reconnect;
asio::ip::tcp::endpoint server_addr;
asio::ip::tcp::endpoint local_addr;
};
}} //namespace
......
......@@ -49,18 +49,6 @@ public:
{
has_bound = false;
asio::error_code ec;
if (!this->lowest_layer().is_open()) {this->lowest_layer().open(local_addr.protocol(), ec); assert(!ec);} //user maybe has opened this socket (to set options for example)
if (this->lowest_layer().is_open())
{
#ifndef ASCS_NOT_REUSE_ADDRESS
this->lowest_layer().set_option(asio::socket_base::reuse_address(true), ec); assert(!ec);
#endif
this->lowest_layer().bind(local_addr, ec); assert(!ec);
if (!(has_bound = !ec))
unified_out::error_out("bind failed.");
}
last_send_msg.clear();
unpacker_->reset();
super::reset();
......@@ -127,7 +115,37 @@ public:
///////////////////////////////////////////////////
protected:
virtual bool do_start() {return has_bound ? super::do_start() : false;}
virtual bool do_start()
{
auto& lowest_object = this->lowest_layer();
if (!lowest_object.is_open()) //user maybe has opened this socket (to set options for example)
{
asio::error_code ec;
lowest_object.open(local_addr.protocol(), ec); assert(!ec);
if (ec)
{
unified_out::error_out("cannot create socket: %s", ec.message().data());
return (has_bound = false);
}
#ifndef ASCS_NOT_REUSE_ADDRESS
lowest_object.set_option(asio::socket_base::reuse_address(true), ec); assert(!ec);
#endif
}
if (0 != local_addr.port() || !local_addr.address().is_unspecified())
{
asio::error_code ec;
lowest_object.bind(local_addr, ec); assert(!ec);
if (ec)
{
unified_out::error_out("cannot bind socket: %s", ec.message().data());
return (has_bound = false);
}
}
return (has_bound = true) && super::do_start();
}
//msg was failed to send and udp::socket_base will not hold it any more, if you want to re-send it in the future,
// you must take over it and re-send (at any time) it via direct_send_msg.
......@@ -169,11 +187,12 @@ private:
this->stop_all_timer();
close();
if (this->lowest_layer().is_open())
auto& lowest_object = this->lowest_layer();
if (lowest_object.is_open())
{
asio::error_code ec;
this->lowest_layer().shutdown(asio::ip::udp::socket::shutdown_both, ec);
this->lowest_layer().close(ec);
lowest_object.shutdown(asio::ip::udp::socket::shutdown_both, ec);
lowest_object.close(ec);
}
}
......@@ -290,12 +309,15 @@ private:
{
asio::error_code ec;
#if ASIO_VERSION >= 101100
auto addr = asio::ip::make_address(ip, ec);
auto addr = asio::ip::make_address(ip, ec); assert(!ec);
#else
auto addr = asio::ip::address::from_string(ip, ec);
auto addr = asio::ip::address::from_string(ip, ec); assert(!ec);
#endif
if (ec)
{
unified_out::error_out("invalid IP address %s.", ip.data());
return false;
}
endpoint = asio::ip::udp::endpoint(addr, port);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册