From 1995114d2c0383a3045a7a6f37df339c1560e786 Mon Sep 17 00:00:00 2001 From: Thai Pangsakulyanont Date: Fri, 17 May 2024 14:05:28 +0700 Subject: [PATCH] Refactor main.ts to extract specific functionalities --- src/createContext.ts | 17 +++ src/createDurationTracker.ts | 11 ++ src/db.ts | 121 ++++++++++++++++++++ src/env.ts | 10 ++ src/main.ts | 208 +---------------------------------- src/queue.ts | 31 ++++++ src/schema.ts | 28 +++++ 7 files changed, 224 insertions(+), 202 deletions(-) create mode 100644 src/createContext.ts create mode 100644 src/createDurationTracker.ts create mode 100644 src/db.ts create mode 100644 src/env.ts create mode 100644 src/queue.ts create mode 100644 src/schema.ts diff --git a/src/createContext.ts b/src/createContext.ts new file mode 100644 index 0000000..4eb30c3 --- /dev/null +++ b/src/createContext.ts @@ -0,0 +1,17 @@ +import * as dynamodb from "@aws-sdk/client-dynamodb"; +import * as sqs from "@aws-sdk/client-sqs"; +import os from "os"; + +interface Context { + sqsClient: sqs.SQSClient; + dynamodbClient: dynamodb.DynamoDBClient; +} + +function createContext(): Context { + return { + dynamodbClient: new dynamodb.DynamoDBClient({}), + sqsClient: new sqs.SQSClient({}), + }; +} + +export { Context, createContext }; diff --git a/src/createDurationTracker.ts b/src/createDurationTracker.ts new file mode 100644 index 0000000..26a0d81 --- /dev/null +++ b/src/createDurationTracker.ts @@ -0,0 +1,11 @@ +function createDurationTracker() { + const start = Date.now(); + return { + formatDuration: () => { + const end = Date.now(); + return ((end - start) / 1000).toFixed(2); + }, + }; +} + +export { createDurationTracker }; diff --git a/src/db.ts b/src/db.ts new file mode 100644 index 0000000..9cbea90 --- /dev/null +++ b/src/db.ts @@ -0,0 +1,121 @@ +import * as dynamodb from "@aws-sdk/client-dynamodb"; + +async function ensureDynamodbTableCreated({ dynamodbClient }: Context) { + try { + await dynamodbClient.send( + new dynamodb.CreateTableCommand({ + TableName: env.PARALLELIZER_DYNAMODB_TABLE, + KeySchema: [ + { + AttributeName: "TaskListId", + KeyType: "HASH", + }, + { + AttributeName: "TaskId", + KeyType: "RANGE", + }, + ], + AttributeDefinitions: [ + { + AttributeName: "TaskListId", + AttributeType: "S", + }, + { + AttributeName: "TaskId", + AttributeType: "S", + }, + ], + BillingMode: "PAY_PER_REQUEST", + }) + ); + } catch (error) { + if ( + error instanceof dynamodb.ResourceInUseException || + error instanceof dynamodb.TableAlreadyExistsException + ) { + return; + } + throw error; + } +} + +async function getPreviouslyRunTaskStatuses( + { dynamodbClient }: Context, + jobId: string +) { + const response = await dynamodbClient.send( + new dynamodb.QueryCommand({ + TableName: env.PARALLELIZER_DYNAMODB_TABLE, + KeyConditionExpression: "TaskListId = :taskListId", + ExpressionAttributeValues: { + ":taskListId": { S: jobId }, + }, + }) + ); + return response; +} + +async function checkTaskCompletionStatus( + { dynamodbClient }: Context, + jobId: string, + taskId: string +): Promise { + const response = await dynamodbClient.send( + new dynamodb.GetItemCommand({ + TableName: env.PARALLELIZER_DYNAMODB_TABLE, + Key: { + TaskListId: { S: jobId }, + TaskId: { S: taskId }, + }, + }) + ); + return response.Item?.Status?.S || "PENDING"; +} + +async function updateTaskStatusInDynamoDB( + { dynamodbClient }: Context, + jobId: string, + task: z.infer, + status: string, + isStart: boolean +) { + const workerId = process.env.PARALLELIZER_WORKER_ID || os.hostname(); + const timestamp = new Date().toISOString(); + let updateExpression = + "set #status = :status, #taskDisplayName = :taskDisplayName, #timestamp = :timestamp, #workerId = :workerId"; + let expressionAttributeNames: Record = { + "#status": "Status", + "#taskDisplayName": "TaskDisplayName", + "#timestamp": isStart ? "StartedAt" : "FinishedAt", + "#workerId": "WorkerId", + }; + let expressionAttributeValues: Record = { + ":status": { S: status }, + ":taskDisplayName": { S: task.displayName }, + ":timestamp": { S: timestamp }, + ":workerId": { S: workerId }, + }; + + if (isStart) { + updateExpression += + ", #attemptCount = if_not_exists(#attemptCount, :zero) + :inc"; + expressionAttributeNames["#attemptCount"] = "AttemptCount"; + expressionAttributeValues[":zero"] = { N: "0" }; + expressionAttributeValues[":inc"] = { N: "1" }; + } + + await dynamodbClient.send( + new dynamodb.UpdateItemCommand({ + TableName: env.PARALLELIZER_DYNAMODB_TABLE, + Key: { + TaskListId: { S: jobId }, + TaskId: { S: task.id }, + }, + UpdateExpression: updateExpression, + ExpressionAttributeNames: expressionAttributeNames, + ExpressionAttributeValues: expressionAttributeValues, + }) + ); +} + +export { ensureDynamodbTableCreated, getPreviouslyRunTaskStatuses, checkTaskCompletionStatus, updateTaskStatusInDynamoDB }; diff --git a/src/env.ts b/src/env.ts new file mode 100644 index 0000000..edfd9cd --- /dev/null +++ b/src/env.ts @@ -0,0 +1,10 @@ +import { z } from "zod"; +import { Env } from "@(-.-)/env"; + +const envSchema = z.object({ + PARALLELIZER_DYNAMODB_TABLE: z.string().default("parallelizer"), +}); + +const env = Env(envSchema); + +export { env }; diff --git a/src/main.ts b/src/main.ts index d3a0392..0ba50df 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,7 +1,4 @@ #!/usr/bin/env node -import { Env } from "@(-.-)/env"; -import * as dynamodb from "@aws-sdk/client-dynamodb"; -import * as sqs from "@aws-sdk/client-sqs"; import { execa } from "execa"; import fs from "fs"; import { chunk } from "lodash-es"; @@ -10,37 +7,12 @@ import pMap from "p-map"; import process from "process"; import yargs from "yargs"; import { z } from "zod"; - -const env = Env( - z.object({ - PARALLELIZER_DYNAMODB_TABLE: z.string().default("parallelizer"), - }) -); - -const Task = z.object({ - id: z.string(), - displayName: z.string(), - spec: z.record(z.unknown()).default({}), -}); - -const TaskListFile = z.object({ - id: z - .string() - .describe( - "Unique ID of the task list. " + - "Different CI workflow runs should have different queue names. " + - "This ID can be reused when retrying a failed workflow run " + - "(previously-complete tasks will be skipped)." - ), - displayName: z.string().default(""), - tasks: z - .array(Task) - .describe( - "An array of tasks that needs to be run. " + - "The tasks will be queued in the order they are listed in this array, " + - "however, they may be run in parallel and out of order." - ), -}); +import { env } from "./env"; +import { Task, TaskListFile } from "./schema"; +import { Context, createContext } from "./createContext"; +import { createDurationTracker } from "./createDurationTracker"; +import { ensureDynamodbTableCreated, getPreviouslyRunTaskStatuses, checkTaskCompletionStatus, updateTaskStatusInDynamoDB } from "./db"; +import { ensureQueueCreated, updateMessageVisibilityTimeout } from "./queue"; yargs(process.argv.slice(2)) .demandCommand() @@ -257,171 +229,3 @@ yargs(process.argv.slice(2)) } ) .parse(); - -interface Context { - sqsClient: sqs.SQSClient; - dynamodbClient: dynamodb.DynamoDBClient; -} - -function createContext(): Context { - return { - dynamodbClient: new dynamodb.DynamoDBClient({}), - sqsClient: new sqs.SQSClient({}), - }; -} - -function createDurationTracker() { - const start = Date.now(); - return { - formatDuration: () => { - const end = Date.now(); - return ((end - start) / 1000).toFixed(2); - }, - }; -} - -async function ensureDynamodbTableCreated({ dynamodbClient }: Context) { - try { - await dynamodbClient.send( - new dynamodb.CreateTableCommand({ - TableName: env.PARALLELIZER_DYNAMODB_TABLE, - KeySchema: [ - { - AttributeName: "TaskListId", - KeyType: "HASH", - }, - { - AttributeName: "TaskId", - KeyType: "RANGE", - }, - ], - AttributeDefinitions: [ - { - AttributeName: "TaskListId", - AttributeType: "S", - }, - { - AttributeName: "TaskId", - AttributeType: "S", - }, - ], - BillingMode: "PAY_PER_REQUEST", - }) - ); - } catch (error) { - if ( - error instanceof dynamodb.ResourceInUseException || - error instanceof dynamodb.TableAlreadyExistsException - ) { - return; - } - throw error; - } -} - -async function ensureQueueCreated({ sqsClient }: Context, queueId: string) { - const createQueueResponse = await sqsClient.send( - new sqs.CreateQueueCommand({ - QueueName: queueId, - tags: { - "ci-parallelizer": "true", - }, - }) - ); - const queueUrl = createQueueResponse.QueueUrl; - return queueUrl; -} - -async function getPreviouslyRunTaskStatuses( - { dynamodbClient }: Context, - jobId: string -) { - const response = await dynamodbClient.send( - new dynamodb.QueryCommand({ - TableName: env.PARALLELIZER_DYNAMODB_TABLE, - KeyConditionExpression: "TaskListId = :taskListId", - ExpressionAttributeValues: { - ":taskListId": { S: jobId }, - }, - }) - ); - return response; -} - -async function checkTaskCompletionStatus( - { dynamodbClient }: Context, - jobId: string, - taskId: string -): Promise { - const response = await dynamodbClient.send( - new dynamodb.GetItemCommand({ - TableName: env.PARALLELIZER_DYNAMODB_TABLE, - Key: { - TaskListId: { S: jobId }, - TaskId: { S: taskId }, - }, - }) - ); - return response.Item?.Status?.S || "PENDING"; -} - -async function updateTaskStatusInDynamoDB( - { dynamodbClient }: Context, - jobId: string, - task: z.infer, - status: string, - isStart: boolean -) { - const workerId = process.env.PARALLELIZER_WORKER_ID || os.hostname(); - const timestamp = new Date().toISOString(); - let updateExpression = - "set #status = :status, #taskDisplayName = :taskDisplayName, #timestamp = :timestamp, #workerId = :workerId"; - let expressionAttributeNames: Record = { - "#status": "Status", - "#taskDisplayName": "TaskDisplayName", - "#timestamp": isStart ? "StartedAt" : "FinishedAt", - "#workerId": "WorkerId", - }; - let expressionAttributeValues: Record = { - ":status": { S: status }, - ":taskDisplayName": { S: task.displayName }, - ":timestamp": { S: timestamp }, - ":workerId": { S: workerId }, - }; - - if (isStart) { - updateExpression += - ", #attemptCount = if_not_exists(#attemptCount, :zero) + :inc"; - expressionAttributeNames["#attemptCount"] = "AttemptCount"; - expressionAttributeValues[":zero"] = { N: "0" }; - expressionAttributeValues[":inc"] = { N: "1" }; - } - - await dynamodbClient.send( - new dynamodb.UpdateItemCommand({ - TableName: env.PARALLELIZER_DYNAMODB_TABLE, - Key: { - TaskListId: { S: jobId }, - TaskId: { S: task.id }, - }, - UpdateExpression: updateExpression, - ExpressionAttributeNames: expressionAttributeNames, - ExpressionAttributeValues: expressionAttributeValues, - }) - ); -} - -async function updateMessageVisibilityTimeout( - sqsClient: sqs.SQSClient, - queueUrl: string, - receiptHandle: string, - visibilityTimeout: number -) { - await sqsClient.send( - new sqs.ChangeMessageVisibilityCommand({ - QueueUrl: queueUrl, - ReceiptHandle: receiptHandle, - VisibilityTimeout: visibilityTimeout, - }) - ); -} diff --git a/src/queue.ts b/src/queue.ts new file mode 100644 index 0000000..e62095f --- /dev/null +++ b/src/queue.ts @@ -0,0 +1,31 @@ +import * as sqs from "@aws-sdk/client-sqs"; + +async function ensureQueueCreated({ sqsClient }: { sqsClient: sqs.SQSClient }, queueId: string) { + const createQueueResponse = await sqsClient.send( + new sqs.CreateQueueCommand({ + QueueName: queueId, + tags: { + "ci-parallelizer": "true", + }, + }) + ); + const queueUrl = createQueueResponse.QueueUrl; + return queueUrl; +} + +async function updateMessageVisibilityTimeout( + sqsClient: sqs.SQSClient, + queueUrl: string, + receiptHandle: string, + visibilityTimeout: number +) { + await sqsClient.send( + new sqs.ChangeMessageVisibilityCommand({ + QueueUrl: queueUrl, + ReceiptHandle: receiptHandle, + VisibilityTimeout: visibilityTimeout, + }) + ); +} + +export { ensureQueueCreated, updateMessageVisibilityTimeout }; diff --git a/src/schema.ts b/src/schema.ts new file mode 100644 index 0000000..63164f0 --- /dev/null +++ b/src/schema.ts @@ -0,0 +1,28 @@ +import { z } from "zod"; + +const Task = z.object({ + id: z.string(), + displayName: z.string(), + spec: z.record(z.unknown()).default({}), +}); + +const TaskListFile = z.object({ + id: z + .string() + .describe( + "Unique ID of the task list. " + + "Different CI workflow runs should have different queue names. " + + "This ID can be reused when retrying a failed workflow run " + + "(previously-complete tasks will be skipped)." + ), + displayName: z.string().default(""), + tasks: z + .array(Task) + .describe( + "An array of tasks that needs to be run. " + + "The tasks will be queued in the order they are listed in this array, " + + "however, they may be run in parallel and out of order." + ), +}); + +export { Task, TaskListFile };