diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 0a1b292fdc59fc..ce822cfa534166 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -492,6 +492,14 @@ core: type: string example: ~ default: "Disabled" + max_templated_field_size: + description: | + The maximum size of the rendered template field. If the value to be stored in the rendered template + field exceeds this size, it's redacted. + version_added: 2.9.0 + type: integer + example: ~ + default: "1024" database: description: ~ options: diff --git a/airflow/serialization/helpers.py b/airflow/serialization/helpers.py index aa3ceb644e96b8..1539a44c8199ee 100644 --- a/airflow/serialization/helpers.py +++ b/airflow/serialization/helpers.py @@ -18,8 +18,10 @@ from __future__ import annotations +import sys from typing import Any +from airflow.configuration import conf from airflow.settings import json @@ -38,7 +40,26 @@ def is_jsonable(x): else: return True + max_size = conf.getint("core", "max_templated_field_size") + if not is_jsonable(template_field): + if isinstance(template_field, (list, tuple)): + if sys.getsizeof(template_field[0]) > max_size: + return ( + "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size" + ) + elif sys.getsizeof(template_field) > max_size: + return ( + "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size" + ) return str(template_field) else: + if template_field and isinstance(template_field, (list, tuple)): + if len(template_field) and sys.getsizeof(template_field[0]) > max_size: + return ( + "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size" + ) return template_field diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index a7b25a317119b0..8245c8aa69be86 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -27,6 +27,7 @@ import pytest from airflow import settings +from airflow.decorators import task as task_decorator from airflow.models import Variable from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF from airflow.operators.bash import BashOperator @@ -318,3 +319,63 @@ def test_redact(self, redact, dag_maker): "env": "val 2", "cwd": "val 3", } + + def test_large_string_is_not_stored(self, dag_maker, session): + """ + Test that large string is not stored in the database + """ + large_string = "a" * 2560 + with dag_maker("test_large_objects_are_not_stored"): + + @task_decorator + def gentask(): + return large_string + + @task_decorator + def consumer_task(value): + return value + + consumer_task(gentask()) + + dr = dag_maker.create_dagrun() + ti, ti2 = dr.task_instances + ti.xcom_push(value=large_string, key="return_value") + rtif = RTIF(ti=ti2) + rtif.write() + assert rtif.rendered_fields == { + "op_args": "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size", + "op_kwargs": {}, + "templates_dict": None, + } + + def test_large_objects_are_not_stored(self, dag_maker, session): + """ + Test that large objects are not stored in the database + """ + import pandas as pd + + large_dataframe = pd.DataFrame({"a": range(1000)}) + with dag_maker("test_large_objects_are_not_stored"): + + @task_decorator + def gentask(): + return large_dataframe + + @task_decorator + def consumer_task(value): + return value + + consumer_task(gentask()) + + dr = dag_maker.create_dagrun() + ti, ti2 = dr.task_instances + ti.xcom_push(value=large_dataframe, key="return_value") + rtif = RTIF(ti=ti2) + rtif.write() + assert rtif.rendered_fields == { + "op_args": "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size", + "op_kwargs": {}, + "templates_dict": None, + }