#pragma once #include #include "reclaimer.h" template class ConcreteReclaimer; template class ConcurrentQueue { static_assert(std::is_copy_constructible_v, "T requires copy constructor"); struct Node; struct RegularNode; friend ConcreteReclaimer; public: ConcurrentQueue() : head_(new Node), tail_(head_.load(std::memory_order_relaxed)), size_(0) {} ~ConcurrentQueue() { Node* p = head_.load(std::memory_order_acquire); while (p != nullptr) { Node* tmp = p; p = p->next.load(std::memory_order_acquire); tmp->Release(); } } ConcurrentQueue(const ConcurrentQueue&) = delete; ConcurrentQueue(ConcurrentQueue&&) = delete; ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; ConcurrentQueue& operator=(ConcurrentQueue&& other) = delete; template void Emplace(Args&&... args); void Enqueue(const T& value) { static_assert(std::is_copy_constructible::value, "T must be copy constructible"); Emplace(value); }; void Enqueue(T&& value) { static_assert(std::is_constructible_v, "T must be constructible with T&&"); Emplace(std::forward(value)); } bool Dequeue(T& data); size_t size() const { return size_.load(std::memory_order_relaxed); } private: Node* get_head() const { return head_.load(std::memory_order_acquire); } Node* get_tail() const { return tail_.load(std::memory_order_acquire); } // Get safe node and its next, ensure next is the succeed of node // and both pointer are safety. // REQUIRE: atomic_node is head_ or tail_. void AcquireSafeNodeAndNext(std::atomic& atomic_node, Node** node_ptr, Node** next_ptr, HazardPointer& node_hp, HazardPointer& next_hp); // Invoke this function when the node can be reclaimed static void OnDeleteNode(void* ptr) { static_cast(ptr)->Release(); } struct Node { Node() : next(nullptr) {} virtual ~Node() = default; virtual void Release() { delete this; } Node* get_next() const { return next.load(std::memory_order_acquire); } std::atomic next; }; struct RegularNode : Node { template RegularNode(Args&&... args) : value(std::forward(args)...) {} ~RegularNode() override = default; void Release() override { delete this; } T value; }; std::atomic head_; std::atomic tail_; std::atomic size_; static Reclaimer::HazardPointerList global_hp_list_; }; template Reclaimer::HazardPointerList ConcurrentQueue::global_hp_list_; template class ConcreteReclaimer : public Reclaimer { friend ConcurrentQueue; private: ConcreteReclaimer(HazardPointerList& hp_list) : Reclaimer(hp_list) {} ~ConcreteReclaimer() override = default; static ConcreteReclaimer& GetInstance() { thread_local static ConcreteReclaimer reclaimer( ConcurrentQueue::global_hp_list_); return reclaimer; } }; template void ConcurrentQueue::AcquireSafeNodeAndNext(std::atomic& atomic_node, Node** node_ptr, Node** next_ptr, HazardPointer& node_hp, HazardPointer& next_hp) { Node* node = atomic_node.load(std::memory_order_acquire); Node* next; Node* temp_node; Node* temp_next; auto& reclaimer = ConcreteReclaimer::GetInstance(); do { do { // 1.UnMark old node; node_hp.UnMark(); temp_node = node; // 2. Mark node. node_hp = HazardPointer(&reclaimer, node); node = atomic_node.load(std::memory_order_acquire); // 3. Make sure the node is still the one we mark before. } while (temp_node != node); // 4. UnMark old next. next_hp.UnMark(); next = node->get_next(); temp_next = next; // 5. Mark next. next_hp = HazardPointer(&reclaimer, next); next = node->get_next(); // 6. Make sure the next is still the succeed of first. } while (temp_next != next); *node_ptr = node; *next_ptr = next; } template template void ConcurrentQueue::Emplace(Args&&... args) { static_assert(std::is_constructible_v, "T must be constructible with Args&&..."); RegularNode* new_node = new RegularNode(std::forward(args)...); Node* tail; Node* next; HazardPointer tail_hp, next_hp; while (true) { AcquireSafeNodeAndNext(tail_, &tail, &next, tail_hp, next_hp); if (tail != get_tail()) continue; // Are tail and next consistent? if (nullptr == next) { // Was tail point to last node? // Try to link node at the end of the linked list. if (tail->next.compare_exchange_strong(next, new_node)) break; } else { // Try to swing tail to the next node. tail_.compare_exchange_weak(tail, next); } } // Enqueue is done. Try to swing tail to the inserted node. tail_.compare_exchange_weak(tail, new_node); size_.fetch_add(1, std::memory_order_relaxed); } template bool ConcurrentQueue::Dequeue(T& value) { HazardPointer head_hp; HazardPointer next_hp; Node* head; Node* next; Node* tail; while (true) { AcquireSafeNodeAndNext(head_, &head, &next, head_hp, next_hp); tail = get_tail(); if (head != get_head()) continue; // Are head, tail, and next consistent? if (nullptr == next) return false; // Queue is empty; if (head == tail) { // Is queue empty or tail falling behind? // Tail is falling behind. Try to advance it. tail_.compare_exchange_weak(tail, next); } else { // Try to swing head to the next node. if (head_.compare_exchange_strong(head, next)) break; } } size_.fetch_sub(1, std::memory_order_relaxed); auto& reclaimer = ConcreteReclaimer::GetInstance(); reclaimer.ReclaimLater(head, ConcurrentQueue::OnDeleteNode); reclaimer.ReclaimNoHazardPointer(); value = std::move(static_cast(next)->value); return true; }