service_pump.h 15.5 KB
Newer Older
Y
youngwolf 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * service_pump.h
 *
 *  Created on: 2012-3-2
 *      Author: youngwolf
 *		email: mail2tao@163.com
 *		QQ: 676218192
 *		Community on QQ: 198941541
 *
 * this class only used at both server and client endpoint
 */

#ifndef _ASCS_SERVICE_PUMP_H_
#define _ASCS_SERVICE_PUMP_H_

#include "base.h"

namespace ascs
{

21
class service_pump
Y
youngwolf 已提交
22 23 24 25 26
{
public:
	class i_service
	{
	protected:
27 28
		i_service(service_pump& service_pump_) : sp(service_pump_), started_(false), id_(0), data(nullptr) {sp.add(this);}
		virtual ~i_service() {sp.remove(this);}
Y
youngwolf 已提交
29 30 31 32

	public:
		//for the same i_service, start_service and stop_service are not thread safe,
		//to resolve this defect, we must add a mutex member variable to i_service, it's not worth
33 34 35
		void start_service() {if (!started_) started_ = init();}
		void stop_service() {if (started_) uninit(); started_ = false;}
		bool service_started() const {return started_;}
Y
youngwolf 已提交
36 37 38 39 40 41 42 43 44 45 46 47

		void id(int id) {id_ = id;}
		int id() const {return id_;}
		void user_data(void* data_) {data = data_;}
		void* user_data() const {return data;}

		service_pump& get_service_pump() {return sp;}
		const service_pump& get_service_pump() const {return sp;}

	protected:
		virtual bool init() = 0;
		virtual void uninit() = 0;
48
		virtual void finalize() {} //clean up after stop_service
Y
youngwolf 已提交
49 50

	protected:
51
		friend service_pump;
Y
youngwolf 已提交
52 53 54
		service_pump& sp;

	private:
55
		bool started_;
Y
youngwolf 已提交
56 57 58 59
		int id_;
		void* data; //magic data, you can use it in any way
	};

60 61 62
protected:
	struct context
	{
63
		boost::asio::io_context io_context;
64
		unsigned refs;
65
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
66
#if BOOST_ASIO_VERSION > 101100
67
		boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work;
68
#else
69
		std::shared_ptr<boost::asio::io_context::work> work;
70 71 72 73
#endif
#endif
		std::list<std::thread> threads;

74 75
#if BOOST_ASIO_VERSION >= 101200
		context(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) : io_context(concurrency_hint), refs(0)
76 77 78 79
#else
		context() : refs(0)
#endif
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
80
#if BOOST_ASIO_VERSION > 101100
81 82
			, work(io_context.get_executor())
#else
83
			, work(std::make_shared<boost::asio::io_context::work>(io_context))
84 85 86 87 88
#endif
#endif
		{}
	};

Y
youngwolf 已提交
89 90 91
public:
	typedef i_service* object_type;
	typedef const object_type object_ctype;
Y
yang li 已提交
92
	typedef std::list<object_type> container_type;
Y
youngwolf 已提交
93

94
#if BOOST_ASIO_VERSION >= 101200
95
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
96
	service_pump(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) : started(false), first(true), real_thread_num(0), del_thread_num(0), single_ctx(true)
97
		{context_can.emplace_back(concurrency_hint);}
98
#else
99 100
	//basically, the parameter multi_ctx is designed to be used by single_service_pump, which means single_service_pump always think it's using multiple io_context
	//for service_pump, you should use set_io_context_num function instead if you really need multiple io_context.
101
	service_pump(int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE, bool multi_ctx = false) : started(false), first(true), single_ctx(!multi_ctx)
102
		{context_can.emplace_back(concurrency_hint);}
103
	bool set_io_context_num(int io_context_num, int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE) //call this before construct any services on this service_pump
104 105 106 107 108 109
	{
		if (io_context_num < 1 || is_service_started() || context_can.size() > 1) //can only be called once
			return false;

		for (auto i = 1; i < io_context_num; ++i)
			context_can.emplace_back(concurrency_hint);
110 111
		if (context_can.size() > 1)
			single_ctx = false;
112 113 114

		return true;
	}
115
#endif
116
#else
Y
youngwolf 已提交
117
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
118
	service_pump() : started(false), first(true), real_thread_num(0), del_thread_num(0), single_ctx(true), context_can(1) {}
Y
youngwolf 已提交
119
#else
120 121
	//basically, the parameter multi_ctx is designed to be used by single_service_pump, which means single_service_pump always think it's using multiple io_context
	//for service_pump, you should use set_io_context_num function instead if you really need multiple io_context.
122
	service_pump(bool multi_ctx = false) : started(false), first(true), single_ctx(!multi_ctx), context_can(1) {}
123
	bool set_io_context_num(int io_context_num) //call this before construct any services on this service_pump
124 125 126 127 128
	{
		if (io_context_num < 1 || is_service_started() || context_can.size() > 1) //can only be called once
			return false;

		context_can.resize(io_context_num);
129 130
		if (context_can.size() > 1)
			single_ctx = false;
131 132 133

		return true;
	}
Y
youngwolf 已提交
134 135
#endif
#endif
Y
youngwolf 已提交
136
	virtual ~service_pump() {stop_service();}
Y
youngwolf 已提交
137

138 139
	int get_io_context_num() const {return (int) context_can.size();}
	void get_io_context_refs(std::list<unsigned>& refs)
140
		{if (!single_ctx) ascs::do_something_to_all(context_can, context_can_mutex, [&](context& item) {refs.emplace_back(item.refs);});}
141

142 143 144 145
	//do not call below function implicitly or explicitly, before 1.6, a service_pump is also an io_context, so we already have it implicitly,
	// but in 1.6 and later, a service_pump is not an io_context anymore, then it is just provided to accommodate legacy usage in class
	// (unix_)server_base, (unix_)server_socket_base, (unix_)client_socket_base, udp::(unix_)socket_base and their subclasses.
	//according to the implementation, it picks the io_context who has the least references, so the return values are not consistent.
146
	operator boost::asio::io_context& () {return assign_io_context();}
147

148
	boost::asio::io_context& assign_io_context(bool increase_ref = true) //pick the context which has the least references
149
	{
150
		if (single_ctx)
151 152 153
			return context_can.front().io_context;

		context* ctx = nullptr;
154
		unsigned refs = 0;
155 156 157 158 159 160 161 162 163 164 165 166 167 168

		std::lock_guard<std::mutex> lock(context_can_mutex);
		ascs::do_something_to_one(context_can, [&](context& item) {
			if (0 == item.refs || 0 == refs || refs > item.refs)
			{
				refs = item.refs;
				ctx = &item;
			}

			return 0 == item.refs;
		});

		if (nullptr != ctx)
		{
169 170 171
			if (increase_ref)
				++ctx->refs;

172 173 174
			return ctx->io_context;
		}

175
		throw std::runtime_error("no available io_context");
176 177
	}

178
	void return_io_context(const boost::asio::execution_context& io_context, unsigned refs = 1)
179
	{
180
		if (!single_ctx)
181
			ascs::do_something_to_one(context_can, context_can_mutex, [&](context& item) {return &io_context != &item.io_context ? false : (item.refs -= refs, true);});
182
	}
183
	void assign_io_context(const boost::asio::execution_context& io_context, unsigned refs = 1)
184
	{
185
		if (!single_ctx)
186
			ascs::do_something_to_one(context_can, context_can_mutex, [&](context& item) {return &io_context != &item.io_context ? false : (item.refs += refs, true);});
187 188
	}

Y
youngwolf 已提交
189 190
	object_type find(int id)
	{
Y
youngwolf 已提交
191
		std::lock_guard<std::mutex> lock(service_can_mutex);
192
		auto iter = std::find_if(std::begin(service_can), std::end(service_can), [&](object_ctype& item) {return id == item->id();});
Y
youngwolf 已提交
193 194 195 196 197 198 199
		return iter == std::end(service_can) ? nullptr : *iter;
	}

	void remove(object_type i_service_)
	{
		assert(nullptr != i_service_);

Y
youngwolf 已提交
200
		std::unique_lock<std::mutex> lock(service_can_mutex);
Y
youngwolf 已提交
201 202 203 204 205 206 207 208
		service_can.remove(i_service_);
		lock.unlock();

		stop_and_free(i_service_);
	}

	void remove(int id)
	{
Y
youngwolf 已提交
209
		std::unique_lock<std::mutex> lock(service_can_mutex);
210
		auto iter = std::find_if(std::begin(service_can), std::end(service_can), [&](object_ctype& item) {return id == item->id();});
Y
youngwolf 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224
		if (iter != std::end(service_can))
		{
			auto i_service_ = *iter;
			service_can.erase(iter);
			lock.unlock();

			stop_and_free(i_service_);
		}
	}

	void clear()
	{
		container_type temp_service_can;

Y
youngwolf 已提交
225
		std::unique_lock<std::mutex> lock(service_can_mutex);
Y
youngwolf 已提交
226 227 228
		temp_service_can.splice(std::end(temp_service_can), service_can);
		lock.unlock();

229
		ascs::do_something_to_all(temp_service_can, [this](object_type& item) {stop_and_free(item);});
Y
youngwolf 已提交
230 231
	}

232 233 234
	//stop io_context directly, call this only if the stop_service invocation cannot stop the io_context
	void stop() {ascs::do_something_to_all(context_can, [&](context& item) {item.io_context.stop();});}

Y
youngwolf 已提交
235 236 237 238 239 240 241 242 243 244 245
	void start_service(int thread_num = ASCS_SERVICE_THREAD_NUM) {if (!is_service_started()) do_service(thread_num);}
	//stop the service, must be invoked explicitly when the service need to stop, for example, close the application
	void stop_service()
	{
		if (is_service_started())
		{
			end_service();
			wait_service();
		}
	}

246
	//if you add a service after start_service(), use this to start it
Y
youngwolf 已提交
247 248 249 250 251 252 253 254 255
	void start_service(object_type i_service_, int thread_num = ASCS_SERVICE_THREAD_NUM)
	{
		assert(nullptr != i_service_);

		if (is_service_started())
			i_service_->start_service();
		else
			start_service(thread_num);
	}
256 257 258 259 260
	//doesn't like stop_service(), this will asynchronously stop service i_service_, you must NOT free i_service_ immediately,
	// otherwise, you may lead segment fault if you freed i_service_ before any async operation ends.
	//my suggestion is, DO NOT free i_service_, we can suspend it (by your own implementation and invocation rather than
	// stop_service(object_type)).
	//BTW, all i_services managed by this service_pump can be safely freed after stop_service().
Y
youngwolf 已提交
261 262
	void stop_service(object_type i_service_) {assert(nullptr != i_service_); i_service_->stop_service();}

263
	//this function works like start_service() except that it will block until all services run out
Y
youngwolf 已提交
264 265 266 267
	void run_service(int thread_num = ASCS_SERVICE_THREAD_NUM)
	{
		if (!is_service_started())
		{
268
			do_service(thread_num, true);
Y
youngwolf 已提交
269 270 271
			wait_service();
		}
	}
Y
youngwolf 已提交
272

Y
youngwolf 已提交
273 274 275
	//stop the service, must be invoked explicitly when the service need to stop, for example, close the application
	//only for service pump started by 'run_service', this function will return immediately,
	//only the return from 'run_service' means service pump ended.
Y
youngwolf 已提交
276 277 278 279 280
	void end_service()
	{
		if (is_service_started())
		{
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
281
			ascs::do_something_to_all(context_can, [](context& item) {item.work.reset();});
Y
youngwolf 已提交
282 283 284 285
#endif
			do_something_to_all([](object_type& item) {item->stop_service();});
		}
	}
Y
youngwolf 已提交
286

287 288 289 290 291 292
	bool is_running() const
	{
		auto running = false;
		ascs::do_something_to_one(context_can, [&](const context& item) {return (running = !item.io_context.stopped());});
		return running;
	}
293

Y
youngwolf 已提交
294
	bool is_service_started() const {return started;}
295
	bool is_first_running() const {return first;}
Y
youngwolf 已提交
296

297
	//not thread safe
298 299
#if BOOST_ASIO_VERSION >= 101200
	void add_service_thread(int thread_num, bool block = false, int io_context_num = 0, int concurrency_hint = BOOST_ASIO_CONCURRENCY_HINT_SAFE)
300 301 302
#else
	void add_service_thread(int thread_num, bool block = false, int io_context_num = 0)
#endif
303
	{
304 305 306 307 308 309 310 311 312
		if (io_context_num > 0)
		{
			if (thread_num < io_context_num)
			{
				unified_out::error_out("thread_num must be bigger than or equal to io_context_num.");
				return;
			}
			else
			{
313
				single_ctx = false;
314
				std::lock_guard<std::mutex> lock(context_can_mutex);
315
#if BOOST_ASIO_VERSION >= 101200
316 317 318 319 320 321 322 323
				for (int i = 0; i < io_context_num; ++i)
					context_can.emplace_back(concurrency_hint);
#else
				context_can.resize((size_t) io_context_num + context_can.size());
#endif
			}
		}

324 325 326 327 328 329 330 331
		for (auto i = 0; i < thread_num; ++i)
		{
			auto ctx = assign_thread();
			if (nullptr == ctx)
				unified_out::error_out("no available io_context!");
			else if (block && i + 1 == thread_num)
				run(ctx); //block at here
			else
332
				ctx->threads.emplace_back([this, ctx]() {run(ctx);});
333 334 335
		}
	}

Y
youngwolf 已提交
336
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
337
	void del_service_thread(int thread_num) {if (thread_num > 0) del_thread_num += thread_num;}
Y
youngwolf 已提交
338
	int service_thread_num() const {return real_thread_num;}
Y
youngwolf 已提交
339
#endif
Y
youngwolf 已提交
340 341

protected:
342
	void do_service(int thread_num, bool block = false)
Y
youngwolf 已提交
343
	{
344 345
		if (thread_num <= 0 || (size_t) thread_num < context_can.size())
		{
346
			unified_out::error_out("thread_num must be bigger than or equal to io_context_num.");
347 348 349
			return;
		}

350 351 352
#ifdef ASCS_AVOID_AUTO_STOP_SERVICE
		if (!is_first_running())
			ascs::do_something_to_all(context_can, [](context& item) {
353
#if BOOST_ASIO_VERSION > 101100
354
				(&item.work)->~executor_work_guard();
355
				new(&item.work) boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(item.io_context.get_executor());
356
#else
357
				item.work = std::make_shared<boost::asio::io_context::work>(item.io_context);
358 359 360 361
#endif
			});
#endif

Y
youngwolf 已提交
362 363 364
		started = true;
		unified_out::info_out("service pump started.");

365
#if BOOST_ASIO_VERSION >= 101100
366
		ascs::do_something_to_all(context_can, [](context& item) {item.io_context.restart();}); //this is needed when restart service
Y
youngwolf 已提交
367
#else
368
		ascs::do_something_to_all(context_can, [](context& item) {item.io_context.reset();}); //this is needed when restart service
Y
youngwolf 已提交
369
#endif
Y
youngwolf 已提交
370
		do_something_to_all([](object_type& item) {item->start_service();});
371
		add_service_thread(thread_num, block);
Y
youngwolf 已提交
372
	}
Y
youngwolf 已提交
373 374 375

	void wait_service()
	{
376 377
		ascs::do_something_to_all(context_can, [](context& item) {ascs::do_something_to_all(item.threads, [](std::thread& t) {t.join();}); item.threads.clear();});
		do_something_to_all([](object_type& item) {item->finalize();});
Y
youngwolf 已提交
378

379
		started = first = false;
Y
youngwolf 已提交
380 381 382
#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
		del_thread_num = 0;
#endif
Y
youngwolf 已提交
383 384
		unified_out::info_out("service pump end.");
	}
Y
youngwolf 已提交
385 386 387 388 389 390 391 392 393 394

	void stop_and_free(object_type i_service_)
	{
		assert(nullptr != i_service_);

		i_service_->stop_service();
		free(i_service_);
	}
	virtual void free(object_type i_service_) {} //if needed, rewrite this to free the service

395
#ifndef ASCS_NO_TRY_CATCH
396
	virtual bool on_exception(const std::exception& e)
Y
youngwolf 已提交
397 398
	{
		unified_out::error_out("service pump exception: %s.", e.what());
Y
youngwolf 已提交
399
		return true; //continue, if needed, rewrite this to decide whether to continue or not
Y
youngwolf 已提交
400
	}
Y
youngwolf 已提交
401 402
#endif

403
	size_t run(context* ctx)
Y
youngwolf 已提交
404 405 406
	{
		size_t n = 0;

Y
youngwolf 已提交
407
		std::stringstream os;
Y
youngwolf 已提交
408 409
		os << "service thread[" << std::this_thread::get_id() << "] begin.";
		unified_out::info_out(os.str().data());
Y
youngwolf 已提交
410 411

#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
Y
youngwolf 已提交
412 413 414
		++real_thread_num;
		while (true)
		{
Y
youngwolf 已提交
415
			if (del_thread_num > 0)
Y
youngwolf 已提交
416
			{
Y
youngwolf 已提交
417 418
				if (--del_thread_num >= 0)
				{
Y
youngwolf 已提交
419
					if (--real_thread_num > 0) //forbid to stop all service thread
Y
youngwolf 已提交
420 421
						break;
					else
Y
youngwolf 已提交
422
						++real_thread_num;
Y
youngwolf 已提交
423
				}
Y
youngwolf 已提交
424
				else
Y
youngwolf 已提交
425
					++del_thread_num;
Y
youngwolf 已提交
426 427 428 429
			}

			//we cannot always decrease service thread timely (because run_one can block).
			size_t this_n = 0;
430
#ifdef ASCS_NO_TRY_CATCH
431
			this_n = ctx->io_context.run_one();
432
#else
433
			try {this_n = ctx->io_context.run_one();} catch (const std::exception& e) {if (!on_exception(e)) break;}
Y
youngwolf 已提交
434 435 436 437
#endif
			if (this_n > 0)
				n += this_n; //n can overflow, please note.
			else
Y
youngwolf 已提交
438 439
			{
				--real_thread_num;
Y
youngwolf 已提交
440
				break;
Y
youngwolf 已提交
441
			}
Y
youngwolf 已提交
442
		}
Y
youngwolf 已提交
443 444 445 446 447
#elif defined(ASCS_NO_TRY_CATCH)
		n += ctx->io_context.run();
#else
		while (true) try {n += ctx->io_context.run(); break;} catch (const std::exception& e) {if (!on_exception(e)) break;}
#endif
Y
youngwolf 已提交
448 449
		os.str("");
		os << "service thread[" << std::this_thread::get_id() << "] end.";
Y
youngwolf 已提交
450 451 452 453
		unified_out::info_out(os.str().data());

		return n;
	}
Y
youngwolf 已提交
454

455 456
	DO_SOMETHING_TO_ALL_MUTEX(service_can, service_can_mutex, std::lock_guard<std::mutex>)
	DO_SOMETHING_TO_ONE_MUTEX(service_can, service_can_mutex, std::lock_guard<std::mutex>)
Y
youngwolf 已提交
457 458

private:
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
	context* assign_thread() //pick the context which has the least threads
	{
		context* ctx = nullptr;
		size_t num = 0;

		ascs::do_something_to_one(context_can, [&](context& item) {
			auto this_num = item.threads.size();
			if (0 == this_num || 0 == num || num > this_num)
			{
				num = this_num;
				ctx = &item;
			}

			return 0 == this_num;
		});

		return ctx;
	}

Y
youngwolf 已提交
478 479 480 481
	void add(object_type i_service_)
	{
		assert(nullptr != i_service_);

Y
youngwolf 已提交
482
		std::unique_lock<std::mutex> lock(service_can_mutex);
Y
youngowlf 已提交
483
		service_can.emplace_back(i_service_);
Y
youngwolf 已提交
484 485 486 487
		lock.unlock();

		if (is_service_started())
			unified_out::warning_out("service been added, please remember to call start_service for it!");
Y
youngwolf 已提交
488 489
	}

Y
youngwolf 已提交
490
private:
491
	bool started, first;
Y
youngwolf 已提交
492
	container_type service_can;
Y
youngwolf 已提交
493
	std::mutex service_can_mutex;
Y
youngwolf 已提交
494 495

#ifdef ASCS_DECREASE_THREAD_AT_RUNTIME
Y
youngwolf 已提交
496 497
	std::atomic_int_fast32_t real_thread_num;
	std::atomic_int_fast32_t del_thread_num;
Y
youngwolf 已提交
498
#endif
Y
youngwolf 已提交
499

500
	bool single_ctx;
501 502
	std::list<context> context_can;
	std::mutex context_can_mutex;
Y
youngwolf 已提交
503 504 505 506 507
};

} //namespace

#endif /* _ASCS_SERVICE_PUMP_H_ */