Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow jobs touching #165

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions HOOKS.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ There are a variety of hooks available that are triggered during the lifecycle o
* `on_failure`: Called with the exception and job args if any exception occurs
while performing the job (or hooks).

* `on_touch`: Called with the job args when the job is touched.

Hooks are just methods prefixed with the hook type. For example:

```ruby
Expand Down
51 changes: 43 additions & 8 deletions lib/backburner/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def process
# b) ttr == 1, so that we don't accidentally set it to never time out
# NB: A ttr of 1 will likely result in race conditions between
# Backburner and beanstalkd and should probably be avoided
timeout_job_after(task.ttr > 1 ? task.ttr - 1 : task.ttr) { job_class.perform(*args) }
start_job { job_class.perform(*args) }
end
task.delete
# Invoke after perform hook
Expand All @@ -73,6 +73,11 @@ def retry(count, delay)
task.release(delay: delay)
end

def touch
@hooks.invoke_hook_events(job_name, :on_touch, *args)
task.touch
end

protected

# Returns the class for the job handler
Expand Down Expand Up @@ -101,16 +106,46 @@ def try_job_class
nil
end

# Timeout job within specified block after given time.
# Start the specified block using the same timeout as beaneater.
#
# @example
# start_job { do_something! }
#
def start_job(&block)
return yield if task.stats.ttr == 0

current_thread = Thread.current
block_thread = Thread.start do
begin
yield
rescue JobTimeout => e
current_thread.raise JobTimeout, "#{name}(#{(@args||[]).join(', ')}) hit #{task.stats.ttr}s timeout.\nbacktrace: #{e.backtrace}"
rescue => e
current_thread.raise e
end
end
timer_thread = job_timer(block_thread)
block_thread.join
timer_thread.kill
end

# Start a thread checking the time left of the job from beanstalk.
# If timed out, bury the job and raise an error on the job's thread to make it stop.
#
# @example
# timeout_job_after(3) { do_something! }
# job_timer(thread)
#
def timeout_job_after(secs, &block)
begin
Timeout::timeout(secs) { yield }
rescue Timeout::Error => e
raise JobTimeout, "#{name}(#{(@args||[]).join(', ')}) hit #{secs}s timeout.\nbacktrace: #{e.backtrace}"
def job_timer(watched_thread)
Thread.start do
while(task.stats.time_left > 0) do
sleep(task.stats.time_left)
end

if watched_thread.alive?
# If we don't bury the job here, beaneater return a NOT_FOUND error when work_one_job tries to bury it
task.bury
watched_thread.raise JobTimeout.new
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/backburner/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def work_one_job(conn = connection)
job.retry(num_retries + 1, delay)
self.log_job_end(job.name, "#{retry_status}, retrying in #{delay}s") if job_started_at
else # retries failed, bury
job.bury
job.bury if job.stats.state != "buried"
self.log_job_end(job.name, "#{retry_status}, burying") if job_started_at
end

Expand Down
7 changes: 7 additions & 0 deletions test/hooks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@
end
end

describe 'with on_touch' do
it "should support successful invocation" do
out = silenced { @hooks.invoke_hook_events(HookedObjectSuccess, :on_touch, 10) }
assert_match(/!!on_touch_foo!! \[10\]/, out)
end
end

describe "with on_reconnect" do
it "should support successful invocation" do
out = silenced { @hooks.invoke_hook_events(HookedWorker.new, :on_reconnect)}
Expand Down
8 changes: 8 additions & 0 deletions test/job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ def self.perform(x); raise RuntimeError; end
.with("AnUnknownClass", :on_retry, 0, is_a(Integer), anything)
@job.retry(0, 0)
end

it "should call touch for task" do
@task.expects(:touch).once
@job = Backburner::Job.new(@task)
Backburner::Hooks.expects(:invoke_hook_events)
.with("AnUnknownClass", :on_touch, anything)
@job.touch
end
end
end # simple delegation

Expand Down