Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve pipeline stop logic to ensure join is called exactly once for all stages #1479

Merged
merged 27 commits into from
Feb 15, 2024

Conversation

efajardo-nv
Copy link
Contributor

@efajardo-nv efajardo-nv commented Jan 25, 2024

Description

  1. Removes the _is_built, _is_started and _is_stopped flags and replaces with single member which holds onto the state enum for: INITIALIZED, BUILT, STARTED, STOPPED, COMPLETED
  2. Changes the meaning of stop() and the meaning of join() for stages
    1. stop() called 0 or 1 times. Only way it can get called is if pipeline.stop() was called indicating the pipeline should try to shut down gracefully.
      1. Users should only implement this method if they have a source stage (or sources in their stage)
    2. join() called exactly 1 time. Only called when the pipeline is complete and all stages are shut down. This is where users should implement any cleanup code
  3. Tests for handling all of these scenarios with the pipeline.

Closes #1477

By Submitting this PR I confirm:

  • I am familiar with the Contributing Guidelines.
  • When the PR is ready for review, new or existing tests cover these changes.
  • When the PR is ready for review, the documentation is up to date with these changes.

@efajardo-nv efajardo-nv added bug Something isn't working non-breaking Non-breaking change labels Jan 25, 2024
@efajardo-nv efajardo-nv self-assigned this Jan 25, 2024
Copy link
Contributor

@mdemoret-nv mdemoret-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your PR makes the fixes specified in the issue but looking at the code in here, I think we can do more to improve how the pipeline state is handled. I have the following recommendations:

  1. Convert the _is_built, _is_started and _is_stopped flags into a single member which holds onto the state enum.
    1. An enum works well for the state because the pipeline state should only progress forwards
    2. The enum values should look something like: initialized, built, running, stopping, completed
    3. Changes to the state value should be guarded by a mutex to prevent changing the state from multiple threads
  2. We should move all of the logic in the join() method into _start() after the call to self._mrc_executor.start()
    1. This is necessary because there are functions in join() which need to be called 100% of the time. However, its not required by the user to call pipeline.join(). Its its possible with the current API that stages will never have their stop() and join() method called.
    2. To fix this, we just need to introduce an asyncio.Event into the pipeline
    3. If you follow the example in the docs, we pretty much want to use the same pattern.
      1. After self._mrc_executor.start() is called, create a new task which immediately calls self._mrc_executor.join_async(). This will block the task until the pipeline is complete.
      2. After join_async() we should have all of the same code which is currently in Pipeline.join() to loop over all stages calling join().
      3. Finally, the task should set the pipeline state to Complete and call set() on the event object
      4. All that should remain in Pipeline.join() is calling await self._completion_event.wait() which will block that method from returning until the pipeline finishes.
  3. We should change the meaning of stop() and the meaning of join() for stages
    1. stop() should only get called 0 or 1 times. The only way it should get called is if pipeline.stop() was called indicating the pipeline should try to shut down gracefully.
      1. Users should only implement this method if they have a source stage (or sources in their stage)
    2. join() should get called exactly 1 time. It should get called when the pipeline is complete and all stages are shut down. This is where users should implement any cleanup code
  4. We should store the order that stages were built into a list and use this list when iterating over all the stages in stop() and join()
    1. This is a small change but will guarantee that stages are stopped and joined in the same order they are built.
  5. We should add edge condition tests for handling all of these scenarios with the pipeline.
    1. We have a few checks but more robust tests would be very powerful here.

morpheus/pipeline/pipeline.py Outdated Show resolved Hide resolved
@efajardo-nv efajardo-nv marked this pull request as ready for review February 9, 2024 18:59
@efajardo-nv efajardo-nv requested a review from a team as a code owner February 9, 2024 18:59
Copy link
Contributor

@mdemoret-nv mdemoret-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just missing tests on the join() method. Do the same tests for both normal and out of order uses.

morpheus/pipeline/pipeline.py Outdated Show resolved Hide resolved
morpheus/pipeline/pipeline.py Show resolved Hide resolved
morpheus/pipeline/pipeline.py Show resolved Hide resolved
morpheus/pipeline/pipeline.py Outdated Show resolved Hide resolved
tests/pipeline/test_pipeline_state.py Show resolved Hide resolved
tests/pipeline/test_pipeline_state.py Show resolved Hide resolved
@mdemoret-nv mdemoret-nv changed the title Update pipeline stop logic Improve pipeline stop logic to ensure join is called exactly once for all stages Feb 15, 2024
@mdemoret-nv
Copy link
Contributor

/merge

@rapids-bot rapids-bot bot merged commit 5fd661b into nv-morpheus:branch-24.03 Feb 15, 2024
10 checks passed
@efajardo-nv efajardo-nv deleted the pipeline-stop-fix branch July 29, 2024 21:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working non-breaking Non-breaking change
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

[BUG]: stop() called twice for each stage on Control-C
3 participants