Skip to content

Commit

Permalink
refactor: implement databricks volumes v2 dest connector (#3334)
Browse files Browse the repository at this point in the history
  • Loading branch information
vangheem committed Jul 3, 2024
1 parent 493bfcc commit 6e4d9cc
Show file tree
Hide file tree
Showing 11 changed files with 468 additions and 2 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ jobs:
ASTRA_DB_TOKEN: ${{secrets.ASTRA_DB_TOKEN}}
ASTRA_DB_ENDPOINT: ${{secrets.ASTRA_DB_ENDPOINT}}
CLARIFAI_API_KEY: ${{secrets.CLARIFAI_API_KEY}}
DATABRICKS_HOST: ${{secrets.DATABRICKS_HOST}}
DATABRICKS_USERNAME: ${{secrets.DATABRICKS_USERNAME}}
DATABRICKS_PASSWORD: ${{secrets.DATABRICKS_PASSWORD}}
DATABRICKS_CATALOG: ${{secrets.DATABRICKS_CATALOG}}
TABLE_OCR: "tesseract"
OCR_AGENT: "unstructured.partition.utils.ocr_models.tesseract_ocr.OCRAgentTesseract"
CI: "true"
Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.14.10-dev8
## 0.14.10-dev9

### Enhancements

Expand Down
63 changes: 63 additions & 0 deletions test_unstructured_ingest/dest/databricks-volumes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env bash

set -e

SRC_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$SRC_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=databricks-volumes
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
DOWNLOAD_DIR=$SCRIPT_DIR/download/$OUTPUT_FOLDER_NAME
DESTINATION_PATH=$SCRIPT_DIR/databricks-volumes
CI=${CI:-"false"}

RANDOM_SUFFIX=$((RANDOM % 100000 + 1))

DATABRICKS_VOLUME="test-platform"
DATABRICKS_VOLUME_PATH="databricks-volumes-test-output-$RANDOM_SUFFIX"

# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh

function cleanup() {
python "$SCRIPT_DIR"/python/test-databricks-volumes.py cleanup \
--host "$DATABRICKS_HOST" \
--username "$DATABRICKS_USERNAME" \
--password "$DATABRICKS_PASSWORD" \
--volume "$DATABRICKS_VOLUME" \
--catalog "$DATABRICKS_CATALOG" \
--volume-path "$DATABRICKS_VOLUME_PATH"

cleanup_dir "$DESTINATION_PATH"
cleanup_dir "$OUTPUT_DIR"
cleanup_dir "$WORK_DIR"
if [ "$CI" == "true" ]; then
cleanup_dir "$DOWNLOAD_DIR"
fi
}

trap cleanup EXIT

PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--input-path example-docs/fake-memo.pdf \
--work-dir "$WORK_DIR" \
databricks-volumes \
--host "$DATABRICKS_HOST" \
--username "$DATABRICKS_USERNAME" \
--password "$DATABRICKS_PASSWORD" \
--volume "$DATABRICKS_VOLUME" \
--catalog "$DATABRICKS_CATALOG" \
--volume-path "$DATABRICKS_VOLUME_PATH"

python "$SCRIPT_DIR"/python/test-databricks-volumes.py test \
--host "$DATABRICKS_HOST" \
--username "$DATABRICKS_USERNAME" \
--password "$DATABRICKS_PASSWORD" \
--volume "$DATABRICKS_VOLUME" \
--catalog "$DATABRICKS_CATALOG" \
--volume-path "$DATABRICKS_VOLUME_PATH"
79 changes: 79 additions & 0 deletions test_unstructured_ingest/python/test-databricks-volumes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env python
import json

import click
from databricks.sdk import WorkspaceClient


@click.group()
def cli():
pass


def _get_volume_path(catalog: str, volume: str, volume_path: str):
return f"/Volumes/{catalog}/default/{volume}/{volume_path}"


@cli.command()
@click.option("--host", type=str, required=True)
@click.option("--username", type=str, required=True)
@click.option("--password", type=str, required=True)
@click.option("--catalog", type=str, required=True)
@click.option("--volume", type=str, required=True)
@click.option("--volume-path", type=str, required=True)
def test(
host: str,
username: str,
password: str,
catalog: str,
volume: str,
volume_path: str,
):
client = WorkspaceClient(host=host, username=username, password=password)
files = list(
client.files.list_directory_contents(_get_volume_path(catalog, volume, volume_path))
)

assert len(files) == 1

resp = client.files.download(files[0].path)
data = json.loads(resp.contents.read())

assert len(data) == 5
assert [v["type"] for v in data] == [
"UncategorizedText",
"Title",
"NarrativeText",
"UncategorizedText",
"Title",
]

print("Databricks test passed!")


@cli.command()
@click.option("--host", type=str, required=True)
@click.option("--username", type=str, required=True)
@click.option("--password", type=str, required=True)
@click.option("--catalog", type=str, required=True)
@click.option("--volume", type=str, required=True)
@click.option("--volume-path", type=str, required=True)
def cleanup(
host: str,
username: str,
password: str,
catalog: str,
volume: str,
volume_path: str,
):
client = WorkspaceClient(host=host, username=username, password=password)

for file in client.files.list_directory_contents(
_get_volume_path(catalog, volume, volume_path)
):
client.files.delete(file.path)
client.files.delete_directory(_get_volume_path(catalog, volume, volume_path))


if __name__ == "__main__":
cli()
1 change: 1 addition & 0 deletions test_unstructured_ingest/test-ingest-dest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ all_tests=(
'vectara.sh'
'singlestore.sh'
'weaviate.sh'
'databricks-volumes.sh'
)

full_python_matrix_tests=(
Expand Down
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.10-dev8" # pragma: no cover
__version__ = "0.14.10-dev9" # pragma: no cover
2 changes: 2 additions & 0 deletions unstructured/ingest/v2/cli/cmds/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .astra import astra_dest_cmd
from .chroma import chroma_dest_cmd
from .databricks_volumes import databricks_volumes_dest_cmd
from .elasticsearch import elasticsearch_dest_cmd, elasticsearch_src_cmd
from .fsspec.azure import azure_dest_cmd, azure_src_cmd
from .fsspec.box import box_dest_cmd, box_src_cmd
Expand Down Expand Up @@ -59,6 +60,7 @@
singlestore_dest_cmd,
weaviate_dest_cmd,
mongodb_dest_cmd,
databricks_volumes_dest_cmd,
]

duplicate_dest_names = [
Expand Down
161 changes: 161 additions & 0 deletions unstructured/ingest/v2/cli/cmds/databricks_volumes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from dataclasses import dataclass

import click

from unstructured.ingest.v2.cli.base import DestCmd
from unstructured.ingest.v2.cli.interfaces import CliConfig
from unstructured.ingest.v2.processes.connectors.databricks_volumes import CONNECTOR_TYPE


@dataclass
class DatabricksVolumesCliConnectionConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--host"],
type=str,
default=None,
help="The Databricks host URL for either the "
"Databricks workspace endpoint or the "
"Databricks accounts endpoint.",
),
click.Option(
["--account-id"],
type=str,
default=None,
help="The Databricks account ID for the Databricks "
"accounts endpoint. Only has effect when Host is "
"either https://accounts.cloud.databricks.com/ (AWS), "
"https://accounts.azuredatabricks.net/ (Azure), "
"or https://accounts.gcp.databricks.com/ (GCP).",
),
click.Option(
["--username"],
type=str,
default=None,
help="The Databricks username part of basic authentication. "
"Only possible when Host is *.cloud.databricks.com (AWS).",
),
click.Option(
["--password"],
type=str,
default=None,
help="The Databricks password part of basic authentication. "
"Only possible when Host is *.cloud.databricks.com (AWS).",
),
click.Option(["--client-id"], type=str, default=None),
click.Option(["--client-secret"], type=str, default=None),
click.Option(
["--token"],
type=str,
default=None,
help="The Databricks personal access token (PAT) (AWS, Azure, and GCP) or "
"Azure Active Directory (Azure AD) token (Azure).",
),
click.Option(
["--azure-workspace-resource-id"],
type=str,
default=None,
help="The Azure Resource Manager ID for the Azure Databricks workspace, "
"which is exchanged for a Databricks host URL.",
),
click.Option(
["--azure-client-secret"],
type=str,
default=None,
help="The Azure AD service principal’s client secret.",
),
click.Option(
["--azure-client-id"],
type=str,
default=None,
help="The Azure AD service principal’s application ID.",
),
click.Option(
["--azure-tenant-id"],
type=str,
default=None,
help="The Azure AD service principal’s tenant ID.",
),
click.Option(
["--azure-environment"],
type=str,
default=None,
help="The Azure environment type (such as Public, UsGov, China, and Germany) for a "
"specific set of API endpoints. Defaults to PUBLIC.",
),
click.Option(
["--auth-type"],
type=str,
default=None,
help="When multiple auth attributes are available in the "
"environment, use the auth type specified by this "
"argument. This argument also holds the currently "
"selected auth.",
),
click.Option(["--cluster-id"], type=str, default=None),
click.Option(["--google-credentials"], type=str, default=None),
click.Option(["--google-service-account"], type=str, default=None),
]
return options


@dataclass
class DatabricksVolumesCliUploaderConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
options = [
click.Option(
["--volume"], type=str, required=True, help="Name of volume in the Unity Catalog"
),
click.Option(
["--catalog"],
type=str,
required=True,
help="Name of the catalog in the Databricks Unity Catalog service",
),
click.Option(
["--volume-path"],
type=str,
required=False,
default=None,
help="Optional path within the volume to write to",
),
click.Option(
["--overwrite"],
type=bool,
is_flag=True,
help="If true, an existing file will be overwritten.",
),
click.Option(
["--encoding"],
type=str,
required=True,
default="utf-8",
help="Encoding applied to the data when written to the volume",
),
click.Option(
["--schema"],
type=str,
required=True,
default="default",
help="Schema associated with the volume to write to in the Unity Catalog service",
),
]
return options


@dataclass
class DatabricksVolumesCliUploadStagerConfig(CliConfig):
@staticmethod
def get_cli_options() -> list[click.Option]:
return []


databricks_volumes_dest_cmd = DestCmd(
cmd_name=CONNECTOR_TYPE,
connection_config=DatabricksVolumesCliConnectionConfig,
uploader_config=DatabricksVolumesCliUploaderConfig,
upload_stager_config=DatabricksVolumesCliUploadStagerConfig,
)
Loading

0 comments on commit 6e4d9cc

Please sign in to comment.