Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent on_cancellation_job & on_completion_job deserialization failure blocking cleanup #24

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
# Changelog
## 0.8.0
* Failure to deserialize on_cancellation_job or on_completion_job will not prevent clean up of the job group.
* Adds `failed_at` to `delayed_job_groups` table.
Use `bundle exec rails g delayed_job_groups_plugin:add_failed_at_to_delayed_job_groups` to generate the migration to
add this column.

### 0.7.0
* Add support for ruby 3
* Drop support for ruby < 2.6
Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,24 @@ job_group.cancel
Configuration to allow failed jobs not to cancel the group
```ruby
# We can optionally pass options that will allow jobs to fail without cancelling the group.
# This also allows the on_completion job to fire once all jobs have either succeeded or failed.
# This also allows the on_completion job to fire once all jobs have either succeeded or failed.
job_group = Delayed::JobGroups::JobGroup.create!(failure_cancels_group: false)
```

### Job Group Plugin Options

The job group plugin can be configured in an initializer (e.g. `config/initializers/delayed_job_groups_plugin.rb`) as follows:

```ruby
Delayed::JobGroups.configure do |configuration|
configuration.error_reporter = Proc.new { |error| Bugsnag.notify(error) }
end
```

The plugin supports the following options (all of which are optional):

* `error_reporter` - a callback proc that accepts an `Exception` if the plugin encounters an unexpected error. This can be useful for reporting to an error monitoring system.

## Supported Platforms

* Only the Delayed Job Active Record backend is supported.
Expand Down
9 changes: 9 additions & 0 deletions lib/delayed/job_groups/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module Delayed
module JobGroups
class Configuration
attr_accessor :error_reporter
end
end
end
25 changes: 23 additions & 2 deletions lib/delayed/job_groups/job_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,19 @@ def unblock
end

def cancel
Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job
destroy
self.class.transaction do
# Deserialization of the job or its options can fail
Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job
destroy
end
rescue TypeError, LoadError, NameError, ArgumentError, SyntaxError, Psych::SyntaxError => e
Delayed::Worker.logger.info('Failed to deserialize the on_cancellation_job or on_cancellation_job_options ' \
"for job_group_id=#{id}. Skipping on_cancellation_job to clean up job group.")
error_reporter.call(e) if error_reporter
self.class.transaction do
update_columns(failed_at: Time.now)
queued_jobs.delete_all
end
end

def self.check_for_completion(job_group_id)
Expand Down Expand Up @@ -80,8 +91,18 @@ def ready_for_completion?
end

def complete
# Deserialization of the job or its options can fail
Delayed::Job.enqueue(on_completion_job, on_completion_job_options || {}) if on_completion_job
destroy
rescue TypeError, LoadError, NameError, ArgumentError, SyntaxError, Psych::SyntaxError => e
Delayed::Worker.logger.info('Failed to deserialize the on_completion_job or on_completion_job_options for ' \
"job_group_id=#{id}. Skipping on_completion_job to clean up job group.")
error_reporter.call(e) if error_reporter
update_columns(failed_at: Time.now)
end

def error_reporter
Delayed::JobGroups.configuration.error_reporter
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/job_groups/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Delayed
module JobGroups
VERSION = '0.7.0'
VERSION = '0.8.0'
end
end
15 changes: 15 additions & 0 deletions lib/delayed_job_groups_plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'active_record'
require 'delayed_job'
require 'delayed_job_active_record'
require 'delayed/job_groups/configuration'
require 'delayed/job_groups/compatibility'
require 'delayed/job_groups/job_extensions'
require 'delayed/job_groups/job_group'
Expand All @@ -20,3 +21,17 @@
end

Delayed::Worker.plugins << Delayed::JobGroups::Plugin

module Delayed
module JobGroups
@configuration = Delayed::JobGroups::Configuration.new

class << self
attr_reader :configuration

def configure
yield(configuration) if block_given?
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require 'rails/generators'
require 'rails/generators/migration'
require 'rails/generators/active_record'

module DelayedJobGroupsPlugin
class AddFailedAtToDelayedJobGroupsGenerator < Rails::Generators::Base
include Rails::Generators::Migration

source_paths << File.join(File.dirname(__FILE__), 'templates')

def create_migration_file
migration_template('add_failed_at_to_delayed_job_groups.erb', 'db/migrate/add_failed_at_to_delayed_job_groups.rb')
end

def self.next_migration_number(dirname)
ActiveRecord::Generators::Base.next_migration_number(dirname)
end

end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# frozen_string_literal: true

class AddFailedAtToDelayedJobGroups < ActiveRecord::Migration[<%= ActiveRecord::VERSION::MAJOR %>.<%= ActiveRecord::VERSION::MINOR %>]

def up
add_column(:delayed_job_groups, :failed_at, :timestamp)
end

def down
remove_column(:delayed_job_groups, :failed_at)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class CreateDelayedJobGroups < ActiveRecord::Migration[<%= ActiveRecord::VERSION
t.boolean :failure_cancels_group, default: true, null: false
t.boolean :queueing_complete, default: false, null: false
t.boolean :blocked, default: false, null: false
t.timestamp :failed_at
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
t.boolean :failure_cancels_group, default: true, null: false
t.boolean :queueing_complete, default: false, null: false
t.boolean :blocked, default: false, null: false
t.timestamp :failed_at
end
end
108 changes: 98 additions & 10 deletions spec/delayed/job_groups/job_group_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,51 @@
expect(job_group).to have_been_destroyed
end
end

context "on_completion_job refers to missing class" do
let(:error_reporter) { Proc.new { |_error| } }

# The on_completion_job needs the class to be defined this way in order to serialize it
# rubocop:disable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock
before do
module Delayed::JobGroups::JobGroupTestHelper
class OnCompletionJob

end
end

allow(error_reporter).to receive(:call)
end
# rubocop:enable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock


around do |example|
original_error_reporter = Delayed::JobGroups.configuration.error_reporter
Delayed::JobGroups.configuration.error_reporter = error_reporter
example.run
Delayed::JobGroups.configuration.error_reporter = original_error_reporter
end

it "handles missing on_completion_job" do
on_completion_job = Delayed::JobGroups::JobGroupTestHelper::OnCompletionJob.new
job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: on_completion_job,
on_completion_job_options: {})
job = Delayed::Job.create!(job_group_id: job_group.id)
job_group.mark_queueing_complete
job.destroy

# Remove the class for on_completion_job
Delayed::JobGroups::JobGroupTestHelper.module_eval do
remove_const 'OnCompletionJob'
end

# Deserialization fails
expect { Delayed::JobGroups::JobGroup.check_for_completion(job_group.id) }.not_to raise_error
expect(error_reporter).to have_received(:call)
expect(job_group).not_to have_been_destroyed
expect(job_group.reload.failed_at).to be_present
end
end
end

describe "#enqueue" do
Expand Down Expand Up @@ -212,20 +257,63 @@
let!(:queued_job) { Delayed::Job.create!(job_group_id: job_group.id) }
let!(:running_job) { Delayed::Job.create!(job_group_id: job_group.id, locked_at: Time.now, locked_by: 'test') }

before do
job_group.cancel
end
context "with no on_cancellation_job" do
before do
job_group.cancel
end

it "destroys the job group" do
expect(job_group).to have_been_destroyed
end
it "destroys the job group" do
expect(job_group).to have_been_destroyed
end

it "destroys queued jobs" do
expect(queued_job).to have_been_destroyed
it "destroys queued jobs" do
expect(queued_job).to have_been_destroyed
end

it "does not destroy running jobs" do
expect(running_job).not_to have_been_destroyed
end
end

it "does not destroy running jobs" do
expect(running_job).not_to have_been_destroyed
context "on_cancellation_job refers to missing class" do
let(:error_reporter) { Proc.new { |_error| } }

# The on_cancellation_job needs the class to be defined this way in order to serialize it
# rubocop:disable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock
before do
module Delayed::JobGroups::JobGroupTestHelper
class OnCancellationJob

end
end

allow(error_reporter).to receive(:call)
end
# rubocop:enable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock

around do |example|
original_error_reporter = Delayed::JobGroups.configuration.error_reporter
Delayed::JobGroups.configuration.error_reporter = error_reporter
example.run
Delayed::JobGroups.configuration.error_reporter = original_error_reporter
end

it "handles missing on_cancellation_job" do
on_cancellation_job = Delayed::JobGroups::JobGroupTestHelper::OnCancellationJob.new
job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: on_cancellation_job,
on_cancellation_job_options: {})

# Remove the class for on_cancellation_job
Delayed::JobGroups::JobGroupTestHelper.module_eval do
remove_const 'OnCancellationJob'
end

# Deserialization fails
expect { job_group.cancel }.not_to raise_error
expect(error_reporter).to have_received(:call)
expect(job_group).not_to have_been_destroyed
expect(job_group.reload.failed_at).to be_present
end
end
end

Expand Down