Skip to content

Commit

Permalink
Merge pull request #5988 from Martchus/worker-limit
Browse files Browse the repository at this point in the history
Allow limiting number of worker websocket connections
  • Loading branch information
mergify[bot] authored Oct 10, 2024
2 parents 837b284 + 8afe0b7 commit 2ebe306
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 52 deletions.
4 changes: 4 additions & 0 deletions etc/openqa/openqa.ini
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ concurrent = 0
#assets_default_limit = 100000
## Maximum number of next jobs to include asset listing request (to prevent performance issues)
#assets_max_limit = 200000
## Maximum number of online workers (to prevent performance issues)
#max_online_workers = 1000
## Retry delay for limited workers in seconds
#worker_limit_retry_delay = 3600

[archiving]
## Moves logs of jobs which are preserved during the cleanup because they are considered important
Expand Down
15 changes: 3 additions & 12 deletions lib/OpenQA/Schema/Result/Workers.pm
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,7 @@ sub unprepare_for_work {
return $self;
}

sub info {
my $self = shift;
my ($live) = ref $_[0] eq 'HASH' ? @{$_[0]}{qw(live)} : @_;

sub info ($self) {
my $settings = {
id => $self->id,
host => $self->host,
Expand All @@ -190,14 +187,8 @@ sub info {
my $cs = $self->currentstep;
$settings->{currentstep} = $cs if $cs;
}
my $alive = $settings->{alive} = $settings->{connected} = $self->dead ? 0 : 1;
$settings->{websocket} = $live ? $alive : 0;

# note: The keys "connected" and "websocket" are only provided for compatibility. The "live"
# parameter makes no actual difference anymore. (`t_seen` is decreased when a worker
# disconnects from the ws server so relying on it is as live as it gets.)

return $settings;
$settings->{alive} = $settings->{connected} = $settings->{websocket} = $self->dead ? 0 : 1;
return $settings; # The keys "connected" and "websocket" are only provided for compatibility.
}

sub send_command {
Expand Down
2 changes: 2 additions & 0 deletions lib/OpenQA/Setup.pm
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ sub read_config ($app) {
job_settings_max_recent_jobs => 20000,
assets_default_limit => 100000,
assets_max_limit => 200000,
max_online_workers => 1000,
worker_limit_retry_delay => ONE_HOUR / 4,
},
archiving => {
archive_preserved_important_jobs => 0,
Expand Down
6 changes: 2 additions & 4 deletions lib/OpenQA/WebAPI/Controller/API/V1/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ websocket status.
sub list ($self) {
my $validation = $self->validation;
$validation->optional('limit')->num;
$validation->optional('live')->num(1);
$validation->optional('offset')->num;
return $self->reply->validation_error({format => 'json'}) if $validation->has_error;

my $limits = OpenQA::App->singleton->config->{misc_limits};
my $limit = min($limits->{generic_max_limit}, $validation->param('limit') // $limits->{generic_default_limit});
my $offset = $validation->param('offset') // 0;
my $live = $validation->param('live');

my @all = $self->schema->resultset('Workers')->search({}, {rows => $limit + 1, offset => $offset})->all;

Expand All @@ -63,7 +61,7 @@ sub list ($self) {
my $ret = [];
for my $worker (@all) {
next unless $worker->id;
push @$ret, $worker->info($live);
push @$ret, $worker->info;
}

$self->render(json => {workers => $ret});
Expand Down Expand Up @@ -225,7 +223,7 @@ sub show {
my ($self) = @_;
my $worker = $self->schema->resultset('Workers')->find($self->param('workerid'));
if ($worker) {
$self->render(json => {worker => $worker->info(1)});
$self->render(json => {worker => $worker->info});
}
else {
$self->reply->not_found;
Expand Down
9 changes: 4 additions & 5 deletions lib/OpenQA/WebAPI/Controller/Admin/Workers.pm
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ use OpenQA::Utils;
use OpenQA::WebAPI::ServerSideDataTable;
use Scalar::Util 'looks_like_number';

sub _extend_info ($w, $live = undef) {
$live //= 0;
my $info = $w->info($live);
sub _extend_info ($w) {
my $info = $w->info;
$info->{name} = $w->name;
my $error = $info->{error};
if ($live && $error && ($error =~ qr/(graceful disconnect) at (.*)/)) {
if ($error && ($error =~ qr/(graceful disconnect|limited) at (.*)/)) {
$info->{offline_note} = $1;
$info->{t_seen} = $2 . 'Z';
$info->{alive} = undef;
Expand Down Expand Up @@ -60,7 +59,7 @@ sub index ($self) {
sub show ($self) {
my $w = $self->schema->resultset('Workers')->find($self->param('worker_id'))
or return $self->reply->not_found;
$self->stash(worker => _extend_info($w, 1));
$self->stash(worker => _extend_info($w));

$self->render('admin/workers/show');
}
Expand Down
6 changes: 2 additions & 4 deletions lib/OpenQA/WebSockets/Controller/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,17 @@ use constant LOG_WORKER_STATUS_MESSAGES => $ENV{OPENQA_LOG_WORKER_STATUS_MESSAGE
sub ws {
my ($self) = @_;
my $status = $self->status;
my $transaction = $self->tx;

# add worker connection
my $worker_id = $self->param('workerid');
return $self->render(text => 'No worker ID', status => 400) unless $worker_id;
my $worker = $status->add_worker_connection($worker_id, $transaction);
return $self->render(text => 'Unknown worker', status => 400) unless $worker;
return undef unless defined $status->add_worker_connection($worker_id, $self);

# upgrade connection to websocket by subscribing to events
$self->on(json => \&_message);
$self->on(finish => \&_finish);
$self->inactivity_timeout(0); # Do not force connection close due to inactivity
$transaction->max_websocket_size(10485760);
$self->tx->max_websocket_size(10485760);
}

sub _finish {
Expand Down
37 changes: 29 additions & 8 deletions lib/OpenQA/WebSockets/Model/Status.pm
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
# SPDX-License-Identifier: GPL-2.0-or-later

package OpenQA::WebSockets::Model::Status;
use Mojo::Base -base;
use Mojo::Base -base, -signatures;

use DateTime;
use Time::Seconds;
use OpenQA::Schema;
use OpenQA::Schema::Result::Workers ();
use OpenQA::Jobs::Constants;
Expand All @@ -13,15 +15,30 @@ has [qw(workers worker_by_transaction)] => sub { {} };

sub singleton { state $status ||= __PACKAGE__->new }

sub add_worker_connection {
my ($self, $worker_id, $transaction) = @_;
sub _is_limit_exceeded ($self, $worker_db, $worker_is_new, $controller) {
my $misc_limits = $controller->app->config->{misc_limits};
my $limit = $misc_limits->{max_online_workers};
return 0 if !defined($limit) || $limit > keys %{$self->worker_by_transaction};
$worker_db->discard_changes unless $worker_is_new;
return 0 if defined($worker_db->job_id); # allow workers that work on a job
$worker_db->update({t_seen => undef, error => 'limited at ' . DateTime->now(time_zone => 'UTC')});
$controller->res->headers->append('Retry-After' => $misc_limits->{worker_limit_retry_delay});
$controller->render(text => 'Limit of worker connections exceeded', status => 429);
return 1;
}

sub add_worker_connection ($self, $worker_id, $controller) {

# add new worker entry if no exists yet
my $workers = $self->workers;
my $worker = $workers->{$worker_id};
if (!defined $worker) {
my $schema = OpenQA::Schema->singleton;
return undef unless my $db = $schema->resultset('Workers')->find($worker_id);
my $worker_is_new = !defined $worker;
if ($worker_is_new) {
my $db = OpenQA::Schema->singleton->resultset('Workers')->find($worker_id);
if (!$db) {
$controller->render(text => 'Unknown worker', status => 400);
return undef;
}
$worker = $workers->{$worker_id} = {
id => $worker_id,
db => $db,
Expand All @@ -34,12 +51,16 @@ sub add_worker_connection {
log_debug "Finishing current connection of worker $worker_id before accepting new one";
$current_tx->finish(1008 => 'only one connection per worker allowed, finishing old one in favor of new one');
}
else {
return undef if $self->_is_limit_exceeded($worker->{db}, $worker_is_new, $controller);
}

$self->worker_by_transaction->{$transaction} = $worker;
my $new_tx = $controller->tx;
$self->worker_by_transaction->{$new_tx} = $worker;

# assign the transaction to have always the most recent web socket connection for a certain worker
# available
$worker->{tx} = $transaction;
$worker->{tx} = $new_tx;

return $worker;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/OpenQA/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ sub _handle_client_status_changed ($self, $client, $event_data) {
}
# handle failures where it makes sense to reconnect
elsif ($status eq 'failed') {
my $interval = $ENV{OPENQA_WORKER_CONNECT_INTERVAL} // 10;
my $interval = $event_data->{retry_after} // $ENV{OPENQA_WORKER_CONNECT_INTERVAL} // 10;
log_warning("$error_message - trying again in $interval seconds");
Mojo::IOLoop->timer($interval => sub { $client->register() });
# stop current job if not accepted yet but out of acceptance attempts
Expand Down
3 changes: 2 additions & 1 deletion lib/OpenQA/Worker/WebUIConnection.pm
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ sub _setup_websocket_connection ($self, $websocket_url = undef) {
$self->websocket_connection(undef);

my $error = $tx->error;
my $retry_after = $tx->res->headers->header('Retry-After');
my $error_message = "Unable to upgrade to ws connection via $websocket_url";
$error_message .= ", code $error->{code}" if ($error && $error->{code});
$self->_set_status(failed => {error_message => $error_message});
$self->_set_status(failed => {error_message => $error_message, retry_after => $retry_after});
return undef;
}

Expand Down
20 changes: 14 additions & 6 deletions t/24-worker-webui-connection.t
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ subtest 'attempt to register and send a command' => sub {
);
Mojo::IOLoop->start;
}
qr/Connection error: Can't connect:.*(remaining tries: 0)/s, 'error logged';
qr/Connection error:.*(remaining tries: 0)/s, 'error logged';
is($callback_invoked, 1, 'callback has been invoked');
is($client->worker->stop_current_job_called,
0, 'not attempted to stop current job because it is from different web UI');
Expand Down Expand Up @@ -156,13 +156,21 @@ subtest 'attempt to register and send a command' => sub {
[{status => 'registering', error_message => undef}, {status => 'failed'}],
'events emitted',
)
and like(
$error_message,
qr{Failed to register at http://test-host - connection error: Can't connect:.*},
'error message',
)) or diag explain \@happened_events;
and like($error_message, qr{Failed to register at http://test-host - connection error:.*}, 'error message')
) or diag explain \@happened_events;
};

subtest 'attempt to setup websocket connection' => sub {
my @expected_events = (
{status => 'establishing_ws', error_message => undef},
{status => 'failed', error_message => 'Unable to upgrade to ws connection via http://test-host/api/v1/ws/42'},
);
@happened_events = ();
$client->_setup_websocket_connection;
$client->once(status_changed => sub ($status, @) { Mojo::IOLoop->stop if $status eq 'failed' });
Mojo::IOLoop->start;
is_deeply \@happened_events, \@expected_events, 'events emitted' or diag explain \@happened_events;
};

subtest 'retry behavior' => sub {
# use fake Mojo::UserAgent and Mojo::Transaction
Expand Down
24 changes: 18 additions & 6 deletions t/27-websockets.t
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ use Mojo::JSON;
my $schema = OpenQA::Test::Database->new->create(fixtures_glob => '01-jobs.pl 02-workers.pl 03-users.pl');
my $t = Test::Mojo->new('OpenQA::WebSockets');
my $t2 = client(Test::Mojo->new('OpenQA::WebSockets'));
my $misc_limits = $t->app->config->{misc_limits} //= {};
my $workers = $schema->resultset('Workers');
my $jobs = $schema->resultset('Jobs');
my $worker = $workers->find({host => 'localhost', instance => 1});
my $worker_id = $worker->id;
my $status = OpenQA::WebSockets::Model::Status->singleton->workers;

subtest 'Authentication' => sub {
my $app = $t->app;
Expand Down Expand Up @@ -59,14 +65,20 @@ subtest 'Exception' => sub {
};

subtest 'API' => sub {
$t->tx($t->ua->start($t->ua->build_websocket_tx('/ws/23')))->status_is(400)->content_like(qr/Unknown worker/);
$t->tx($t->ua->start($t->ua->build_websocket_tx('/ws/23')));
$t->status_is(400, 'no ws connection for unregistered worker');
$t->content_like(qr/Unknown worker/, 'error about unknown worker');

$worker->update({job_id => undef});
$misc_limits->{max_online_workers} = 0;
$misc_limits->{worker_limit_retry_delay} = 42;
$t->tx($t->ua->start($t->ua->build_websocket_tx('/ws/1')));
$t->status_is(429, 'no ws connection for limited worker')->content_like(qr/Limit.*exceeded/, 'error about limit');
$worker->discard_changes;
like $worker->error, qr/^limited at .*/, 'worker flagged as limited via error field excluding it from assignments';
};

my $workers = $schema->resultset('Workers');
my $jobs = $schema->resultset('Jobs');
my $worker = $workers->find({host => 'localhost', instance => 1});
my $worker_id = $worker->id;
my $status = OpenQA::WebSockets::Model::Status->singleton->workers;
$misc_limits->{max_online_workers} = undef;
$status->{$worker_id} = {id => $worker_id, db => $worker};

subtest 'web socket message handling' => sub {
Expand Down
11 changes: 6 additions & 5 deletions t/api/01-workers.t
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,14 @@ my @workers = (
instance => 1
});

$t->get_ok('/api/v1/workers?live=1')
->json_is('' => {workers => \@workers}, 'workers present with deprecated live flag');
diag explain $t->tx->res->json unless $t->success;
$_->{websocket} = 0 for @workers;
$t->get_ok('/api/v1/workers')->json_is('' => {workers => \@workers}, "workers present with deprecated websocket flag");
$t->get_ok('/api/v1/workers')->status_is(200, 'listing of all workers');
$t->json_is('' => {workers => \@workers}, 'workers present with deprecated websocket flag');
diag explain $t->tx->res->json unless $t->success;

$t->get_ok('/api/v1/workers/2')->status_is(200, 'info for existing individual worker');
$t->json_is('' => {worker => $workers[1]}, 'info for correct worker returned');
$t->get_ok('/api/v1/workers/3')->status_is(404, 'no info for non-existing worker');

my %worker_key = (host => 'localhost', instance => 1);
my %registration_params = %worker_key;
$t->post_ok('/api/v1/workers', form => \%registration_params)->status_is(400, 'worker with missing parameters refused')
Expand Down
2 changes: 2 additions & 0 deletions t/config.t
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ subtest 'Test configuration default modes' => sub {
job_settings_max_recent_jobs => 20000,
assets_default_limit => 100000,
assets_max_limit => 200000,
max_online_workers => 1000,
worker_limit_retry_delay => ONE_HOUR / 4,
},
archiving => {
archive_preserved_important_jobs => 0,
Expand Down

0 comments on commit 2ebe306

Please sign in to comment.