Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: abstracted worker threads #68

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from

Conversation

svvimming
Copy link
Collaborator

Description

This PR contains all new features added in #67 while also refactoring the work therein to abstract the node worker pool functionality out into its own module for use across multiple projects.

A summary of features added before the abstraction/refactor (May 5th, 2023):

Two main features have been added to the CID importer cron:

  1. Backups of entire CID files retrieved from Web3.storage to the Open Panda backblaze bucket
  2. Multithreaded processing:
  • CID retrieval from Web3.Storage, zst unpacking, metadata extraction and import to the database are now handled in a worker thread (cid-batch-import.js in the crons directory). The first part of the main cid-importer.js script is still the same; a manifest list of CIDs to download is still generated and stored to tmp/cid-files/cid-manifest.txt. However, where retrieving the files from the manifest list was previously handled in batches processed in series, now the script delegates batches out to worker threads to process in parallel. Two new arguments can be passed to the cid-importer.js script: --threads followed by the integer number of workers to add and a boolean argument --all, which, if true, skips the search for the last imported document in the database and retrieves all CIDs starting from the oldest existing upload. The previous two arguments, both which still apply, are; --pagesize - an integer specifying import/backup batch size and --maxpages - an integer to specify how many batches to process; if left unspecified, no limit will be placed on the number of batches.

Ticket:
https://www.notion.so/agencyundone/Backup-all-dataset-manifests-to-Backblaze-3196a93f141546a3a91602d78b3dbd7f?pvs=4

Most recent features added with this PR (Multithread Batch Processing via Worker Pool Module):

Overview

The worker-pool-batch-processor.js module manages a pool of node workers to process large lists of data or objects in batches. It is a simple layer built on top of the workerpool npm package that specifically makes processing large datasets in batches quick and straightforward. The worker-pool-batch-processor script exports two functions; CreateWorkerPool and InitializeWorker.

CreateWorkerPool instantiates a pool of workers using the workerpool npm package. It then proceeds to siphon and queue batches from a manifest list one at a time. The manifest list can either be a list of data, objects or references to files. How this list is processed or used for processing depends entirely on the script supplied to CreateWorkerPool, the contents of which are passed on to each individual worker. The CreateWorkerPool function is agnostic to the contents of the script and only handles delegating batches to workers and managing results returned from each worker.

There are four arguments that the function will accept:

  • pathToScript: (String) an absolute path to a script which each worker will run
  • operation: (String) the name of the root function in the script that the worker will interface with
  • manifest: (Array) an array of data to process or use within the script
  • options: (Object) an object with the following options:
    • threads: (Number) the number of workers to run simultaneously
    • batchSize: (Number) how big each batch of data should be that is processed by each worker
    • onBatchResult: (Function) a function called that when a worker has finished a batch and returns the results of the batch processing. It is passed three arguments by the worker:
        1. result: the result returned by the worker
        1. num: the batch number
        1. results: all results to date including this result as the most recent
          These arguments would typically be used to display the worker pool's progress as each batch is completed.
    • onWorkerPoolComplete: (Function) a function that is called on completion of processing the entire manifest list. i.e. all workers and queued tasks have completed. This function is passed two arguments:
        1. results: (Array) an array of the results from each individual batch
        1. errors: (Array) an array of any errors returned by a worker while processing a batch

InitializeWorker is a simple initialization function that takes a single argument:

  • operation: (Function) a function that each worker will perform on each batch it receives. The name of this function must match the operation string described above.

Use

The module must be used with two separate scripts:

One runs in the main thread and is the source of the manifest list passed to the worker pool. In this thread/script the CreateWorkerPool function must be imported from the worker-pool-batch-processor.js module and called at the appropriate step with the arguments described above.

Likewise, the InitializeWorker function must be imported in a script representing all processes to be executed in a worker thread. This function must be called as the last step in the script and must be passed the top-most function in the script whose name exactly matches the operation string passed to the CreateWorkerPool function.
This function (the operation passed as an argument to InitializeWorker) must return a Promise which either resolves with the result of a processed batch or rejects upon error or null result.

Ticket:
https://www.notion.so/agencyundone/Abstract-node-workers-and-worker-pools-782d974e21c8455282b033783a40198e?pvs=4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant