Skip to content

Commit

Permalink
fix: make batch processing sequential
Browse files Browse the repository at this point in the history
Signed-off-by: eshwarprasadS <[email protected]>
  • Loading branch information
eshwarprasadS committed Jan 16, 2025
1 parent e478be3 commit 2b999bf
Showing 1 changed file with 4 additions and 21 deletions.
25 changes: 4 additions & 21 deletions src/instructlab/sdg/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,27 +205,10 @@ def _generate_single(self, dataset) -> Dataset:
else:
# Split the dataset into batches
input_splits = self._split_dataset(dataset)
output_splits = []
with ThreadPoolExecutor(max_workers=self.ctx.batch_num_workers) as executor:
futures = [
executor.submit(block.generate, input_split)
for input_split in input_splits
]

# Collect the results of each batch
for future in futures:
try:
ds = future.result()
output_splits.append(ds)
except Exception as err:
logger.error("Error in block %s: %s", block_name, err)
raise PipelineBlockError(
exception=err,
block=block,
block_name=block_name,
block_type=block_type,
) from err

# Process each batch in sequence
output_splits = [
block.generate(input_split) for input_split in input_splits
]
# Combine the processed splits back into a single dataset
dataset = concatenate_datasets(output_splits)

Expand Down

0 comments on commit 2b999bf

Please sign in to comment.