From 7701b5b72fb6f2a5f06c599ceeace1611a4b0af7 Mon Sep 17 00:00:00 2001 From: dtinth on MBP M1 Date: Tue, 21 May 2024 17:19:06 +0700 Subject: [PATCH] add walkthrough --- README.md | 68 ++++++++++++++++++++++++++++++++++++++-- scripts/generate-job.mjs | 4 +-- scripts/work.mjs | 10 ++++++ src/main.ts | 16 +++++----- src/queue.ts | 8 +++-- src/schema.ts | 1 - 6 files changed, 92 insertions(+), 15 deletions(-) create mode 100644 scripts/work.mjs diff --git a/README.md b/README.md index 4e0819b..26b54b2 100644 --- a/README.md +++ b/README.md @@ -40,9 +40,68 @@ Additionally, **test grouping** can be done by running multiple test files in a ## Basic concepts -A **Parallelizer Job** is a collection of _Parallelizer Tasks._ Each parallelizer job has a unique ID, which will be used as the queue ID in SQS. This ID can be reused when retrying a job. This allows completed tasks to be skipped and only the failed tasks to be retried. The ID can be up to 80 characters long and can contain only alphanumeric characters, hyphens, and underscores. +A **Parallelizer Job** is a collection of _Parallelizer Tasks._ Each parallelizer job has a unique ID, which will be used as the queue ID in SQS. This ID can be reused when retrying a job. This allows completed tasks to be skipped and only the failed tasks to be retried. -A **Parallelizer Task** represents a testing task. Usually, it is a test file that needs to be run. It can also be a group of test files that are run together. Each task has a unique ID, which is used to identify the task in the queue. The ID can be up to 80 characters long and can contain only alphanumeric characters, hyphens, and underscores. +A **Parallelizer Task** represents a testing task. Usually, it is a test file that needs to be run. It can also be a group of test files that are run together. Each task has a unique ID, which is used to identify the task in the queue. + +## Walkthrough + +Before starting, run `pnpm install` to install dependencies and `pnpm run build` to compile the project. + +### Step 1: Create a Parallelizer Job file + +A Parallelizer Job file is a JSON file that contains the list of tasks to be run. The JSON file should have the following properties: + +- `id`: The ID of the job. It can contain alphanumeric characters, dashes, and underscores. +- `tasks`: An array of tasks. Each task should have the following properties: + - `id`: The ID of the task. It can contain alphanumeric characters, dashes, and underscores. + - `displayName`: The display name of the task. This is used for logging purposes. + - `spec`: An object that contains the task specification. This can be anything that is needed to run the task, such as the path to the test file (or test files), the testing framework to use, etc. + +To generate a sample job file, run the following command: + +```sh +node scripts/generate-job.mjs +``` + +…then look at the `tmp/job.json` file. + +### Step 2: Load the job into the queue + +Run the following command to load the job into the queue: + +```sh +node dist/main.js prepare --job-file=tmp/job.json +``` + +### Step 3: Work on the queue + +Run the following command to start working on the queue: + +```sh +node dist/main.js work --job-file=tmp/job.json node scripts/work.mjs +``` + +You can run the above command in multiple terminals to simulate multiple workers. + +The `work` command consumes tasks from the queue, and runs the worker command specified in the argument with the task ID appended. For example, the above command would run `node scripts/work.mjs `. The worker should return a status code of `0` if the task is successful, or a non-zero status code if the task failed. + +Note that Parallelizer doesn’t handle retries, so the worker should handle retries as needed. + +Additionally, the following environment variables are available to the worker: + +- `PARALLELIZER_JOB_ID`: The ID of the job. +- `PARALLELIZER_TASK_ID`: The ID of the task (same as the task ID passed to the worker command). + +### Step 4: Check the task status + +Run the following command to check the status of the tasks: + +```sh +node dist/main.js status --job-file=tmp/job.json --out-file=tmp/status.json +``` + +A summary table will be printed to the console, and the detailed status of each task will be written to the `tmp/status.json` file. ## Development @@ -61,5 +120,8 @@ node scripts/generate-job.mjs node dist/main.js prepare --job-file=tmp/job.json # Start a worker (this can be run in multiple terminals to simulate multiple workers) -node dist/main.js work --job-file=tmp/job.json echo +node dist/main.js work --job-file=tmp/job.json node scripts/work.mjs + +# Check the status of the tasks +node dist/main.js status --job-file=tmp/job.json --out-file=tmp/status.json ``` diff --git a/scripts/generate-job.mjs b/scripts/generate-job.mjs index 556a3ef..6a7d9f6 100644 --- a/scripts/generate-job.mjs +++ b/scripts/generate-job.mjs @@ -3,12 +3,12 @@ import { mkdirSync, writeFileSync } from "fs"; mkdirSync("tmp", { recursive: true }); const job = { id: "test-" + crypto.randomUUID(), - displayName: "Test Job", tasks: Array.from({ length: 50 }, (_, i) => ({ id: `task-${i}`, displayName: `Task ${i}`, + spec: { random: ~~(Math.random() * 10 + 1) }, })), }; -writeFileSync("tmp/job.json", JSON.stringify(job)); +writeFileSync("tmp/job.json", JSON.stringify(job, null, 2)); console.log("Job file generated:", "tmp/job.json"); console.log("Job ID:", job.id); diff --git a/scripts/work.mjs b/scripts/work.mjs new file mode 100644 index 0000000..3db10b2 --- /dev/null +++ b/scripts/work.mjs @@ -0,0 +1,10 @@ +import { readFileSync } from "fs"; + +const job = JSON.parse(readFileSync("tmp/job.json", "utf8")); +const task = job.tasks.find((task) => task.id === process.argv[2]); +const value = task.spec.random; +const ms = value * 100; +await new Promise((resolve) => setTimeout(resolve, ms)); +console.log( + `Task ${task.id} (${task.displayName}) completed in ${ms}ms with value ${value}!` +); diff --git a/src/main.ts b/src/main.ts index e311eb4..cade2cb 100644 --- a/src/main.ts +++ b/src/main.ts @@ -38,7 +38,7 @@ yargs(process.argv.slice(2)) console.log("Reading job file:", jobFile); const jobFileContents = fs.readFileSync(jobFile, "utf-8"); const job = TaskListFile.parse(JSON.parse(jobFileContents)); - console.log("Queue name:", job.id); + console.log("Job ID:", job.id); console.log("Number of tasks:", job.tasks.length); const ctx: Context = createContext(); @@ -123,12 +123,12 @@ yargs(process.argv.slice(2)) console.log("Reading job file:", jobFile); const jobFileContents = fs.readFileSync(jobFile, "utf-8"); const job = TaskListFile.parse(JSON.parse(jobFileContents)); - console.log("Queue name:", job.id); + console.log("Job ID:", job.id); const sqsClient = new sqs.SQSClient({}); const queueUrlResponse = await sqsClient.send( new sqs.GetQueueUrlCommand({ - QueueName: job.id, + QueueName: env.PARALLELIZER_SQS_PREFIX + job.id, }) ); const queueUrl = queueUrlResponse.QueueUrl!; @@ -181,7 +181,10 @@ yargs(process.argv.slice(2)) try { await execa(command[0], command.slice(1), { stdio: "inherit", - env: { PARALLELIZER_TASK_ID: body.id }, + env: { + PARALLELIZER_TASK_ID: body.id, + PARALLELIZER_JOB_ID: job.id, + }, }); await updateTaskStatusInDynamoDB( ctx, @@ -262,8 +265,7 @@ yargs(process.argv.slice(2)) }, "out-file": { type: "string", - description: - "Path to the output file. If not provided, prints to stdout.", + description: "Path to the output JSON file.", }, }, async (args) => { @@ -271,7 +273,7 @@ yargs(process.argv.slice(2)) console.log("Reading job file:", jobFile); const jobFileContents = fs.readFileSync(jobFile, "utf-8"); const job = TaskListFile.parse(JSON.parse(jobFileContents)); - console.log("Queue name:", job.id); + console.log("Job ID:", job.id); console.log("Number of tasks:", job.tasks.length); const ctx: Context = createContext(); const statuses = await getPreviouslyRunTaskStatuses(ctx, job.id); diff --git a/src/queue.ts b/src/queue.ts index e62095f..c0119c7 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,9 +1,13 @@ import * as sqs from "@aws-sdk/client-sqs"; +import { env } from "./env"; -async function ensureQueueCreated({ sqsClient }: { sqsClient: sqs.SQSClient }, queueId: string) { +async function ensureQueueCreated( + { sqsClient }: { sqsClient: sqs.SQSClient }, + queueId: string +) { const createQueueResponse = await sqsClient.send( new sqs.CreateQueueCommand({ - QueueName: queueId, + QueueName: env.PARALLELIZER_SQS_PREFIX + queueId, tags: { "ci-parallelizer": "true", }, diff --git a/src/schema.ts b/src/schema.ts index 63164f0..1466873 100644 --- a/src/schema.ts +++ b/src/schema.ts @@ -15,7 +15,6 @@ const TaskListFile = z.object({ "This ID can be reused when retrying a failed workflow run " + "(previously-complete tasks will be skipped)." ), - displayName: z.string().default(""), tasks: z .array(Task) .describe(