Skip to content

Commit

Permalink
Merge pull request #1 from eventpop/dtinth/enhance-dynamodb
Browse files Browse the repository at this point in the history
Add TaskDisplayName and increment AttemptCount in DynamoDB
  • Loading branch information
dtinth authored May 17, 2024
2 parents ea03937 + d8f1a43 commit f7a5d64
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
2 changes: 1 addition & 1 deletion scripts/generate-job.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ writeFileSync(
JSON.stringify({
id: "test-" + crypto.randomUUID(),
displayName: "Test Job",
tasks: Array.from({ length: 200 }, (_, i) => ({
tasks: Array.from({ length: 50 }, (_, i) => ({
id: `task-${i}`,
displayName: `Task ${i}`,
})),
Expand Down
53 changes: 30 additions & 23 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,21 +184,15 @@ yargs(process.argv.slice(2))
);
}
}, 15000);
await updateTaskStatusInDynamoDB(
ctx,
job.id,
body.id,
"RUNNING",
true
);
await updateTaskStatusInDynamoDB(ctx, job.id, body, "RUNNING", true);
console.log(`::group::${title}`);
const durationTracker = createDurationTracker();
try {
await execa(command[0], command.slice(1), { stdio: "inherit" });
await updateTaskStatusInDynamoDB(
ctx,
job.id,
body.id,
body,
"COMPLETED",
false
);
Expand All @@ -217,7 +211,7 @@ yargs(process.argv.slice(2))
await updateTaskStatusInDynamoDB(
ctx,
job.id,
body.id,
body,
"FAILED",
false
);
Expand Down Expand Up @@ -374,32 +368,45 @@ async function checkTaskCompletionStatus(
async function updateTaskStatusInDynamoDB(
{ dynamodbClient }: Context,
jobId: string,
taskId: string,
task: z.infer<typeof Task>,
status: string,
isStart: boolean
) {
const workerId = process.env.PARALLELIZER_WORKER_ID || os.hostname();
const timestamp = new Date().toISOString();
const updateExpression =
"set #status = :status, #timestamp = :timestamp, #workerId = :workerId";
let updateExpression =
"set #status = :status, #taskDisplayName = :taskDisplayName, #timestamp = :timestamp, #workerId = :workerId";
let expressionAttributeNames: Record<string, string> = {
"#status": "Status",
"#taskDisplayName": "TaskDisplayName",
"#timestamp": isStart ? "StartedAt" : "FinishedAt",
"#workerId": "WorkerId",
};
let expressionAttributeValues: Record<string, dynamodb.AttributeValue> = {
":status": { S: status },
":taskDisplayName": { S: task.displayName },
":timestamp": { S: timestamp },
":workerId": { S: workerId },
};

if (isStart) {
updateExpression +=
", #attemptCount = if_not_exists(#attemptCount, :zero) + :inc";
expressionAttributeNames["#attemptCount"] = "AttemptCount";
expressionAttributeValues[":zero"] = { N: "0" };
expressionAttributeValues[":inc"] = { N: "1" };
}

await dynamodbClient.send(
new dynamodb.UpdateItemCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
Key: {
TaskListId: { S: jobId },
TaskId: { S: taskId },
TaskId: { S: task.id },
},
UpdateExpression: updateExpression,
ExpressionAttributeNames: {
"#status": "Status",
"#timestamp": isStart ? "StartedAt" : "FinishedAt",
"#workerId": "WorkerId",
},
ExpressionAttributeValues: {
":status": { S: status },
":timestamp": { S: timestamp },
":workerId": { S: workerId },
},
ExpressionAttributeNames: expressionAttributeNames,
ExpressionAttributeValues: expressionAttributeValues,
})
);
}
Expand Down

0 comments on commit f7a5d64

Please sign in to comment.