Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to correct close all submited tasks? #16848

Closed
tonal opened this issue Jan 24, 2025 · 4 comments · Fixed by #16969
Closed

How to correct close all submited tasks? #16848

tonal opened this issue Jan 24, 2025 · 4 comments · Fixed by #16969
Labels
bug Something isn't working

Comments

@tonal
Copy link

tonal commented Jan 24, 2025

Bug summary

How can I cancel tasks that have not yet been completed and can no longer be completed due to exceptions that have occurred in dependent tasks?

# -*- coding: utf-8 -*-
from asyncio import run, sleep

from prefect import flow, task
from prefect.futures import wait


@task
async def task_ok(i:int):
  await sleep(5)
  print('task', i)


@task
async def task_err(i:int):
  await sleep(3)
  raise Exception(f'error {i}')


@flow(log_prints=True)
async def all_run():
  t1 = task_ok.submit(1)
  t2 = task_err.submit(2)
  t3 = task_ok.submit(3, wait_for=[t2])

  wait([t1, t2, t3])


async def amain():
  await all_run()


if __name__ == '__main__':
  run(amain())

Output:

14:00:58.568 | DEBUG   | prefect.profiles - Using profile 'ephemeral'
14:01:03.822 | INFO    | prefect - Starting temporary server on http://127.0.0.1:8909
See https://docs.prefect.io/3.0/manage/self-host#self-host-a-prefect-server for more information on running a dedicated Prefect server.
14:01:17.305 | DEBUG   | prefect.profiles - Using profile 'ephemeral'
14:01:19.457 | DEBUG   | prefect.client - Connecting to API at http://127.0.0.1:8909/api/
14:01:19.636 | DEBUG   | prefect.utilities.urls - No URL found for the Prefect UI, and no default base path provided.
14:01:19.716 | DEBUG   | prefect.task_runner.threadpool - Starting task runner
14:01:19.727 | INFO    | Flow run 'ultramarine-malamute' - Beginning flow run 'ultramarine-malamute' for flow 'all-run'
14:01:19.733 | DEBUG   | prefect.utilities.urls - No URL found for the Prefect UI, and no default base path provided.
14:01:19.733 | DEBUG   | Flow run 'ultramarine-malamute' - Executing flow 'all-run' for flow run 'ultramarine-malamute'...
14:01:19.750 | DEBUG   | Flow run 'ultramarine-malamute' - Submitting task task_ok to thread pool executor...
14:01:19.752 | DEBUG   | Flow run 'ultramarine-malamute' - Submitting task task_err to thread pool executor...
14:01:19.757 | DEBUG   | Flow run 'ultramarine-malamute' - Submitting task task_ok to thread pool executor...
14:01:19.816 | DEBUG   | prefect.client - Connecting to API at http://127.0.0.1:8909/api/
14:01:20.042 | DEBUG   | prefect.client - Connecting to API at http://127.0.0.1:8909/api/
14:01:20.052 | DEBUG   | prefect.client - Connecting to API at http://127.0.0.1:8909/api/
14:01:20.075 | DEBUG   | prefect.client - Connecting to API at http://127.0.0.1:8909/api/
14:01:20.108 | DEBUG   | Task run 'task_err-afa' - Created task run 'task_err-afa' for task 'task_err'
14:01:20.109 | DEBUG   | Task run 'task_ok-f2f' - Created task run 'task_ok-f2f' for task 'task_ok'
14:01:20.109 | DEBUG   | Task run 'task_ok-fcc' - Created task run 'task_ok-fcc' for task 'task_ok'
14:01:20.113 | DEBUG   | Task run 'task_err-afa' - Executing task 'task_err' for task run 'task_err-afa'...
14:01:20.116 | DEBUG   | Task run 'task_ok-f2f' - Executing task 'task_ok' for task run 'task_ok-f2f'...
14:01:20.170 | DEBUG   | prefect.client - Connecting to API at http://127.0.0.1:8909/api/
14:01:23.130 | ERROR   | Task run 'task_err-afa' - Task run failed with exception: Exception('error 2') - Retries are exhausted
Traceback (most recent call last):
  File "/home/tonal/lang/projects/promsoft/epool/props_service/venv/lib/python3.12/site-packages/prefect/task_engine.py", line 1337, in run_context
    yield self
  File "/home/tonal/lang/projects/promsoft/epool/props_service/venv/lib/python3.12/site-packages/prefect/task_engine.py", line 1414, in run_task_async
    await engine.call_task_fn(txn)
  File "/home/tonal/lang/projects/promsoft/epool/props_service/venv/lib/python3.12/site-packages/prefect/task_engine.py", line 1360, in call_task_fn
    result = await call_with_parameters(self.task.fn, parameters)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/tonal/lang/projects/promsoft/epool/props_service/flow4exept.py", line 18, in task_err
    raise Exception(f'error {i}')
Exception: error 2
14:01:23.150 | ERROR   | Task run 'task_err-afa' - Finished in state Failed('Task run encountered an exception Exception: error 2')
14:01:23.161 | ERROR   | Task run 'task_ok-fcc' - Finished in state NotReady("Upstream task run '432d6d28-7599-4e60-8356-7c866ff6d330' did not reach a 'COMPLETED' state.", type=PENDING)
Please wait for all submitted tasks to complete before exiting your flow by calling `.wait()` on the `PrefectFuture` returned from your `.submit()` calls.

Example:

from prefect import flow, task

@task
def say_hello(name):
    print(f"Hello, {name}!")

@flow
def example_flow():
    future = say_hello.submit(name="Marvin")
    future.wait()

example_flow()

14:01:25.132 | INFO    | Task run 'task_ok-f2f' - task 1
14:01:25.141 | INFO    | Task run 'task_ok-f2f' - Finished in state Completed()
14:01:25.263 | DEBUG   | prefect.task_runner.threadpool - Set cancel event
14:01:25.264 | DEBUG   | prefect.task_runner.threadpool - Set cancel event
14:01:25.265 | DEBUG   | prefect.task_runner.threadpool - Set cancel event
14:01:25.266 | DEBUG   | prefect.task_runner.threadpool - Stopping task runner
14:01:25.267 | INFO    | Flow run 'ultramarine-malamute' - Finished in state Completed()
14:01:25.302 | INFO    | prefect - Stopping temporary server on http://127.0.0.1:8909

Version info

Version:             3.1.14
API version:         0.8.4
Python version:      3.12.8
Git commit:          5f1ebb57
Built:               Thu, Jan 23, 2025 1:22 PM
OS/Arch:             linux/x86_64
Profile:             ephemeral
Server type:         ephemeral
Pydantic version:    2.10.6
Server:
  Database:          sqlite
  SQLite version:    3.45.1
Integrations:
  prefect-aws:       0.5.3

Additional context

No response

@tonal tonal added the bug Something isn't working label Jan 24, 2025
@cicdw
Copy link
Member

cicdw commented Jan 24, 2025

Hi @tonal - it seems that Prefect did what you expect, as t3 was prevented from running because of the error that occurred in t2; maybe the error message is a bit misleading though because you did properly wait on all of the tasks.

Let me know if I'm misunderstanding your issue though!

@tonal
Copy link
Author

tonal commented Jan 26, 2025

Yes, the behavior is exactly as expected.
But the error message is misleading.
It would be nice to change it so that what happened is more obvious.
Or at least display the name of the task in the "NotReady" state in the message.

@gavrie
Copy link

gavrie commented Feb 3, 2025

@cicdw I encountered the same issue, and wasted many hours trying to understand what I did wrong. I even asked Marvin on Slack, who didn't know either.

I'd suggest not printing the Please wait for all submitted tasks to complete message in case an exception was raised, since it looks like the user's code is incorrect even when it is.

Even beyond that, it seems to me that that explicitly waiting for t3 in the original example is superfluous, since t2 already waits for it. Is that right?

@cicdw
Copy link
Member

cicdw commented Feb 4, 2025

Should be fixed in the next release assuming #16969 passes tests and review!

And yup @gavrie you are correct that technically t3 is the only future that needs waiting, although it doesn't hurt to include the others

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants