未验证 提交 8c366626 编写于 作者: C Chinmay Garde 提交者: GitHub

Fix and re-enable flaky test `MessageLoopTaskQueue::ConcurrentQueueAndTaskCreatingCounts`. (#25826)

上级 3f955f0c
......@@ -6,6 +6,8 @@
#include "flutter/fml/message_loop_task_queues.h"
#include <algorithm>
#include <cstdlib>
#include <thread>
#include "flutter/fml/synchronization/count_down_latch.h"
......@@ -192,63 +194,63 @@ TEST(MessageLoopTaskQueue, QueueDoNotOwnUnmergedTaskQueueId) {
ASSERT_FALSE(task_queue->Owns(_kUnmerged, _kUnmerged));
}
// TODO(chunhtai): This unit-test is flaky and sometimes fails asynchronizely
// after the test has finished.
// https://github.com/flutter/flutter/issues/43858
TEST(MessageLoopTaskQueue, DISABLED_ConcurrentQueueAndTaskCreatingCounts) {
//------------------------------------------------------------------------------
/// Verifies that tasks can be added to task queues concurrently.
///
TEST(MessageLoopTaskQueue, ConcurrentQueueAndTaskCreatingCounts) {
auto task_queues = fml::MessageLoopTaskQueues::GetInstance();
const int base_queue_id = task_queues->CreateTaskQueue();
const int num_queues = 100;
std::atomic_bool created[num_queues * 3];
std::atomic_int num_tasks[num_queues * 3];
std::mutex task_count_mutex[num_queues * 3];
std::atomic_int done = 0;
// kThreadCount threads post kThreadTaskCount tasks each to kTaskQueuesCount
// task queues. Each thread picks a task queue randomly for each task.
constexpr size_t kThreadCount = 4;
constexpr size_t kTaskQueuesCount = 2;
constexpr size_t kThreadTaskCount = 500;
for (int i = 0; i < num_queues * 3; i++) {
num_tasks[i] = 0;
created[i] = false;
std::vector<TaskQueueId> task_queue_ids;
for (size_t i = 0; i < kTaskQueuesCount; ++i) {
task_queue_ids.emplace_back(task_queues->CreateTaskQueue());
}
auto creation_func = [&] {
for (int i = 0; i < num_queues; i++) {
fml::TaskQueueId queue_id = task_queues->CreateTaskQueue();
int limit = queue_id - base_queue_id;
created[limit] = true;
for (int cur_q = 1; cur_q < limit; cur_q++) {
if (created[cur_q]) {
std::scoped_lock counter(task_count_mutex[cur_q]);
int cur_num_tasks = rand() % 10;
for (int k = 0; k < cur_num_tasks; k++) {
task_queues->RegisterTask(
fml::TaskQueueId(base_queue_id + cur_q), [] {},
fml::TimePoint::Now());
}
num_tasks[cur_q] += cur_num_tasks;
}
}
ASSERT_EQ(task_queue_ids.size(), kTaskQueuesCount);
fml::CountDownLatch tasks_posted_latch(kThreadCount);
auto thread_main = [&]() {
for (size_t i = 0; i < kThreadTaskCount; i++) {
const auto current_task_queue_id =
task_queue_ids[std::rand() % kTaskQueuesCount];
const auto empty_task = []() {};
// The timepoint doesn't matter as the queue is never going to be drained.
const auto task_timepoint = fml::TimePoint::Now();
task_queues->RegisterTask(current_task_queue_id, empty_task,
task_timepoint);
}
done++;
tasks_posted_latch.CountDown();
};
std::thread creation_1(creation_func);
std::thread creation_2(creation_func);
while (done < 2) {
for (int i = 0; i < num_queues * 3; i++) {
if (created[i]) {
std::scoped_lock counter(task_count_mutex[i]);
int num_pending = task_queues->GetNumPendingTasks(
fml::TaskQueueId(base_queue_id + i));
int num_added = num_tasks[i];
ASSERT_EQ(num_pending, num_added);
}
}
std::vector<std::thread> threads;
for (size_t i = 0; i < kThreadCount; i++) {
threads.emplace_back(std::thread{thread_main});
}
creation_1.join();
creation_2.join();
ASSERT_EQ(threads.size(), kThreadCount);
for (size_t i = 0; i < kThreadCount; i++) {
threads[i].join();
}
// All tasks have been posted by now. Check that they are all pending.
size_t pending_tasks = 0u;
std::for_each(task_queue_ids.begin(), task_queue_ids.end(),
[&](const auto& queue) {
pending_tasks += task_queues->GetNumPendingTasks(queue);
});
ASSERT_EQ(pending_tasks, kThreadCount * kThreadTaskCount);
}
TEST(MessageLoopTaskQueue, RegisterTaskWakesUpOwnerQueue) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册