[External] : Re: [POTENTIAL BUG] Potential FIFO violation in BlockingQueue under high contention and suggestion for fair mode in ArrayBlockingQueue and LinkedBlockingQueue
김민주
miiiinju00 at gmail.com
Fri Sep 20 06:58:20 UTC 2024
Hi Viktor, Archie, and Daniel,
I hope everything is going well on your end. I really appreciate the thoughtful feedback you've provided so far, and I wanted to follow up on our previous discussion about the potential fairness issue in ArrayBlockingQueue when using ReentrantLock with Condition.await().
In our previous conversation, I raised a concern regarding the separate waiting spaces for ReentrantLock's AQS and Condition.await() in ArrayBlockingQueue. Specifically, my understanding is that when a thread is signaled from Condition.await(), it is re-enqueued at the end of the AQS queue, which could potentially allow newly arriving threads to acquire the lock before threads that have been waiting on Condition.await() for longer. This separation of the AQS queue and the condition waiting space seems to introduce the possibility of FIFO violations and starvation, even in fair mode.
If the potential starvation caused by this behavior is something that needs to be addressed, I’ve been considering a solution that could help. One approach could be tracking the number of threads waiting when the queue is full. Here’s an example of how this might be implemented:
private volatile long putWaitingCount = 0L;
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
// waitedBefore: A flag indicating whether the current thread has entered the await state in this iteration.
boolean waitedBefore = false;
lock.lockInterruptibly();
try {
/**
* If there are waiting threads, the current thread joins the wait queue.
* Threads entering this condition are implicitly guaranteed that count == capacity.
* Therefore, it's safe to use await() without wrapping it in a loop here.
*/
if (putWaitingCount > 0) {
++putWaitingCount;
waitedBefore = true;
notFull.await();
}
while (count == items.length) {
if(!waitedBefore) {
++putWaitingCount;
waitedBefore = true;
}
notFull.await();
}
enqueue(e);
} finally {
// If the thread had waited, decrement the waiting thread count.
if (waitedBefore) {
--putWaitingCount;
}
lock.unlock();
}
}
In this approach, the number of threads currently waiting on Condition.await() is tracked by a simple counter, putWaitingCount. This ensures that newly arriving threads do not bypass those that have already been waiting when the queue is full.
I’d love to hear your thoughts on whether the starvation issue caused by the separation of AQS and Condition.await()waiting spaces is something worth addressing, and if so, whether the approaches I’ve suggested could be effective in resolving it. If this direction seems promising, I’d be happy to continue refining the solution based on your feedback.
Thanks again for your time and consideration. I truly appreciate the opportunity to engage with you and learn through this process.
Best regards,
Minju Kim
> 2024. 9. 11. 오후 1:33, 김민주 <miiiinju00 at gmail.com> 작성:
>
> Hi Archie and Viktor,
>
> I apologize for the delay in my response.
>
>
>
> After further consideration, I believe my understanding aligns more closely with Interpretation B.
>
> To clarify my understanding of Interpretation B, threads waiting on a Condition (linked through ConditionNode in ConditionObject) receive signals in the order in which they started waiting, but receiving a signal doesn't guarantee immediate reacquisition of the ReentrantLock. Instead, as far as I understand, the signal simply enqueues the thread at the end of the AQS queue(ReentrantLock).
>
>
>
> As a result, threads that received a signal from ConditionObject do not have priority over the threads already waiting in the AQS queue and are simply added to the end of that queue.
>
> From what I understand, the difference between NonFairSync and FairSync lies in whether a thread can acquire the lock when there are other threads already waiting in the AQS queue, rather than anything related to the threads signaled from a Condition.
>
>
>
> Additionally, as I understand it, the enqueue() and doSignal() methods are declared final in AQS, and both Syncimplementations use the AQS methods directly.
>
>
>
> Therefore, I wanted to highlight that this isn't necessarily an issue with AQS or FairSync. Instead, I suspect that the behavior we're seeing could arise from the fact that threads signaled in a BlockingQueue implementation may not execute immediately, which is a characteristic of the implementation.
>
>
>
> However, there may be aspects I’m misunderstanding, and I would appreciate any corrections or clarifications if that’s the case.
>
>
>
> Thank you very much for your time and understanding.
>
> Best regards,
>
> Kim Minju
>
>
>> 2024. 9. 8. 오후 11:39, Archie Cobbs <archie.cobbs at gmail.com> 작성:
>>
>> Hi Kim,
>>
>> Thanks for the details.
>>
>> So to summarize:
>> Kim is saying that "Interpretation B" is how it actually works.
>> Viktor is saying that "Interpretation A" is how it actually works.
>> Do I have that right?
>>
>> -Archie
>>
>> P.S. Viktor: my apologies for misspelling your name before
>>
>>
>> On Sat, Sep 7, 2024 at 8:04 PM 김민주 <miiiinju00 at gmail.com <mailto:miiiinju00 at gmail.com>> wrote:
>>> Hi Arhice,
>>>
>>>
>>>
>>> First of all, I want to apologize if I may not have explained things clearly since English is not my first language. I’m really sorry about that.
>>>
>>>
>>>
>>> Even so, I deeply appreciate your response and taking the time to reply.
>>>
>>>
>>>
>>> First, I would like to confirm whether my understanding is correct.
>>>
>>> From what I know, ReentrantLock is based on AQS, and internally, threads are queued by being linked as Node.
>>>
>>> When ReentrantLock.newCondition is called, a ConditionObject is created. When Condition::await is called, the thread waits based on a ConditionNode, and the AQS head is replaced with AQS::signalNext, allowing the lock to be released. At this point, I understand that the node disappears from the AQS queue and starts waiting within the ConditionNode of the ConditionObject.
>>>
>>> As a result, I understand that the waiting places for ReentrantLock and Condition are different.
>>>
>>> To summarize, when Condition::await() is called, the node that was previously waiting in AQS receives the lock, disappears from the AQS queue, and starts waiting within the ConditionNode of the ConditionObject.
>>>
>>> Additionally, from what I understand, in Condition::doSignal, the first ConditionNode is removed, and then AQS::enqueue adds it to the tail of AQS.
>>>
>>> public class ConditionObject implements Condition, java.io.Serializable {
>>>
>>> private void doSignal(ConditionNode first, boolean all) {
>>> while (first != null) {
>>> ConditionNode next = first.nextWaiter;
>>> if ((firstWaiter = next) == null)
>>> lastWaiter = null;
>>> if ((first.getAndUnsetStatus(COND) & COND) != 0) {
>>> enqueue(first);
>>> if (!all)
>>> break;
>>> }
>>> first = next;
>>> }
>>> }
>>>
>>> When enqueue is called:
>>>
>>> public abstract class AbstractQueuedSynchronizer
>>> extends AbstractOwnableSynchronizer
>>>
>>> /*
>>> * Tail of the wait queue. After initialization, modified only via casTail.
>>> */
>>> private transient volatile Node tail;
>>>
>>> final void enqueue(Node node) {
>>> if (node != null) {
>>> for (;;) {
>>> Node t = tail;
>>> node.setPrevRelaxed(t); // avoid unnecessary fence
>>> if (t == null) // initialize
>>> tryInitializeHead();
>>> else if (casTail(t, node)) {
>>> t.next = node;
>>> if (t.status < 0) // wake up to clean link
>>> LockSupport.unpark(node.waiter);
>>> break;
>>> }
>>> }
>>> }
>>> }
>>>
>>> From my understanding, this attaches the node to the tail of AQS.
>>>
>>> To elaborate further on the situation:
>>>
>>> Threads T1 to T10 are waiting on Condition::await() because the queue is full.
>>>
>>> (At this point, T1 to T10 are linked through ConditionNode.) [The AQS queue is empty, while ConditionNode holds T1 to T10.]
>>>
>>> T11 calls take() and holds the lock via lock.lockInterruptibly().
>>>
>>> (Since no threads are waiting in the AQS queue, T11 will acquire the lock immediately.)
>>>
>>> [Now, AQS holds T11 at its head, and ConditionNode holds T1 to T10.]
>>>
>>> T12 calls queue.put() and enters the wait queue for lock.lockInterruptibly(). (Since T11 is holding the lock with take(), T12 will be queued behind it in AQS.)
>>>
>>> [Now, AQS holds T11 and T12, while ConditionNode holds T1 to T10.]
>>>
>>> T11 reduces the count and sends a signal, then releases the lock.
>>>
>>> T1 receives the signal and moves to the lock queue. Since ReentrantLock is in fair mode,
>>>
>>> (When T11 sends the signal, T1, the first thread linked in ConditionNode, will be enqueued via AQS::enqueue. Now, AQS holds T11, T12, and T1, while ConditionNode holds T2 to T10.)
>>>
>>> T11 releases the lock and wakes up T12.
>>>
>>> [Now, AQS holds T12 and T1, while ConditionNode holds T2 to T10.]
>>>
>>> T12 acquires the lock and proceeds to enqueue in ArrayBlockingQueue without being blocked by while(count==length).
>>>
>>> T12 releases the lock, and the next node in AQS is unparked.
>>>
>>> [Now, AQS holds T1, while ConditionNode holds T2 to T10.]
>>>
>>> T1, having reacquired the lock after Condition::await(), fails to exit the while loop and waits again.
>>>
>>> [Now, ConditionNode holds T1 and T2 to T10.]
>>>
>>>
>>>
>>> This is how I currently understand the situation.
>>>
>>> If there are any mistakes in my understanding, I would greatly appreciate your clarification.
>>>
>>> Best Regards,
>>>
>>> Kim Minju
>>>
>>>
>>>> 2024. 9. 8. 오전 3:34, Archie Cobbs <archie.cobbs at gmail.com <mailto:archie.cobbs at gmail.com>> 작성:
>>>>
>>>> Hi Kim,
>>>>
>>>> On Sat, Sep 7, 2024 at 10:36 AM 김민주 <miiiinju00 at gmail.com <mailto:miiiinju00 at gmail.com>> wrote:
>>>>> Here's a clearer outline of the scenario:
>>>>>
>>>>> Threads T1 to T10 are waiting on Condition::await() because the queue is full.
>>>>> T11 calls take() and holds the lock via lock.lockInterruptibly().
>>>>> T12 calls queue.put() and enters the wait queue for lock.lockInterruptibly(). (As I understand, the wait queue for ReentrantLock and Condition are separate.)
>>>>> T11 reduces the count and sends a signal, then releases the lock.
>>>>> T1 receives the signal and moves to the lock queue. Since the ReentrantLock is in fair mode, T12 (which was already waiting) gets priority, and T1 will acquire the lock later.
>>>>> T12 acquires the lock and successfully enqueues.
>>>> From one reading of the Javadoc, your step #5 ("T12 gets priority") is not supposed to happen that way. Instead, one of T1 through T10 should be guaranteed to acquire the lock.
>>>>
>>>> Here it is again (from ReentrantLock.newCondition()):
>>>>
>>>>> The ordering of lock reacquisition for threads returning from waiting methods is the same as for threads initially acquiring the lock, which is in the default case not specified, but for fair locks favors those threads that have been waiting the longest.
>>>>
>>>>
>>>> But part of the problem here is that this documentation is ambiguous.
>>>>
>>>> The ambiguity is: are ALL threads trying to acquire the lock, whether on an initial attempt or after a condition wakeup, ordered for fairness together in one big pool? → In this case step #5 can't happen. Call this Interpretation A.
>>>>
>>>> Or is this merely saying that threads waiting on a condition reacquire the lock based on when they started waiting on the condition, but there are no ordering guarantees between those threads and any other unrelated threads trying to acquire the lock? → In this case step #5 can happen. Call this Interpretation B.
>>>>
>>>> So I think we first need to clarify which interpretation is correct here, A or B.
>>>>
>>>> On that point, Victor said this:
>>>>
>>>>> I've re-read ReentrantLock and AQS, and from my understanding on the logic the Condition's place in the wait queue should be maintained, which means that T3 shouldn't be able to "barge".
>>>>
>>>>
>>>> So it sounds like Victor is confirming interpretation A. Victor do you agree?
>>>>
>>>> If so, then it seems like we need to do two things:
>>>>
>>>> 1. File a Jira ticket to clarify the Javadoc, e.g. to say something like this:
>>>>
>>>>> The ordering of lock reacquisition for threads returning from waiting methods is the same as for threads initially acquiring the lock, which is in the default case not specified, but for fair locks favors those threads that have been waiting the longest. In the latter case, the ordering consideration includes all threads attempting to acquire the lock, regardless of whether or not they were previously blocked on the condition.
>>>>
>>>>
>>>> 2. Understand why Kim's updated test case is still failing (it must be either a bug in the test or a bug in ArrayBlockingQueue).
>>>>
>>>> -Archie
>>>>
>>>> --
>>>> Archie L. Cobbs
>>>
>>
>>
>> --
>> Archie L. Cobbs
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://mail.openjdk.org/pipermail/core-libs-dev/attachments/20240920/0262d55f/attachment-0001.htm>
More information about the core-libs-dev
mailing list