Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Newer dask/distributed versions take very long to start computation. #2

Open
jbusecke opened this issue Mar 2, 2021 · 14 comments
Open

Comments

@jbusecke
Copy link
Collaborator

jbusecke commented Mar 2, 2021

Hi everyone,

here is a relatively recent issue that puzzles me (and prevents me from upgrading to the latest dask/distributed versions).

For large computations, it can take very long until a any computation is "started", as judged from nothing happening in the task stream/ProgressBar(for threaded scheduler).

This example (which mimics part of my typical workload with high-resolution ocean model output) for example, has been showing nothing for several minutes now (It has been about 8-10 minutes at the point of writing this).

# Lets first create a simple dummy dataset
import dask.array as dsa
import xarray as xr
nt = 3600

so = xr.DataArray(dsa.random.random(size=(1440, 1080, 35, nt), chunks=(1440, 270, 35, 1)), dims=['x', 'y', 'z', 'time'])
thetao = xr.DataArray(dsa.random.random(size=(1440, 1080, 35, nt), chunks=(1440, 270, 35, 1)), dims=['x', 'y', 'z', 'time'])
a = xr.DataArray(dsa.random.random(size=(1440, 1080, 35, nt), chunks=(1440, 270, 35, 1)), dims=['x', 'y', 'z', 'time'])
b = xr.DataArray(dsa.random.random(size=(1440, 1080, 35, nt), chunks=(1440, 270, 35, 1)), dims=['x', 'y', 'z', 'time'])

ds = xr.Dataset({'so':so, 'thetao':thetao, 'a':a, 'b':b})

# layer a simple xarray.apply_ufunc
def simple_func(salt, temp):
    return salt+temp

ds['simple'] = xr.apply_ufunc(simple_func, ds.so, ds.thetao, dask="parallelized") # This also blows out the memory

ds

image

Then I set up an adaptive dask gateway cluster

from dask_gateway import GatewayCluster

cluster = GatewayCluster()
cluster.adapt(minimum=6, maximum=20) 
client = cluster.get_client()

I am then trying to write this to the pangeo scratch bucket

# Set up scratch bucket path
import os
PANGEO_SCRATCH = os.environ['PANGEO_SCRATCH']
# -> gs://pangeo-scratch/<username>
import fsspec
mapper = fsspec.get_mapper(f'{PANGEO_SCRATCH}/test/test.zarr')
# mapper can now be to read / write zarr stores

ds.to_zarr(mapper)

I am running this on the pangeo google deployment with a "Large" server.

My versions are:dask:2021.01.1 distributed:2021.01.1

I realize that these datasets are quite large, but they are by no means unrealistic for modern climate/earth system models.

NOTE: I just canceled my first run of this example after ~30 min.

I originally noticed this behavior in one of my research projects when I upgraded from 2020.12.0 version to the latest release (I believe 2021.02.x), and it led me to manually downgrade to get my workflow running since nothing would happen even after I waited for 30+ minutes.

This particular workflow is on a university HPC cluster, and hard to reproduce here, but I tested it with both versions there and was able to improve the behavior drastically with a downgrade to 2020.12.0. Unfortunately it is a bit more tricky to change the dask version on the pangeo deployment and the kubernetes dask workers.

  1. Do you have a hunch as to what changes in the recent version could have exacerbated this issue?
  2. On a more general level, is there a way to monitor 'progress' on what the scheduler does (I assume that it is wrangling with this huge graph?)?
@jbusecke
Copy link
Collaborator Author

jbusecke commented Mar 2, 2021

I just ran the example again and the cluster started showing activity fairly quickly, so I am not entirely sure the above example is the best to expose this behavior, but perhaps you still have an idea what could be causing this?

@jrbourbeau
Copy link

Thanks for the nice example snippet @jbusecke!

Do you have a hunch as to what changes in the recent version could have exacerbated this issue?

Nothing immediately pops out to me. But there has been a lot of recent work on HighLevelGraphs, transmitting graphs to the scheduler, etc. which might have impacted your workflow.

I tried the example locally on my laptop with two changes:

  • Used a LocalCluster instead of GatewayCluster
  • Used a local filesystem fsspec mapper (i.e. mapper = fsspec.get_mapper('test.zarr')) instead of one which uses gcsfs

using dask and distributed 2021.02.0 to see if I could attempt to reproduce this long time computation start time. However, locally I saw the task stream become active ~30 seconds after I kicked off the compute with ds.to_zarr(mapper). This makes me think that the slow down isn't due purely to the scheduler processing a large graph.

A couple of things immediately come to mind:

  • It could be that constructing the graph itself is slow (e.g. when using a non-local file system mapper)
  • It could be that we're network-bound and transmitting the graph from the Client to the scheduler is a bottleneck
  • ...

To test whether or not transmitting the graph to the scheduler is a large issue, could you try turning off low-level task fusion? Instead of

ds.to_zarr(mapper)

do

with dask.config.set({"optimization.fuse.active": False}):
    ds.to_zarr(mapper)

with should hopefully result in a much smaller graph getting sent over the wire to the scheduler.

Additionally, I see there have been recent releases of fsspec and gcsfs. Just for completeness, what versions of these packages are you using?

@jrbourbeau
Copy link

Stepping back a bit, I suspect that there will be times in the future when you encounter issues when running on pangeo resources and it will be useful for others to try and reproduce them. Does Pangeo have any publicly accessible resources we could use to try and reproduce the issues you run into? I know there's Pangeo cloud and Pangeo's binderhub, but I don't have a good sense for if these are appropriate for this use case

@jrbourbeau
Copy link

Also cc @ian-r-rose

@jbusecke
Copy link
Collaborator Author

jbusecke commented Mar 2, 2021

Stepping back a bit, I suspect that there will be times in the future when you encounter issues when running on pangeo resources and it will be useful for others to try and reproduce them. Does Pangeo have any publicly accessible resources we could use to try and reproduce the issues you run into? I know there's Pangeo cloud and Pangeo's binderhub, but I don't have a good sense for if these are appropriate for this use case

I did run these on the Pangeo Cloud. It only requires a sign up. This would be a good place for all of us to be able to have the same playing field?

@jbusecke
Copy link
Collaborator Author

jbusecke commented Mar 2, 2021

Thank you very much for the suggestions. Will try them now.

@jrbourbeau
Copy link

I did run these on the Pangeo Cloud. It only requires a sign up.

Great! I'll sign up now. Time to dust of my old ORCID...

@jbusecke
Copy link
Collaborator Author

jbusecke commented Mar 2, 2021

Using

with dask.config.set({"optimization.fuse.active": False}):
    ds.to_zarr(mapper)

indeed cut the wait time down from ~4 min to less than 1 min! Ill try to check that in my full blown workflow to see if this has a similar effect.

@rabernat
Copy link
Contributor

rabernat commented Mar 2, 2021

I'm curious about the tradeoffs of bypassing optimizations. They might make the computation start faster...but will it run slower?

@jbusecke
Copy link
Collaborator Author

jbusecke commented Mar 2, 2021

I didnt run them to completion, but will now 😁

@jrbourbeau
Copy link

They might make the computation start faster...but will it run slower?

This is a great question to ask! In general things will be slower. Specifically, here are all the array optimizations that are skipped when "optimization.fuse.active" is turned off. Exactly how much slower things are depends on the particular computation -- though I suspect the last optimization, optimize_slices, is particularly useful for common Xarray workloads.

Either moving these optimizations to be at the HighLevelGraph level (similar to the cull optimization here), or removing the need for a particular optimization altogether with improvements in the distributed scheduler, are part of the ongoing scheduler performance improvement effort (Matt gave a recent talk on this topic and here's a blog post which outlines the main parts of these efforts). Ultimately we want to remove the need for the "optimization.fuse.active" config option, but we're not there yet.

I was mostly interested in turning off "optimization.fuse.active" to get a sense for how much of a bottleneck graph transmission from the client to the scheduler is or isn't.

@mrocklin
Copy link

mrocklin commented Mar 15, 2021 via email

@mrocklin
Copy link

I'm curious, does this problem still persist when turning off fusion? If there is something else going on here then I'd like to get to the bottom of it. If not then I would encourage this group to start operating without fusion (I think that you'll be ok) and we can work towards making that the default on our end.

@jbusecke
Copy link
Collaborator Author

I havent had time to get back to those test cases yet. In other workflows I have not really noticed this anymore, but Ill try to confirm soonish (backed up by paper revisions this/next week).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants