提交 50d64bd3 编写于 作者: M Megvii Engine Team

fix(pytest/windows/impertive): open impertive pytest on windows

GitOrigin-RevId: 2e8f15607e876aac2cc5926da9c9f347f84cf473
上级 dfc3935d
...@@ -114,7 +114,7 @@ class DataLoader: ...@@ -114,7 +114,7 @@ class DataLoader:
self.__initialized = True self.__initialized = True
def __iter__(self): def __iter__(self):
if platform.system() == "Windows": if platform.system() == "Windows" and self.num_workers > 0:
print( print(
"pyarrow.plasma does not support ParallelDataLoader on windows, changing num_workers to be zero" "pyarrow.plasma does not support ParallelDataLoader on windows, changing num_workers to be zero"
) )
......
...@@ -67,6 +67,9 @@ namespace { ...@@ -67,6 +67,9 @@ namespace {
/* =============== SCQueueSynchronizer =============== */ /* =============== SCQueueSynchronizer =============== */
size_t SCQueueSynchronizer::cached_max_spin = 0; size_t SCQueueSynchronizer::cached_max_spin = 0;
#ifdef WIN32
bool SCQueueSynchronizer::is_into_atexit = false;
#endif
size_t SCQueueSynchronizer::max_spin() { size_t SCQueueSynchronizer::max_spin() {
if (cached_max_spin) if (cached_max_spin)
......
...@@ -72,6 +72,13 @@ namespace mgb { ...@@ -72,6 +72,13 @@ namespace mgb {
return m_worker_started; return m_worker_started;
} }
#ifdef WIN32
static bool is_into_atexit;
void set_finish_called(bool status) {
m_wait_finish_called = status;
}
#endif
static size_t max_spin(); static size_t max_spin();
void start_worker(std::thread thread); void start_worker(std::thread thread);
...@@ -143,14 +150,29 @@ namespace mgb { ...@@ -143,14 +150,29 @@ namespace mgb {
}; };
public: public:
void add_task(const Param &param) { #ifdef WIN32
bool check_is_into_atexit() {
if (SCQueueSynchronizer::is_into_atexit) {
mgb_log_warn(
"add_task after system call atexit happened! "
"ignore it, workround for windows os force INT "
"some thread before shared_ptr destructor "
"finish!!");
m_synchronizer.set_finish_called(true);
}
return SCQueueSynchronizer::is_into_atexit;
}
#endif
void add_task(const Param& param) {
SyncedParam* p = allocate_task(); SyncedParam* p = allocate_task();
new (p->get()) Param(param); new (p->get()) Param(param);
p->init_done.store(true, std::memory_order_release); p->init_done.store(true, std::memory_order_release);
m_synchronizer.producer_add(); m_synchronizer.producer_add();
} }
void add_task(Param &&param) { void add_task(Param&& param) {
SyncedParam* p = allocate_task(); SyncedParam* p = allocate_task();
new (p->get()) Param(std::move(param)); new (p->get()) Param(std::move(param));
p->init_done.store(true, std::memory_order_release); p->init_done.store(true, std::memory_order_release);
...@@ -165,6 +187,10 @@ namespace mgb { ...@@ -165,6 +187,10 @@ namespace mgb {
void wait_all_task_finish() { void wait_all_task_finish() {
auto tgt = m_queue_tail_tid.load(std::memory_order_acquire); auto tgt = m_queue_tail_tid.load(std::memory_order_acquire);
do { do {
#ifdef WIN32
if (check_is_into_atexit())
return;
#endif
// we need a loop because other threads might be adding new // we need a loop because other threads might be adding new
// tasks, and m_queue_tail_tid is increased before // tasks, and m_queue_tail_tid is increased before
// producer_add() // producer_add()
...@@ -184,6 +210,10 @@ namespace mgb { ...@@ -184,6 +210,10 @@ namespace mgb {
void wait_task_queue_empty() { void wait_task_queue_empty() {
size_t tgt, done; size_t tgt, done;
do { do {
#ifdef WIN32
if (check_is_into_atexit())
return;
#endif
m_synchronizer.producer_wait(); m_synchronizer.producer_wait();
// producer_wait() only waits for tasks that are added upon // producer_wait() only waits for tasks that are added upon
// entrance of the function, and new tasks might be added // entrance of the function, and new tasks might be added
...@@ -272,6 +302,17 @@ namespace mgb { ...@@ -272,6 +302,17 @@ namespace mgb {
// reload newest tail // reload newest tail
tail = m_queue_tail; tail = m_queue_tail;
if (!m_synchronizer.worker_started()) { if (!m_synchronizer.worker_started()) {
#ifdef WIN32
if (!SCQueueSynchronizer::is_into_atexit) {
auto cb_atexit = [] {
SCQueueSynchronizer::is_into_atexit = true;
};
auto err = atexit(cb_atexit);
mgb_assert(!err,
"failed to register windows_call_atexit "
"at exit");
}
#endif
m_synchronizer.start_worker(std::thread{ m_synchronizer.start_worker(std::thread{
&AsyncQueueSC::worker_thread_impl, this}); &AsyncQueueSC::worker_thread_impl, this});
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册