diff --git a/00-ProgramElement/1-NolockQueue/include/lock_free_queue.hpp b/00-ProgramElement/1-NolockQueue/include/lock_free_queue.hpp new file mode 100644 index 0000000000000000000000000000000000000000..1b4135d0f5f123d962c7a7cfd4881f366ab8ef20 --- /dev/null +++ b/00-ProgramElement/1-NolockQueue/include/lock_free_queue.hpp @@ -0,0 +1,131 @@ +/* + * Copyright (c) TD-Tech. 2022. All rights reserved. + * Description: lock free queuea + * Author: ousixin + */ + +#pragma once +#include +#include +#include + +namespace add { +template +class LockFreeQueue final { + static_assert(cap > 0u, "capacity of LockFreeQueue should greater than 0"); + // a redundant location to determine if the queue is empty or full + static constexpr size_t nMax = cap + 1u; +public: + LockFreeQueue() {} + ~LockFreeQueue() {} + bool push(const T& elem) + { + size_t wIdx = writeIdx_.load(); + size_t updateIdx; + size_t woff; + do { + if (wIdx > std::numeric_limits::max() - 1u) { + updateIdx = ((wIdx % nMax) + 1u) % nMax; + } else { + updateIdx = wIdx + 1u; + } + // offset to write + woff = wIdx % nMax; + // full + if (woff == headIdx_.load() % nMax) { + return false; + } + } while(!writeIdx_.compare_exchange_strong(wIdx, updateIdx)); + datas_[woff] = elem; + while (true) { + size_t expect = wIdx; + if(!tailIdx_.compare_exchange_weak(expect, updateIdx)) { + std::this_thread::yield(); + } else { + break; + } + } + count_.fetch_add(1); + return true; + } + bool push(T&& elem) + { + size_t wIdx = writeIdx_.load(); + size_t updateIdx; + size_t woff; + do { + if (wIdx > std::numeric_limits::max() - 1u) { + updateIdx = ((wIdx % nMax) + 1u) % nMax; + } else { + updateIdx = wIdx + 1u; + } + // offset to write + woff = wIdx % nMax; + // full + if (woff == headIdx_.load() % nMax) { + return false; + } + } while(!writeIdx_.compare_exchange_strong(wIdx, updateIdx)); + datas_[woff] = std::move(elem); + while (true) { + size_t expect = wIdx; + if(!tailIdx_.compare_exchange_weak(expect, updateIdx)) { + std::this_thread::yield(); + } else { + break; + } + } + count_.fetch_add(1); + return true; + } + bool pop(T& elem) + { + size_t rBeginIdx = headIdx_.load(); + size_t updateIdx; + size_t roff; + do { + if (rBeginIdx > std::numeric_limits::max() - 1u) { + updateIdx = ((rBeginIdx % nMax) + 1u) % nMax; + } else { + updateIdx = rBeginIdx + 1u; + } + // offset to read, the headIdx_ is one ahead, read the idx behind the the headIdx_ + roff = updateIdx % nMax; + // empty + if (roff == tailIdx_.load() % nMax) { + return false; + } + elem = datas_[roff]; // don't move, as CAS may failed + } while (!headIdx_.compare_exchange_strong(rBeginIdx, updateIdx)); + count_.fetch_sub(1); + return true; + } + size_t capacity() const + { + return cap; + } + size_t size() const + { + return count_.load(); + } + bool empty() const + { + return count_.load() == 0u; + } + bool full() const + { + return count_.load() == cap; + } +private: + LockFreeQueue(const LockFreeQueue&) = delete; + LockFreeQueue& operator=(const LockFreeQueue&) = delete; + +private: + T datas_[nMax]; + std::atomic_size_t headIdx_ { nMax - 1 }; + std::atomic_size_t tailIdx_ { 0 }; + std::atomic_size_t writeIdx_ { 0 }; + std::atomic_size_t count_ { 0 }; +}; + +} // namespace add