[POTENTIAL BUG] Potential FIFO violation in BlockingQueue under high contention and suggestion for fair mode in ArrayBlockingQueue and LinkedBlockingQueue

김민주 miiiinju00 at gmail.com
Wed Sep 4 07:09:31 UTC 2024


Hello core-libs-dev team,

My name is Kim Minju, and I have investigated a potential issue with both
ArrayBlockingQueue and LinkedBlockingQueue implementations when handling
high contention scenarios. While I am not entirely certain whether this is
a bug or expected behavior,

I would like to share my findings and seek your input. If this is indeed an
issue, I would also like to propose a fix and work on implementing it, with
the community's guidance and support.
Problem Description:

When the queue is full, and both put() operations and item consumption
occur simultaneously, threads that have been waiting to insert elements
using notFull.await() are expected to be unblocked in a strict FIFO order.
However, in my tests, when space becomes available, both the unblocked
threads and newly arriving put() requests compete for the lock, potentially
leading to race conditions. This can result in newer threads acquiring the
lock first and inserting items before previously blocked threads, which
seems to violate FIFO order.

In ArrayBlockingQueue, while a fair mode is available, it does not seem to
fully guarantee strict FIFO order under high contention. Additionally,
LinkedBlockingQueue lacks any fair mode, which seems to exacerbate the
issue, allowing newer threads to preempt waiting threads more easily.

I am unsure if this behavior is by design or if it can be improved. I would
greatly appreciate any feedback from the community regarding whether this
is an intended behavior or a potential issue.
Steps to Reproduce:

Here is a test case that demonstrates how FIFO order can be violated when
threads blocked on put() calls are unblocked and compete with new put()
requests for lock acquisition:

import static org.junit.jupiter.api.Assertions.fail;

import java.lang.Thread.State;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;

class BlockingQueueFIFOTest {

    @Test
    void testFIFOOrder() throws InterruptedException {
        final int QUEUE_CAPACITY = 10;
        AtomicInteger sequenceGenerator = new AtomicInteger(0);

        // Create a blocking queue with a fixed capacity
        BlockingQueue<Integer> queue = new
LinkedBlockingQueue<>(QUEUE_CAPACITY);
//        queue = new ArrayBlockingQueue<>(QUEUE_CAPACITY, true);

        // Prefill the queue to its capacity
        // This is crucial for the test as it ensures subsequent put
operations will block
        // By filling the queue first, we guarantee that following put
operations
        // will be queued in the exact order they are called (10, 11, 12, ...)
        // This setup creates a controlled environment for testing FIFO behavior
         for(int i = 0;i<QUEUE_CAPACITY;i++) {
            queue.put(sequenceGenerator.getAndIncrement());
        }

        final int PRODUCER_COUNT = 100;
        final int PRODUCER_COUNT_WHEN_CONSUMING = 1000;

        // Create multiple producer threads
        // Each thread attempts to put a single item into the already full queue
        // Since the queue is full, these put operations will block in
the order they are called
        // This configuration ensures items enter the queue's waiting
list in a predictable sequence (10, 11, 12, ...)
        // allowing us to verify if they are later consumed in the same order
        for (int i = 0; i < PRODUCER_COUNT; i++) {
            Thread thread = new Thread(() -> {
                try {
                    queue.put(sequenceGenerator.getAndIncrement());
                } catch (InterruptedException e) {
                    // ignore
                }

            });
            thread.start();
            // Wait for the producer thread to enter BLOCKED state
            // This ensures that the thread is waiting on the full queue
            while (thread.getState() == State.RUNNABLE ||
thread.getState() == State.NEW);
        }

        List<Integer> result = new ArrayList<>();

        // This simulates a real-world high-contention scenario on a full queue
        // When space becomes available, it's not just waiting threads
that compete
        // New put requests arrive simultaneously, adding to the contention
        // This tests if FIFO order is maintained under intense
pressure from both
        Thread producingWhenConsumingThread = new Thread(() -> {
            for (int i = 0; i < PRODUCER_COUNT_WHEN_CONSUMING; i++) {
                Thread thread = new Thread(() -> {
                    try {
                        queue.put(sequenceGenerator.getAndIncrement());
                    } catch (InterruptedException e) {
                        // ignore
                    }

                });
                thread.start();

                // Wait for the producer thread to enter BLOCKED state
                // This ensures that the thread is waiting on the full queue
                while (thread.getState() == State.RUNNABLE ||
thread.getState() == State.NEW);
            }
        });
        producingWhenConsumingThread.start();

        for (int i = 0; i < PRODUCER_COUNT + QUEUE_CAPACITY +
PRODUCER_COUNT_WHEN_CONSUMING ; i++) {
            Integer item = queue.take();
            result.add(item);
        }
        System.out.println("result = " + result);

        for (int i = 0; i < result.size() - 1; i++) {
            if (result.get(i) > result.get(i + 1)) {
                fail("FIFO order violated at index " + i + ": " +
result.get(i) + " > " + result.get(i + 1));
            }
        }
    }
}


Expected Behavior:

When the queue is full and multiple threads are blocked on put() operations
(using notFull.await()), these threads should be unblocked in the order
they were blocked, with newly arriving put() requests waiting until the
blocked threads have inserted their elements. The blocked threads should
have priority over new threads, and lock acquisition should respect this
order.
Actual Behavior (Observed in Testing):

Under high contention, both ArrayBlockingQueue and LinkedBlockingQueue seem
to violate FIFO order due to lock contention between threads that were
blocked on notFull.await() and new put() requests. Newly arriving threads
can sometimes preempt previously blocked threads, resulting in newer items
being inserted ahead of older ones.

Again, I am uncertain if this is an expected consequence of the current
implementation or if there is room for improvement.
Proposed Solution (If This is a Problem):

If this behavior is not intended, I propose modifying both
ArrayBlockingQueue and LinkedBlockingQueue so that if there are threads
waiting on notFull.await(), newly arriving put() requests would be forced
to wait behind the blocked threads. This could help ensure that previously
blocked threads insert their items in FIFO order, minimizing FIFO
violations due to lock contention.

Additionally, I suggest adding an option for *fair mode* in
LinkedBlockingQueue, similar to what is available in ArrayBlockingQueue.
This would further ensure that waiting threads are processed in a
predictable and fair manner, preventing newer threads from preempting
waiting threads and thus maintaining strict FIFO order.
Performance Considerations:

I understand that implementing stricter FIFO guarantees may impact overall
throughput, especially in high-contention scenarios. To address this, I
suggest making these changes configurable, allowing users to choose between
strict FIFO ordering and potentially higher throughput, depending on their
workload needs.

If the community deems this change valuable, I am willing to conduct
thorough performance testing to quantify the impact on throughput and
latency under various workloads, and I could create JMH benchmarks to
measure the performance differences between the current and proposed
implementations.
Request for Sponsorship and Feedback:

As I am not yet an Author in the OpenJDK project, I am seeking guidance and
potential sponsorship to address this issue, should it be deemed necessary.

   - I would appreciate the community's thoughts on whether this behavior
   is expected or a potential issue worth addressing.
   - If the community agrees that this is a significant issue, I welcome
   guidance on the best approach to tackle it within the current Java
   implementation.
   - I am looking for a sponsor who might be interested in supporting this
   effort. With sponsorship, I would be glad to:
      - Conduct a deeper analysis of the problem
      - Create a prototype implementation for review
      - Prepare and submit a formal patch
      - Participate in the review process and make necessary adjustments

Thank you for your time and consideration. I look forward to your feedback
and the opportunity to contribute to the improvement of Java's concurrent
collections.

Best regards,

Kim Minju

miiiinju00 at gmail.com
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20240904/aa51be9d/attachment.htm>


More information about the core-libs-dev mailing list