Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified file source stage #1184

Open
wants to merge 20 commits into
base: branch-23.11
Choose a base branch
from

Conversation

bsuryadevara
Copy link
Contributor

@bsuryadevara bsuryadevara commented Sep 11, 2023

  • Added unified file source stage

Notes:

  • FileSource stage supports only with use_cpp=False, as directory watcher implementation doesn't exists in C++
  • With this stage addition, what happens to MultiFileSource (It returns fsspec.Openfiles instead of dataframes)?
  • Current FileSourceStage support C++ implementation. Is it going to stay?

Description

Closes #976

@bsuryadevara bsuryadevara requested a review from a team as a code owner September 11, 2023 13:18
@bsuryadevara bsuryadevara marked this pull request as draft September 11, 2023 13:19
@bsuryadevara bsuryadevara self-assigned this Sep 11, 2023
@bsuryadevara bsuryadevara added non-breaking Non-breaking change feature request New feature or request 2 - In Progress labels Sep 11, 2023
@bsuryadevara bsuryadevara removed the feature request New feature or request label Sep 11, 2023
@bsuryadevara bsuryadevara added the feature request New feature or request label Sep 11, 2023
@bsuryadevara bsuryadevara marked this pull request as ready for review September 12, 2023 01:40
…ltifilesource-and-directorywatcher-functionality
morpheus/stages/input/file_source.py Outdated Show resolved Hide resolved
morpheus/stages/input/file_source.py Outdated Show resolved Hide resolved
morpheus/stages/input/file_source.py Show resolved Hide resolved
morpheus/stages/input/file_source.py Outdated Show resolved Hide resolved
morpheus/stages/input/file_source.py Outdated Show resolved Hide resolved
morpheus/stages/input/file_source.py Outdated Show resolved Hide resolved
tests/test_file_source.py Outdated Show resolved Hide resolved
tests/test_file_source.py Outdated Show resolved Hide resolved
tests/test_file_source.py Outdated Show resolved Hide resolved
tests/test_file_source.py Outdated Show resolved Hide resolved
…ltifilesource-and-directorywatcher-functionality
docker/conda/environments/cuda11.8_dev.yml Show resolved Hide resolved

if len(self._protocols) > 1:
raise ValueError("Accepts same protocol input files, but it received multiple protocols.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: It appears that == 0 and < -1 are invalid values for max_files.

Important: Check that max_files is in a valid range (if you decide to keep -1 as the default, adjust accordingly).

Suggested change
if max_files and max_files <= 0:
raise ValueError(...)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising an error if self._files is None or []. We will get at least one value in the self._protocols, so i didn't put an extra check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but what about max_files? If max_files == 0 or max_files < -1, then this stage won't produce any files. In that case we should either warn or raise an exception.

Copy link
Contributor Author

@bsuryadevara bsuryadevara Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max_files flag takes effect only when set to a value greater than zero; otherwise, it is treated as continuous polling without any imposed limit. Default value is -1, so I thought raising an error or warn would not needed. Let me know if you still want to add the warning message.

morpheus/stages/input/file_source.py Outdated Show resolved Hide resolved
filtered_files = sorted(filtered_files, key=lambda f: f.full_name)

if self._max_files > 0:
filtered_files = filtered_files[:self._max_files - processed_files_count]
Copy link
Contributor

@cwharris cwharris Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: If processed_files_count > self._max_files we get filtered_files[:n] where n < 0, meaning we'll take the last n files, which doesn't sound like what we want.

Important: make sure we don't accidentally read from the end of the list of filtered_files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see we won't get a negative number because processed_files_count is calculated based on _max_files. My bad. No change needed.

morpheus/stages/input/file_source.py Show resolved Hide resolved
Comment on lines 219 to 221
if self._max_files > 0 and self._max_files <= processed_files_count:
logger.debug("Maximum file limit reached. Exiting polling service...")
self._stop_requested = True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Oh, I see. This is how we are stopping the source when we reach the max file limit. This is fine, but in general cancellation tokens are reserved for flagging from outside of the function that checks them. I think we can move this logic up in to the previous if self._max_files > 0 condition and use break or return there rather than flagging the cancellation token. Up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated as suggested. I tried to avoid break and yield (multiple times), which is the reason i choosed this approach.

bsuryadevara and others added 4 commits September 13, 2023 16:18
…irectorywatcher-functionality' of github.com:bsuryadevara/Morpheus into 976-fea-unify-the-filesourcestage-multifilesource-and-directorywatcher-functionality
Comment on lines +201 to +204
if self._max_files <= processed_files_count:
logger.debug("Maximum file limit reached. Exiting polling service...")
yield fsspec.core.OpenFiles(filtered_files, fs=files.fs)
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I'm seeing the break in this location, I think having it down below line 206 (as you had it before I made my suggestion to change it) makes more sense because we won't need the two yields. That's my bad. I don't think it's necessary to change, I just wanted to point out your first solution was less repetitive than my suggestion. Up to you!

Comment on lines 139 to 142
if self._watch:
generator_function = self._polling_generate_frames_fsspec
else:
if self._watch:
generator_function = self._polling_generate_frames_fsspec
else:
generator_function = self._generate_frames_fsspec
generator_function = self._generate_frames_fsspec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Nice! This looks cleaner now that DirectoryWatcher is gone!


if len(self._protocols) > 1:
raise ValueError("Accepts same protocol input files, but it received multiple protocols.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but what about max_files? If max_files == 0 or max_files < -1, then this stage won't produce any files. In that case we should either warn or raise an exception.

curr_time = time.monotonic()
next_update_epoch = curr_time

while (True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't knowingly witnessed a clean shutdown of a Morpheus pipeline through the use of ctrl+c. I should check to see if that's even possible (it should be, but might have limits).

watch_interval: float = 1.0,
sort: bool = False,
file_type: FileTypes = FileTypes.Auto,
parser_kwargs: dict = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remark: Adding this argument makes me uneasy, since it will be difficult to deprecate in the future if necessary.

Question: Is this being added as a new feature, or is this something that existed on any of the other file source implementations?

…ltifilesource-and-directorywatcher-functionality
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request non-breaking Non-breaking change
Projects
Status: Review - Changes Requested
Development

Successfully merging this pull request may close these issues.

[FEA]: Unify the FileSourceStage, MultiFileSource and DirectoryWatcher functionality
3 participants