提交 7b0c98ce 编写于 作者: E eidheim

Fixed pointer bug

上级 12d9e532
......@@ -26,7 +26,7 @@ int main() {
response_stream << response;
//server.send is an asynchronous function
server.send(connection.id, response_stream, [](const boost::system::error_code& ec){
server.send(connection, response_stream, [](const boost::system::error_code& ec){
if(!ec)
cout << "Message sent successfully" << endl;
else {
......@@ -67,12 +67,12 @@ int main() {
string response=message_stream.str()+" from "+to_string((size_t)connection.id);
for(auto connection_id: server.get_connection_ids()) {
for(auto connection_pointer: server.get_connection_pointers()) {
stringstream response_stream;
response_stream << response;
//server.send is an asynchronous function
server.send(connection_id, response_stream);
server.send(*connection_pointer, response_stream);
}
};
......
......@@ -26,7 +26,7 @@ int main() {
response_stream << response;
//server.send is an asynchronous function
server.send(connection.id, response_stream, [](const boost::system::error_code& ec){
server.send(connection, response_stream, [](const boost::system::error_code& ec){
if(!ec)
cout << "Message sent successfully" << endl;
else {
......@@ -67,12 +67,12 @@ int main() {
string response=message_stream.str()+" from "+to_string((size_t)connection.id);
for(auto connection_id: server.get_connection_ids()) {
for(auto connection_pointer: server.get_connection_pointers()) {
stringstream response_stream;
response_stream << response;
//server.send is an asynchronous function
server.send(connection_id, response_stream);
server.send(*connection_pointer, response_stream);
}
};
......@@ -80,4 +80,4 @@ int main() {
server.start();
return 0;
}
}
\ No newline at end of file
......@@ -9,12 +9,13 @@
#include <unordered_map>
#include <thread>
#include <mutex>
#include <unordered_set>
#include <set>
#include <iostream>
namespace SimpleWeb {
struct Connection {
class Connection {
public:
std::string method, path, http_version;
std::shared_ptr<std::istream> message;
......@@ -27,8 +28,7 @@ namespace SimpleWeb {
void* id;
};
class WebSocketCallbacks {
public:
struct WebSocketCallbacks {
std::function<void(Connection&)> onopen;
std::function<void(Connection&)> onmessage;
std::function<void(Connection&, const boost::system::error_code&)> onerror;
......@@ -40,10 +40,7 @@ namespace SimpleWeb {
template <class socket_type>
class SocketServerBase {
public:
endpoint_type endpoint;
SocketServerBase(unsigned short port, size_t num_threads=1) : m_endpoint(boost::asio::ip::tcp::v4(), port),
acceptor(m_io_service, m_endpoint), num_threads(num_threads) {}
endpoint_type endpoint;
void start() {
accept();
......@@ -66,8 +63,13 @@ namespace SimpleWeb {
//message_header: 129=one fragment, text, 130=one fragment, binary
//See http://tools.ietf.org/html/rfc6455#section-5.2 for more information
void send(void* connection_id, std::ostream& stream, const std::function<void(const boost::system::error_code&)>& callback=nullptr,
void send(Connection& connection, std::ostream& stream,
const std::function<void(const boost::system::error_code&)>& callback=nullptr,
unsigned char message_header=129) const {
if(!connections.count(&connection)) {
throw std::invalid_argument("Connection closed");
}
std::shared_ptr<boost::asio::streambuf> write_buffer(new boost::asio::streambuf);
std::ostream response(write_buffer.get());
......@@ -97,27 +99,28 @@ namespace SimpleWeb {
response << stream.rdbuf();
//Needs to copy the callback-function in case its destroyed
boost::asio::async_write(*(socket_type*)connection_id, *write_buffer,
[this, write_buffer, callback, connection_id](const boost::system::error_code& ec, size_t bytes_transferred) {
//Need to copy the callback-function in case its destroyed
boost::asio::async_write(*static_cast<socket_type*>(connection.id), *write_buffer,
[this, write_buffer, callback]
(const boost::system::error_code& ec, size_t bytes_transferred) {
if(callback) {
callback(ec);
}
});
}
std::unordered_set<void*> get_connection_ids() {
connection_ids_mutex.lock();
auto copy=connection_ids;
connection_ids_mutex.unlock();
std::set<Connection*> get_connection_pointers() {
connections_mutex.lock();
auto copy=connections;
connections_mutex.unlock();
return copy;
}
protected:
const std::string ws_magic_string="258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
std::unordered_set<void*> connection_ids;
std::mutex connection_ids_mutex;
std::set<Connection*> connections;
std::mutex connections_mutex;
boost::asio::io_service m_io_service;
boost::asio::ip::tcp::endpoint m_endpoint;
......@@ -125,7 +128,13 @@ namespace SimpleWeb {
size_t num_threads;
std::vector<std::thread> threads;
virtual void accept() {}
size_t timeout_request;
size_t timeout_idle;
SocketServerBase(unsigned short port, size_t num_threads=1) : m_endpoint(boost::asio::ip::tcp::v4(), port),
acceptor(m_io_service, m_endpoint), num_threads(num_threads) {}
virtual void accept()=0;
void process_request_and_start_connection(std::shared_ptr<socket_type> socket) {
//Create new read_buffer for async_read_until()
......@@ -133,7 +142,8 @@ namespace SimpleWeb {
std::shared_ptr<boost::asio::streambuf> read_buffer(new boost::asio::streambuf);
boost::asio::async_read_until(*socket, *read_buffer, "\r\n\r\n",
[this, socket, read_buffer](const boost::system::error_code& ec, size_t bytes_transferred) {
[this, socket, read_buffer]
(const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
//Convert to istream to extract string-lines
std::istream stream(read_buffer.get());
......@@ -208,13 +218,14 @@ namespace SimpleWeb {
connection->id=socket.get();
//Capture write_buffer in lambda so it is not destroyed before async_write is finished
boost::asio::async_write(*socket, *write_buffer,
[this, socket, write_buffer, read_buffer, &an_endpoint, connection](const boost::system::error_code& ec, size_t bytes_transferred) {
[this, socket, write_buffer, read_buffer, &an_endpoint, connection]
(const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
connection_open(socket.get(), an_endpoint.second, *connection);
connection_open(an_endpoint.second, *connection);
read_write_messages(socket, read_buffer, an_endpoint.second, connection);
}
else
connection_error(socket.get(), an_endpoint.second, *connection, ec);
connection_error(an_endpoint.second, *connection, ec);
});
}
return;
......@@ -222,9 +233,11 @@ namespace SimpleWeb {
}
}
void read_write_messages(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer, WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection) {
void read_write_messages(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer,
WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection) {
boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(2),
[this, socket, read_buffer, &websocketcallbacks, connection](const boost::system::error_code& ec, size_t bytes_transferred) {
[this, socket, read_buffer, &websocketcallbacks, connection]
(const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::istream stream(read_buffer.get());
......@@ -239,7 +252,8 @@ namespace SimpleWeb {
if(length==126) {
//2 next bytes is the size of content
boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(2),
[this, socket, read_buffer, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) {
[this, socket, read_buffer, &websocketcallbacks, connection, opcode]
(const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::istream stream(read_buffer.get());
......@@ -255,13 +269,14 @@ namespace SimpleWeb {
read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode);
}
else
connection_error(socket.get(), websocketcallbacks, *connection, ec);
connection_error(websocketcallbacks, *connection, ec);
});
}
else if(length==127) {
//8 next bytes is the size of content
boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(8),
[this, socket, read_buffer, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) {
[this, socket, read_buffer, &websocketcallbacks, connection, opcode]
(const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::istream stream(read_buffer.get());
......@@ -277,21 +292,24 @@ namespace SimpleWeb {
read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode);
}
else
connection_error(socket.get(), websocketcallbacks, *connection, ec);
connection_error(websocketcallbacks, *connection, ec);
});
}
else
read_write_message_content(socket, read_buffer, length, websocketcallbacks, connection, opcode);
}
else
connection_error(socket.get(), websocketcallbacks, *connection, ec);
connection_error(websocketcallbacks, *connection, ec);
});
}
void read_write_message_content(std::shared_ptr<socket_type> socket, std::shared_ptr<boost::asio::streambuf> read_buffer, size_t length,
WebSocketCallbacks& websocketcallbacks, std::shared_ptr<Connection> connection, unsigned char opcode) {
void read_write_message_content(std::shared_ptr<socket_type> socket,
std::shared_ptr<boost::asio::streambuf> read_buffer,
size_t length, WebSocketCallbacks& websocketcallbacks,
std::shared_ptr<Connection> connection, unsigned char opcode) {
boost::asio::async_read(*socket, *read_buffer, boost::asio::transfer_exactly(4+length),
[this, socket, read_buffer, length, &websocketcallbacks, connection, opcode](const boost::system::error_code& ec, size_t bytes_transferred) {
[this, socket, read_buffer, length, &websocketcallbacks, connection, opcode]
(const boost::system::error_code& ec, size_t bytes_transferred) {
if(!ec) {
std::istream stream(read_buffer.get());
......@@ -318,7 +336,7 @@ namespace SimpleWeb {
status=(byte1<<8)+byte2;
}
connection_close(socket.get(), websocketcallbacks, *connection, status);
connection_close(websocketcallbacks, *connection, status);
return;
}
......@@ -329,34 +347,30 @@ namespace SimpleWeb {
read_write_messages(socket, read_buffer, websocketcallbacks, connection);
}
else
connection_error(socket.get(), websocketcallbacks, *connection, ec);
connection_error(websocketcallbacks, *connection, ec);
});
}
void connection_open(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection) {
connection_ids_mutex.lock();
connection_ids.insert(socket);
connection_ids_mutex.unlock();
void connection_open(const WebSocketCallbacks& websocketcallbacks, Connection& connection) {
connections_mutex.lock();
connections.insert(&connection);
connections_mutex.unlock();
if(websocketcallbacks.onopen)
websocketcallbacks.onopen(connection);
}
void connection_close(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection, int status) {
//((socket_type*)socket)->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
//((socket_type*)socket)->close();
connection_ids_mutex.lock();
connection_ids.erase(socket);
connection_ids_mutex.unlock();
void connection_close(const WebSocketCallbacks& websocketcallbacks, Connection& connection, int status) {
connections_mutex.lock();
connections.erase(&connection);
connections_mutex.unlock();
if(websocketcallbacks.onclose)
websocketcallbacks.onclose(connection, status);
}
void connection_error(void* socket, const WebSocketCallbacks& websocketcallbacks, Connection& connection, const boost::system::error_code& ec) {
//((socket_type*)socket)->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
//((socket_type*)socket)->close();
connection_ids_mutex.lock();
connection_ids.erase(socket);
connection_ids_mutex.unlock();
void connection_error(const WebSocketCallbacks& websocketcallbacks, Connection& connection, const boost::system::error_code& ec) {
connections_mutex.lock();
connections.erase(&connection);
connections_mutex.unlock();
if(websocketcallbacks.onerror) {
boost::system::error_code ec_tmp=ec;
websocketcallbacks.onerror(connection, ec_tmp);
......
......@@ -11,7 +11,7 @@ namespace SimpleWeb {
class Server<WSS> : public SocketServerBase<WSS> {
public:
Server(unsigned short port, size_t num_threads, const std::string& cert_file, const std::string& private_key_file) :
SocketServerBase<WSS>::SocketServerBase(port, num_threads), context(boost::asio::ssl::context::sslv23) {
SocketServerBase<WSS>::SocketServerBase(port, num_threads), context(boost::asio::ssl::context::sslv23) {
context.use_certificate_chain_file(cert_file);
context.use_private_key_file(private_key_file, boost::asio::ssl::context::pem);
}
......@@ -29,7 +29,8 @@ namespace SimpleWeb {
accept();
if(!ec) {
(*socket).async_handshake(boost::asio::ssl::stream_base::server, [this, socket](const boost::system::error_code& ec) {
(*socket).async_handshake(boost::asio::ssl::stream_base::server,
[this, socket](const boost::system::error_code& ec) {
if(!ec) {
process_request_and_start_connection(socket);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册