提交 c5d09ddd 编写于 作者: S Serov, Vladimir

ASAN: Use after scope in conformance_concurrent_queue

concurrent_queue_rep holds reference to Allocator, which is managed by
concurrent_queue. This reference can be invalid after moving
concurrent_queue_rep from one concurrent_queue to another.

Updating concurrent_queue_rep for using provider Allocator instead of
holding reference.
Signed-off-by: NSerov, Vladimir <vladimir.serov@intel.com>
上级 fc9a3bbc
......@@ -26,7 +26,7 @@
namespace tbb {
namespace detail {
namespace d1 {
namespace d2 {
// A high-performance thread-safe non-blocking concurrent queue.
// Multiple threads may each push and pop concurrently.
......@@ -57,7 +57,7 @@ public:
my_allocator(a), my_queue_representation(nullptr)
{
my_queue_representation = static_cast<queue_representation_type*>(r1::cache_aligned_allocate(sizeof(queue_representation_type)));
queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator);
queue_allocator_traits::construct(my_allocator, my_queue_representation);
__TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
__TBB_ASSERT(is_aligned(&my_queue_representation->head_counter, max_nfs_size), "alignment error" );
......@@ -76,13 +76,13 @@ public:
concurrent_queue(const concurrent_queue& src, const allocator_type& a) :
concurrent_queue(a)
{
my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
}
concurrent_queue(const concurrent_queue& src) :
concurrent_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
{
my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
}
// Move constructors
......@@ -101,7 +101,7 @@ public:
internal_swap(src);
} else {
// allocators are different => performing per-element move
my_queue_representation->assign(*src.my_queue_representation, move_construct_item);
my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
src.clear();
}
}
......@@ -109,7 +109,7 @@ public:
// Destroy queue
~concurrent_queue() {
clear();
my_queue_representation->clear();
my_queue_representation->clear(my_allocator);
queue_allocator_traits::destroy(my_allocator, my_queue_representation);
r1::cache_aligned_deallocate(my_queue_representation);
}
......@@ -177,7 +177,7 @@ private:
template <typename... Args>
void internal_push( Args&&... args ) {
ticket_type k = my_queue_representation->tail_counter++;
my_queue_representation->choose(k).push(k, *my_queue_representation, std::forward<Args>(args)...);
my_queue_representation->choose(k).push(k, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
}
bool internal_try_pop( void* dst ) {
......@@ -193,7 +193,7 @@ private:
// Queue had item with ticket k when we looked. Attempt to get that item.
// Another thread snatched the item, retry.
} while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1));
} while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation));
} while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator));
return true;
}
......@@ -230,7 +230,7 @@ class concurrent_monitor;
// The concurrent monitor tags for concurrent_bounded_queue.
static constexpr std::size_t cbq_slots_avail_tag = 0;
static constexpr std::size_t cbq_items_avail_tag = 1;
} // namespace d1
} // namespace d2
namespace r1 {
......@@ -246,7 +246,7 @@ namespace r1 {
} // namespace r1
namespace d1 {
namespace d2 {
// A high-performance thread-safe blocking concurrent bounded queue.
// Supports boundedness and blocking semantics.
// Multiple threads may each push and pop concurrently.
......@@ -260,7 +260,7 @@ class concurrent_bounded_queue {
template <typename FuncType>
void internal_wait(r1::concurrent_monitor* monitors, std::size_t monitor_tag, std::ptrdiff_t target, FuncType pred) {
delegated_function<FuncType> func(pred);
d1::delegated_function<FuncType> func(pred);
r1::wait_bounded_queue_monitor(monitors, monitor_tag, target, func);
}
public:
......@@ -285,7 +285,7 @@ public:
my_queue_representation = reinterpret_cast<queue_representation_type*>(
r1::allocate_bounded_queue_rep(sizeof(queue_representation_type)));
my_monitors = reinterpret_cast<r1::concurrent_monitor*>(my_queue_representation + 1);
queue_allocator_traits::construct(my_allocator, my_queue_representation, my_allocator);
queue_allocator_traits::construct(my_allocator, my_queue_representation);
my_capacity = std::size_t(-1) / (queue_representation_type::item_size > 1 ? queue_representation_type::item_size : 2);
__TBB_ASSERT(is_aligned(my_queue_representation, max_nfs_size), "alignment error" );
......@@ -305,13 +305,13 @@ public:
concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
concurrent_bounded_queue(a)
{
my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
}
concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
{
my_queue_representation->assign(*src.my_queue_representation, copy_construct_item);
my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
}
// Move constructors
......@@ -330,7 +330,7 @@ public:
internal_swap(src);
} else {
// allocators are different => performing per-element move
my_queue_representation->assign(*src.my_queue_representation, move_construct_item);
my_queue_representation->assign(*src.my_queue_representation, my_allocator, move_construct_item);
src.clear();
}
}
......@@ -338,7 +338,7 @@ public:
// Destroy queue
~concurrent_bounded_queue() {
clear();
my_queue_representation->clear();
my_queue_representation->clear(my_allocator);
queue_allocator_traits::destroy(my_allocator, my_queue_representation);
r1::deallocate_bounded_queue_rep(reinterpret_cast<std::uint8_t*>(my_queue_representation),
sizeof(queue_representation_type));
......@@ -456,12 +456,12 @@ private:
try_call( [&] {
internal_wait(my_monitors, cbq_slots_avail_tag, target, pred);
}).on_exception( [&] {
my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation);
my_queue_representation->choose(ticket).abort_push(ticket, *my_queue_representation, my_allocator);
});
}
__TBB_ASSERT((static_cast<std::ptrdiff_t>(my_queue_representation->head_counter.load(std::memory_order_relaxed)) > target), nullptr);
my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...);
my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
}
......@@ -477,7 +477,7 @@ private:
// Another thread claimed the slot, so retry.
} while (!my_queue_representation->tail_counter.compare_exchange_strong(ticket, ticket + 1));
my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, std::forward<Args>(args)...);
my_queue_representation->choose(ticket).push(ticket, *my_queue_representation, my_allocator, std::forward<Args>(args)...);
r1::notify_bounded_queue_monitor(my_monitors, cbq_items_avail_tag, ticket);
return true;
}
......@@ -505,7 +505,7 @@ private:
});
}
__TBB_ASSERT(static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed)) > target, nullptr);
} while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation));
} while (!my_queue_representation->choose(target).pop(dst, target, *my_queue_representation, my_allocator));
r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, target);
return true;
......@@ -523,7 +523,7 @@ private:
// Queue had item with ticket k when we looked. Attempt to get that item.
// Another thread snatched the item, retry.
} while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1));
} while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation));
} while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator));
r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
return true;
......@@ -563,13 +563,13 @@ concurrent_bounded_queue( It, It, Alloc = Alloc() )
#endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
} //namespace d1
} //namespace d2
} // namesapce detail
inline namespace v1 {
using detail::d1::concurrent_queue;
using detail::d1::concurrent_bounded_queue;
using detail::d2::concurrent_queue;
using detail::d2::concurrent_bounded_queue;
using detail::r1::user_abort;
using detail::r1::bad_last_alloc;
......
......@@ -30,7 +30,7 @@
namespace tbb {
namespace detail {
namespace d1 {
namespace d2 {
using ticket_type = std::size_t;
......@@ -67,6 +67,7 @@ public:
using allocator_type = Allocator;
using allocator_traits_type = tbb::detail::allocator_traits<allocator_type>;
using queue_allocator_type = typename allocator_traits_type::template rebind_alloc<queue_rep_type>;
static constexpr size_type item_size = sizeof(T);
static constexpr size_type items_per_page = item_size <= 8 ? 32 :
......@@ -123,7 +124,7 @@ public:
}
if (tail_counter.load(std::memory_order_relaxed) != k) spin_wait_until_my_turn(tail_counter, k, base);
call_itt_notify(acquired, &tail_counter);
d1::call_itt_notify(d1::acquired, &tail_counter);
if (p) {
spin_mutex::scoped_lock lock( page_mutex );
......@@ -141,10 +142,10 @@ public:
}
template<typename... Args>
void push( ticket_type k, queue_rep_type& base, Args&&... args )
void push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator, Args&&... args )
{
padded_page* p = nullptr;
page_allocator_type page_allocator(base.get_allocator());
page_allocator_type page_allocator(allocator);
size_type index = prepare_page(k, base, page_allocator, p);
__TBB_ASSERT(p != nullptr, "Page was not prepared");
......@@ -152,38 +153,38 @@ public:
// variadic capture on GCC 4.8.5
auto value_guard = make_raii_guard([&] {
++base.n_invalid_entries;
call_itt_notify(releasing, &tail_counter);
d1::call_itt_notify(d1::releasing, &tail_counter);
tail_counter.fetch_add(queue_rep_type::n_queue);
});
page_allocator_traits::construct(page_allocator, &(*p)[index], std::forward<Args>(args)...);
// If no exception was thrown, mark item as present.
p->mask.store(p->mask.load(std::memory_order_relaxed) | uintptr_t(1) << index, std::memory_order_relaxed);
call_itt_notify(releasing, &tail_counter);
d1::call_itt_notify(d1::releasing, &tail_counter);
value_guard.dismiss();
tail_counter.fetch_add(queue_rep_type::n_queue);
}
void abort_push( ticket_type k, queue_rep_type& base) {
void abort_push( ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
padded_page* p = nullptr;
prepare_page(k, base, base.get_allocator(), p);
prepare_page(k, base, allocator, p);
++base.n_invalid_entries;
tail_counter.fetch_add(queue_rep_type::n_queue);
}
bool pop( void* dst, ticket_type k, queue_rep_type& base ) {
bool pop( void* dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator) {
k &= -queue_rep_type::n_queue;
spin_wait_until_eq(head_counter, k);
call_itt_notify(acquired, &head_counter);
d1::call_itt_notify(d1::acquired, &head_counter);
spin_wait_while_eq(tail_counter, k);
call_itt_notify(acquired, &tail_counter);
d1::call_itt_notify(d1::acquired, &tail_counter);
padded_page *p = head_page.load(std::memory_order_acquire);
__TBB_ASSERT( p, nullptr );
size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
bool success = false;
{
page_allocator_type page_allocator(base.get_allocator());
page_allocator_type page_allocator(allocator);
micro_queue_pop_finalizer<self_type, value_type, page_allocator_type> finalizer(*this, page_allocator,
k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
if (p->mask.load(std::memory_order_relaxed) & (std::uintptr_t(1) << index)) {
......@@ -196,7 +197,7 @@ public:
return success;
}
micro_queue& assign( const micro_queue& src, queue_rep_type& base,
micro_queue& assign( const micro_queue& src, queue_allocator_type& allocator,
item_constructor_type construct_item )
{
head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
......@@ -211,7 +212,7 @@ public:
size_type end_in_first_page = (index+n_items < items_per_page) ? (index + n_items) : items_per_page;
try_call( [&] {
head_page.store(make_copy(base, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed);
head_page.store(make_copy(allocator, srcp, index, end_in_first_page, g_index, construct_item), std::memory_order_relaxed);
}).on_exception( [&] {
head_counter.store(0, std::memory_order_relaxed);
tail_counter.store(0, std::memory_order_relaxed);
......@@ -221,7 +222,7 @@ public:
try_call( [&] {
if (srcp != src.tail_page.load(std::memory_order_relaxed)) {
for (srcp = srcp->next; srcp != src.tail_page.load(std::memory_order_relaxed); srcp=srcp->next ) {
cur_page->next = make_copy( base, srcp, 0, items_per_page, g_index, construct_item );
cur_page->next = make_copy( allocator, srcp, 0, items_per_page, g_index, construct_item );
cur_page = cur_page->next;
}
......@@ -229,7 +230,7 @@ public:
size_type last_index = modulo_power_of_two(tail_counter.load(std::memory_order_relaxed) / queue_rep_type::n_queue, items_per_page);
if( last_index==0 ) last_index = items_per_page;
cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
cur_page->next = make_copy( allocator, srcp, 0, last_index, g_index, construct_item );
cur_page = cur_page->next;
}
tail_page.store(cur_page, std::memory_order_relaxed);
......@@ -244,10 +245,10 @@ public:
return *this;
}
padded_page* make_copy( queue_rep_type& base, const padded_page* src_page, size_type begin_in_page,
padded_page* make_copy( queue_allocator_type& allocator, const padded_page* src_page, size_type begin_in_page,
size_type end_in_page, ticket_type& g_index, item_constructor_type construct_item )
{
page_allocator_type page_allocator(base.get_allocator());
page_allocator_type page_allocator(allocator);
padded_page* new_page = page_allocator_traits::allocate(page_allocator, 1);
new_page->next = nullptr;
new_page->mask.store(src_page->mask.load(std::memory_order_relaxed), std::memory_order_relaxed);
......@@ -287,10 +288,10 @@ public:
tail_page.store(pg, std::memory_order_relaxed);
}
void clear(queue_rep_type& base) {
void clear(queue_allocator_type& allocator ) {
padded_page* curr_page = head_page.load(std::memory_order_relaxed);
std::size_t index = head_counter.load(std::memory_order_relaxed);
page_allocator_type page_allocator(base.get_allocator());
page_allocator_type page_allocator(allocator);
while (curr_page) {
for (; index != items_per_page - 1; ++index) {
......@@ -423,14 +424,13 @@ public:
static constexpr size_type item_size = micro_queue_type::item_size;
static constexpr size_type items_per_page = micro_queue_type::items_per_page;
concurrent_queue_rep( queue_allocator_type& alloc ) : my_queue_allocator(alloc)
{}
concurrent_queue_rep() {}
concurrent_queue_rep( const concurrent_queue_rep& ) = delete;
concurrent_queue_rep& operator=( const concurrent_queue_rep& ) = delete;
void clear() {
page_allocator_type page_allocator(my_queue_allocator);
void clear( queue_allocator_type& alloc ) {
page_allocator_type page_allocator(alloc);
for (size_type i = 0; i < n_queue; ++i) {
padded_page* tail_page = array[i].get_tail_page();
if( is_valid_page(tail_page) ) {
......@@ -444,7 +444,7 @@ public:
}
}
void assign( const concurrent_queue_rep& src, item_constructor_type construct_item ) {
void assign( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
head_counter.store(src.head_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
tail_counter.store(src.tail_counter.load(std::memory_order_relaxed), std::memory_order_relaxed);
n_invalid_entries.store(src.n_invalid_entries.load(std::memory_order_relaxed), std::memory_order_relaxed);
......@@ -453,11 +453,11 @@ public:
size_type queue_idx = 0;
try_call( [&] {
for (; queue_idx < n_queue; ++queue_idx) {
array[queue_idx].assign(src.array[queue_idx], *this, construct_item);
array[queue_idx].assign(src.array[queue_idx], alloc, construct_item);
}
}).on_exception( [&] {
for (size_type i = 0; i < queue_idx + 1; ++i) {
array[i].clear(*this);
array[i].clear(alloc);
}
head_counter.store(0, std::memory_order_relaxed);
tail_counter.store(0, std::memory_order_relaxed);
......@@ -486,10 +486,6 @@ public:
return tc - hc - nie;
}
queue_allocator_type& get_allocator() {
return my_queue_allocator;
}
friend class micro_queue<T, Allocator>;
// Map ticket_type to an array index
......@@ -507,7 +503,6 @@ public:
alignas(max_nfs_size) std::atomic<ticket_type> head_counter{};
alignas(max_nfs_size) std::atomic<ticket_type> tail_counter{};
alignas(max_nfs_size) std::atomic<size_type> n_invalid_entries{};
queue_allocator_type& my_queue_allocator;
}; // class concurrent_queue_rep
#if _MSC_VER && !defined(__INTEL_COMPILER)
......@@ -652,7 +647,7 @@ private:
friend struct concurrent_queue_iterator_provider;
}; // class concurrent_queue_iterator
} // namespace d1
} // namespace d2
} // namespace detail
} // tbb
......
......@@ -58,8 +58,8 @@ void __TBB_EXPORTED_FUNC wait_bounded_queue_monitor( concurrent_monitor* monitor
}
void __TBB_EXPORTED_FUNC abort_bounded_queue_monitors( concurrent_monitor* monitors ) {
concurrent_monitor& items_avail = monitors[d1::cbq_items_avail_tag];
concurrent_monitor& slots_avail = monitors[d1::cbq_slots_avail_tag];
concurrent_monitor& items_avail = monitors[d2::cbq_items_avail_tag];
concurrent_monitor& slots_avail = monitors[d2::cbq_slots_avail_tag];
items_avail.abort_all();
slots_avail.abort_all();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册