Skip to content

Commit

Permalink
Merge pull request #732 from nolar/queueing-consistency
Browse files Browse the repository at this point in the history
Prevent loss of events under stress-load or sync-blockers
  • Loading branch information
nolar authored Jul 4, 2021
2 parents 129fe4c + 4e22ff5 commit d147687
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions kopf/_core/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,27 @@ async def worker(
try:
while not shouldstop:

# Try ASAP, but give it few seconds for the new events to arrive, maybe.
# If the queue is empty for some time, then indeed finish the object's worker.
# If the queue is filled, use the latest event only (within the short timeframe).
# Try ASAP, but give it a few seconds for the new events to arrive.
# If the queue is empty for some time, then finish the object's worker.
# If the queue is filled, use the latest event only (within a short time window).
# If an EOS marker is received, handle the last real event, then finish the worker ASAP.
try:
raw_event = await asyncio.wait_for(
backlog.get(),
timeout=settings.batching.idle_timeout)
except asyncio.TimeoutError:
break
# A tricky part! Under high-load or with synchronous blocks of asyncio event-loop,
# it is possible that the timeout happens while the queue is filled: depending on
# the order in which the coros/waiters are checked once control returns to asyncio.
# As a work-around, we double-check the queue and exit only if it is truly empty;
# if not, run as normally. IMPORTANT: There MUST be NO async/await-code between
# "break" and "finally", so that the queue is not populated again.
# TODO: LATER: Test the described scenario. I have found no ways to simulate
# a timeout while the queue is filled -- neither with pure Python nor with mocks.
if backlog.empty():
break
else:
continue
else:
try:
while True:
Expand Down

0 comments on commit d147687

Please sign in to comment.