Skip to content

Commit

Permalink
Prevent large objects from being stored in the RTIF
Browse files Browse the repository at this point in the history
There's no control over the size of objects stored in the rendered taskinstance
field. This PR adds control and enable users to be able to customize the size of
data that can be stored in this field

closes: apache#28199
  • Loading branch information
ephraimbuddy committed Mar 22, 2024
1 parent 6a225cc commit a96e4d8
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 0 deletions.
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,14 @@ core:
type: string
example: ~
default: "Disabled"
max_templated_field_size:
description: |
The maximum size of the rendered template field. If the value to be stored in the rendered template
field exceeds this size, it's redacted.
version_added: 2.9.0
type: integer
example: ~
default: "1024"
database:
description: ~
options:
Expand Down
21 changes: 21 additions & 0 deletions airflow/serialization/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

from __future__ import annotations

import sys
from typing import Any

from airflow.configuration import conf
from airflow.settings import json


Expand All @@ -38,7 +40,26 @@ def is_jsonable(x):
else:
return True

max_size = conf.getint("core", "max_templated_field_size")

if not is_jsonable(template_field):
if isinstance(template_field, (list, tuple)):
if sys.getsizeof(template_field[0]) > max_size:
return (
"Value redacted as it is too large to be stored in the database. "
"You can change this behaviour in [core]max_templated_field_size"
)
elif sys.getsizeof(template_field) > max_size:
return (
"Value redacted as it is too large to be stored in the database. "
"You can change this behaviour in [core]max_templated_field_size"
)
return str(template_field)
else:
if template_field and isinstance(template_field, (list, tuple)):
if len(template_field) and sys.getsizeof(template_field[0]) > max_size:
return (
"Value redacted as it is too large to be stored in the database. "
"You can change this behaviour in [core]max_templated_field_size"
)
return template_field
61 changes: 61 additions & 0 deletions tests/models/test_renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pytest

from airflow import settings
from airflow.decorators import task as task_decorator
from airflow.models import Variable
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.operators.bash import BashOperator
Expand Down Expand Up @@ -318,3 +319,63 @@ def test_redact(self, redact, dag_maker):
"env": "val 2",
"cwd": "val 3",
}

def test_large_string_is_not_stored(self, dag_maker, session):
"""
Test that large string is not stored in the database
"""
large_string = "a" * 2560
with dag_maker("test_large_objects_are_not_stored"):

@task_decorator
def gentask():
return large_string

@task_decorator
def consumer_task(value):
return value

consumer_task(gentask())

dr = dag_maker.create_dagrun()
ti, ti2 = dr.task_instances
ti.xcom_push(value=large_string, key="return_value")
rtif = RTIF(ti=ti2)
rtif.write()
assert rtif.rendered_fields == {
"op_args": "Value redacted as it is too large to be stored in the database. "
"You can change this behaviour in [core]max_templated_field_size",
"op_kwargs": {},
"templates_dict": None,
}

def test_large_objects_are_not_stored(self, dag_maker, session):
"""
Test that large objects are not stored in the database
"""
import pandas as pd

large_dataframe = pd.DataFrame({"a": range(1000)})
with dag_maker("test_large_objects_are_not_stored"):

@task_decorator
def gentask():
return large_dataframe

@task_decorator
def consumer_task(value):
return value

consumer_task(gentask())

dr = dag_maker.create_dagrun()
ti, ti2 = dr.task_instances
ti.xcom_push(value=large_dataframe, key="return_value")
rtif = RTIF(ti=ti2)
rtif.write()
assert rtif.rendered_fields == {
"op_args": "Value redacted as it is too large to be stored in the database. "
"You can change this behaviour in [core]max_templated_field_size",
"op_kwargs": {},
"templates_dict": None,
}

0 comments on commit a96e4d8

Please sign in to comment.