diff --git a/scripts/generate-job.mjs b/scripts/generate-job.mjs index 632c6c4..1f0ace8 100644 --- a/scripts/generate-job.mjs +++ b/scripts/generate-job.mjs @@ -6,7 +6,7 @@ writeFileSync( JSON.stringify({ id: "test-" + crypto.randomUUID(), displayName: "Test Job", - tasks: Array.from({ length: 200 }, (_, i) => ({ + tasks: Array.from({ length: 50 }, (_, i) => ({ id: `task-${i}`, displayName: `Task ${i}`, })), diff --git a/src/main.ts b/src/main.ts index 7add4b4..d3a0392 100644 --- a/src/main.ts +++ b/src/main.ts @@ -184,13 +184,7 @@ yargs(process.argv.slice(2)) ); } }, 15000); - await updateTaskStatusInDynamoDB( - ctx, - job.id, - body.id, - "RUNNING", - true - ); + await updateTaskStatusInDynamoDB(ctx, job.id, body, "RUNNING", true); console.log(`::group::${title}`); const durationTracker = createDurationTracker(); try { @@ -198,7 +192,7 @@ yargs(process.argv.slice(2)) await updateTaskStatusInDynamoDB( ctx, job.id, - body.id, + body, "COMPLETED", false ); @@ -217,7 +211,7 @@ yargs(process.argv.slice(2)) await updateTaskStatusInDynamoDB( ctx, job.id, - body.id, + body, "FAILED", false ); @@ -374,32 +368,45 @@ async function checkTaskCompletionStatus( async function updateTaskStatusInDynamoDB( { dynamodbClient }: Context, jobId: string, - taskId: string, + task: z.infer, status: string, isStart: boolean ) { const workerId = process.env.PARALLELIZER_WORKER_ID || os.hostname(); const timestamp = new Date().toISOString(); - const updateExpression = - "set #status = :status, #timestamp = :timestamp, #workerId = :workerId"; + let updateExpression = + "set #status = :status, #taskDisplayName = :taskDisplayName, #timestamp = :timestamp, #workerId = :workerId"; + let expressionAttributeNames: Record = { + "#status": "Status", + "#taskDisplayName": "TaskDisplayName", + "#timestamp": isStart ? "StartedAt" : "FinishedAt", + "#workerId": "WorkerId", + }; + let expressionAttributeValues: Record = { + ":status": { S: status }, + ":taskDisplayName": { S: task.displayName }, + ":timestamp": { S: timestamp }, + ":workerId": { S: workerId }, + }; + + if (isStart) { + updateExpression += + ", #attemptCount = if_not_exists(#attemptCount, :zero) + :inc"; + expressionAttributeNames["#attemptCount"] = "AttemptCount"; + expressionAttributeValues[":zero"] = { N: "0" }; + expressionAttributeValues[":inc"] = { N: "1" }; + } + await dynamodbClient.send( new dynamodb.UpdateItemCommand({ TableName: env.PARALLELIZER_DYNAMODB_TABLE, Key: { TaskListId: { S: jobId }, - TaskId: { S: taskId }, + TaskId: { S: task.id }, }, UpdateExpression: updateExpression, - ExpressionAttributeNames: { - "#status": "Status", - "#timestamp": isStart ? "StartedAt" : "FinishedAt", - "#workerId": "WorkerId", - }, - ExpressionAttributeValues: { - ":status": { S: status }, - ":timestamp": { S: timestamp }, - ":workerId": { S: workerId }, - }, + ExpressionAttributeNames: expressionAttributeNames, + ExpressionAttributeValues: expressionAttributeValues, }) ); }