Skip to content

Commit

Permalink
rfctr: Implement SQL V2 Dest Connector (#3323)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangheem committed Jul 5, 2024
1 parent 6e4d9cc commit 1ce01c3
Show file tree
Hide file tree
Showing 7 changed files with 449 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.14.10-dev9
## 0.14.10-dev10

### Enhancements

Expand Down
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.10-dev9" # pragma: no cover
__version__ = "0.14.10-dev10" # pragma: no cover
2 changes: 2 additions & 0 deletions unstructured/ingest/v2/cli/cmds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .opensearch import opensearch_dest_cmd, opensearch_src_cmd
from .pinecone import pinecone_dest_cmd
from .singlestore import singlestore_dest_cmd
from .sql import sql_dest_cmd
from .weaviate import weaviate_dest_cmd

src_cmds = [
Expand Down Expand Up @@ -61,6 +62,7 @@
weaviate_dest_cmd,
mongodb_dest_cmd,
databricks_volumes_dest_cmd,
sql_dest_cmd,
]

duplicate_dest_names = [
Expand Down
84 changes: 84 additions & 0 deletions unstructured/ingest/v2/cli/cmds/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from dataclasses import dataclass

import click

from unstructured.ingest.v2.cli.base import DestCmd
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.processes.connectors.sql import CONNECTOR_TYPE

SQL_DRIVERS = {"postgresql", "sqlite"}


@dataclass
class SQLCliConnectionConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--db-type"],
required=True,
type=click.Choice(SQL_DRIVERS),
help="Type of the database backend",
),
click.Option(
["--username"],
default=None,
type=str,
help="DB username",
),
click.Option(
["--password"],
default=None,
type=str,
help="DB password",
),
click.Option(
["--host"],
default=None,
type=str,
help="DB host",
),
click.Option(
["--port"],
default=None,
type=int,
help="DB host connection port",
),
click.Option(
["--database"],
default=None,
type=str,
help="Database name. For sqlite databases, this is the path to the .db file.",
),
]
return options


@dataclass
class SQLCliUploaderConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--batch-size"],
default=100,
type=int,
help="Number of records per batch",
)
]
return options


@dataclass
class SQLCliUploadStagerConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
return []


sql_dest_cmd = DestCmd(
cmd_name=CONNECTOR_TYPE,
connection_config=SQLCliConnectionConfig,
uploader_config=SQLCliUploaderConfig,
upload_stager_config=SQLCliUploadStagerConfig,
)
88 changes: 88 additions & 0 deletions unstructured/ingest/v2/examples/example_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os
import sqlite3
from pathlib import Path

from unstructured.ingest.v2.interfaces import ProcessorConfig
from unstructured.ingest.v2.logger import logger
from unstructured.ingest.v2.pipeline.pipeline import Pipeline
from unstructured.ingest.v2.processes.chunker import ChunkerConfig
from unstructured.ingest.v2.processes.connectors.local import (
LocalConnectionConfig,
LocalDownloaderConfig,
LocalIndexerConfig,
)
from unstructured.ingest.v2.processes.connectors.sql import (
DatabaseType,
SimpleSqlConfig,
SQLAccessConfig,
SQLUploaderConfig,
SQLUploadStagerConfig,
)
from unstructured.ingest.v2.processes.embedder import EmbedderConfig
from unstructured.ingest.v2.processes.partitioner import PartitionerConfig

base_path = Path(__file__).parent.parent.parent.parent.parent
docs_path = base_path / "example-docs"
work_dir = base_path / "tmp_ingest"
output_path = work_dir / "output"
download_path = work_dir / "download"

SQLITE_DB = "test-sql-db.sqlite"

if __name__ == "__main__":
logger.info(f"Writing all content in: {work_dir.resolve()}")

configs = {
"context": ProcessorConfig(work_dir=str(work_dir.resolve())),
"indexer_config": LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"),
"downloader_config": LocalDownloaderConfig(download_dir=download_path),
"source_connection_config": LocalConnectionConfig(),
"partitioner_config": PartitionerConfig(strategy="fast"),
"chunker_config": ChunkerConfig(
chunking_strategy="by_title",
chunk_include_orig_elements=False,
chunk_max_characters=1500,
chunk_multipage_sections=True,
),
"embedder_config": EmbedderConfig(embedding_provider="langchain-huggingface"),
"stager_config": SQLUploadStagerConfig(),
"uploader_config": SQLUploaderConfig(batch_size=10),
}

if os.path.exists(SQLITE_DB):
os.remove(SQLITE_DB)

connection = sqlite3.connect(database=SQLITE_DB)

query = None
script_path = (
Path(__file__).parent.parent.parent.parent.parent
/ Path("scripts/sql-test-helpers/create-sqlite-schema.sql")
).resolve()
with open(script_path) as f:
query = f.read()
cursor = connection.cursor()
cursor.executescript(query)
connection.close()

# sqlite test first
Pipeline.from_configs(
destination_connection_config=SimpleSqlConfig(
db_type=DatabaseType.SQLITE,
database=SQLITE_DB,
access_config=SQLAccessConfig(),
),
**configs,
).run()

# now, pg with pgvector
Pipeline.from_configs(
destination_connection_config=SimpleSqlConfig(
db_type=DatabaseType.POSTGRESQL,
database="elements",
host="localhost",
port=5433,
access_config=SQLAccessConfig(username="unstructured", password="test"),
),
**configs,
).run()
4 changes: 4 additions & 0 deletions unstructured/ingest/v2/processes/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from .onedrive import onedrive_source_entry
from .opensearch import CONNECTOR_TYPE as OPENSEARCH_CONNECTOR_TYPE
from .opensearch import opensearch_destination_entry, opensearch_source_entry
from .sql import CONNECTOR_TYPE as SQL_CONNECTOR_TYPE
from .sql import sql_destination_entry
from .weaviate import CONNECTOR_TYPE as WEAVIATE_CONNECTOR_TYPE
from .weaviate import weaviate_destination_entry

Expand Down Expand Up @@ -51,3 +53,5 @@
add_destination_entry(
destination_type=DATABRICKS_VOLUMES_CONNECTOR_TYPE, entry=databricks_volumes_destination_entry
)

add_destination_entry(destination_type=SQL_CONNECTOR_TYPE, entry=sql_destination_entry)
Loading

0 comments on commit 1ce01c3

Please sign in to comment.