Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-threading like in remfile #78

Open
rly opened this issue May 31, 2024 · 5 comments
Open

Add multi-threading like in remfile #78

rly opened this issue May 31, 2024 · 5 comments

Comments

@rly
Copy link
Contributor

rly commented May 31, 2024

Currently Remfile reads a large number of chunks faster than lindi. I think it is because Remfile uses multi-threading for requests. It would be nice to add that here for reading large numbers of chunks efficiently.

@magland
Copy link
Collaborator

magland commented May 31, 2024

Are you talking about loading multiple zarr/hdf5 chunks in parallel? Or loading a single large zarr/hdf5 chunk more efficiently using multiple threads?

If it's the former:

Right now the slicing is passed on to zarr, which I believe does not do multi-threaded reads. Are you suggesting that we specially handle slicing in a special way in LindiH5pyDataset?

The relevant code would be here

def __getitem__(self, args, new_dtype=None):
if new_dtype is not None:
raise Exception("new_dtype is not supported for zarr.Array")
return self._get_item_for_zarr(self._zarr_array, args)
def _get_item_for_zarr(self, zarr_array: zarr.Array, selection: Any):
# First check whether this is an external array link
external_array_link = zarr_array.attrs.get("_EXTERNAL_ARRAY_LINK", None)
if external_array_link and isinstance(external_array_link, dict):
link_type = external_array_link.get("link_type", None)
if link_type == 'hdf5_dataset':
url = external_array_link.get("url", None)
name = external_array_link.get("name", None)
if url is not None and name is not None:
client = self._get_external_hdf5_client(url)
dataset = client[name]
assert isinstance(dataset, h5py.Dataset)
return dataset[selection]
if self._compound_dtype is not None:
# Compound dtype
# In this case we index into the compound dtype using the name of the field
# For example, if the dtype is [('x', 'f4'), ('y', 'f4')], then we can do
# dataset['x'][0] to get the first x value
assert self._compound_dtype.names is not None
if isinstance(selection, str):
# Find the index of this field in the compound dtype
ind = self._compound_dtype.names.index(selection)
# Get the dtype of this field
dt = self._compound_dtype[ind]
if dt == 'object':
dtype = h5py.Reference
else:
dtype = np.dtype(dt)
# Return a new object that can be sliced further
# It's important that the return type is Any here, because otherwise we get linter problems
ret = LindiH5pyDatasetCompoundFieldSelection(
dataset=self, ind=ind, dtype=dtype
)
return ret
else:
raise TypeError(
f"Compound dataset {self.name} does not support selection with {selection}"
)
# We use zarr's slicing, except in the case of a scalar dataset
if self.ndim == 0:
# make sure selection is ()
if selection != ():
raise TypeError(f'Cannot slice a scalar dataset with {selection}')
# For some reason, with the newest version of zarr (2.18.0) we need to use [:][0] rather than just [0].
# Otherwise we get an error "ValueError: buffer source array is read-only"
return zarr_array[:][0]
return decode_references(zarr_array[selection])

Would we look at the argument being passed in to the slicing in order to see if this could be parallelized, and then spawn multiple threads in this function? I think this would get complicated with all the various slicing possibilities.

If it's the latter:
This could be done, but it's unclear to me how much gain we could achieve. Depends on size of chunks. If ~10 MB, then I don't think there's much to gain using multiple threads - in fact there will probably be a loss of efficiency.

@rly
Copy link
Contributor Author

rly commented Jun 2, 2024

Are you talking about loading multiple zarr/hdf5 chunks in parallel?

Yes.

Right now the slicing is passed on to zarr, which I believe does not do multi-threaded reads. Are you suggesting that we specially handle slicing in a special way in LindiH5pyDataset?
Would we look at the argument being passed in to the slicing in order to see if this could be parallelized, and then spawn multiple threads in this function? I think this would get complicated with all the various slicing possibilities.

I see. Yeah, that would be challenging.

In exploring this, I found a different possible solution:
Zarr can use fsspec's ReferenceFileSystem which is an AsyncFileSystem which seems to support concurrent fetches of chunks: https://filesystem-spec.readthedocs.io/en/latest/async.html

We could defer calls for __getitem__ and getitems to fsspec's ReferenceFileSystem using a zarr FSStore. I think since our RFS spec does not really differ from the original RFS spec, we could also just replace LindiReferenceFileSystemStore with FSStore configured with a ReferenceFileSystem (and move the static methods). I haven't tested that yet, but with the former, I see a speedup of about 1.7X for one particular set of reads. I'll push a draft implementation of this in a branch.

@rly
Copy link
Contributor Author

rly commented Jun 2, 2024

We could also roll our own by overriding getitems in LindiReferenceFileSystemStore which is currently a simple loop that calls __getitem__ for each selection:
https://github.com/zarr-developers/zarr-python/blob/19365e20522e5d67b0b14698a7c6c953c450c5c6/src/zarr/v2/_storage/store.py#L142

@magland
Copy link
Collaborator

magland commented Jun 6, 2024

@rly I think we should try to "roll our own" because lindi takes care of authenticating URLs of embargoed dandisets. Shall I take a crack at that?

@rly
Copy link
Contributor Author

rly commented Jun 8, 2024

Ah, I see. Yeah, that would be great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants