Skip to content

Commit

Permalink
Use len and check the size of the serialized
Browse files Browse the repository at this point in the history
  • Loading branch information
ephraimbuddy committed Mar 21, 2024
1 parent b59e81d commit ffc67d1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
25 changes: 9 additions & 16 deletions airflow/serialization/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from __future__ import annotations

import sys
from typing import Any

from airflow.configuration import conf
Expand All @@ -43,23 +42,17 @@ def is_jsonable(x):
max_size = conf.getint("core", "max_templated_field_size")

if not is_jsonable(template_field):
if isinstance(template_field, (list, tuple)):
if sum(sys.getsizeof(x) for x in template_field) > max_size:
return (
"Value redacted as it is too large to be stored in the database. "
"You can change this behaviour in [core]max_templated_field_size"
)
elif sys.getsizeof(template_field) > max_size:
serialized = str(template_field)
if len(serialized) > max_size:
return (
"Value redacted as it is too large to be stored in the database. "
"Value removed due to size. "
"You can change this behaviour in [core]max_templated_field_size"
)
return str(template_field)
return serialized
else:
if template_field and isinstance(template_field, (list, tuple)):
if sum(sys.getsizeof(x) for x in template_field) > max_size:
return (
"Value redacted as it is too large to be stored in the database. "
"You can change this behaviour in [core]max_templated_field_size"
)
if template_field and len(str(template_field)) > max_size:
return (
"Value removed due to size. "
"You can change this behaviour in [core]max_templated_field_size"
)
return template_field
22 changes: 15 additions & 7 deletions tests/models/test_renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,10 @@ def consumer_task(value):
rtif = RTIF(ti=ti2)
rtif.write(session=session)
session.flush()
rtif = session.scalar(select(RTIF).where(RTIF.dag_id == rtif.dag_id, RTIF.task_id == rtif.task_id))
rtif = session.query(RTIF).filter(RTIF.dag_id == rtif.dag_id, RTIF.task_id == rtif.task_id).first()

assert rtif.rendered_fields == {
"op_args": "Value redacted as it is too large to be stored in the database. "
"op_args": "Value removed due to size. "
"You can change this behaviour in [core]max_templated_field_size",
"op_kwargs": {},
"templates_dict": None,
Expand All @@ -357,14 +358,21 @@ def test_large_objects_are_not_stored(self, dag_maker, session):
"""
Test that large objects are not stored in the database
"""
import pandas as pd

large_dataframe = pd.DataFrame({"a": range(1000)})
class A:
def __init__(self):
self.a = "a" * 2560

def __str__(self):
return self.a

large_data = A()

with dag_maker("test_large_objects_are_not_stored"):

@task_decorator
def gentask():
return large_dataframe
return large_data

@task_decorator
def consumer_task(value):
Expand All @@ -374,13 +382,13 @@ def consumer_task(value):

dr = dag_maker.create_dagrun()
ti, ti2 = dr.task_instances
ti.xcom_push(value=large_dataframe, key="return_value")
ti.xcom_push(value=str(large_data), key="return_value")
rtif = RTIF(ti=ti2)
rtif.write(session=session)
session.flush()
rtif = session.scalar(select(RTIF).where(RTIF.dag_id == rtif.dag_id, RTIF.task_id == rtif.task_id))
assert rtif.rendered_fields == {
"op_args": "Value redacted as it is too large to be stored in the database. "
"op_args": "Value removed due to size. "
"You can change this behaviour in [core]max_templated_field_size",
"op_kwargs": {},
"templates_dict": None,
Expand Down

0 comments on commit ffc67d1

Please sign in to comment.