Skip to content

Commit

Permalink
feat: implement endpoint for retrieving invocation's details (#1187)
Browse files Browse the repository at this point in the history
* implement endpoint for retrieving invocation's details

* make fmt
  • Loading branch information
miguelhrocha authored Jan 24, 2025
1 parent 50064aa commit 4d997de
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 2 deletions.
41 changes: 40 additions & 1 deletion server/src/http_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use data_model::ComputeGraphCode;
use data_model::{ComputeGraphCode, GraphInvocationCtx};
use indexify_utils::get_epoch_time_in_ms;
use serde::{Deserialize, Serialize};
use tracing::error;
Expand Down Expand Up @@ -531,6 +531,45 @@ pub struct InvocationId {
pub id: String,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct Invocation {
pub id: String,
pub completed: bool,
pub outstanding_tasks: u64,
pub task_analytics: HashMap<String, TaskAnalytics>,
pub graph_version: String,
}

impl From<GraphInvocationCtx> for Invocation {
fn from(value: GraphInvocationCtx) -> Self {
let mut task_analytics = HashMap::new();
for (k, v) in value.fn_task_analytics {
task_analytics.insert(
k,
TaskAnalytics {
pending_tasks: v.pending_tasks,
successful_tasks: v.successful_tasks,
failed_tasks: v.failed_tasks,
},
);
}
Self {
id: value.invocation_id.to_string(),
completed: value.completed,
outstanding_tasks: value.outstanding_tasks,
task_analytics,
graph_version: value.graph_version.0,
}
}
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct TaskAnalytics {
pub pending_tasks: u64,
pub successful_tasks: u64,
pub failed_tasks: u64,
}

#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct FunctionURI {
pub namespace: String,
Expand Down
30 changes: 29 additions & 1 deletion server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use tracing::{error, info};
use utoipa::{OpenApi, ToSchema};
use utoipa_swagger_ui::SwaggerUi;

use crate::executors::{self, EXECUTOR_TIMEOUT};
use crate::{
executors::{self, EXECUTOR_TIMEOUT},
http_objects::Invocation,
};

mod download;
mod internal_ingest;
Expand Down Expand Up @@ -282,6 +285,7 @@ pub fn namespace_routes(route_state: RouteState) -> Router {
"/compute_graphs/{compute_graph}/replay",
post(replay_compute_graph).with_state(route_state.clone()),
)
.route("/compute_graphs/{compute_graph}/invocations/{invocation_id}", get(find_invocation).with_state(route_state.clone()))
.route(
"/compute_graphs/{compute_graph}/invocations/{invocation_id}",
delete(delete_invocation).with_state(route_state.clone()),
Expand Down Expand Up @@ -804,6 +808,30 @@ async fn list_outputs(
}))
}

#[utoipa::path(
get,
path = "/namespaces/{namespace}/compute_graphs/{compute_graph}/invocations/{invocation_id}",
tag = "retrieve",
responses(
(status = 200, description = "Details about a given invocation", body = Invocation),
(status = NOT_FOUND, description = "Invocation not found"),
(status = INTERNAL_SERVER_ERROR, description = "Internal Server Error")
),
)]
async fn find_invocation(
Path((namespace, compute_graph, invocation_id)): Path<(String, String, String)>,
State(state): State<RouteState>,
) -> Result<Json<Invocation>, IndexifyAPIError> {
let invocation_ctx = state
.indexify_state
.reader()
.invocation_ctx(&namespace, &compute_graph, &invocation_id)
.map_err(IndexifyAPIError::internal_error)?
.ok_or(IndexifyAPIError::not_found("invocation not found"))?;

Ok(Json(invocation_ctx.into()))
}

/// Delete a specific invocation
#[utoipa::path(
delete,
Expand Down

0 comments on commit 4d997de

Please sign in to comment.