Skip to content

Commit

Permalink
Bugfix/ingest pipeline check (#3303)
Browse files Browse the repository at this point in the history
### Description
Using a `isinstance` on the destination registry mapping breaks when
inheritance is used for the associated uploader types. This adds a
connector type field to all uploaders so that the entry can be
deterministically fetched when running check for associated stager in
pipeline.
  • Loading branch information
rbiseck3 committed Jun 27, 2024
1 parent 087adb2 commit 137b149
Show file tree
Hide file tree
Showing 16 changed files with 25 additions and 19 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.14.9-dev7
## 0.14.9-dev8

### Enhancements

Expand Down
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.9-dev7" # pragma: no cover
__version__ = "0.14.9-dev8" # pragma: no cover
1 change: 1 addition & 0 deletions unstructured/ingest/v2/interfaces/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class UploadContent:
@dataclass
class Uploader(BaseProcess, BaseConnector, ABC):
upload_config: UploaderConfigT
connector_type: str

def is_async(self) -> bool:
return False
Expand Down
13 changes: 2 additions & 11 deletions unstructured/ingest/v2/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,8 @@ def check_destination_connector(self):
# Make sure that if the set destination connector expects a stager, one is also set
if not self.uploader_step:
return
matching_registry_entry = [
v
for v in destination_registry.values()
if isinstance(self.uploader_step.process, v.uploader)
]
if len(matching_registry_entry) > 1:
raise ValueError(
f"More than one entry found in destination registry "
f"for uploader type: {self.uploader_step.process}"
)
registry_entry = matching_registry_entry[0]
uploader_connector_type = self.uploader_step.process.connector_type
registry_entry = destination_registry[uploader_connector_type]
if registry_entry.upload_stager and self.stager_step is None:
raise ValueError(
f"pipeline with uploader type {self.uploader_step.process.__class__.__name__} "
Expand Down
1 change: 1 addition & 0 deletions unstructured/ingest/v2/processes/connectors/astra.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class AstraUploaderConfig(UploaderConfig):
class AstraUploader(Uploader):
connection_config: AstraConnectionConfig
upload_config: AstraUploaderConfig
connector_type: str = CONNECTOR_TYPE

@requires_dependencies(["astrapy"], extras="astra")
def get_collection(self) -> "AstraDBCollection":
Expand Down
1 change: 1 addition & 0 deletions unstructured/ingest/v2/processes/connectors/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class ChromaUploaderConfig(UploaderConfig):

@dataclass
class ChromaUploader(Uploader):
connector_type: str = CONNECTOR_TYPE
upload_config: ChromaUploaderConfig
connection_config: ChromaConnectionConfig
client: Optional["Client"] = field(init=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ class ElasticsearchUploaderConfig(UploaderConfig):

@dataclass
class ElasticsearchUploader(Uploader):
connector_type: str = CONNECTOR_TYPE
upload_config: ElasticsearchUploaderConfig
connection_config: ElasticsearchConnectionConfig

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class AzureUploaderConfig(FsspecUploaderConfig):

@dataclass
class AzureUploader(FsspecUploader):
connector_type: str = CONNECTOR_TYPE
connection_config: AzureConnectionConfig
upload_config: AzureUploaderConfig = field(default=None)

Expand Down
7 changes: 5 additions & 2 deletions unstructured/ingest/v2/processes/connectors/fsspec/box.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class BoxUploaderConfig(FsspecUploaderConfig):


@dataclass
class BoxUpload(FsspecUploader):
class BoxUploader(FsspecUploader):
connector_type: str = CONNECTOR_TYPE
connection_config: BoxConnectionConfig
upload_config: BoxUploaderConfig = field(default=None)

Expand Down Expand Up @@ -131,6 +132,8 @@ async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> Non
add_destination_entry(
destination_type=CONNECTOR_TYPE,
entry=DestinationRegistryEntry(
uploader=BoxUpload, uploader_config=BoxUploaderConfig, connection_config=BoxConnectionConfig
uploader=BoxUploader,
uploader_config=BoxUploaderConfig,
connection_config=BoxConnectionConfig,
),
)
5 changes: 3 additions & 2 deletions unstructured/ingest/v2/processes/connectors/fsspec/dropbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class DropboxUploaderConfig(FsspecUploaderConfig):


@dataclass
class DropboxUpload(FsspecUploader):
class DropboxUploader(FsspecUploader):
connector_type: str = CONNECTOR_TYPE
connection_config: DropboxConnectionConfig
upload_config: DropboxUploaderConfig = field(default=None)

Expand Down Expand Up @@ -130,7 +131,7 @@ async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> Non
add_destination_entry(
destination_type=CONNECTOR_TYPE,
entry=DestinationRegistryEntry(
uploader=DropboxUpload,
uploader=DropboxUploader,
uploader_config=DropboxUploaderConfig,
connection_config=DropboxConnectionConfig,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ class FsspecUploaderConfig(FileConfig, UploaderConfig):

@dataclass
class FsspecUploader(Uploader):
connector_type: str = CONNECTOR_TYPE
upload_config: FsspecUploaderConfigT = field(default=None)

@property
Expand Down
1 change: 1 addition & 0 deletions unstructured/ingest/v2/processes/connectors/fsspec/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class GcsUploaderConfig(FsspecUploaderConfig):

@dataclass
class GcsUploader(FsspecUploader):
connector_type: str = CONNECTOR_TYPE
connection_config: GcsConnectionConfig
upload_config: GcsUploaderConfig = field(default=None)

Expand Down
5 changes: 3 additions & 2 deletions unstructured/ingest/v2/processes/connectors/fsspec/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class S3UploaderConfig(FsspecUploaderConfig):


@dataclass
class S3Upload(FsspecUploader):
class S3Uploader(FsspecUploader):
connector_type: str = CONNECTOR_TYPE
connection_config: S3ConnectionConfig
upload_config: S3UploaderConfig = field(default=None)

Expand Down Expand Up @@ -156,7 +157,7 @@ async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> Non
add_destination_entry(
destination_type=CONNECTOR_TYPE,
entry=DestinationRegistryEntry(
uploader=S3Upload,
uploader=S3Uploader,
uploader_config=S3UploaderConfig,
connection_config=S3ConnectionConfig,
),
Expand Down
1 change: 1 addition & 0 deletions unstructured/ingest/v2/processes/connectors/fsspec/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class SftpUploaderConfig(FsspecUploaderConfig):

@dataclass
class SftpUploader(FsspecUploader):
connector_type: str = CONNECTOR_TYPE
connection_config: SftpConnectionConfig
upload_config: SftpUploaderConfig = field(default=None)

Expand Down
1 change: 1 addition & 0 deletions unstructured/ingest/v2/processes/connectors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __post_init__(self):

@dataclass
class LocalUploader(Uploader):
connector_type: str = CONNECTOR_TYPE
upload_config: LocalUploaderConfig = field(default_factory=lambda: LocalUploaderConfig())
connection_config: LocalConnectionConfig = field(
default_factory=lambda: LocalConnectionConfig()
Expand Down
1 change: 1 addition & 0 deletions unstructured/ingest/v2/processes/connectors/weaviate.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class WeaviateUploaderConfig(UploaderConfig):

@dataclass
class WeaviateUploader(Uploader):
connector_type: str = CONNECTOR_TYPE
upload_config: WeaviateUploaderConfig
connection_config: WeaviateConnectionConfig
client: Optional["Client"] = field(init=False)
Expand Down

0 comments on commit 137b149

Please sign in to comment.