Skip to content

Commit

Permalink
add flag for batch enqueue delayed
Browse files Browse the repository at this point in the history
  • Loading branch information
onyxraven committed Aug 19, 2024
1 parent d7452f7 commit 45ce5fe
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 49 deletions.
9 changes: 3 additions & 6 deletions .github/workflows/ruby.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ on:
pull_request:
branches: [master]


# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}'
Expand All @@ -33,16 +32,16 @@ jobs:
ruby-version:
- "3.0"
- "3.1"
# - "3.2"
# - "3.3"
- "3.2"
- "3.3"
resque-version:
# - "master"
# - "~> 2.6"
- "~> 1.27"
rufus-scheduler:
# - "~> 3.6.0"
# - "~> 3.7.0"
- "~> 3.8.0"
# - "~> 3.8.0"
- "~> 3.9"
redis-version:
- "~> 3.3"
Expand All @@ -57,8 +56,6 @@ jobs:
RUFUS_SCHEDULER: "${{ matrix.rufus-scheduler }}"
COVERAGE: 1

name: "ruby ${{matrix.ruby-version}}, resque ${{matrix.resque-version}}, rufus-scheduler ${{matrix.rufus-scheduler}}, redis-rb ${{matrix.redis-version}}"

steps:
- uses: actions/checkout@v4
- uses: ruby/setup-ruby@v1
Expand Down
25 changes: 18 additions & 7 deletions lib/resque/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,25 +204,36 @@ def enqueue_next_item(timestamp)
item
end

def batch_delayed_items?
enable_delayed_requeue_batches || delayed_requeue_batch_size <= 1
end

# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
count = 0
batch_size = delayed_requeue_batch_size
actual_batch_size = nil
batch_size = batch_delayed_items? ? delayed_requeue_batch_size : 1

log "Processing delayed items for timestamp #{timestamp}, in batches of #{batch_size}"
message = "Processing delayed items for timestamp #{timestamp}"
message += ", in batches of #{batch_size}" if batch_delayed_items?
log message

loop do
actual_batch_size = 0

handle_shutdown do
# Continually check that it is still the master
if am_master
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp,
batch_size)
if batch_delayed_items?
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp, batch_size)
log "queued batch of #{actual_batch_size} jobs" if actual_batch_size != -1
else
item = enqueue_next_item(timestamp)
actual_batch_size = item.nil? ? 0 : 1
end
end
end

count += actual_batch_size
log "queued #{count} jobs" if actual_batch_size != -1

# continue processing until there are no more items in this
# timestamp. If we don't have a full batch, this is the last one.
Expand All @@ -231,7 +242,7 @@ def enqueue_delayed_items_for_timestamp(timestamp)
break if actual_batch_size < batch_size
end

log "finished queueing #{count} total jobs for timestamp #{timestamp}" if count != -1
log "finished queueing #{count} total jobs for timestamp #{timestamp}"
end

def timestamp_key(timestamp)
Expand Down
8 changes: 8 additions & 0 deletions lib/resque/scheduler/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,20 @@ def app_name
@app_name ||= environment['APP_NAME']
end

attr_writer :delayed_requeue_batch_size

def delayed_requeue_batch_size
@delayed_requeue_batch_size ||= \
ENV['DELAYED_REQUEUE_BATCH_SIZE'].to_i if environment['DELAYED_REQUEUE_BATCH_SIZE']
@delayed_requeue_batch_size ||= 100
end

attr_writer :enable_delayed_requeue_batches

def enable_delayed_requeue_batches
@enable_delayed_requeue_batches ||= to_bool(environment['ENABLE_DELAYED_REQUEUE_BATCH'])
end

# Amount of time in seconds to sleep between polls of the delayed
# queue. Defaults to 5
attr_writer :poll_sleep_amount
Expand Down
127 changes: 91 additions & 36 deletions test/delayed_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -391,60 +391,115 @@ def assert_resque_key_exists?(key)
assert_equal(1, Resque.delayed_timestamp_peek(t, 0, 3).length)
end

test 'enqueue_delayed_items_for_timestamp enqueues jobs in 2 batches' do
t = Time.now + 60
context "non-batch delayed item queue" do
batch_enabled = Resque::Scheduler.enable_delayed_requeue_batches
batch_size = Resque::Scheduler.delayed_requeue_batch_size
setup do
Resque::Scheduler.quiet = true
Resque.data_store.redis.flushall
Resque::Scheduler.enable_delayed_requeue_batches = false
Resque::Scheduler.delayed_requeue_batch_size = 1
end

# create 120 jobs
120.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(120, Resque.delayed_timestamp_size(t))
teardown do
Resque::Scheduler.enable_delayed_requeue_batches = batch_enabled
Resque::Scheduler.delayed_requeue_batch_size = batch_size
end

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
assert_equal(0, Resque.delayed_timestamp_size(t))
test 'enqueue_delayed_items_for_timestamp enqueues jobs for the timestamp' do
t = Time.now + 60

# assert that the active queue is now 120
assert_equal(120, Resque.size(Resque.queue_from_class(SomeIvarJob)))
end
Resque::Scheduler.expects(:enqueue_items_in_batch_for_timestamp).never

test 'enqueue_delayed_items_for_timestamp enqueues jobs in one batch for the timestamp' do
t = Time.now + 60
# create 90 jobs
90.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(90, Resque.delayed_timestamp_size(t))

# create 90 jobs
90.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(90, Resque.delayed_timestamp_size(t))
Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
assert_equal(0, Resque.delayed_timestamp_size(t))

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
assert_equal(0, Resque.delayed_timestamp_size(t))
# assert that the active queue is now 90
assert_equal(90, Resque.size(Resque.queue_from_class(SomeIvarJob)))
end

# assert that the active queue is now 90
assert_equal(90, Resque.size(Resque.queue_from_class(SomeIvarJob)))
# TODO clean up timestamp?
end

# test to make sure the timestamp is cleaned up
context "batch delayed item queue" do
batch_enabled = Resque::Scheduler.enable_delayed_requeue_batches
batch_size = Resque::Scheduler.delayed_requeue_batch_size
setup do
Resque::Scheduler.quiet = true
Resque.data_store.redis.flushall
Resque::Scheduler.enable_delayed_requeue_batches = true
Resque::Scheduler.delayed_requeue_batch_size = 100
end

test 'enqueue_delayed_items_for_timestamp handles a watch failure' do
t = Time.now + 60
teardown do
Resque::Scheduler.enable_delayed_requeue_batches = batch_enabled
Resque::Scheduler.delayed_requeue_batch_size = batch_size
end

# create 100 jobs
100.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(100, Resque.delayed_timestamp_size(t))
test 'enqueue_delayed_items_for_timestamp enqueues jobs in 2 batches' do
t = Time.now + 60

Resque.redis.stubs(:watch).returns(nil)
Resque::Scheduler.expects(:enqueue_next_item).never

Resque.expects(:clean_up_timestamp).never
# create 120 jobs
120.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(120, Resque.delayed_timestamp_size(t))

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
end
Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
assert_equal(0, Resque.delayed_timestamp_size(t))

test 'enqueue_delayed_items_for_timestamp cleans up a timestamp' do
t = Time.now + 60
# assert that the active queue is now 120
assert_equal(120, Resque.size(Resque.queue_from_class(SomeIvarJob)))
end

# create 100 jobs
100.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(100, Resque.delayed_timestamp_size(t))
test 'enqueue_delayed_items_for_timestamp enqueues jobs in one batch for the timestamp' do
t = Time.now + 60

Resque.expects(:clean_up_timestamp).once
Resque::Scheduler.expects(:enqueue_next_item).never

# create 90 jobs
90.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(90, Resque.delayed_timestamp_size(t))

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
assert_equal(0, Resque.delayed_timestamp_size(t))

# assert that the active queue is now 90
assert_equal(90, Resque.size(Resque.queue_from_class(SomeIvarJob)))
end

# test to make sure the timestamp is cleaned up

test 'enqueue_delayed_items_for_timestamp handles a watch failure' do
t = Time.now + 60

# create 100 jobs
100.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(100, Resque.delayed_timestamp_size(t))

Resque.redis.stubs(:watch).returns(nil)

Resque.expects(:clean_up_timestamp).never

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
end

test 'enqueue_delayed_items_for_timestamp cleans up a timestamp' do
t = Time.now + 60

# create 100 jobs
100.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(100, Resque.delayed_timestamp_size(t))

Resque.expects(:clean_up_timestamp).once

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
end

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
end

test 'enqueue_delayed_items_for_timestamp creates jobs ' \
Expand Down

0 comments on commit 45ce5fe

Please sign in to comment.