From a96e4d8aa262b103a6f5373079b65fc711d92b61 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 12 Mar 2024 15:54:13 -0700 Subject: [PATCH] Prevent large objects from being stored in the RTIF There's no control over the size of objects stored in the rendered taskinstance field. This PR adds control and enable users to be able to customize the size of data that can be stored in this field closes: #28199 --- airflow/config_templates/config.yml | 8 ++++ airflow/serialization/helpers.py | 21 +++++++++ tests/models/test_renderedtifields.py | 61 +++++++++++++++++++++++++++ 3 files changed, 90 insertions(+) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 0a1b292fdc59fc..ce822cfa534166 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -492,6 +492,14 @@ core: type: string example: ~ default: "Disabled" + max_templated_field_size: + description: | + The maximum size of the rendered template field. If the value to be stored in the rendered template + field exceeds this size, it's redacted. + version_added: 2.9.0 + type: integer + example: ~ + default: "1024" database: description: ~ options: diff --git a/airflow/serialization/helpers.py b/airflow/serialization/helpers.py index aa3ceb644e96b8..1539a44c8199ee 100644 --- a/airflow/serialization/helpers.py +++ b/airflow/serialization/helpers.py @@ -18,8 +18,10 @@ from __future__ import annotations +import sys from typing import Any +from airflow.configuration import conf from airflow.settings import json @@ -38,7 +40,26 @@ def is_jsonable(x): else: return True + max_size = conf.getint("core", "max_templated_field_size") + if not is_jsonable(template_field): + if isinstance(template_field, (list, tuple)): + if sys.getsizeof(template_field[0]) > max_size: + return ( + "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size" + ) + elif sys.getsizeof(template_field) > max_size: + return ( + "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size" + ) return str(template_field) else: + if template_field and isinstance(template_field, (list, tuple)): + if len(template_field) and sys.getsizeof(template_field[0]) > max_size: + return ( + "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size" + ) return template_field diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index a7b25a317119b0..8245c8aa69be86 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -27,6 +27,7 @@ import pytest from airflow import settings +from airflow.decorators import task as task_decorator from airflow.models import Variable from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF from airflow.operators.bash import BashOperator @@ -318,3 +319,63 @@ def test_redact(self, redact, dag_maker): "env": "val 2", "cwd": "val 3", } + + def test_large_string_is_not_stored(self, dag_maker, session): + """ + Test that large string is not stored in the database + """ + large_string = "a" * 2560 + with dag_maker("test_large_objects_are_not_stored"): + + @task_decorator + def gentask(): + return large_string + + @task_decorator + def consumer_task(value): + return value + + consumer_task(gentask()) + + dr = dag_maker.create_dagrun() + ti, ti2 = dr.task_instances + ti.xcom_push(value=large_string, key="return_value") + rtif = RTIF(ti=ti2) + rtif.write() + assert rtif.rendered_fields == { + "op_args": "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size", + "op_kwargs": {}, + "templates_dict": None, + } + + def test_large_objects_are_not_stored(self, dag_maker, session): + """ + Test that large objects are not stored in the database + """ + import pandas as pd + + large_dataframe = pd.DataFrame({"a": range(1000)}) + with dag_maker("test_large_objects_are_not_stored"): + + @task_decorator + def gentask(): + return large_dataframe + + @task_decorator + def consumer_task(value): + return value + + consumer_task(gentask()) + + dr = dag_maker.create_dagrun() + ti, ti2 = dr.task_instances + ti.xcom_push(value=large_dataframe, key="return_value") + rtif = RTIF(ti=ti2) + rtif.write() + assert rtif.rendered_fields == { + "op_args": "Value redacted as it is too large to be stored in the database. " + "You can change this behaviour in [core]max_templated_field_size", + "op_kwargs": {}, + "templates_dict": None, + }