提交 8b6f831c 编写于 作者: T tbbdev

Commit oneTBB source code 5ba997e1

上级 6aa706d8
......@@ -49,9 +49,9 @@ if (NOT ${CMAKE_CXX_COMPILER_ID} STREQUAL Intel)
set(TBB_DSE_FLAG $<$<NOT:$<VERSION_LESS:${CMAKE_CXX_COMPILER_VERSION},6.0>>:-flifetime-dse=1>)
endif()
# Workaround for heavy tests
# Workaround for heavy tests and too many symbols in debug (rellocation truncated to fit: R_MIPS_CALL16)
if ("${CMAKE_SYSTEM_PROCESSOR}" MATCHES "mips")
set(TBB_TEST_COMPILE_FLAGS ${TBB_TEST_COMPILE_FLAGS} -DTBB_TEST_LOW_WORKLOAD)
set(TBB_TEST_COMPILE_FLAGS ${TBB_TEST_COMPILE_FLAGS} -DTBB_TEST_LOW_WORKLOAD $<$<CONFIG:DEBUG>:-mxgot>)
endif()
# TBB malloc settings
......
......@@ -45,7 +45,10 @@ endforeach()
unset(HWLOC_TARGET_NAME)
if (NOT HWLOC_TARGET_EXPLICITLY_DEFINED)
if (NOT HWLOC_TARGET_EXPLICITLY_DEFINED AND
# No hwloc auto detection for cross compilation
NOT CMAKE_CROSSCOMPILING
)
find_package(PkgConfig QUIET)
if (PKG_CONFIG_FOUND)
pkg_search_module(HWLOC hwloc)
......
......@@ -152,67 +152,66 @@ public:
typedef receiver<T> successor_type;
reservable_predecessor_cache( successor_type* owner )
: predecessor_cache<T,M>(owner), reserved_src(NULL)
: predecessor_cache<T,M>(owner), reserved_src(nullptr)
{
// Do not work with the passed pointer here as it may not be fully initialized yet
}
bool
try_reserve( output_type &v ) {
bool try_reserve( output_type &v ) {
bool msg = false;
do {
predecessor_type* pred = nullptr;
{
typename mutex_type::scoped_lock lock(this->my_mutex);
if ( reserved_src || this->internal_empty() )
if ( reserved_src.load(std::memory_order_relaxed) || this->internal_empty() )
return false;
reserved_src = &this->internal_pop();
pred = &this->internal_pop();
reserved_src.store(pred, std::memory_order_relaxed);
}
// Try to get from this sender
msg = reserved_src->try_reserve( v );
msg = pred->try_reserve( v );
if (msg == false) {
typename mutex_type::scoped_lock lock(this->my_mutex);
// Relinquish ownership of the edge
register_successor( *reserved_src, *this->my_owner );
reserved_src = NULL;
register_successor( *pred, *this->my_owner );
reserved_src.store(nullptr, std::memory_order_relaxed);
} else {
// Retain ownership of the edge
this->add( *reserved_src );
this->add( *pred);
}
} while ( msg == false );
return msg;
}
bool
try_release( ) {
reserved_src->try_release( );
reserved_src = NULL;
bool try_release() {
reserved_src.load(std::memory_order_relaxed)->try_release();
reserved_src.store(nullptr, std::memory_order_relaxed);
return true;
}
bool
try_consume( ) {
reserved_src->try_consume( );
reserved_src = NULL;
bool try_consume() {
reserved_src.load(std::memory_order_relaxed)->try_consume();
reserved_src.store(nullptr, std::memory_order_relaxed);
return true;
}
void reset( ) {
reserved_src = NULL;
predecessor_cache<T,M>::reset( );
void reset() {
reserved_src.store(nullptr, std::memory_order_relaxed);
predecessor_cache<T, M>::reset();
}
void clear() {
reserved_src = NULL;
predecessor_cache<T,M>::clear();
reserved_src.store(nullptr, std::memory_order_relaxed);
predecessor_cache<T, M>::clear();
}
private:
predecessor_type *reserved_src;
std::atomic<predecessor_type*> reserved_src;
};
......
......@@ -18,6 +18,7 @@
#define __TBB_assert_impl_H
#include "oneapi/tbb/detail/_config.h"
#include "oneapi/tbb/detail/_utils.h"
#include <cstdio>
#include <cstdlib>
......@@ -53,9 +54,23 @@ static void assertion_failure_impl(const char* location, int line, const char* e
}
}
// Do not move the definition into the assertion_failure function because it will require "magic statics".
// It will bring a dependency on C++ runtime on some platforms while assert_impl.h is reused in tbbmalloc
// that should not depend on C++ runtime
static std::atomic<do_once_state> assertion_state;
void __TBB_EXPORTED_FUNC assertion_failure(const char* location, int line, const char* expression, const char* comment) {
static std::once_flag flag;
std::call_once(flag, [&](){ assertion_failure_impl(location, line, expression, comment); });
#if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
// Workaround for erroneous "unreachable code" during assertion throwing using call_once
#pragma warning (push)
#pragma warning (disable: 4702)
#endif
// We cannot use std::call_once because it brings a dependency on C++ runtime on some platforms
// while assert_impl.h is reused in tbbmalloc that should not depend on C++ runtime
atomic_do_once([&](){ assertion_failure_impl(location, line, expression, comment); }, assertion_state);
#if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
#pragma warning (pop)
#endif
}
//! Report a runtime warning.
......
......@@ -59,6 +59,8 @@ target_compile_options(tbbmalloc
${TBB_COMMON_COMPILE_FLAGS}
)
enable_language(C)
# Avoid use of target_link_libraries here as it changes /DEF option to \DEF on Windows.
set_target_properties(tbbmalloc PROPERTIES
DEFINE_SYMBOL ""
......@@ -66,8 +68,11 @@ set_target_properties(tbbmalloc PROPERTIES
SOVERSION ${TBBMALLOC_BINARY_VERSION}
LINK_FLAGS ${TBB_LINK_DEF_FILE_FLAG}${CMAKE_CURRENT_SOURCE_DIR}/def/${TBB_DEF_FILE_PREFIX}-tbbmalloc.def
LINK_DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/def/${TBB_DEF_FILE_PREFIX}-tbbmalloc.def
LINKER_LANGUAGE C
)
set(CMAKE_CXX_IMPLICIT_LINK_LIBRARIES "")
# Prefer using target_link_options instead of target_link_libraries to specify link options because
# target_link_libraries may incorrectly handle some options (on Windows, for example).
if (COMMAND target_link_options)
......
......@@ -706,7 +706,7 @@ void AllLocalCaches::markUnused()
#if MALLOC_CHECK_RECURSION
MallocMutex RecursiveMallocCallProtector::rmc_mutex;
pthread_t RecursiveMallocCallProtector::owner_thread;
std::atomic<pthread_t> RecursiveMallocCallProtector::owner_thread;
std::atomic<void*> RecursiveMallocCallProtector::autoObjPtr;
bool RecursiveMallocCallProtector::mallocRecursionDetected;
#if __FreeBSD__
......
......@@ -638,7 +638,7 @@ class RecursiveMallocCallProtector {
// pointer to an automatic data of holding thread
static std::atomic<void*> autoObjPtr;
static MallocMutex rmc_mutex;
static pthread_t owner_thread;
static std::atomic<pthread_t> owner_thread;
/* Under FreeBSD 8.0 1st call to any pthread function including pthread_self
leads to pthread initialization, that causes malloc calls. As 1st usage of
RecursiveMallocCallProtector can be before pthread initialized, pthread calls
......@@ -670,7 +670,7 @@ public:
RecursiveMallocCallProtector() : lock_acquired(NULL) {
lock_acquired = new (scoped_lock_space) MallocMutex::scoped_lock( rmc_mutex );
if (canUsePthread)
owner_thread = pthread_self();
owner_thread.store(pthread_self(), std::memory_order_relaxed);
autoObjPtr.store(&scoped_lock_space, std::memory_order_relaxed);
}
~RecursiveMallocCallProtector() {
......@@ -685,7 +685,7 @@ public:
// Some thread has an active recursive call protector; check if the current one.
// Exact pthread_self based test
if (canUsePthread) {
if (pthread_equal( owner_thread, pthread_self() )) {
if (pthread_equal( owner_thread.load(std::memory_order_relaxed), pthread_self() )) {
mallocRecursionDetected = true;
return true;
} else
......@@ -706,7 +706,7 @@ public:
is already on, so can do it. */
if (!canUsePthread) {
canUsePthread = true;
owner_thread = pthread_self();
owner_thread.store(pthread_self(), std::memory_order_relaxed);
}
#endif
free(malloc(1));
......
......@@ -39,6 +39,9 @@ function(tbb_add_test)
)
if (ANDROID_PLATFORM)
# Expand the linker rpath by the CMAKE_LIBRARY_OUTPUT_DIRECTORY path since clang compiler from Android SDK
# doesn't respect the -L flag.
target_link_libraries(${_tbb_test_TARGET_NAME} PRIVATE "-Wl,--rpath-link,${CMAKE_LIBRARY_OUTPUT_DIRECTORY}")
target_link_libraries(${_tbb_test_TARGET_NAME} PRIVATE -rdynamic) # for the test_dynamic_link
add_test(NAME ${_tbb_test_TARGET_NAME}
COMMAND ${CMAKE_COMMAND}
......
......@@ -23,6 +23,10 @@
#define __TBB_TEST_CPP20_COMPARISONS __TBB_CPP20_COMPARISONS_PRESENT
#endif
#if __TBB_TEST_CPP20_COMPARISONS
#include <compare>
#endif
namespace comparisons_testing {
template <bool ExpectEqual, bool ExpectLess, typename T>
......
......@@ -1175,14 +1175,14 @@ void TestETSIteratorComparisons() {
// Fill the ets
const std::size_t expected_ets_size = 2;
std::atomic<std::size_t> sync_counter(0);
auto fill_ets_body = [&](){
auto fill_ets_body = [&](int){
ets.local() = 42;
++sync_counter;
while(sync_counter != expected_ets_size)
std::this_thread::yield();
};
oneapi::tbb::parallel_invoke(fill_ets_body, fill_ets_body);
utils::NativeParallelFor(2, fill_ets_body);
TestETSIteratorComparisonsBasic<typename ets_type::iterator>(ets);
const ets_type& cets = ets;
......
......@@ -54,15 +54,17 @@ TEST_CASE("Stress test") {
// Need to prolong lifetime of the exposed concurrent_monitor
tbb::task_scheduler_handle handler = tbb::task_scheduler_handle::get();
utils::SpinBarrier barrier(threads_number);
tbb::detail::r1::concurrent_monitor test_monitor;
{
tbb::task_arena arena(threads_number - 1, 0);
utils::SpinBarrier barrier(threads_number);
std::size_t iter_on_operation = 1000;
std::size_t operation_number = std::size_t(notification_types::notify_number) * iter_on_operation;
auto thread_func = [&] {
auto thread_func = [&, operation_number] {
for (std::size_t i = 0; i < operation_number; ++i) {
tbb::detail::r1::concurrent_monitor::thread_context context{std::uintptr_t(1)};
test_monitor.prepare_wait(context);
......
......@@ -21,6 +21,7 @@
#include <limits.h> // for INT_MAX
#include <thread>
#include <vector>
#include "tbb/parallel_for.h"
#include "tbb/parallel_reduce.h"
......@@ -746,20 +747,34 @@ TEST_CASE("parallel_for and parallel_reduce cancellation test #4") {
// Tests for tbb::parallel_for_each
////////////////////////////////////////////////////////////////////////////////
#define ITER_RANGE 1000
#define ITEMS_TO_FEED 50
#define INNER_ITER_RANGE 100
#define OUTER_ITER_RANGE 50
std::size_t get_iter_range_size() {
// Set the minimal iteration sequence size to 50 to improve test complexity on small machines
return std::max(50, g_NumThreads * 2);
}
template<typename Iterator>
struct adaptive_range {
std::vector<std::size_t> my_array;
#define PREPARE_RANGE(Iterator, rangeSize) \
size_t test_vector[rangeSize + 1]; \
for (int i =0; i < rangeSize; i++) \
test_vector[i] = i; \
Iterator begin(&test_vector[0]); \
Iterator end(&test_vector[rangeSize])
adaptive_range(std::size_t size) : my_array(size + 1) {}
using iterator = Iterator;
iterator begin() const {
return iterator{&my_array.front()};
}
iterator begin() {
return iterator{&my_array.front()};
}
iterator end() const {
return iterator{&my_array.back()};
}
iterator end() {
return iterator{&my_array.back()};
}
};
void Feed ( tbb::feeder<size_t> &feeder, size_t val ) {
if (g_FedTasksCount < ITEMS_TO_FEED) {
if (g_FedTasksCount < 50) {
++g_FedTasksCount;
feeder.add(val);
}
......@@ -805,9 +820,9 @@ public:
template <class Iterator, class simple_body>
void Test1_parallel_for_each () {
ResetGlobals();
PREPARE_RANGE(Iterator, ITER_RANGE);
auto range = adaptive_range<Iterator>(get_iter_range_size());
TRY();
tbb::parallel_for_each<Iterator, simple_body>(begin, end, simple_body() );
tbb::parallel_for_each(std::begin(range), std::end(range), simple_body() );
CATCH_AND_ASSERT();
REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
......@@ -822,8 +837,8 @@ class OuterParForEachBody {
public:
void operator()( size_t& /*value*/ ) const {
++g_OuterParCalls;
PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody());
auto range = adaptive_range<Iterator>(get_iter_range_size());
tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody());
}
};
......@@ -844,9 +859,9 @@ public:
template <class Iterator, class outer_body>
void Test2_parallel_for_each () {
ResetGlobals();
PREPARE_RANGE(Iterator, ITER_RANGE);
auto range = adaptive_range<Iterator>(get_iter_range_size());
TRY();
tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body() );
tbb::parallel_for_each(std::begin(range), std::end(range), outer_body() );
CATCH_AND_ASSERT();
REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
......@@ -861,8 +876,8 @@ public:
void operator()( size_t& /*value*/ ) const {
tbb::task_group_context ctx(tbb::task_group_context::isolated);
++g_OuterParCalls;
PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx);
auto range = adaptive_range<Iterator>(get_iter_range_size());
tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
}
};
......@@ -884,13 +899,13 @@ public:
template <class Iterator, class outer_body>
void Test3_parallel_for_each () {
ResetGlobals();
PREPARE_RANGE(Iterator, OUTER_ITER_RANGE);
intptr_t innerCalls = INNER_ITER_RANGE,
auto range = adaptive_range<Iterator>(get_iter_range_size());
intptr_t innerCalls = get_iter_range_size(),
// The assumption here is the same as in outer parallel fors.
minExecuted = (g_NumThreads - 1) * innerCalls;
g_Master = std::this_thread::get_id();
TRY();
tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body());
tbb::parallel_for_each(std::begin(range), std::end(range), outer_body());
CATCH_AND_ASSERT();
// figure actual number of expected executions given the number of outer PDos started.
minExecuted = (g_OuterParCalls - 1) * innerCalls;
......@@ -911,9 +926,9 @@ public:
void operator()( size_t& /*value*/ ) const {
tbb::task_group_context ctx(tbb::task_group_context::isolated);
++g_OuterParCalls;
PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
auto range = adaptive_range<Iterator>(get_iter_range_size());
TRY();
tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx);
tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
CATCH();
}
};
......@@ -937,14 +952,14 @@ public:
template <class Iterator, class outer_body_with_eh>
void Test4_parallel_for_each () {
ResetGlobals( true, true );
PREPARE_RANGE(Iterator, OUTER_ITER_RANGE);
auto range = adaptive_range<Iterator>(get_iter_range_size());
g_Master = std::this_thread::get_id();
TRY();
tbb::parallel_for_each<Iterator, outer_body_with_eh>(begin, end, outer_body_with_eh());
tbb::parallel_for_each(std::begin(range), std::end(range), outer_body_with_eh());
CATCH();
REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body");
intptr_t innerCalls = INNER_ITER_RANGE,
outerCalls = OUTER_ITER_RANGE + g_FedTasksCount,
intptr_t innerCalls = get_iter_range_size(),
outerCalls = get_iter_range_size() + g_FedTasksCount,
maxExecuted = outerCalls * innerCalls,
minExecuted = 0;
g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
......@@ -985,10 +1000,10 @@ public:
template <class Iterator>
void Test5_parallel_for_each () {
ResetGlobals();
PREPARE_RANGE(Iterator, ITER_RANGE);
auto range = adaptive_range<Iterator>(get_iter_range_size());
g_Master = std::this_thread::get_id();
TRY();
tbb::parallel_for_each<Iterator, ParForEachBodyWithThrowingFeederTasks>(begin, end, ParForEachBodyWithThrowingFeederTasks());
tbb::parallel_for_each(std::begin(range), std::end(range), ParForEachBodyWithThrowingFeederTasks());
CATCH();
if (g_SolitaryException) {
// Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range
......@@ -1120,8 +1135,8 @@ class ParForEachWorker {
tbb::task_group_context &my_ctx;
public:
void operator()() const {
PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
tbb::parallel_for_each<Iterator, B>( begin, end, B(), my_ctx );
auto range = adaptive_range<Iterator>(get_iter_range_size());
tbb::parallel_for_each( std::begin(range), std::end(range), B(), my_ctx );
}
ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
......@@ -1131,7 +1146,8 @@ public:
template <class Iterator, class body_to_cancel>
void TestCancelation1_parallel_for_each () {
ResetGlobals( false );
intptr_t threshold = 10;
// Threshold should leave more then max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
intptr_t threshold = get_iter_range_size() / 4;
tbb::task_group tg;
tbb::task_group_context ctx;
Cancellator cancellator(ctx, threshold);
......@@ -1184,7 +1200,6 @@ TEST_CASE("parallel_for_each cancellation test #1") {
for ( size_t j = 0; j < 4; ++j ) {
g_ExceptionInMaster = (j & 1) != 0;
g_SolitaryException = (j & 2) != 0;
RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel);
}
}
......@@ -1213,26 +1228,23 @@ TEST_CASE("parallel_for_each cancellation test #2") {
////////////////////////////////////////////////////////////////////////////////
// Tests for tbb::parallel_pipeline
////////////////////////////////////////////////////////////////////////////////
#define NUM_ITEMS 100
const size_t c_DataEndTag = size_t(~0);
int g_NumTokens = 0;
// Simple input filter class, it assigns 1 to all array members
// It stops when it receives item equal to -1
class InputFilter {
mutable std::atomic<size_t> m_Item{};
mutable size_t m_Buffer[NUM_ITEMS + 1];
mutable std::vector<size_t> m_Buffer;
public:
InputFilter() {
m_Item = 0;
for (size_t i = 0; i < NUM_ITEMS; ++i )
m_Buffer.resize(get_iter_range_size());
for (size_t i = 0; i < get_iter_range_size(); ++i )
m_Buffer[i] = 1;
m_Buffer[NUM_ITEMS] = c_DataEndTag;
}
InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()) {
for (size_t i = 0; i < NUM_ITEMS; ++i )
m_Buffer.resize(get_iter_range_size());
for (size_t i = 0; i < get_iter_range_size(); ++i )
m_Buffer[i] = other.m_Buffer[i];
}
......@@ -1244,7 +1256,7 @@ public:
if(item == 1) {
++g_PipelinesStarted; // count on emitting the first item.
}
if ( item >= NUM_ITEMS ) {
if ( item >= get_iter_range_size() ) {
control.stop();
return nullptr;
}
......@@ -1252,7 +1264,7 @@ public:
return &m_Buffer[item];
}
size_t* buffer() { return m_Buffer; }
size_t* buffer() { return m_Buffer.data(); }
}; // class InputFilter
#if TBB_USE_EXCEPTIONS
......@@ -1332,7 +1344,7 @@ void Test1_pipeline ( const FilterSet& filters ) {
SimplePipeline testPipeline(filters);
TRY();
testPipeline.run();
if ( g_CurExecuted == 2 * NUM_ITEMS ) {
if ( g_CurExecuted == 2 * static_cast<int>(get_iter_range_size()) ) {
// all the items were processed, though an exception was supposed to occur.
if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) {
// if !g_ExceptionInMaster, the external thread is not allowed to throw.
......@@ -1411,7 +1423,7 @@ void Test3_pipeline ( const FilterSet& filters ) {
ResetGlobals();
g_NestedPipelines = true;
g_Master = std::this_thread::get_id();
intptr_t innerCalls = NUM_ITEMS,
intptr_t innerCalls = get_iter_range_size(),
minExecuted = (g_NumThreads - 1) * innerCalls;
CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters);
TRY();
......@@ -1438,12 +1450,12 @@ void Test3_pipeline ( const FilterSet& filters ) {
//
// So g_CurExecuted should be about
//
// (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1
// (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1
// ^ executions for each completed pipeline
// ^ completing pipelines (remembering two will not complete)
// ^ one for the inner throwing pipeline
minExecuted = (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1;
minExecuted = (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1;
// each failing pipeline must execute at least two tasks
REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception");
// no more than g_NumThreads tasks will be executed in a cancelled context. Otherwise
......@@ -1487,12 +1499,12 @@ void Test4_pipeline ( const FilterSet& filters ) {
}
#endif
ResetGlobals( true, true );
// each outer pipeline stage will start NUM_ITEMS inner pipelines.
// each inner pipeline that doesn't throw will process NUM_ITEMS items.
// each outer pipeline stage will start get_iter_range_size() inner pipelines.
// each inner pipeline that doesn't throw will process get_iter_range_size() items.
// for solitary exception there will be one pipeline that only processes one stage, one item.
// innerCalls should be 2*NUM_ITEMS
intptr_t innerCalls = 2*NUM_ITEMS,
outerCalls = 2*NUM_ITEMS,
// innerCalls should be 2*get_iter_range_size()
intptr_t innerCalls = 2*get_iter_range_size(),
outerCalls = 2*get_iter_range_size(),
maxExecuted = outerCalls * innerCalls; // the number of invocations of the inner pipelines
CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters);
TRY();
......@@ -1611,7 +1623,9 @@ public:
void TestCancelation1_pipeline () {
ResetGlobals();
g_ThrowException = false;
RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(10);
// Threshold should leave more then max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
intptr_t threshold = get_iter_range_size() / 4;
RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(threshold);
g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
}
......
......@@ -14,11 +14,14 @@
limitations under the License.
*/
#include "common/config.h"
#include "tbb/parallel_for.h"
#include "tbb/global_control.h"
#include "common/test.h"
#include "common/utils.h"
#include "common/utils_concurrency_limit.h"
#include <atomic>
#include <condition_variable>
......@@ -65,6 +68,10 @@ public:
//! Test for exception when too many threads
//! \brief \ref resource_usage
TEST_CASE("Too many threads") {
if (utils::get_platform_max_threads() < 2) {
// The test expects that the scheduler will try to create at least one thread.
return;
}
std::thread /* isolate test */ ([] {
std::vector<Thread> threads;
stop = false;
......
......@@ -261,13 +261,13 @@ void TestFunctionNode() {
// rejecting
serial_fn_state0 = 0;
tbb::flow::make_edge(fnode0, qnode1);
tbb::flow::make_edge(qnode0, fnode0);
INFO("Testing rejecting function_node:");
CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
std::thread t([&] {
g.reset(); // attach to the current arena
tbb::flow::make_edge(fnode0, qnode1);
tbb::flow::make_edge(qnode0, fnode0); // TODO: invesigate why it always creates a forwarding task
INFO("Testing rejecting function_node:");
CHECK_MESSAGE( (!fnode0.my_queue), "node should have no queue");
CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
qnode0.try_put(1);
qnode0.try_put(2); // rejecting node should reject, reverse.
g.wait_for_all();
......@@ -302,10 +302,11 @@ void TestFunctionNode() {
INFO("\n");