Skip to content

Commit

Permalink
Merge pull request #15 from DrDub/main
Browse files Browse the repository at this point in the history
Priority Queue Multithreaded Test
  • Loading branch information
DNedic committed Dec 27, 2023
2 parents 9046fea + 8976cb6 commit d2d8225
Showing 1 changed file with 88 additions and 0 deletions.
88 changes: 88 additions & 0 deletions tests/spsc/priority_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <algorithm>
#include <math.h>
#include <thread>

#include <catch2/catch_test_macros.hpp>

Expand Down Expand Up @@ -66,6 +67,93 @@ TEST_CASE("Write multiple with different priority and read back ensuring "
REQUIRE(read == 1024);
}

TEST_CASE("Multithreaded read/write", "[pq_multithreaded]") {
lockfree::spsc::PriorityQueue<uint64_t, 10, 4> queue;
std::vector<std::thread> threads;
std::vector<uint64_t> written;
std::vector<uint64_t> read;

/* The following code pushes a large number of values into a priority
queue using four different priorities. The priority is also encoded
into the value pushed, as its two lower significance bits.
Both consumer and producer store in two vectors the numbers written
and read.
After the multi-threaded execution, special code (described in detail
below) checks no higher priority value was written to the priority
queue at the time a lower priority one was read.
*/

// consumer, it just pops values from the queue and stores them in the
// main thread vector
threads.emplace_back([&]() {
uint64_t value = 0;
uint64_t cnt = 0;
do {
bool pop_success = queue.Pop(value);
if (pop_success) {
read.push_back(value);
cnt++;
}
} while (cnt < TEST_MT_TRANSFER_CNT);
});

// producer, uses alternative priorities and pushes a counter shifted to
// accommodate the priority on its lower bits.
threads.emplace_back([&]() {
uint64_t cnt = 0;
uint64_t value = 0;
uint8_t prio = 0;
do {
value = cnt << 2 + prio;
bool push_success = queue.Push(value, prio);
if (push_success) {
written.push_back(value);
prio = (prio + 1) % 4; // this could be also randomly generated
cnt++;
}
} while (cnt < TEST_MT_TRANSFER_CNT + 1);
});
for (auto &t : threads) {
t.join();
}
/* The following code checks that at all times no higher priority value was
present in the `written` vector.
It needs to keep track which values were already read, it does that with
the help of the `consumed` Boolean vector.
*/
std::vector<bool> consumed(written.size(), false);
uint64_t value1, value2;
uint8_t prio1, prio2;
bool found;
for(size_t idx=0; idx<read.size(); idx++) {
// the value was read
value1 = read[idx];
// extract the priority encoded in the value
prio1 = value1 & ((1<<2) - 1);

found = false;
for(size_t idx2=0; idx2<written.size(); idx2++) {
if(consumed[idx2]) { // consumed values are skipped
continue;
}
// find when the value was written
value2 = written[idx2];
prio2 = value2 & ((1<<2) - 1);
if(written[idx2] == value1) {
consumed[idx2] = true; // this value is now accounted for
found = true;
break;
} else { // intermediate value, should be lower priority
REQUIRE(prio2 <= prio1);
}
}
REQUIRE(found);
}
}

TEST_CASE("Optional API", "[pq_optional_api]") {
lockfree::spsc::PriorityQueue<int16_t, 20, 3> queue;

Expand Down

0 comments on commit d2d8225

Please sign in to comment.