Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up parallel from_group_dataframe #2645

Open
tRosenflanz opened this issue Jan 19, 2025 · 0 comments
Open

Speed up parallel from_group_dataframe #2645

tRosenflanz opened this issue Jan 19, 2025 · 0 comments
Labels
triage Issue waiting for triaging

Comments

@tRosenflanz
Copy link
Contributor

tRosenflanz commented Jan 19, 2025

Is your feature request related to a current problem? Please describe.
Parallel timeseries.from_group_dataframe is currently passing sub_df around per group which can be slow when there are lot of groups to process due to the parallelization overheads.

Describe proposed solution
Instead of processing each group individually, split the initial dataframe into n_jobs chunks and process each of those chunks sequentially (i.e with n_jobs=1). This way each worker get many groups at once and each of them can process a large number of groups.

Describe potential alternatives
A mix of the two approaches can work as well

Additional context
Stub of logic to compare the results:

baseline call:

ts.TimeSeries.from_group_dataframe(
                data_df,
                group_cols=grouper,
                value_cols=val,
                time_col="date",
                n_jobs=-1,
            )

a (potentially) significantly faster implementation to compare

def process_group(data_df):
    return ts.TimeSeries.from_group_dataframe(
        data_df,
        group_cols=grouper,
        value_cols=val,
        time_col="date",
        n_jobs=1,
    )

n_chonks = cpu_count()
sub_df = df[grouper].drop_duplicates()
#make a list of dataframes that correspond to each group indexes
list_df = np.array_split(sub_df, n_chonks)
jobs = []
for chunk in list_df:
    #create a sub chunk of the original dataframe using the group index
    chunk_df = df.merge(chunk)
    job = delayed(process_group)(chunk_df) 
    jobs.append(job)
retLst = Parallel(n_jobs=-1)(jobs)
covariates[key] = sum(retLst, start=[])

On my dataset the latter code is about 4x faster for a dataframe with 30k groups.

I am not certain this is worth putting into the library but thought it might be worth looking into

@tRosenflanz tRosenflanz added the triage Issue waiting for triaging label Jan 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
triage Issue waiting for triaging
Projects
None yet
Development

No branches or pull requests

1 participant