Skip to content

Commit

Permalink
Refactor main.ts to extract specific functionalities
Browse files Browse the repository at this point in the history
  • Loading branch information
dtinth committed May 17, 2024
1 parent 67c6542 commit 1995114
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 202 deletions.
17 changes: 17 additions & 0 deletions src/createContext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import * as dynamodb from "@aws-sdk/client-dynamodb";
import * as sqs from "@aws-sdk/client-sqs";
import os from "os";

interface Context {
sqsClient: sqs.SQSClient;
dynamodbClient: dynamodb.DynamoDBClient;
}

function createContext(): Context {
return {
dynamodbClient: new dynamodb.DynamoDBClient({}),
sqsClient: new sqs.SQSClient({}),
};
}

export { Context, createContext };
11 changes: 11 additions & 0 deletions src/createDurationTracker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
function createDurationTracker() {
const start = Date.now();
return {
formatDuration: () => {
const end = Date.now();
return ((end - start) / 1000).toFixed(2);
},
};
}

export { createDurationTracker };
121 changes: 121 additions & 0 deletions src/db.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import * as dynamodb from "@aws-sdk/client-dynamodb";

async function ensureDynamodbTableCreated({ dynamodbClient }: Context) {

Check failure on line 3 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'Context'.
try {
await dynamodbClient.send(
new dynamodb.CreateTableCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,

Check failure on line 7 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'env'.
KeySchema: [
{
AttributeName: "TaskListId",
KeyType: "HASH",
},
{
AttributeName: "TaskId",
KeyType: "RANGE",
},
],
AttributeDefinitions: [
{
AttributeName: "TaskListId",
AttributeType: "S",
},
{
AttributeName: "TaskId",
AttributeType: "S",
},
],
BillingMode: "PAY_PER_REQUEST",
})
);
} catch (error) {
if (
error instanceof dynamodb.ResourceInUseException ||
error instanceof dynamodb.TableAlreadyExistsException
) {
return;
}
throw error;
}
}

async function getPreviouslyRunTaskStatuses(
{ dynamodbClient }: Context,

Check failure on line 43 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'Context'.
jobId: string
) {
const response = await dynamodbClient.send(
new dynamodb.QueryCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,

Check failure on line 48 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'env'.
KeyConditionExpression: "TaskListId = :taskListId",
ExpressionAttributeValues: {
":taskListId": { S: jobId },
},
})
);
return response;
}

async function checkTaskCompletionStatus(
{ dynamodbClient }: Context,

Check failure on line 59 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'Context'.
jobId: string,
taskId: string
): Promise<string> {
const response = await dynamodbClient.send(
new dynamodb.GetItemCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,

Check failure on line 65 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'env'.
Key: {
TaskListId: { S: jobId },
TaskId: { S: taskId },
},
})
);
return response.Item?.Status?.S || "PENDING";
}

async function updateTaskStatusInDynamoDB(
{ dynamodbClient }: Context,

Check failure on line 76 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'Context'.
jobId: string,
task: z.infer<typeof Task>,

Check failure on line 78 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find namespace 'z'.

Check failure on line 78 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'Task'. Did you mean 'task'?
status: string,
isStart: boolean
) {
const workerId = process.env.PARALLELIZER_WORKER_ID || os.hostname();

Check failure on line 82 in src/db.ts

View workflow job for this annotation

GitHub Actions / build

Cannot find name 'os'.
const timestamp = new Date().toISOString();
let updateExpression =
"set #status = :status, #taskDisplayName = :taskDisplayName, #timestamp = :timestamp, #workerId = :workerId";
let expressionAttributeNames: Record<string, string> = {
"#status": "Status",
"#taskDisplayName": "TaskDisplayName",
"#timestamp": isStart ? "StartedAt" : "FinishedAt",
"#workerId": "WorkerId",
};
let expressionAttributeValues: Record<string, dynamodb.AttributeValue> = {
":status": { S: status },
":taskDisplayName": { S: task.displayName },
":timestamp": { S: timestamp },
":workerId": { S: workerId },
};

if (isStart) {
updateExpression +=
", #attemptCount = if_not_exists(#attemptCount, :zero) + :inc";
expressionAttributeNames["#attemptCount"] = "AttemptCount";
expressionAttributeValues[":zero"] = { N: "0" };
expressionAttributeValues[":inc"] = { N: "1" };
}

await dynamodbClient.send(
new dynamodb.UpdateItemCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
Key: {
TaskListId: { S: jobId },
TaskId: { S: task.id },
},
UpdateExpression: updateExpression,
ExpressionAttributeNames: expressionAttributeNames,
ExpressionAttributeValues: expressionAttributeValues,
})
);
}

export { ensureDynamodbTableCreated, getPreviouslyRunTaskStatuses, checkTaskCompletionStatus, updateTaskStatusInDynamoDB };
10 changes: 10 additions & 0 deletions src/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { z } from "zod";
import { Env } from "@(-.-)/env";

const envSchema = z.object({
PARALLELIZER_DYNAMODB_TABLE: z.string().default("parallelizer"),
});

const env = Env(envSchema);

export { env };
208 changes: 6 additions & 202 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
#!/usr/bin/env node
import { Env } from "@(-.-)/env";
import * as dynamodb from "@aws-sdk/client-dynamodb";
import * as sqs from "@aws-sdk/client-sqs";
import { execa } from "execa";
import fs from "fs";
import { chunk } from "lodash-es";
Expand All @@ -10,37 +7,12 @@ import pMap from "p-map";
import process from "process";
import yargs from "yargs";
import { z } from "zod";

const env = Env(
z.object({
PARALLELIZER_DYNAMODB_TABLE: z.string().default("parallelizer"),
})
);

const Task = z.object({
id: z.string(),
displayName: z.string(),
spec: z.record(z.unknown()).default({}),
});

const TaskListFile = z.object({
id: z
.string()
.describe(
"Unique ID of the task list. " +
"Different CI workflow runs should have different queue names. " +
"This ID can be reused when retrying a failed workflow run " +
"(previously-complete tasks will be skipped)."
),
displayName: z.string().default(""),
tasks: z
.array(Task)
.describe(
"An array of tasks that needs to be run. " +
"The tasks will be queued in the order they are listed in this array, " +
"however, they may be run in parallel and out of order."
),
});
import { env } from "./env";
import { Task, TaskListFile } from "./schema";
import { Context, createContext } from "./createContext";
import { createDurationTracker } from "./createDurationTracker";
import { ensureDynamodbTableCreated, getPreviouslyRunTaskStatuses, checkTaskCompletionStatus, updateTaskStatusInDynamoDB } from "./db";
import { ensureQueueCreated, updateMessageVisibilityTimeout } from "./queue";

yargs(process.argv.slice(2))
.demandCommand()
Expand Down Expand Up @@ -257,171 +229,3 @@ yargs(process.argv.slice(2))
}
)
.parse();

interface Context {
sqsClient: sqs.SQSClient;
dynamodbClient: dynamodb.DynamoDBClient;
}

function createContext(): Context {
return {
dynamodbClient: new dynamodb.DynamoDBClient({}),
sqsClient: new sqs.SQSClient({}),
};
}

function createDurationTracker() {
const start = Date.now();
return {
formatDuration: () => {
const end = Date.now();
return ((end - start) / 1000).toFixed(2);
},
};
}

async function ensureDynamodbTableCreated({ dynamodbClient }: Context) {
try {
await dynamodbClient.send(
new dynamodb.CreateTableCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
KeySchema: [
{
AttributeName: "TaskListId",
KeyType: "HASH",
},
{
AttributeName: "TaskId",
KeyType: "RANGE",
},
],
AttributeDefinitions: [
{
AttributeName: "TaskListId",
AttributeType: "S",
},
{
AttributeName: "TaskId",
AttributeType: "S",
},
],
BillingMode: "PAY_PER_REQUEST",
})
);
} catch (error) {
if (
error instanceof dynamodb.ResourceInUseException ||
error instanceof dynamodb.TableAlreadyExistsException
) {
return;
}
throw error;
}
}

async function ensureQueueCreated({ sqsClient }: Context, queueId: string) {
const createQueueResponse = await sqsClient.send(
new sqs.CreateQueueCommand({
QueueName: queueId,
tags: {
"ci-parallelizer": "true",
},
})
);
const queueUrl = createQueueResponse.QueueUrl;
return queueUrl;
}

async function getPreviouslyRunTaskStatuses(
{ dynamodbClient }: Context,
jobId: string
) {
const response = await dynamodbClient.send(
new dynamodb.QueryCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
KeyConditionExpression: "TaskListId = :taskListId",
ExpressionAttributeValues: {
":taskListId": { S: jobId },
},
})
);
return response;
}

async function checkTaskCompletionStatus(
{ dynamodbClient }: Context,
jobId: string,
taskId: string
): Promise<string> {
const response = await dynamodbClient.send(
new dynamodb.GetItemCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
Key: {
TaskListId: { S: jobId },
TaskId: { S: taskId },
},
})
);
return response.Item?.Status?.S || "PENDING";
}

async function updateTaskStatusInDynamoDB(
{ dynamodbClient }: Context,
jobId: string,
task: z.infer<typeof Task>,
status: string,
isStart: boolean
) {
const workerId = process.env.PARALLELIZER_WORKER_ID || os.hostname();
const timestamp = new Date().toISOString();
let updateExpression =
"set #status = :status, #taskDisplayName = :taskDisplayName, #timestamp = :timestamp, #workerId = :workerId";
let expressionAttributeNames: Record<string, string> = {
"#status": "Status",
"#taskDisplayName": "TaskDisplayName",
"#timestamp": isStart ? "StartedAt" : "FinishedAt",
"#workerId": "WorkerId",
};
let expressionAttributeValues: Record<string, dynamodb.AttributeValue> = {
":status": { S: status },
":taskDisplayName": { S: task.displayName },
":timestamp": { S: timestamp },
":workerId": { S: workerId },
};

if (isStart) {
updateExpression +=
", #attemptCount = if_not_exists(#attemptCount, :zero) + :inc";
expressionAttributeNames["#attemptCount"] = "AttemptCount";
expressionAttributeValues[":zero"] = { N: "0" };
expressionAttributeValues[":inc"] = { N: "1" };
}

await dynamodbClient.send(
new dynamodb.UpdateItemCommand({
TableName: env.PARALLELIZER_DYNAMODB_TABLE,
Key: {
TaskListId: { S: jobId },
TaskId: { S: task.id },
},
UpdateExpression: updateExpression,
ExpressionAttributeNames: expressionAttributeNames,
ExpressionAttributeValues: expressionAttributeValues,
})
);
}

async function updateMessageVisibilityTimeout(
sqsClient: sqs.SQSClient,
queueUrl: string,
receiptHandle: string,
visibilityTimeout: number
) {
await sqsClient.send(
new sqs.ChangeMessageVisibilityCommand({
QueueUrl: queueUrl,
ReceiptHandle: receiptHandle,
VisibilityTimeout: visibilityTimeout,
})
);
}
Loading

0 comments on commit 1995114

Please sign in to comment.