Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing to Databricks delta table using a module #1483

Open
wants to merge 4 commits into
base: branch-24.06
Choose a base branch
from

Conversation

spatil44
Copy link

Adds support to write to delta lake table using module, this will close #1482

Copy link

copy-pr-bot bot commented Jan 27, 2024

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@spatil44 spatil44 marked this pull request as ready for review January 30, 2024 06:16
@spatil44 spatil44 requested a review from a team as a code owner January 30, 2024 06:16
delta_table_write_mode = module_config.get("DELTA_WRITE_MODE", "append")
databricks_host=module_config.get("DATABRICKS_HOST", None)
databricks_token=module_config.get("DATABRICKS_TOKEN", None)
databricks_cluster_id=module_config.get("DATABRICKS_CLUSTER_ID", None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add error checking here for any config parameter that might cause the builder.remote call to fail later, this gives us more immediate feedback during module loading instead of runtime.

@@ -0,0 +1,113 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2024, NVIDIA CORPORATION.

except ImportError as import_exc:
IMPORT_EXCEPTION = import_exc

def _extract_schema_from_pandas_dataframe(df: pd.DataFrame) -> "sql_types.StructType":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add at least one unit test for this helper function; mocking out everything to deal with the spark internals in the module's work unit probably isn't worth it right now, but having a test file gives us a spot for sprouting new tests later.

from databricks.connect import DatabricksSession
from pyspark.sql import types as sql_types
except ImportError as import_exc:
IMPORT_EXCEPTION = import_exc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 'IMPORT_EXCEPTION' used anywhere? If we're not handling the failure to import via some kind of work around, then lets log an error here and re-raise the ImportErrror

Comment on lines +69 to +85
module_config = builder.get_current_module_config()
"""module_config contains all the required configuration parameters, that would otherwise be passed to the stage.
Parameters
----------
config : morpheus.config.Config
Pipeline configuration instance.
delta_path : str, default None
Path of the delta table where the data need to be written or updated.
databricks_host : str, default None
URL of Databricks host to connect to.
databricks_token : str, default None
Access token for Databricks cluster.
databricks_cluster_id : str, default None
Databricks cluster to be used to query the data as per SQL provided.
delta_table_write_mode: str, default "append"
Delta table write mode for storing data.
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
module_config = builder.get_current_module_config()
"""module_config contains all the required configuration parameters, that would otherwise be passed to the stage.
Parameters
----------
config : morpheus.config.Config
Pipeline configuration instance.
delta_path : str, default None
Path of the delta table where the data need to be written or updated.
databricks_host : str, default None
URL of Databricks host to connect to.
databricks_token : str, default None
Access token for Databricks cluster.
databricks_cluster_id : str, default None
Databricks cluster to be used to query the data as per SQL provided.
delta_table_write_mode: str, default "append"
Delta table write mode for storing data.
"""
"""
Parameters
----------
config : morpheus.config.Config
Pipeline configuration instance.
delta_path : str, default None
Path of the delta table where the data need to be written or updated.
databricks_host : str, default None
URL of Databricks host to connect to.
databricks_token : str, default None
Access token for Databricks cluster.
databricks_cluster_id : str, default None
Databricks cluster to be used to query the data as per SQL provided.
delta_table_write_mode: str, default "append"
Delta table write mode for storing data.
"""
module_config = builder.get_current_module_config()

@drobison00 drobison00 added non-breaking Non-breaking change feature request New feature or request improvement Improvement to existing functionality and removed improvement Improvement to existing functionality labels Jan 30, 2024
Copy link
Contributor

@drobison00 drobison00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something else; if you're feeling ambitious, you might also look at updating the delta lake writer stage so that it just loads this module; that way we dont' have duplicated code between the two.

@mdemoret-nv mdemoret-nv changed the title Support writing to databricks delta table using Module. Support writing to Databricks delta table using a module Feb 6, 2024
@mdemoret-nv
Copy link
Contributor

/ok to test

@mdemoret-nv mdemoret-nv changed the base branch from branch-24.03 to branch-24.06 April 6, 2024 00:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request non-breaking Non-breaking change
Projects
Status: Review - Changes Requested
Development

Successfully merging this pull request may close these issues.

[FEA]: Add support of writing to databricks delta table using Module.
3 participants