From 8c366626269a9225311fc2eeff8c0ffcddc31d1a Mon Sep 17 00:00:00 2001 From: Chinmay Garde Date: Wed, 28 Apr 2021 16:09:01 -0700 Subject: [PATCH] Fix and re-enable flaky test `MessageLoopTaskQueue::ConcurrentQueueAndTaskCreatingCounts`. (#25826) --- fml/message_loop_task_queues_unittests.cc | 96 ++++++++++++----------- 1 file changed, 49 insertions(+), 47 deletions(-) diff --git a/fml/message_loop_task_queues_unittests.cc b/fml/message_loop_task_queues_unittests.cc index 43310505b..9939baaf0 100644 --- a/fml/message_loop_task_queues_unittests.cc +++ b/fml/message_loop_task_queues_unittests.cc @@ -6,6 +6,8 @@ #include "flutter/fml/message_loop_task_queues.h" +#include +#include #include #include "flutter/fml/synchronization/count_down_latch.h" @@ -192,63 +194,63 @@ TEST(MessageLoopTaskQueue, QueueDoNotOwnUnmergedTaskQueueId) { ASSERT_FALSE(task_queue->Owns(_kUnmerged, _kUnmerged)); } -// TODO(chunhtai): This unit-test is flaky and sometimes fails asynchronizely -// after the test has finished. -// https://github.com/flutter/flutter/issues/43858 -TEST(MessageLoopTaskQueue, DISABLED_ConcurrentQueueAndTaskCreatingCounts) { +//------------------------------------------------------------------------------ +/// Verifies that tasks can be added to task queues concurrently. +/// +TEST(MessageLoopTaskQueue, ConcurrentQueueAndTaskCreatingCounts) { auto task_queues = fml::MessageLoopTaskQueues::GetInstance(); - const int base_queue_id = task_queues->CreateTaskQueue(); - const int num_queues = 100; - std::atomic_bool created[num_queues * 3]; - std::atomic_int num_tasks[num_queues * 3]; - std::mutex task_count_mutex[num_queues * 3]; - std::atomic_int done = 0; + // kThreadCount threads post kThreadTaskCount tasks each to kTaskQueuesCount + // task queues. Each thread picks a task queue randomly for each task. + constexpr size_t kThreadCount = 4; + constexpr size_t kTaskQueuesCount = 2; + constexpr size_t kThreadTaskCount = 500; - for (int i = 0; i < num_queues * 3; i++) { - num_tasks[i] = 0; - created[i] = false; + std::vector task_queue_ids; + for (size_t i = 0; i < kTaskQueuesCount; ++i) { + task_queue_ids.emplace_back(task_queues->CreateTaskQueue()); } - auto creation_func = [&] { - for (int i = 0; i < num_queues; i++) { - fml::TaskQueueId queue_id = task_queues->CreateTaskQueue(); - int limit = queue_id - base_queue_id; - created[limit] = true; - - for (int cur_q = 1; cur_q < limit; cur_q++) { - if (created[cur_q]) { - std::scoped_lock counter(task_count_mutex[cur_q]); - int cur_num_tasks = rand() % 10; - for (int k = 0; k < cur_num_tasks; k++) { - task_queues->RegisterTask( - fml::TaskQueueId(base_queue_id + cur_q), [] {}, - fml::TimePoint::Now()); - } - num_tasks[cur_q] += cur_num_tasks; - } - } + ASSERT_EQ(task_queue_ids.size(), kTaskQueuesCount); + + fml::CountDownLatch tasks_posted_latch(kThreadCount); + + auto thread_main = [&]() { + for (size_t i = 0; i < kThreadTaskCount; i++) { + const auto current_task_queue_id = + task_queue_ids[std::rand() % kTaskQueuesCount]; + const auto empty_task = []() {}; + // The timepoint doesn't matter as the queue is never going to be drained. + const auto task_timepoint = fml::TimePoint::Now(); + + task_queues->RegisterTask(current_task_queue_id, empty_task, + task_timepoint); } - done++; + + tasks_posted_latch.CountDown(); }; - std::thread creation_1(creation_func); - std::thread creation_2(creation_func); - - while (done < 2) { - for (int i = 0; i < num_queues * 3; i++) { - if (created[i]) { - std::scoped_lock counter(task_count_mutex[i]); - int num_pending = task_queues->GetNumPendingTasks( - fml::TaskQueueId(base_queue_id + i)); - int num_added = num_tasks[i]; - ASSERT_EQ(num_pending, num_added); - } - } + std::vector threads; + + for (size_t i = 0; i < kThreadCount; i++) { + threads.emplace_back(std::thread{thread_main}); } - creation_1.join(); - creation_2.join(); + ASSERT_EQ(threads.size(), kThreadCount); + + for (size_t i = 0; i < kThreadCount; i++) { + threads[i].join(); + } + + // All tasks have been posted by now. Check that they are all pending. + + size_t pending_tasks = 0u; + std::for_each(task_queue_ids.begin(), task_queue_ids.end(), + [&](const auto& queue) { + pending_tasks += task_queues->GetNumPendingTasks(queue); + }); + + ASSERT_EQ(pending_tasks, kThreadCount * kThreadTaskCount); } TEST(MessageLoopTaskQueue, RegisterTaskWakesUpOwnerQueue) { -- GitLab