diff --git a/server/src/http_objects.rs b/server/src/http_objects.rs index 014926c9f..dbf05d67c 100644 --- a/server/src/http_objects.rs +++ b/server/src/http_objects.rs @@ -4,7 +4,7 @@ use axum::{ http::StatusCode, response::{IntoResponse, Response}, }; -use data_model::ComputeGraphCode; +use data_model::{ComputeGraphCode, GraphInvocationCtx}; use indexify_utils::get_epoch_time_in_ms; use serde::{Deserialize, Serialize}; use tracing::error; @@ -531,6 +531,45 @@ pub struct InvocationId { pub id: String, } +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct Invocation { + pub id: String, + pub completed: bool, + pub outstanding_tasks: u64, + pub task_analytics: HashMap, + pub graph_version: String, +} + +impl From for Invocation { + fn from(value: GraphInvocationCtx) -> Self { + let mut task_analytics = HashMap::new(); + for (k, v) in value.fn_task_analytics { + task_analytics.insert( + k, + TaskAnalytics { + pending_tasks: v.pending_tasks, + successful_tasks: v.successful_tasks, + failed_tasks: v.failed_tasks, + }, + ); + } + Self { + id: value.invocation_id.to_string(), + completed: value.completed, + outstanding_tasks: value.outstanding_tasks, + task_analytics, + graph_version: value.graph_version.0, + } + } +} + +#[derive(Debug, Serialize, Deserialize, ToSchema)] +pub struct TaskAnalytics { + pub pending_tasks: u64, + pub successful_tasks: u64, + pub failed_tasks: u64, +} + #[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct FunctionURI { pub namespace: String, diff --git a/server/src/routes.rs b/server/src/routes.rs index c923c65b2..2cc2e1fa9 100644 --- a/server/src/routes.rs +++ b/server/src/routes.rs @@ -41,7 +41,10 @@ use tracing::{error, info}; use utoipa::{OpenApi, ToSchema}; use utoipa_swagger_ui::SwaggerUi; -use crate::executors::{self, EXECUTOR_TIMEOUT}; +use crate::{ + executors::{self, EXECUTOR_TIMEOUT}, + http_objects::Invocation, +}; mod download; mod internal_ingest; @@ -282,6 +285,7 @@ pub fn namespace_routes(route_state: RouteState) -> Router { "/compute_graphs/{compute_graph}/replay", post(replay_compute_graph).with_state(route_state.clone()), ) + .route("/compute_graphs/{compute_graph}/invocations/{invocation_id}", get(find_invocation).with_state(route_state.clone())) .route( "/compute_graphs/{compute_graph}/invocations/{invocation_id}", delete(delete_invocation).with_state(route_state.clone()), @@ -804,6 +808,30 @@ async fn list_outputs( })) } +#[utoipa::path( + get, + path = "/namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}", + tag = "retrieve", + responses( + (status = 200, description = "Details about a given invocation", body = Invocation), + (status = NOT_FOUND, description = "Invocation not found"), + (status = INTERNAL_SERVER_ERROR, description = "Internal Server Error") + ), +)] +async fn find_invocation( + Path((namespace, compute_graph, invocation_id)): Path<(String, String, String)>, + State(state): State, +) -> Result, IndexifyAPIError> { + let invocation_ctx = state + .indexify_state + .reader() + .invocation_ctx(&namespace, &compute_graph, &invocation_id) + .map_err(IndexifyAPIError::internal_error)? + .ok_or(IndexifyAPIError::not_found("invocation not found"))?; + + Ok(Json(invocation_ctx.into())) +} + /// Delete a specific invocation #[utoipa::path( delete,