diff --git a/etc/openqa/openqa.ini b/etc/openqa/openqa.ini index c355b2cf5eb..cf845414dc5 100644 --- a/etc/openqa/openqa.ini +++ b/etc/openqa/openqa.ini @@ -263,6 +263,10 @@ concurrent = 0 #assets_default_limit = 100000 ## Maximum number of next jobs to include asset listing request (to prevent performance issues) #assets_max_limit = 200000 +## Maximum number of online workers (to prevent performance issues) +#max_online_workers = 1000 +## Retry delay for limited workers in seconds +#worker_limit_retry_delay = 3600 [archiving] ## Moves logs of jobs which are preserved during the cleanup because they are considered important diff --git a/lib/OpenQA/Schema/Result/Workers.pm b/lib/OpenQA/Schema/Result/Workers.pm index e928a93794a..40ff4b89904 100644 --- a/lib/OpenQA/Schema/Result/Workers.pm +++ b/lib/OpenQA/Schema/Result/Workers.pm @@ -168,10 +168,7 @@ sub unprepare_for_work { return $self; } -sub info { - my $self = shift; - my ($live) = ref $_[0] eq 'HASH' ? @{$_[0]}{qw(live)} : @_; - +sub info ($self) { my $settings = { id => $self->id, host => $self->host, @@ -190,14 +187,8 @@ sub info { my $cs = $self->currentstep; $settings->{currentstep} = $cs if $cs; } - my $alive = $settings->{alive} = $settings->{connected} = $self->dead ? 0 : 1; - $settings->{websocket} = $live ? $alive : 0; - - # note: The keys "connected" and "websocket" are only provided for compatibility. The "live" - # parameter makes no actual difference anymore. (`t_seen` is decreased when a worker - # disconnects from the ws server so relying on it is as live as it gets.) - - return $settings; + $settings->{alive} = $settings->{connected} = $settings->{websocket} = $self->dead ? 0 : 1; + return $settings; # The keys "connected" and "websocket" are only provided for compatibility. } sub send_command { diff --git a/lib/OpenQA/Setup.pm b/lib/OpenQA/Setup.pm index 7071e165225..bedfe749b06 100644 --- a/lib/OpenQA/Setup.pm +++ b/lib/OpenQA/Setup.pm @@ -182,6 +182,8 @@ sub read_config ($app) { job_settings_max_recent_jobs => 20000, assets_default_limit => 100000, assets_max_limit => 200000, + max_online_workers => 1000, + worker_limit_retry_delay => ONE_HOUR / 4, }, archiving => { archive_preserved_important_jobs => 0, diff --git a/lib/OpenQA/WebAPI/Controller/API/V1/Worker.pm b/lib/OpenQA/WebAPI/Controller/API/V1/Worker.pm index b55463572e5..db40a3cd672 100644 --- a/lib/OpenQA/WebAPI/Controller/API/V1/Worker.pm +++ b/lib/OpenQA/WebAPI/Controller/API/V1/Worker.pm @@ -45,14 +45,12 @@ websocket status. sub list ($self) { my $validation = $self->validation; $validation->optional('limit')->num; - $validation->optional('live')->num(1); $validation->optional('offset')->num; return $self->reply->validation_error({format => 'json'}) if $validation->has_error; my $limits = OpenQA::App->singleton->config->{misc_limits}; my $limit = min($limits->{generic_max_limit}, $validation->param('limit') // $limits->{generic_default_limit}); my $offset = $validation->param('offset') // 0; - my $live = $validation->param('live'); my @all = $self->schema->resultset('Workers')->search({}, {rows => $limit + 1, offset => $offset})->all; @@ -63,7 +61,7 @@ sub list ($self) { my $ret = []; for my $worker (@all) { next unless $worker->id; - push @$ret, $worker->info($live); + push @$ret, $worker->info; } $self->render(json => {workers => $ret}); @@ -225,7 +223,7 @@ sub show { my ($self) = @_; my $worker = $self->schema->resultset('Workers')->find($self->param('workerid')); if ($worker) { - $self->render(json => {worker => $worker->info(1)}); + $self->render(json => {worker => $worker->info}); } else { $self->reply->not_found; diff --git a/lib/OpenQA/WebAPI/Controller/Admin/Workers.pm b/lib/OpenQA/WebAPI/Controller/Admin/Workers.pm index e558e01ce47..a0e01155a00 100644 --- a/lib/OpenQA/WebAPI/Controller/Admin/Workers.pm +++ b/lib/OpenQA/WebAPI/Controller/Admin/Workers.pm @@ -8,12 +8,11 @@ use OpenQA::Utils; use OpenQA::WebAPI::ServerSideDataTable; use Scalar::Util 'looks_like_number'; -sub _extend_info ($w, $live = undef) { - $live //= 0; - my $info = $w->info($live); +sub _extend_info ($w) { + my $info = $w->info; $info->{name} = $w->name; my $error = $info->{error}; - if ($live && $error && ($error =~ qr/(graceful disconnect) at (.*)/)) { + if ($error && ($error =~ qr/(graceful disconnect|limited) at (.*)/)) { $info->{offline_note} = $1; $info->{t_seen} = $2 . 'Z'; $info->{alive} = undef; @@ -60,7 +59,7 @@ sub index ($self) { sub show ($self) { my $w = $self->schema->resultset('Workers')->find($self->param('worker_id')) or return $self->reply->not_found; - $self->stash(worker => _extend_info($w, 1)); + $self->stash(worker => _extend_info($w)); $self->render('admin/workers/show'); } diff --git a/lib/OpenQA/WebSockets/Controller/Worker.pm b/lib/OpenQA/WebSockets/Controller/Worker.pm index 1cc0a212b59..29935ad6056 100644 --- a/lib/OpenQA/WebSockets/Controller/Worker.pm +++ b/lib/OpenQA/WebSockets/Controller/Worker.pm @@ -21,19 +21,17 @@ use constant LOG_WORKER_STATUS_MESSAGES => $ENV{OPENQA_LOG_WORKER_STATUS_MESSAGE sub ws { my ($self) = @_; my $status = $self->status; - my $transaction = $self->tx; # add worker connection my $worker_id = $self->param('workerid'); return $self->render(text => 'No worker ID', status => 400) unless $worker_id; - my $worker = $status->add_worker_connection($worker_id, $transaction); - return $self->render(text => 'Unknown worker', status => 400) unless $worker; + return undef unless defined $status->add_worker_connection($worker_id, $self); # upgrade connection to websocket by subscribing to events $self->on(json => \&_message); $self->on(finish => \&_finish); $self->inactivity_timeout(0); # Do not force connection close due to inactivity - $transaction->max_websocket_size(10485760); + $self->tx->max_websocket_size(10485760); } sub _finish { diff --git a/lib/OpenQA/WebSockets/Model/Status.pm b/lib/OpenQA/WebSockets/Model/Status.pm index c70f74b008c..25216c3c6d3 100644 --- a/lib/OpenQA/WebSockets/Model/Status.pm +++ b/lib/OpenQA/WebSockets/Model/Status.pm @@ -2,8 +2,10 @@ # SPDX-License-Identifier: GPL-2.0-or-later package OpenQA::WebSockets::Model::Status; -use Mojo::Base -base; +use Mojo::Base -base, -signatures; +use DateTime; +use Time::Seconds; use OpenQA::Schema; use OpenQA::Schema::Result::Workers (); use OpenQA::Jobs::Constants; @@ -13,15 +15,30 @@ has [qw(workers worker_by_transaction)] => sub { {} }; sub singleton { state $status ||= __PACKAGE__->new } -sub add_worker_connection { - my ($self, $worker_id, $transaction) = @_; +sub _is_limit_exceeded ($self, $worker_db, $worker_is_new, $controller) { + my $misc_limits = $controller->app->config->{misc_limits}; + my $limit = $misc_limits->{max_online_workers}; + return 0 if !defined($limit) || $limit > keys %{$self->worker_by_transaction}; + $worker_db->discard_changes unless $worker_is_new; + return 0 if defined($worker_db->job_id); # allow workers that work on a job + $worker_db->update({t_seen => undef, error => 'limited at ' . DateTime->now(time_zone => 'UTC')}); + $controller->res->headers->append('Retry-After' => $misc_limits->{worker_limit_retry_delay}); + $controller->render(text => 'Limit of worker connections exceeded', status => 429); + return 1; +} + +sub add_worker_connection ($self, $worker_id, $controller) { # add new worker entry if no exists yet my $workers = $self->workers; my $worker = $workers->{$worker_id}; - if (!defined $worker) { - my $schema = OpenQA::Schema->singleton; - return undef unless my $db = $schema->resultset('Workers')->find($worker_id); + my $worker_is_new = !defined $worker; + if ($worker_is_new) { + my $db = OpenQA::Schema->singleton->resultset('Workers')->find($worker_id); + if (!$db) { + $controller->render(text => 'Unknown worker', status => 400); + return undef; + } $worker = $workers->{$worker_id} = { id => $worker_id, db => $db, @@ -34,12 +51,16 @@ sub add_worker_connection { log_debug "Finishing current connection of worker $worker_id before accepting new one"; $current_tx->finish(1008 => 'only one connection per worker allowed, finishing old one in favor of new one'); } + else { + return undef if $self->_is_limit_exceeded($worker->{db}, $worker_is_new, $controller); + } - $self->worker_by_transaction->{$transaction} = $worker; + my $new_tx = $controller->tx; + $self->worker_by_transaction->{$new_tx} = $worker; # assign the transaction to have always the most recent web socket connection for a certain worker # available - $worker->{tx} = $transaction; + $worker->{tx} = $new_tx; return $worker; } diff --git a/lib/OpenQA/Worker.pm b/lib/OpenQA/Worker.pm index eec9d863ff9..a186099bf27 100644 --- a/lib/OpenQA/Worker.pm +++ b/lib/OpenQA/Worker.pm @@ -682,7 +682,7 @@ sub _handle_client_status_changed ($self, $client, $event_data) { } # handle failures where it makes sense to reconnect elsif ($status eq 'failed') { - my $interval = $ENV{OPENQA_WORKER_CONNECT_INTERVAL} // 10; + my $interval = $event_data->{retry_after} // $ENV{OPENQA_WORKER_CONNECT_INTERVAL} // 10; log_warning("$error_message - trying again in $interval seconds"); Mojo::IOLoop->timer($interval => sub { $client->register() }); # stop current job if not accepted yet but out of acceptance attempts diff --git a/lib/OpenQA/Worker/WebUIConnection.pm b/lib/OpenQA/Worker/WebUIConnection.pm index 1c31f0e852f..4abcb22f073 100644 --- a/lib/OpenQA/Worker/WebUIConnection.pm +++ b/lib/OpenQA/Worker/WebUIConnection.pm @@ -164,9 +164,10 @@ sub _setup_websocket_connection ($self, $websocket_url = undef) { $self->websocket_connection(undef); my $error = $tx->error; + my $retry_after = $tx->res->headers->header('Retry-After'); my $error_message = "Unable to upgrade to ws connection via $websocket_url"; $error_message .= ", code $error->{code}" if ($error && $error->{code}); - $self->_set_status(failed => {error_message => $error_message}); + $self->_set_status(failed => {error_message => $error_message, retry_after => $retry_after}); return undef; } diff --git a/t/24-worker-webui-connection.t b/t/24-worker-webui-connection.t index 56f1a943418..b6f62075d1a 100644 --- a/t/24-worker-webui-connection.t +++ b/t/24-worker-webui-connection.t @@ -124,7 +124,7 @@ subtest 'attempt to register and send a command' => sub { ); Mojo::IOLoop->start; } - qr/Connection error: Can't connect:.*(remaining tries: 0)/s, 'error logged'; + qr/Connection error:.*(remaining tries: 0)/s, 'error logged'; is($callback_invoked, 1, 'callback has been invoked'); is($client->worker->stop_current_job_called, 0, 'not attempted to stop current job because it is from different web UI'); @@ -156,13 +156,21 @@ subtest 'attempt to register and send a command' => sub { [{status => 'registering', error_message => undef}, {status => 'failed'}], 'events emitted', ) - and like( - $error_message, - qr{Failed to register at http://test-host - connection error: Can't connect:.*}, - 'error message', - )) or diag explain \@happened_events; + and like($error_message, qr{Failed to register at http://test-host - connection error:.*}, 'error message') + ) or diag explain \@happened_events; }; +subtest 'attempt to setup websocket connection' => sub { + my @expected_events = ( + {status => 'establishing_ws', error_message => undef}, + {status => 'failed', error_message => 'Unable to upgrade to ws connection via http://test-host/api/v1/ws/42'}, + ); + @happened_events = (); + $client->_setup_websocket_connection; + $client->once(status_changed => sub ($status, @) { Mojo::IOLoop->stop if $status eq 'failed' }); + Mojo::IOLoop->start; + is_deeply \@happened_events, \@expected_events, 'events emitted' or diag explain \@happened_events; +}; subtest 'retry behavior' => sub { # use fake Mojo::UserAgent and Mojo::Transaction diff --git a/t/27-websockets.t b/t/27-websockets.t index 61946fddbc8..a336ee9c108 100644 --- a/t/27-websockets.t +++ b/t/27-websockets.t @@ -29,6 +29,12 @@ use Mojo::JSON; my $schema = OpenQA::Test::Database->new->create(fixtures_glob => '01-jobs.pl 02-workers.pl 03-users.pl'); my $t = Test::Mojo->new('OpenQA::WebSockets'); my $t2 = client(Test::Mojo->new('OpenQA::WebSockets')); +my $misc_limits = $t->app->config->{misc_limits} //= {}; +my $workers = $schema->resultset('Workers'); +my $jobs = $schema->resultset('Jobs'); +my $worker = $workers->find({host => 'localhost', instance => 1}); +my $worker_id = $worker->id; +my $status = OpenQA::WebSockets::Model::Status->singleton->workers; subtest 'Authentication' => sub { my $app = $t->app; @@ -59,14 +65,20 @@ subtest 'Exception' => sub { }; subtest 'API' => sub { - $t->tx($t->ua->start($t->ua->build_websocket_tx('/ws/23')))->status_is(400)->content_like(qr/Unknown worker/); + $t->tx($t->ua->start($t->ua->build_websocket_tx('/ws/23'))); + $t->status_is(400, 'no ws connection for unregistered worker'); + $t->content_like(qr/Unknown worker/, 'error about unknown worker'); + + $worker->update({job_id => undef}); + $misc_limits->{max_online_workers} = 0; + $misc_limits->{worker_limit_retry_delay} = 42; + $t->tx($t->ua->start($t->ua->build_websocket_tx('/ws/1'))); + $t->status_is(429, 'no ws connection for limited worker')->content_like(qr/Limit.*exceeded/, 'error about limit'); + $worker->discard_changes; + like $worker->error, qr/^limited at .*/, 'worker flagged as limited via error field excluding it from assignments'; }; -my $workers = $schema->resultset('Workers'); -my $jobs = $schema->resultset('Jobs'); -my $worker = $workers->find({host => 'localhost', instance => 1}); -my $worker_id = $worker->id; -my $status = OpenQA::WebSockets::Model::Status->singleton->workers; +$misc_limits->{max_online_workers} = undef; $status->{$worker_id} = {id => $worker_id, db => $worker}; subtest 'web socket message handling' => sub { diff --git a/t/api/01-workers.t b/t/api/01-workers.t index 48a04bb5ba3..8a8fb71d93a 100644 --- a/t/api/01-workers.t +++ b/t/api/01-workers.t @@ -76,13 +76,14 @@ my @workers = ( instance => 1 }); -$t->get_ok('/api/v1/workers?live=1') - ->json_is('' => {workers => \@workers}, 'workers present with deprecated live flag'); -diag explain $t->tx->res->json unless $t->success; -$_->{websocket} = 0 for @workers; -$t->get_ok('/api/v1/workers')->json_is('' => {workers => \@workers}, "workers present with deprecated websocket flag"); +$t->get_ok('/api/v1/workers')->status_is(200, 'listing of all workers'); +$t->json_is('' => {workers => \@workers}, 'workers present with deprecated websocket flag'); diag explain $t->tx->res->json unless $t->success; +$t->get_ok('/api/v1/workers/2')->status_is(200, 'info for existing individual worker'); +$t->json_is('' => {worker => $workers[1]}, 'info for correct worker returned'); +$t->get_ok('/api/v1/workers/3')->status_is(404, 'no info for non-existing worker'); + my %worker_key = (host => 'localhost', instance => 1); my %registration_params = %worker_key; $t->post_ok('/api/v1/workers', form => \%registration_params)->status_is(400, 'worker with missing parameters refused') diff --git a/t/config.t b/t/config.t index 8420cdbb7e9..43eb4cb3422 100644 --- a/t/config.t +++ b/t/config.t @@ -162,6 +162,8 @@ subtest 'Test configuration default modes' => sub { job_settings_max_recent_jobs => 20000, assets_default_limit => 100000, assets_max_limit => 200000, + max_online_workers => 1000, + worker_limit_retry_delay => ONE_HOUR / 4, }, archiving => { archive_preserved_important_jobs => 0,