Skip to content

Commit

Permalink
feat/custom ingest stager (#3340)
Browse files Browse the repository at this point in the history
### Description
Allow used to pass in a reference to a custom defined stager via the
CLI. Checks are run on the instance passed in to be a subclass of the
UploadStager interface.
  • Loading branch information
rbiseck3 committed Jul 3, 2024
1 parent f1a2860 commit d86d15c
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 4 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.14.10-dev7
## 0.14.10-dev8

### Enhancements
* **Update unstructured-client dependency** Change unstructured-client dependency pin back to
Expand Down
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.10-dev7" # pragma: no cover
__version__ = "0.14.10-dev8" # pragma: no cover
42 changes: 41 additions & 1 deletion unstructured/ingest/v2/cli/base/cmd.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import inspect
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from dataclasses import dataclass, field, fields
from typing import Any, Optional, Type, TypeVar

import click

from unstructured.ingest.v2.cli.base.importer import import_from_string
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.cli.utils import extract_config
from unstructured.ingest.v2.interfaces import ProcessorConfig
Expand All @@ -14,6 +16,8 @@
DownloaderT,
IndexerT,
UploaderT,
UploadStager,
UploadStagerConfig,
UploadStagerT,
destination_registry,
source_registry,
Expand Down Expand Up @@ -147,8 +151,44 @@ def get_downloader(src: str, options: dict[str, Any]) -> DownloaderT:
downloader_cls = source_entry.downloader
return downloader_cls(**downloader_kwargs)

@staticmethod
def get_custom_stager(
stager_reference: str, stager_config_kwargs: Optional[dict] = None
) -> Optional[UploadStagerT]:
uploader_cls = import_from_string(stager_reference)
if not inspect.isclass(uploader_cls):
raise ValueError(
f"custom stager must be a reference to a python class, got: {type(uploader_cls)}"
)
if not issubclass(uploader_cls, UploadStager):
raise ValueError(
"custom stager must be an implementation of the UploadStager interface"
)
fields_dict = {f.name: f.type for f in fields(uploader_cls)}
upload_stager_config_cls = fields_dict["upload_stager_config"]
if not inspect.isclass(upload_stager_config_cls):
raise ValueError(
f"custom stager config must be a class, got: {type(upload_stager_config_cls)}"
)
if not issubclass(upload_stager_config_cls, UploadStagerConfig):
raise ValueError(
"custom stager config must be an implementation "
"of the UploadStagerUploadStagerConfig interface"
)
upload_stager_kwargs: dict[str, Any] = {}
if stager_config_kwargs:
upload_stager_kwargs["upload_stager_config"] = upload_stager_config_cls(
**stager_config_kwargs
)
return uploader_cls(**upload_stager_kwargs)

@staticmethod
def get_upload_stager(dest: str, options: dict[str, Any]) -> Optional[UploadStagerT]:
if custom_stager := options.get("custom_stager"):
return BaseCmd.get_custom_stager(
stager_reference=custom_stager,
stager_config_kwargs=options.get("custom_stager_config_kwargs"),
)
dest_entry = destination_registry[dest]
upload_stager_kwargs: dict[str, Any] = {}
if upload_stager_config_cls := dest_entry.upload_stager_config:
Expand Down
22 changes: 21 additions & 1 deletion unstructured/ingest/v2/cli/base/dest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from unstructured.ingest.v2.cli.base.cmd import BaseCmd
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.cli.utils import conform_click_options
from unstructured.ingest.v2.cli.utils import Dict, conform_click_options
from unstructured.ingest.v2.logger import logger


Expand Down Expand Up @@ -53,4 +53,24 @@ def get_cmd(self) -> click.Command:
if x
]
self.add_options(cmd, extras=extras)
cmd.params.append(
click.Option(
["--custom-stager"],
required=False,
type=str,
default=None,
help="Pass a pointer to a custom upload stager to use, "
"must be in format '<module>:<attribute>'",
)
)
cmd.params.append(
click.Option(
["--custom-stager-config-kwargs"],
required=False,
type=Dict(),
default=None,
help="Any kwargs to instantiate the configuration "
"associated with the customer stager",
)
)
return cmd
34 changes: 34 additions & 0 deletions unstructured/ingest/v2/cli/base/importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import importlib
from typing import Any


class ImportFromStringError(Exception):
pass


def import_from_string(import_str: Any) -> Any:
if not isinstance(import_str, str):
return import_str

module_str, _, attrs_str = import_str.partition(":")
if not module_str or not attrs_str:
message = 'Import string "{import_str}" must be in format "<module>:<attribute>".'
raise ImportFromStringError(message.format(import_str=import_str))

try:
module = importlib.import_module(module_str)
except ModuleNotFoundError as exc:
if exc.name != module_str:
raise exc from None
message = 'Could not import module "{module_str}".'
raise ImportFromStringError(message.format(module_str=module_str))

instance = module
try:
for attr_str in attrs_str.split("."):
instance = getattr(instance, attr_str)
except AttributeError:
message = 'Attribute "{attrs_str}" not found in module "{module_str}".'
raise ImportFromStringError(message.format(attrs_str=attrs_str, module_str=module_str))

return instance

0 comments on commit d86d15c

Please sign in to comment.