提交 fb3b79b0 编写于 作者: W willzhang4a58

refine mutex in read callback


Former-commit-id: 78132f33
上级 6736e8c4
...@@ -117,6 +117,7 @@ void EpollDataCommNet::AddReadCallBack(void* actor_read_id, void* read_id, ...@@ -117,6 +117,7 @@ void EpollDataCommNet::AddReadCallBack(void* actor_read_id, void* read_id,
break; break;
} else { } else {
actor_read_ctx->read_ctx_list.back()->cbl.push_back(callback); actor_read_ctx->read_ctx_list.back()->cbl.push_back(callback);
return;
} }
} while (0); } while (0);
callback(); callback();
...@@ -125,7 +126,10 @@ void EpollDataCommNet::AddReadCallBack(void* actor_read_id, void* read_id, ...@@ -125,7 +126,10 @@ void EpollDataCommNet::AddReadCallBack(void* actor_read_id, void* read_id,
void EpollDataCommNet::AddReadCallBackDone(void* actor_read_id, void* read_id) { void EpollDataCommNet::AddReadCallBackDone(void* actor_read_id, void* read_id) {
auto actor_read_ctx = static_cast<ActorReadContext*>(actor_read_id); auto actor_read_ctx = static_cast<ActorReadContext*>(actor_read_id);
ReadContext* read_ctx = static_cast<ReadContext*>(read_id); ReadContext* read_ctx = static_cast<ReadContext*>(read_id);
IncreaseDoneCnt(actor_read_ctx, read_ctx); if (IncreaseDoneCnt(read_ctx) == 2) {
FinishOneReadContext(actor_read_ctx, read_ctx);
delete read_ctx;
}
} }
void EpollDataCommNet::ReadDone(void* read_done_id) { void EpollDataCommNet::ReadDone(void* read_done_id) {
...@@ -134,27 +138,26 @@ void EpollDataCommNet::ReadDone(void* read_done_id) { ...@@ -134,27 +138,26 @@ void EpollDataCommNet::ReadDone(void* read_done_id) {
auto actor_read_ctx = std::get<0>(*parsed_read_done_id); auto actor_read_ctx = std::get<0>(*parsed_read_done_id);
auto read_ctx = std::get<1>(*parsed_read_done_id); auto read_ctx = std::get<1>(*parsed_read_done_id);
delete parsed_read_done_id; delete parsed_read_done_id;
IncreaseDoneCnt(actor_read_ctx, read_ctx); if (IncreaseDoneCnt(read_ctx) == 2) {
} {
std::unique_lock<std::mutex> lck(actor_read_ctx->read_ctx_list_mtx);
void EpollDataCommNet::IncreaseDoneCnt(ActorReadContext* actor_read_ctx, FinishOneReadContext(actor_read_ctx, read_ctx);
ReadContext* read_ctx) {
do {
std::unique_lock<std::mutex> lck(read_ctx->done_cnt_mtx);
read_ctx->done_cnt += 1;
if (read_ctx->done_cnt == 2) {
break;
} else {
return;
} }
} while (0); delete read_ctx;
{
std::unique_lock<std::mutex> lck(actor_read_ctx->read_ctx_list_mtx);
CHECK_EQ(actor_read_ctx->read_ctx_list.front(), read_ctx);
actor_read_ctx->read_ctx_list.pop_front();
for (std::function<void()>& callback : read_ctx->cbl) { callback(); }
} }
delete read_ctx; }
int8_t EpollDataCommNet::IncreaseDoneCnt(ReadContext* read_ctx) {
std::unique_lock<std::mutex> lck(read_ctx->done_cnt_mtx);
read_ctx->done_cnt += 1;
return read_ctx->done_cnt;
}
void EpollDataCommNet::FinishOneReadContext(ActorReadContext* actor_read_ctx,
ReadContext* read_ctx) {
CHECK_EQ(actor_read_ctx->read_ctx_list.front(), read_ctx);
actor_read_ctx->read_ctx_list.pop_front();
for (std::function<void()>& callback : read_ctx->cbl) { callback(); }
} }
void EpollDataCommNet::SendActorMsg(int64_t dst_machine_id, void EpollDataCommNet::SendActorMsg(int64_t dst_machine_id,
......
...@@ -47,7 +47,8 @@ class EpollDataCommNet final : public DataCommNet { ...@@ -47,7 +47,8 @@ class EpollDataCommNet final : public DataCommNet {
std::list<ReadContext*> read_ctx_list; std::list<ReadContext*> read_ctx_list;
}; };
EpollDataCommNet(); EpollDataCommNet();
void IncreaseDoneCnt(ActorReadContext* actor_read_ctx, ReadContext* read_ctx); int8_t IncreaseDoneCnt(ReadContext*);
void FinishOneReadContext(ActorReadContext*, ReadContext*);
void InitSockets(); void InitSockets();
SocketHelper* GetSocketHelper(int64_t machine_id); SocketHelper* GetSocketHelper(int64_t machine_id);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册