From 4b47d6ac9e7e10f2404910ced0b4f2569d0ba43d Mon Sep 17 00:00:00 2001 From: Tvrtko Sternak <117077296+Sternakt@users.noreply.github.com> Date: Fri, 19 May 2023 12:24:44 +0200 Subject: [PATCH] Add batch consuming option to consumers (#298) * Add batch consumer logic to documentation generation * Modify consumer loop to include batch option for consuming * Add batch consuming guide * Fix mypy and add test to Tester * Merge origin/main into 18-add-batching-for-consumers * Add docs * Implement review changes --- fastkafka/_application/tester.py | 16 +- .../_components/aiokafka_consumer_loop.py | 203 ++++-- fastkafka/_components/asyncapi.py | 62 +- fastkafka/_components/helpers.py | 26 +- fastkafka/_components/producer_decorator.py | 37 +- fastkafka/_components/task_streaming.py | 119 ++- fastkafka/_modidx.py | 16 +- nbs/011_ConsumerLoop.ipynb | 690 ++++++++++++++---- nbs/011_TaskStreaming.ipynb | 184 ++++- nbs/013_ProducerDecorator.ipynb | 44 ++ nbs/014_AsyncAPI.ipynb | 127 ++-- nbs/016_Tester.ipynb | 68 +- nbs/998_Internal_Helpers.ipynb | 44 +- nbs/guides/Guide_11_Consumes_Basics.ipynb | 33 - nbs/guides/Guide_12_Batch_Consuming.ipynb | 365 +++++++++ nbs/sidebar.yml | 1 + 16 files changed, 1639 insertions(+), 396 deletions(-) create mode 100644 nbs/guides/Guide_12_Batch_Consuming.ipynb diff --git a/fastkafka/_application/tester.py b/fastkafka/_application/tester.py index 682bad2..dcf0928 100644 --- a/fastkafka/_application/tester.py +++ b/fastkafka/_application/tester.py @@ -13,7 +13,9 @@ from .. import KafkaEvent from .app import FastKafka +from .._components.helpers import unwrap_list_type from .._components.meta import delegates, export, patch +from .._components.producer_decorator import unwrap_from_kafka_event from .._testing.apache_kafka_broker import ApacheKafkaBroker from .._testing.in_memory_broker import InMemoryBroker from .._testing.local_redpanda_broker import LocalRedpandaBroker @@ -148,11 +150,7 @@ async def __aexit__(self, *args: Any) -> None: def mirror_producer(topic: str, producer_f: Callable[..., Any]) -> Callable[..., Any]: msg_type = inspect.signature(producer_f).return_annotation - if hasattr(msg_type, "__origin__") and msg_type.__origin__ == KafkaEvent: - msg_type = msg_type.__args__[0] - - if hasattr(msg_type, "__origin__") and msg_type.__origin__ == list: - msg_type = msg_type.__args__[0] + msg_type_unwrapped = unwrap_list_type(unwrap_from_kafka_event(msg_type)) async def skeleton_func(msg: BaseModel) -> None: pass @@ -168,7 +166,7 @@ async def skeleton_func(msg: BaseModel) -> None: parameters=[ inspect.Parameter( name="msg", - annotation=msg_type, + annotation=msg_type_unwrapped, kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, ) ] @@ -182,6 +180,8 @@ async def skeleton_func(msg: BaseModel) -> None: def mirror_consumer(topic: str, consumer_f: Callable[..., Any]) -> Callable[..., Any]: msg_type = inspect.signature(consumer_f).parameters["msg"] + msg_type_unwrapped = unwrap_list_type(msg_type) + async def skeleton_func(msg: BaseModel) -> BaseModel: return msg @@ -192,7 +192,9 @@ async def skeleton_func(msg: BaseModel) -> BaseModel: mirror_func.__name__ = "to_" + topic # adjust arg and return val - sig = sig.replace(parameters=[msg_type], return_annotation=msg_type.annotation) + sig = sig.replace( + parameters=[msg_type], return_annotation=msg_type_unwrapped.annotation + ) mirror_func.__signature__ = sig # type: ignore return mirror_func diff --git a/fastkafka/_components/aiokafka_consumer_loop.py b/fastkafka/_components/aiokafka_consumer_loop.py index d8defa9..2d131dc 100644 --- a/fastkafka/_components/aiokafka_consumer_loop.py +++ b/fastkafka/_components/aiokafka_consumer_loop.py @@ -60,6 +60,14 @@ class EventMetadata: @staticmethod def create_event_metadata(record: ConsumerRecord) -> "EventMetadata": # type: ignore + """Creates an instance of EventMetadata from a ConsumerRecord. + + Args: + record: The Kafka ConsumerRecord. + + Returns: + The created EventMetadata instance. + """ return EventMetadata( topic=record.topic, partition=record.partition, @@ -75,10 +83,15 @@ def create_event_metadata(record: ConsumerRecord) -> "EventMetadata": # type: i ) # %% ../../nbs/011_ConsumerLoop.ipynb 11 -AsyncConsume = Callable[[BaseModel], Awaitable[None]] -AsyncConsumeMeta = Callable[[BaseModel, EventMetadata], Awaitable[None]] -SyncConsume = Callable[[BaseModel], None] -SyncConsumeMeta = Callable[[BaseModel, EventMetadata], None] +AsyncConsume = Callable[[Union[List[BaseModel], BaseModel]], Awaitable[None]] +AsyncConsumeMeta = Callable[ + [Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], + Awaitable[None], +] +SyncConsume = Callable[[Union[List[BaseModel], BaseModel]], None] +SyncConsumeMeta = Callable[ + [Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], None +] ConsumeCallable = Union[AsyncConsume, AsyncConsumeMeta, SyncConsume, SyncConsumeMeta] @@ -96,26 +109,30 @@ def _callback_parameters_wrapper( """ async def _params_wrap( - msg: BaseModel, - meta: EventMetadata, + msg: Union[BaseModel, List[BaseModel]], + meta: Union[EventMetadata, List[EventMetadata]], callback: Union[AsyncConsume, AsyncConsumeMeta] = callback, ) -> None: types = list(get_type_hints(callback).values()) - args: List[Union[BaseModel, EventMetadata]] = [msg] + args: List[ + Union[BaseModel, List[BaseModel], EventMetadata, List[EventMetadata]] + ] = [msg] if EventMetadata in types: args.insert(types.index(EventMetadata), meta) + if List[EventMetadata] in types: + args.insert(types.index(List[EventMetadata]), meta) await callback(*args) # type: ignore return _params_wrap -# %% ../../nbs/011_ConsumerLoop.ipynb 16 +# %% ../../nbs/011_ConsumerLoop.ipynb 17 def _prepare_callback(callback: ConsumeCallable) -> AsyncConsumeMeta: """ Prepares a callback to be used in the consumer loop. 1. If callback is sync, asyncify it 2. Wrap the callback into a safe callback for exception handling - Params: + Args: callback: async callable that will be prepared for use in consumer Returns: @@ -126,35 +143,113 @@ def _prepare_callback(callback: ConsumeCallable) -> AsyncConsumeMeta: ) return _callback_parameters_wrapper(async_callback) -# %% ../../nbs/011_ConsumerLoop.ipynb 18 -async def _stream_msgs( # type: ignore - msgs: Dict[TopicPartition, bytes], - send_stream: anyio.streams.memory.MemoryObjectSendStream[Any], -) -> None: +# %% ../../nbs/011_ConsumerLoop.ipynb 24 +def _get_single_msg_handlers( # type: ignore + *, + consumer: AIOKafkaConsumer, + callback: AsyncConsumeMeta, + decoder_fn: Callable[[bytes, ModelMetaclass], Any], + msg_type: Type[BaseModel], + **kwargs: Any, +) -> Tuple[ + Callable[ + [ + ConsumerRecord, + AsyncConsumeMeta, + Callable[[bytes, ModelMetaclass], Any], + Type[BaseModel], + ], + Awaitable[None], + ], + Callable[[AIOKafkaConsumer, Any], Awaitable[List[ConsumerRecord]]], +]: """ - Decodes and streams the message and topic to the send_stream. + Retrieves the message handlers for consuming single messages from a Kafka topic. - Params: - msgs: - send_stream: + Args: + consumer: The Kafka consumer instance. + callback: The callback function to handle the consumed message. + decoder_fn: The function to decode the consumed message. + msg_type: The type of the consumed message. + **kwargs: Additional keyword arguments for the consumer. + + Returns: + The handle_msg function and poll_consumer function. """ - for topic_partition, topic_msgs in msgs.items(): - topic = topic_partition.topic - try: - await send_stream.send(topic_msgs) - except Exception as e: - logger.warning( - f"_stream_msgs(): Unexpected exception '{e.__repr__()}' caught and ignored for topic='{topic_partition.topic}', partition='{topic_partition.partition}' and messages: {topic_msgs!r}" - ) + async def handle_msg( # type: ignore + record: ConsumerRecord, + callback: AsyncConsumeMeta = callback, + decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn, + msg_type: Type[BaseModel] = msg_type, + ) -> None: + await callback( + decoder_fn(record.value, msg_type), + EventMetadata.create_event_metadata(record), + ) + + async def poll_consumer( # type: ignore + consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs + ) -> List[ConsumerRecord]: + msgs = await consumer.getmany(**kwargs) + return [msg for msg_group in msgs.values() for msg in msg_group] + + return handle_msg, poll_consumer + +# %% ../../nbs/011_ConsumerLoop.ipynb 26 +def _get_batch_msg_handlers( # type: ignore + *, + consumer: AIOKafkaConsumer, + callback: AsyncConsumeMeta, + decoder_fn: Callable[[bytes, ModelMetaclass], Any], + msg_type: Type[BaseModel], + **kwargs: Any, +) -> Tuple[ + Callable[ + [ + List[ConsumerRecord], + AsyncConsumeMeta, + Callable[[bytes, ModelMetaclass], Any], + Type[BaseModel], + ], + Awaitable[None], + ], + Callable[[AIOKafkaConsumer, Any], Awaitable[List[List[ConsumerRecord]]]], +]: + """ + Retrieves the message handlers for consuming messages in batches from a Kafka topic. + + Args: + consumer: The Kafka consumer instance. + callback: The callback function to handle the consumed messages. + decoder_fn: The function to decode the consumed messages. + msg_type: The type of the consumed messages. + **kwargs: Additional keyword arguments for the consumer. -def _decode_streamed_msgs( # type: ignore - msgs: List[ConsumerRecord], msg_type: BaseModel -) -> List[BaseModel]: - decoded_msgs = [msg_type.parse_raw(msg.value.decode("utf-8")) for msg in msgs] - return decoded_msgs + Returns: + The handle_msg function and poll_consumer function. + """ -# %% ../../nbs/011_ConsumerLoop.ipynb 23 + async def handle_msg( # type: ignore + records: List[ConsumerRecord], + callback: AsyncConsumeMeta = callback, + decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn, + msg_type: Type[BaseModel] = msg_type, + ) -> None: + await callback( + [decoder_fn(record.value, msg_type) for record in records], + [EventMetadata.create_event_metadata(record) for record in records], + ) + + async def poll_consumer( # type: ignore + consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs + ) -> List[List[ConsumerRecord]]: + msgs = await consumer.getmany(**kwargs) + return [value for value in msgs.values() if len(value) > 0] + + return handle_msg, poll_consumer + +# %% ../../nbs/011_ConsumerLoop.ipynb 28 @delegates(AIOKafkaConsumer.getmany) async def _aiokafka_consumer_loop( # type: ignore consumer: AIOKafkaConsumer, @@ -163,7 +258,7 @@ async def _aiokafka_consumer_loop( # type: ignore decoder_fn: Callable[[bytes, ModelMetaclass], Any], callback: ConsumeCallable, max_buffer_size: int = 100_000, - msg_type: Type[BaseModel], + msg_type: Union[Type[List[BaseModel]], Type[BaseModel]], is_shutting_down_f: Callable[[], bool], executor: Union[str, StreamExecutor, None] = None, **kwargs: Any, @@ -172,7 +267,7 @@ async def _aiokafka_consumer_loop( # type: ignore Consumer loop for infinite pooling of the AIOKafka consumer for new messages. Calls consumer.getmany() and after the consumer return messages or times out, messages are decoded and streamed to defined callback. - Params: + Args: topic: Topic to subscribe decoder_fn: Function to decode the messages consumed from the topic callbacks: Dict of callbacks mapped to their respective topics @@ -184,29 +279,35 @@ async def _aiokafka_consumer_loop( # type: ignore prepared_callback = _prepare_callback(callback) - async def handle_msg( # type: ignore - record: ConsumerRecord, - callback: AsyncConsumeMeta = prepared_callback, - decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn, - msg_type: Type[BaseModel] = msg_type, - ) -> None: - await callback( - decoder_fn(record.value, msg_type), - EventMetadata.create_event_metadata(record), + if hasattr(msg_type, "__origin__") and msg_type.__origin__ == list: + handle_msg, poll_consumer = _get_batch_msg_handlers( + consumer=consumer, + callback=prepared_callback, + decoder_fn=decoder_fn, + msg_type=msg_type.__args__[0], # type: ignore + **kwargs, + ) + else: + handle_msg, poll_consumer = _get_single_msg_handlers( + consumer=consumer, + callback=prepared_callback, + decoder_fn=decoder_fn, + msg_type=msg_type, # type: ignore + **kwargs, ) - async def poll_consumer(kwargs: Any = kwargs) -> List[ConsumerRecord]: # type: ignore - msgs = await consumer.getmany(**kwargs) - return [msg for msg_group in msgs.values() for msg in msg_group] - - await get_executor(executor).run(is_shutting_down_f, poll_consumer, handle_msg) + await get_executor(executor).run( + is_shutting_down_f=is_shutting_down_f, + generator=poll_consumer, # type: ignore + processor=handle_msg, # type: ignore + ) -# %% ../../nbs/011_ConsumerLoop.ipynb 28 +# %% ../../nbs/011_ConsumerLoop.ipynb 35 def sanitize_kafka_config(**kwargs: Any) -> Dict[str, Any]: """Sanitize Kafka config""" return {k: "*" * len(v) if "pass" in k.lower() else v for k, v in kwargs.items()} -# %% ../../nbs/011_ConsumerLoop.ipynb 30 +# %% ../../nbs/011_ConsumerLoop.ipynb 37 @delegates(AIOKafkaConsumer) @delegates(_aiokafka_consumer_loop, keep=True) async def aiokafka_consumer_loop( @@ -216,7 +317,7 @@ async def aiokafka_consumer_loop( timeout_ms: int = 100, max_buffer_size: int = 100_000, callback: ConsumeCallable, - msg_type: Type[BaseModel], + msg_type: Union[Type[List[BaseModel]], Type[BaseModel]], is_shutting_down_f: Callable[[], bool], executor: Union[str, StreamExecutor, None] = None, **kwargs: Any, diff --git a/fastkafka/_components/asyncapi.py b/fastkafka/_components/asyncapi.py index 2197bf7..47f6799 100644 --- a/fastkafka/_components/asyncapi.py +++ b/fastkafka/_components/asyncapi.py @@ -25,8 +25,13 @@ from .docs_dependencies import _check_npm_with_local from .logger import get_logger -from .producer_decorator import KafkaEvent, ProduceCallable -from .aiokafka_consumer_loop import ConsumeCallable +from fastkafka._components.producer_decorator import ( + KafkaEvent, + ProduceCallable, + unwrap_from_kafka_event, +) +from .aiokafka_consumer_loop import ConsumeCallable, EventMetadata +from .helpers import unwrap_list_type # %% ../../nbs/014_AsyncAPI.ipynb 3 logger = get_logger(__name__) @@ -156,11 +161,8 @@ def _get_msg_cls_for_producer(f: ProduceCallable) -> Type[Any]: f"Producer function must have a defined return value, got {return_type} as return value" ) - if hasattr(return_type, "__origin__") and return_type.__origin__ == KafkaEvent: - return_type = return_type.__args__[0] - - if hasattr(return_type, "__origin__") and return_type.__origin__ == list: - return_type = return_type.__args__[0] + return_type = unwrap_from_kafka_event(return_type) + return_type = unwrap_list_type(return_type) if not hasattr(return_type, "json"): raise ValueError(f"Producer function return value must have json method") @@ -171,24 +173,30 @@ def _get_msg_cls_for_consumer(f: ConsumeCallable) -> Type[Any]: types = get_type_hints(f) return_type = types.pop("return", type(None)) types_list = list(types.values()) + # @app.consumer does not return a value + if return_type != type(None): + raise ValueError( + f"Consumer function cannot return any value, got {return_type}" + ) # @app.consumer first consumer argument must be a msg which is a subclass of BaseModel try: - if issubclass(BaseModel, types_list[0]): + msg_type = types_list[0] + + msg_type = unwrap_list_type(msg_type) + + if not issubclass(msg_type, BaseModel): raise ValueError( f"Consumer function first param must be a BaseModel subclass msg, got {types_list}" ) + + return msg_type # type: ignore + except IndexError: raise ValueError( f"Consumer function first param must be a BaseModel subclass msg, got {types_list}" ) - # @app.consumer does not return a value - if return_type != type(None): - raise ValueError( - f"Consumer function cannot return any value, got {return_type}" - ) - return types_list[0] # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 25 +# %% ../../nbs/014_AsyncAPI.ipynb 27 def _get_topic_dict( f: Callable[[Any], Any], direction: str = "publish" ) -> Dict[str, Any]: @@ -209,7 +217,7 @@ def _get_topic_dict( msg_schema["description"] = f.__doc__ # type: ignore return {direction: msg_schema} -# %% ../../nbs/014_AsyncAPI.ipynb 28 +# %% ../../nbs/014_AsyncAPI.ipynb 30 def _get_channels_schema( consumers: Dict[str, ConsumeCallable], producers: Dict[str, ProduceCallable], @@ -220,7 +228,7 @@ def _get_channels_schema( topics[topic] = _get_topic_dict(f, d) return topics -# %% ../../nbs/014_AsyncAPI.ipynb 30 +# %% ../../nbs/014_AsyncAPI.ipynb 32 def _get_kafka_msg_classes( consumers: Dict[str, ConsumeCallable], producers: Dict[str, ProduceCallable], @@ -236,7 +244,7 @@ def _get_kafka_msg_definitions( ) -> Dict[str, Dict[str, Any]]: return schema(_get_kafka_msg_classes(consumers, producers)) # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 32 +# %% ../../nbs/014_AsyncAPI.ipynb 34 def _get_example(cls: Type[BaseModel]) -> BaseModel: kwargs: Dict[str, Any] = {} for k, v in cls.__fields__.items(): @@ -253,7 +261,7 @@ def _get_example(cls: Type[BaseModel]) -> BaseModel: return json.loads(cls(**kwargs).json()) # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 34 +# %% ../../nbs/014_AsyncAPI.ipynb 36 def _add_example_to_msg_definitions( msg_cls: Type[BaseModel], msg_schema: Dict[str, Dict[str, Any]] ) -> None: @@ -282,7 +290,7 @@ def _get_msg_definitions_with_examples( return msg_schema -# %% ../../nbs/014_AsyncAPI.ipynb 36 +# %% ../../nbs/014_AsyncAPI.ipynb 38 def _get_security_schemes(kafka_brokers: KafkaBrokers) -> Dict[str, Any]: security_schemes = {} for key, kafka_broker in kafka_brokers.brokers.items(): @@ -292,7 +300,7 @@ def _get_security_schemes(kafka_brokers: KafkaBrokers) -> Dict[str, Any]: ) return security_schemes -# %% ../../nbs/014_AsyncAPI.ipynb 38 +# %% ../../nbs/014_AsyncAPI.ipynb 40 def _get_components_schema( consumers: Dict[str, ConsumeCallable], producers: Dict[str, ProduceCallable], @@ -325,7 +333,7 @@ def _sub_values(d: Any, substitutions: Dict[str, str] = substitutions) -> Any: return _sub_values(components) # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 40 +# %% ../../nbs/014_AsyncAPI.ipynb 42 def _get_servers_schema(kafka_brokers: KafkaBrokers) -> Dict[str, Any]: servers = json.loads(kafka_brokers.json(sort_keys=False))["brokers"] @@ -334,7 +342,7 @@ def _get_servers_schema(kafka_brokers: KafkaBrokers) -> Dict[str, Any]: servers[key]["security"] = [{f"{key}_default_security": []}] return servers # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 42 +# %% ../../nbs/014_AsyncAPI.ipynb 44 def _get_asyncapi_schema( consumers: Dict[str, ConsumeCallable], producers: Dict[str, ProduceCallable], @@ -355,7 +363,7 @@ def _get_asyncapi_schema( "components": components, } -# %% ../../nbs/014_AsyncAPI.ipynb 44 +# %% ../../nbs/014_AsyncAPI.ipynb 46 def yaml_file_cmp(file_1: Union[Path, str], file_2: Union[Path, str]) -> bool: try: import yaml @@ -371,7 +379,7 @@ def _read(f: Union[Path, str]) -> Dict[str, Any]: d = [_read(f) for f in [file_1, file_2]] return d[0] == d[1] -# %% ../../nbs/014_AsyncAPI.ipynb 45 +# %% ../../nbs/014_AsyncAPI.ipynb 47 def _generate_async_spec( *, consumers: Dict[str, ConsumeCallable], @@ -415,7 +423,7 @@ def _generate_async_spec( ) return False -# %% ../../nbs/014_AsyncAPI.ipynb 47 +# %% ../../nbs/014_AsyncAPI.ipynb 49 def _generate_async_docs( *, spec_path: Path, @@ -447,7 +455,7 @@ def _generate_async_docs( f"Generation of async docs failed, used '$ {' '.join(cmd)}'{p.stdout.decode()}" ) -# %% ../../nbs/014_AsyncAPI.ipynb 49 +# %% ../../nbs/014_AsyncAPI.ipynb 51 def export_async_spec( *, consumers: Dict[str, ConsumeCallable], diff --git a/fastkafka/_components/helpers.py b/fastkafka/_components/helpers.py index 63b8a69..1d3b517 100644 --- a/fastkafka/_components/helpers.py +++ b/fastkafka/_components/helpers.py @@ -1,7 +1,7 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../../nbs/998_Internal_Helpers.ipynb. # %% auto 0 -__all__ = ['in_notebook', 'change_dir', 'ImportFromStringError', 'true_after'] +__all__ = ['in_notebook', 'change_dir', 'ImportFromStringError', 'true_after', 'unwrap_list_type'] # %% ../../nbs/998_Internal_Helpers.ipynb 2 def in_notebook() -> bool: @@ -23,7 +23,7 @@ def in_notebook() -> bool: import sys from datetime import datetime, timedelta from functools import wraps -from inspect import signature +from inspect import signature, Parameter from pathlib import Path from typing import * @@ -100,3 +100,25 @@ def _true_after(seconds: Union[int, float] = seconds, t: datetime = t) -> bool: return (datetime.now() - t) > timedelta(seconds=seconds) return _true_after + +# %% ../../nbs/998_Internal_Helpers.ipynb 12 +def unwrap_list_type(var_type: Union[Type, Parameter]) -> Union[Type, Parameter]: + """ + Unwraps the type of a list. + + Vars: + var_type: Type to unwrap. + + Returns: + Unwrapped type if the given type is a list, otherwise returns the same type. + + Example: + - Input: List[str] + Output: str + - Input: int + Output: int + """ + if hasattr(var_type, "__origin__") and var_type.__origin__ == list: + return var_type.__args__[0] # type: ignore + else: + return var_type diff --git a/fastkafka/_components/producer_decorator.py b/fastkafka/_components/producer_decorator.py index ffadca8..b36e5d9 100644 --- a/fastkafka/_components/producer_decorator.py +++ b/fastkafka/_components/producer_decorator.py @@ -1,8 +1,8 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../../nbs/013_ProducerDecorator.ipynb. # %% auto 0 -__all__ = ['BaseSubmodel', 'ProduceReturnTypes', 'ProduceCallable', 'KafkaEvent', 'release_callback', 'produce_single', - 'send_batch', 'produce_batch', 'producer_decorator'] +__all__ = ['BaseSubmodel', 'ProduceReturnTypes', 'ProduceCallable', 'KafkaEvent', 'unwrap_from_kafka_event', 'release_callback', + 'produce_single', 'send_batch', 'produce_batch', 'producer_decorator'] # %% ../../nbs/013_ProducerDecorator.ipynb 1 import asyncio @@ -13,6 +13,7 @@ from asyncio import iscoroutinefunction # do not use the version from inspect from collections import namedtuple from dataclasses import dataclass +from inspect import Parameter from typing import * from aiokafka import AIOKafkaProducer @@ -41,6 +42,28 @@ class KafkaEvent(Generic[BaseSubmodel]): key: Optional[bytes] = None # %% ../../nbs/013_ProducerDecorator.ipynb 5 +def unwrap_from_kafka_event(var_type: Union[Type, Parameter]) -> Union[Type, Parameter]: + """ + Unwraps the type from a KafkaEvent. + + Vars: + var_type: Type to unwrap. + + Returns: + Type: Unwrapped type if the given type is a KafkaEvent, otherwise returns the same type. + + Example: + - Input: KafkaEvent[str] + Output: str + - Input: int + Output: int + """ + if hasattr(var_type, "__origin__") and var_type.__origin__ == KafkaEvent: + return var_type.__args__[0] # type: ignore + else: + return var_type + +# %% ../../nbs/013_ProducerDecorator.ipynb 7 ProduceReturnTypes = Union[ BaseModel, KafkaEvent[BaseModel], List[BaseModel], KafkaEvent[List[BaseModel]] ] @@ -49,17 +72,17 @@ class KafkaEvent(Generic[BaseSubmodel]): Callable[..., ProduceReturnTypes], Callable[..., Awaitable[ProduceReturnTypes]] ] -# %% ../../nbs/013_ProducerDecorator.ipynb 8 +# %% ../../nbs/013_ProducerDecorator.ipynb 10 def _wrap_in_event( message: Union[BaseModel, List[BaseModel], KafkaEvent] ) -> KafkaEvent: return message if type(message) == KafkaEvent else KafkaEvent(message) -# %% ../../nbs/013_ProducerDecorator.ipynb 11 +# %% ../../nbs/013_ProducerDecorator.ipynb 13 def release_callback(fut: asyncio.Future) -> None: pass -# %% ../../nbs/013_ProducerDecorator.ipynb 12 +# %% ../../nbs/013_ProducerDecorator.ipynb 14 async def produce_single( # type: ignore producer: AIOKafkaProducer, topic: str, @@ -71,7 +94,7 @@ async def produce_single( # type: ignore ) fut.add_done_callback(release_callback) -# %% ../../nbs/013_ProducerDecorator.ipynb 14 +# %% ../../nbs/013_ProducerDecorator.ipynb 16 async def send_batch( # type: ignore producer: AIOKafkaProducer, topic: str, batch: BatchBuilder, key: Optional[bytes] ) -> None: @@ -108,7 +131,7 @@ async def produce_batch( # type: ignore await send_batch(producer, topic, batch, wrapped_val.key) -# %% ../../nbs/013_ProducerDecorator.ipynb 16 +# %% ../../nbs/013_ProducerDecorator.ipynb 18 def producer_decorator( producer_store: Dict[str, Any], func: ProduceCallable, diff --git a/fastkafka/_components/task_streaming.py b/fastkafka/_components/task_streaming.py index 5662552..de3f47a 100644 --- a/fastkafka/_components/task_streaming.py +++ b/fastkafka/_components/task_streaming.py @@ -35,12 +35,30 @@ def __init__( self.finished = False async def add(self, item: Task) -> None: + """ + Adds a task to the task pool. + + Args: + item: The task to be added. + + Returns: + None + """ while len(self.pool) >= self.size: await asyncio.sleep(0) self.pool.add(item) item.add_done_callback(self.discard) def discard(self, task: Task) -> None: + """ + Discards a completed task from the task pool. + + Args: + task: The completed task to be discarded. + + Returns: + None + """ e = task.exception() if e is not None and self.on_error is not None: try: @@ -53,6 +71,12 @@ def discard(self, task: Task) -> None: self.pool.discard(task) def __len__(self) -> int: + """ + Returns the number of tasks in the task pool. + + Returns: + The number of tasks in the task pool. + """ return len(self.pool) async def __aenter__(self) -> "TaskPool": @@ -66,6 +90,16 @@ async def __aexit__(self, *args: Any, **kwargs: Any) -> None: @staticmethod def log_error(logger: Logger) -> Callable[[Exception], None]: + """ + Creates a decorator that logs errors using the specified logger. + + Args: + logger: The logger to use for error logging. + + Returns: + The decorator function. + """ + def _log_error(e: Exception, logger: Logger = logger) -> None: logger.warning(f"{e=}") @@ -78,10 +112,25 @@ def __init__(self) -> None: self.exception_found = False def on_error(self, e: Exception) -> None: + """ + Handles an error by storing the exception. + + Args: + e: The exception to be handled. + + Returns: + None + """ self.exceptions.append(e) self.exception_found = True def _monitor_step(self) -> None: + """ + Raises the next exception in the queue. + + Returns: + None + """ if len(self.exceptions) > 0: e = self.exceptions.pop(0) raise e @@ -99,15 +148,16 @@ class StreamExecutor(ABC): @abstractmethod async def run( # type: ignore self, + *, is_shutting_down_f: Callable[[], bool], - produce_func: Callable[[], Awaitable[ConsumerRecord]], - consume_func: Callable[[ConsumerRecord], Awaitable[None]], + generator: Callable[[], Awaitable[ConsumerRecord]], + processor: Callable[[ConsumerRecord], Awaitable[None]], ) -> None: pass # %% ../../nbs/011_TaskStreaming.ipynb 20 def _process_items_task( # type: ignore - consume_func: Callable[[ConsumerRecord], Awaitable[None]], task_pool: TaskPool + processor: Callable[[ConsumerRecord], Awaitable[None]], task_pool: TaskPool ) -> Callable[ [ anyio.streams.memory.MemoryObjectReceiveStream, @@ -118,12 +168,12 @@ def _process_items_task( # type: ignore ]: async def _process_items_wrapper( # type: ignore receive_stream: anyio.streams.memory.MemoryObjectReceiveStream, - consume_func: Callable[[ConsumerRecord], Awaitable[None]] = consume_func, + processor: Callable[[ConsumerRecord], Awaitable[None]] = processor, task_pool=task_pool, ): async with receive_stream: async for msg in receive_stream: - task: asyncio.Task = asyncio.create_task(consume_func(msg)) # type: ignore + task: asyncio.Task = asyncio.create_task(processor(msg)) # type: ignore await task_pool.add(task) return _process_items_wrapper @@ -148,18 +198,30 @@ def __init__( # type: ignore async def run( # type: ignore self, + *, is_shutting_down_f: Callable[[], bool], - produce_func: Callable[[], Awaitable[ConsumerRecord]], - consume_func: Callable[[ConsumerRecord], Awaitable[None]], + generator: Callable[[], Awaitable[ConsumerRecord]], + processor: Callable[[ConsumerRecord], Awaitable[None]], ) -> None: send_stream, receive_stream = anyio.create_memory_object_stream( max_buffer_size=self.max_buffer_size ) + """ + Runs the dynamic task executor. + + Args: + is_shutting_down_f: Function to check if the executor is shutting down. + generator: Generator function for retrieving consumer records. + processor: Processor function for processing consumer records. + + Returns: + None + """ async with self.exception_monitor, self.task_pool: async with anyio.create_task_group() as tg: tg.start_soon( - _process_items_task(consume_func, self.task_pool), receive_stream + _process_items_task(processor, self.task_pool), receive_stream ) async with send_stream: while not is_shutting_down_f(): @@ -168,13 +230,13 @@ async def run( # type: ignore and self.throw_exceptions ): break - msgs = await produce_func() + msgs = await generator() for msg in msgs: await send_stream.send(msg) # %% ../../nbs/011_TaskStreaming.ipynb 30 def _process_items_coro( # type: ignore - consume_func: Callable[[ConsumerRecord], Awaitable[None]], + processor: Callable[[ConsumerRecord], Awaitable[None]], throw_exceptions: bool, ) -> Callable[ [ @@ -186,13 +248,13 @@ def _process_items_coro( # type: ignore ]: async def _process_items_wrapper( # type: ignore receive_stream: anyio.streams.memory.MemoryObjectReceiveStream, - consume_func: Callable[[ConsumerRecord], Awaitable[None]] = consume_func, + processor: Callable[[ConsumerRecord], Awaitable[None]] = processor, throw_exceptions: bool = throw_exceptions, ) -> Awaitable[None]: async with receive_stream: async for msg in receive_stream: try: - await consume_func(msg) + await processor(msg) except Exception as e: if throw_exceptions: raise e @@ -213,26 +275,51 @@ def __init__( # type: ignore async def run( # type: ignore self, + *, is_shutting_down_f: Callable[[], bool], - produce_func: Callable[[], Awaitable[ConsumerRecord]], - consume_func: Callable[[ConsumerRecord], Awaitable[None]], + generator: Callable[[], Awaitable[ConsumerRecord]], + processor: Callable[[ConsumerRecord], Awaitable[None]], ) -> None: + """ + Runs the sequential executor. + + Args: + is_shutting_down_f: Function to check if the executor is shutting down. + generator: Generator function for retrieving consumer records. + processor: Processor function for processing consumer records. + + Returns: + None + """ + send_stream, receive_stream = anyio.create_memory_object_stream( max_buffer_size=self.max_buffer_size ) async with anyio.create_task_group() as tg: tg.start_soon( - _process_items_coro(consume_func, self.throw_exceptions), receive_stream + _process_items_coro(processor, self.throw_exceptions), receive_stream ) async with send_stream: while not is_shutting_down_f(): - msgs = await produce_func() + msgs = await generator() for msg in msgs: await send_stream.send(msg) # %% ../../nbs/011_TaskStreaming.ipynb 34 def get_executor(executor: Union[str, StreamExecutor, None] = None) -> StreamExecutor: + """ + Returns an instance of the specified executor. + + Args: + executor: Executor instance or name of the executor. + + Returns: + Instance of the specified executor. + + Raises: + AttributeError: If the executor is not found. + """ if isinstance(executor, StreamExecutor): return executor elif executor is None: diff --git a/fastkafka/_modidx.py b/fastkafka/_modidx.py index b095f0c..0625356 100644 --- a/fastkafka/_modidx.py +++ b/fastkafka/_modidx.py @@ -101,12 +101,12 @@ 'fastkafka/_components/aiokafka_consumer_loop.py'), 'fastkafka._components.aiokafka_consumer_loop._callback_parameters_wrapper': ( 'consumerloop.html#_callback_parameters_wrapper', 'fastkafka/_components/aiokafka_consumer_loop.py'), - 'fastkafka._components.aiokafka_consumer_loop._decode_streamed_msgs': ( 'consumerloop.html#_decode_streamed_msgs', - 'fastkafka/_components/aiokafka_consumer_loop.py'), + 'fastkafka._components.aiokafka_consumer_loop._get_batch_msg_handlers': ( 'consumerloop.html#_get_batch_msg_handlers', + 'fastkafka/_components/aiokafka_consumer_loop.py'), + 'fastkafka._components.aiokafka_consumer_loop._get_single_msg_handlers': ( 'consumerloop.html#_get_single_msg_handlers', + 'fastkafka/_components/aiokafka_consumer_loop.py'), 'fastkafka._components.aiokafka_consumer_loop._prepare_callback': ( 'consumerloop.html#_prepare_callback', 'fastkafka/_components/aiokafka_consumer_loop.py'), - 'fastkafka._components.aiokafka_consumer_loop._stream_msgs': ( 'consumerloop.html#_stream_msgs', - 'fastkafka/_components/aiokafka_consumer_loop.py'), 'fastkafka._components.aiokafka_consumer_loop.aiokafka_consumer_loop': ( 'consumerloop.html#aiokafka_consumer_loop', 'fastkafka/_components/aiokafka_consumer_loop.py'), 'fastkafka._components.aiokafka_consumer_loop.sanitize_kafka_config': ( 'consumerloop.html#sanitize_kafka_config', @@ -212,7 +212,9 @@ 'fastkafka._components.helpers.in_notebook': ( 'internal_helpers.html#in_notebook', 'fastkafka/_components/helpers.py'), 'fastkafka._components.helpers.true_after': ( 'internal_helpers.html#true_after', - 'fastkafka/_components/helpers.py')}, + 'fastkafka/_components/helpers.py'), + 'fastkafka._components.helpers.unwrap_list_type': ( 'internal_helpers.html#unwrap_list_type', + 'fastkafka/_components/helpers.py')}, 'fastkafka._components.logger': { 'fastkafka._components.logger.cached_log': ( 'logger.html#cached_log', 'fastkafka/_components/logger.py'), 'fastkafka._components.logger.get_default_logger_configuration': ( 'logger.html#get_default_logger_configuration', @@ -264,7 +266,9 @@ 'fastkafka._components.producer_decorator.release_callback': ( 'producerdecorator.html#release_callback', 'fastkafka/_components/producer_decorator.py'), 'fastkafka._components.producer_decorator.send_batch': ( 'producerdecorator.html#send_batch', - 'fastkafka/_components/producer_decorator.py')}, + 'fastkafka/_components/producer_decorator.py'), + 'fastkafka._components.producer_decorator.unwrap_from_kafka_event': ( 'producerdecorator.html#unwrap_from_kafka_event', + 'fastkafka/_components/producer_decorator.py')}, 'fastkafka._components.task_streaming': { 'fastkafka._components.task_streaming.DynamicTaskExecutor': ( 'taskstreaming.html#dynamictaskexecutor', 'fastkafka/_components/task_streaming.py'), 'fastkafka._components.task_streaming.DynamicTaskExecutor.__init__': ( 'taskstreaming.html#dynamictaskexecutor.__init__', diff --git a/nbs/011_ConsumerLoop.ipynb b/nbs/011_ConsumerLoop.ipynb index 368e56d..c986420 100644 --- a/nbs/011_ConsumerLoop.ipynb +++ b/nbs/011_ConsumerLoop.ipynb @@ -173,6 +173,14 @@ "\n", " @staticmethod\n", " def create_event_metadata(record: ConsumerRecord) -> \"EventMetadata\": # type: ignore\n", + " \"\"\"Creates an instance of EventMetadata from a ConsumerRecord.\n", + "\n", + " Args:\n", + " record: The Kafka ConsumerRecord.\n", + "\n", + " Returns:\n", + " The created EventMetadata instance.\n", + " \"\"\"\n", " return EventMetadata(\n", " topic=record.topic,\n", " partition=record.partition,\n", @@ -235,10 +243,10 @@ "source": [ "# | export\n", "\n", - "AsyncConsume = Callable[[BaseModel], Awaitable[None]]\n", - "AsyncConsumeMeta = Callable[[BaseModel, EventMetadata], Awaitable[None]]\n", - "SyncConsume = Callable[[BaseModel], None]\n", - "SyncConsumeMeta = Callable[[BaseModel, EventMetadata], None]\n", + "AsyncConsume = Callable[[Union[List[BaseModel], BaseModel]], Awaitable[None]]\n", + "AsyncConsumeMeta = Callable[[Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], Awaitable[None]]\n", + "SyncConsume = Callable[[Union[List[BaseModel], BaseModel]], None]\n", + "SyncConsumeMeta = Callable[[Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], None]\n", "\n", "ConsumeCallable = Union[\n", " AsyncConsume, AsyncConsumeMeta, SyncConsume, SyncConsumeMeta\n", @@ -266,16 +274,19 @@ " Returns:\n", " Wrapped callback with filtered params\n", " \"\"\"\n", + "\n", " async def _params_wrap(\n", - " msg: BaseModel,\n", - " meta: EventMetadata,\n", + " msg: Union[BaseModel, List[BaseModel]],\n", + " meta: Union[EventMetadata, List[EventMetadata]],\n", " callback: Union[AsyncConsume, AsyncConsumeMeta] = callback,\n", " ) -> None:\n", " types = list(get_type_hints(callback).values())\n", - " args: List[Union[BaseModel, EventMetadata]] = [msg]\n", + " args: List[Union[BaseModel, List[BaseModel], EventMetadata, List[EventMetadata]]] = [msg]\n", " if EventMetadata in types:\n", " args.insert(types.index(EventMetadata), meta)\n", - " await callback(*args) # type: ignore\n", + " if List[EventMetadata] in types:\n", + " args.insert(types.index(List[EventMetadata]), meta)\n", + " await callback(*args) # type: ignore\n", "\n", " return _params_wrap" ] @@ -323,6 +334,21 @@ "await with_meta(\"Example_msg\", \"Some_meta\")" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "ec76eaf3", + "metadata": {}, + "outputs": [], + "source": [ + "@_callback_parameters_wrapper\n", + "async def with_meta(msg: List[BaseModel], meta: List[EventMetadata]):\n", + " assert msg == \"Example_msg\"\n", + " assert meta == \"Some_meta\"\n", + "\n", + "await with_meta(\"Example_msg\", \"Some_meta\")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -341,7 +367,7 @@ " 1. If callback is sync, asyncify it\n", " 2. Wrap the callback into a safe callback for exception handling\n", "\n", - " Params:\n", + " Args:\n", " callback: async callable that will be prepared for use in consumer\n", "\n", " Returns:\n", @@ -381,9 +407,6 @@ "metadata": {}, "outputs": [], "source": [ - "# | export\n", - "\n", - "\n", "async def _stream_msgs( # type: ignore\n", " msgs: Dict[TopicPartition, bytes],\n", " send_stream: anyio.streams.memory.MemoryObjectSendStream[Any],\n", @@ -391,7 +414,7 @@ " \"\"\"\n", " Decodes and streams the message and topic to the send_stream.\n", "\n", - " Params:\n", + " Args:\n", " msgs:\n", " send_stream:\n", " \"\"\"\n", @@ -541,6 +564,226 @@ " mock.assert_has_calls([call(msg) for msg in msgs.values()])" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "e5275963", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "\n", + "def _get_single_msg_handlers( # type: ignore\n", + " *,\n", + " consumer: AIOKafkaConsumer,\n", + " callback: AsyncConsumeMeta,\n", + " decoder_fn: Callable[[bytes, ModelMetaclass], Any],\n", + " msg_type: Type[BaseModel],\n", + " **kwargs: Any,\n", + ") -> Tuple[\n", + " Callable[\n", + " [\n", + " ConsumerRecord,\n", + " AsyncConsumeMeta,\n", + " Callable[[bytes, ModelMetaclass], Any],\n", + " Type[BaseModel],\n", + " ],\n", + " Awaitable[None],\n", + " ],\n", + " Callable[[AIOKafkaConsumer, Any], Awaitable[List[ConsumerRecord]]],\n", + "]:\n", + " \n", + " \"\"\"\n", + " Retrieves the message handlers for consuming single messages from a Kafka topic.\n", + "\n", + " Args:\n", + " consumer: The Kafka consumer instance.\n", + " callback: The callback function to handle the consumed message.\n", + " decoder_fn: The function to decode the consumed message.\n", + " msg_type: The type of the consumed message.\n", + " **kwargs: Additional keyword arguments for the consumer.\n", + "\n", + " Returns:\n", + " The handle_msg function and poll_consumer function.\n", + " \"\"\"\n", + " async def handle_msg( # type: ignore\n", + " record: ConsumerRecord,\n", + " callback: AsyncConsumeMeta = callback,\n", + " decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,\n", + " msg_type: Type[BaseModel] = msg_type,\n", + " ) -> None:\n", + " await callback(\n", + " decoder_fn(record.value, msg_type),\n", + " EventMetadata.create_event_metadata(record),\n", + " )\n", + "\n", + " async def poll_consumer( # type: ignore\n", + " consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs\n", + " ) -> List[ConsumerRecord]:\n", + " msgs = await consumer.getmany(**kwargs)\n", + " return [msg for msg_group in msgs.values() for msg in msg_group]\n", + "\n", + " return handle_msg, poll_consumer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cc43f2f7", + "metadata": {}, + "outputs": [], + "source": [ + "topic_partitions = [(\"topic_0\", 0), (\"topic_0\", 1)]\n", + "\n", + "msg = MyMessage(url=\"http://www.acme.com\", port=22)\n", + "msgs = {\n", + " TopicPartition(topic, partition): [\n", + " create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + " ]\n", + " for topic, partition in topic_partitions\n", + "}\n", + "record = create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + "\n", + "consumer = AsyncMock()\n", + "consumer.getmany.return_value = msgs\n", + "\n", + "callback = AsyncMock()\n", + "decoder_fn = json_decoder\n", + "msg_type = MyMessage\n", + "\n", + "handle_msg, poll_consumer = _get_single_msg_handlers(\n", + " consumer=consumer, callback=callback, decoder_fn=decoder_fn, msg_type=msg_type\n", + ")\n", + "\n", + "got_msgs = await poll_consumer()\n", + "assert len(msgs.values()) == len(got_msgs)\n", + "\n", + "for msg in got_msgs:\n", + " await handle_msg(msg)\n", + "\n", + "callback.assert_has_awaits(\n", + " [\n", + " call(\n", + " json_decoder(msg.value, msg_type), EventMetadata.create_event_metadata(msg)\n", + " )\n", + " for msg in got_msgs\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5ddd1b59", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "\n", + "def _get_batch_msg_handlers( # type: ignore\n", + " *,\n", + " consumer: AIOKafkaConsumer,\n", + " callback: AsyncConsumeMeta,\n", + " decoder_fn: Callable[[bytes, ModelMetaclass], Any],\n", + " msg_type: Type[BaseModel],\n", + " **kwargs: Any,\n", + ") -> Tuple[\n", + " Callable[\n", + " [\n", + " List[ConsumerRecord],\n", + " AsyncConsumeMeta,\n", + " Callable[[bytes, ModelMetaclass], Any],\n", + " Type[BaseModel],\n", + " ],\n", + " Awaitable[None],\n", + " ],\n", + " Callable[[AIOKafkaConsumer, Any], Awaitable[List[List[ConsumerRecord]]]],\n", + "]:\n", + " \"\"\"\n", + " Retrieves the message handlers for consuming messages in batches from a Kafka topic.\n", + "\n", + " Args:\n", + " consumer: The Kafka consumer instance.\n", + " callback: The callback function to handle the consumed messages.\n", + " decoder_fn: The function to decode the consumed messages.\n", + " msg_type: The type of the consumed messages.\n", + " **kwargs: Additional keyword arguments for the consumer.\n", + "\n", + " Returns:\n", + " The handle_msg function and poll_consumer function.\n", + " \"\"\"\n", + "\n", + " async def handle_msg( # type: ignore\n", + " records: List[ConsumerRecord],\n", + " callback: AsyncConsumeMeta = callback,\n", + " decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,\n", + " msg_type: Type[BaseModel] = msg_type,\n", + " ) -> None:\n", + " await callback(\n", + " [decoder_fn(record.value, msg_type) for record in records],\n", + " [EventMetadata.create_event_metadata(record) for record in records],\n", + " )\n", + "\n", + " async def poll_consumer( # type: ignore\n", + " consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs\n", + " ) -> List[List[ConsumerRecord]]:\n", + " msgs = await consumer.getmany(**kwargs)\n", + " return [value for value in msgs.values() if len(value)>0]\n", + "\n", + " return handle_msg, poll_consumer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18367eb4", + "metadata": {}, + "outputs": [], + "source": [ + "topic_partitions = [(\"topic_0\", 0), (\"topic_0\", 1)]\n", + "\n", + "msg = MyMessage(url=\"http://www.acme.com\", port=22)\n", + "msgs = {\n", + " TopicPartition(topic, partition): [\n", + " create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + " ]\n", + " for topic, partition in topic_partitions\n", + "}\n", + "record = create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + "\n", + "consumer = AsyncMock()\n", + "consumer.getmany.return_value = msgs\n", + "\n", + "callback = AsyncMock()\n", + "decoder_fn = json_decoder\n", + "msg_type = MyMessage\n", + "\n", + "handle_msg, poll_consumer = _get_batch_msg_handlers(\n", + " consumer=consumer, callback=callback, decoder_fn=decoder_fn, msg_type=msg_type\n", + ")\n", + "\n", + "got_msgs = await poll_consumer()\n", + "assert len(msgs.values()) == len(got_msgs)\n", + "\n", + "for msgs in got_msgs:\n", + " assert len(msgs) == 1\n", + "\n", + "for msg in got_msgs:\n", + " await handle_msg(msg)\n", + "\n", + "callback.assert_has_awaits(\n", + " [\n", + " call(\n", + " [json_decoder(msg_unwrapped.value, msg_type) for msg_unwrapped in msg],\n", + " [EventMetadata.create_event_metadata(msg_unwrapped) for msg_unwrapped in msg],\n", + " )\n", + " for msg in got_msgs\n", + " ]\n", + ")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -559,7 +802,7 @@ " decoder_fn: Callable[[bytes, ModelMetaclass], Any],\n", " callback: ConsumeCallable,\n", " max_buffer_size: int = 100_000,\n", - " msg_type: Type[BaseModel],\n", + " msg_type: Union[Type[List[BaseModel]], Type[BaseModel]],\n", " is_shutting_down_f: Callable[[], bool],\n", " executor: Union[str, StreamExecutor, None] = None,\n", " **kwargs: Any,\n", @@ -568,7 +811,7 @@ " Consumer loop for infinite pooling of the AIOKafka consumer for new messages. Calls consumer.getmany()\n", " and after the consumer return messages or times out, messages are decoded and streamed to defined callback.\n", "\n", - " Params:\n", + " Args:\n", " topic: Topic to subscribe\n", " decoder_fn: Function to decode the messages consumed from the topic\n", " callbacks: Dict of callbacks mapped to their respective topics\n", @@ -580,22 +823,28 @@ "\n", " prepared_callback = _prepare_callback(callback)\n", "\n", - " async def handle_msg( # type: ignore\n", - " record: ConsumerRecord,\n", - " callback: AsyncConsumeMeta = prepared_callback,\n", - " decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,\n", - " msg_type: Type[BaseModel] = msg_type,\n", - " ) -> None:\n", - " await callback(\n", - " decoder_fn(record.value, msg_type),\n", - " EventMetadata.create_event_metadata(record),\n", + " if hasattr(msg_type, \"__origin__\") and msg_type.__origin__ == list:\n", + " handle_msg, poll_consumer = _get_batch_msg_handlers(\n", + " consumer=consumer,\n", + " callback=prepared_callback,\n", + " decoder_fn=decoder_fn,\n", + " msg_type=msg_type.__args__[0], # type: ignore\n", + " **kwargs,\n", + " )\n", + " else:\n", + " handle_msg, poll_consumer = _get_single_msg_handlers(\n", + " consumer=consumer,\n", + " callback=prepared_callback,\n", + " decoder_fn=decoder_fn,\n", + " msg_type=msg_type, # type: ignore\n", + " **kwargs,\n", " )\n", "\n", - " async def poll_consumer(kwargs: Any = kwargs) -> List[ConsumerRecord]: # type: ignore\n", - " msgs = await consumer.getmany(**kwargs)\n", - " return [msg for msg_group in msgs.values() for msg in msg_group]\n", - "\n", - " await get_executor(executor).run(is_shutting_down_f, poll_consumer, handle_msg)" + " await get_executor(executor).run(\n", + " is_shutting_down_f=is_shutting_down_f,\n", + " generator=poll_consumer, # type: ignore\n", + " processor=handle_msg, # type: ignore\n", + " )" ] }, { @@ -612,6 +861,16 @@ " return _is_shutting_down_f" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "3020fa4a", + "metadata": {}, + "outputs": [], + "source": [ + "from fastkafka._components.task_streaming import SequentialExecutor" + ] + }, { "cell_type": "code", "execution_count": null, @@ -638,7 +897,9 @@ "f = asyncio.Future()\n", "f.set_result(msgs)\n", "mock_consumer.configure_mock(**{\"getmany.return_value\": f})\n", - "mock_callback = Mock()\n", + "\n", + "def f(msg: MyMessage): pass\n", + "mock_callback = MagicMock(spec=f)\n", "\n", "\n", "for is_async in [True, False]:\n", @@ -661,6 +922,57 @@ "print(\"ok\")" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6854bb7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "ok\n" + ] + } + ], + "source": [ + "topic = \"topic_0\"\n", + "partition = 0\n", + "msg = MyMessage(url=\"http://www.acme.com\", port=22)\n", + "record = create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + "\n", + "mock_consumer = MagicMock()\n", + "msgs = {TopicPartition(topic, 0): [record]}\n", + "\n", + "f = asyncio.Future()\n", + "f.set_result(msgs)\n", + "mock_consumer.configure_mock(**{\"getmany.return_value\": f})\n", + "\n", + "def f(msg: List[MyMessage]): pass\n", + "mock_callback = MagicMock(spec=f)\n", + "\n", + "\n", + "for is_async in [True, False]:\n", + " for executor_type in [\"DynamicTaskExecutor\", \"SequentialExecutor\"]:\n", + " await _aiokafka_consumer_loop(\n", + " consumer=mock_consumer,\n", + " topic=topic,\n", + " decoder_fn=json_decoder,\n", + " max_buffer_size=100,\n", + " timeout_ms=10,\n", + " callback=asyncer.asyncify(mock_callback) if is_async else mock_callback,\n", + " msg_type=List[MyMessage],\n", + " is_shutting_down_f=is_shutting_down_f(mock_consumer.getmany),\n", + " executor_type=executor_type,\n", + " )\n", + "\n", + " assert mock_consumer.getmany.call_count == 1\n", + " mock_callback.assert_called_once_with([msg])\n", + "\n", + "print(\"ok\")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -832,7 +1144,7 @@ " timeout_ms: int = 100,\n", " max_buffer_size: int = 100_000,\n", " callback: ConsumeCallable,\n", - " msg_type: Type[BaseModel],\n", + " msg_type: Union[Type[List[BaseModel]], Type[BaseModel]],\n", " is_shutting_down_f: Callable[[], bool],\n", " executor: Union[str, StreamExecutor, None] = None,\n", " **kwargs: Any,\n", @@ -908,7 +1220,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "d2ff2c5cf7924175a7e020b8b6334b64", + "model_id": "61317fa5eb244e60a286e4edcdcf9a51", "version_major": 2, "version_minor": 0 }, @@ -941,10 +1253,10 @@ "[INFO] __main__: msgs_received=9000\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 48208...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 48208 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 47846...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 47846 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 112786...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 112786 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 112426...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 112426 terminated.\n", "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", @@ -955,7 +1267,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "c305aaaa1a244e52842a053f02ac6825", + "model_id": "75d0b57876d943438e5add00860e967c", "version_major": 2, "version_minor": 0 }, @@ -988,10 +1300,10 @@ "[INFO] __main__: msgs_received=9000\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 49350...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 49350 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 48989...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 48989 terminated.\n" + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 113934...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 113934 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 113569...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 113569 terminated.\n" ] } ], @@ -1030,6 +1342,130 @@ " assert msgs_sent == msgs_received, f\"{msgs_sent} != {msgs_received}\"" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "fde91e3e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", + "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "17f7bd19cb6c490484c2483ebed0a6b6", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "producing to 'test_topic': 0%| | 0/9178 [00:00, ?it/s]" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] __main__: aiokafka_consumer_loop() starting...\n", + "[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}\n", + "[INFO] __main__: aiokafka_consumer_loop(): Consumer started.\n", + "[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})\n", + "[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}\n", + "[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.\n", + "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. \n", + "[INFO] __main__: msgs_received=9178\n", + "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", + "[INFO] __main__: aiokafka_consumer_loop() finished.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 115108...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 115108 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 114746...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 114746 terminated.\n", + "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", + "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "a988c74fff734d1da038ed08ff1a26fa", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "producing to 'test_topic': 0%| | 0/9178 [00:00, ?it/s]" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] __main__: aiokafka_consumer_loop() starting...\n", + "[INFO] __main__: aiokafka_consumer_loop(): Consumer created using the following parameters: {'auto_offset_reset': 'earliest', 'bootstrap_servers': '127.0.0.1:9092'}\n", + "[INFO] __main__: aiokafka_consumer_loop(): Consumer started.\n", + "[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'test_topic'})\n", + "[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}\n", + "[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.\n", + "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. \n", + "[INFO] __main__: msgs_received=9178\n", + "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", + "[INFO] __main__: aiokafka_consumer_loop() finished.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 116245...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 116245 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 115884...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 115884 terminated.\n" + ] + } + ], + "source": [ + "for executor in [\"DynamicTaskExecutor\", \"SequentialExecutor\"]:\n", + " topic = \"test_topic\"\n", + " msgs_sent = 9178\n", + " msgs = [\n", + " MyMessage(url=\"http://www.ai.com\", port=port).json().encode(\"utf-8\")\n", + " for port in range(msgs_sent)\n", + " ]\n", + " msgs_received = 0\n", + "\n", + "\n", + " async def count_msg(msg: List[MyMessage], meta: List[EventMetadata]):\n", + " global msgs_received\n", + " msgs_received = msgs_received + len(msg)\n", + " logger.info(f\"{msgs_received=}\")\n", + "\n", + "\n", + " async with ApacheKafkaBroker(topics=[topic]) as bootstrap_server:\n", + " await produce_messages(topic=topic, bootstrap_servers=bootstrap_server, msgs=msgs)\n", + " await aiokafka_consumer_loop(\n", + " topic=topic,\n", + " decoder_fn=json_decoder,\n", + " auto_offset_reset=\"earliest\",\n", + " callback=count_msg,\n", + " msg_type=List[MyMessage],\n", + " is_shutting_down_f=true_after(2),\n", + " bootstrap_servers=bootstrap_server,\n", + " executor=executor,\n", + " )\n", + "\n", + " assert msgs_sent == msgs_received, f\"{msgs_sent} != {msgs_received}\"" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1050,7 +1486,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "4db673183b8b4515978557f9e72ee52a", + "model_id": "bcc210b13ae643c6b731b2b079ab286f", "version_major": 2, "version_minor": 0 }, @@ -1072,21 +1508,21 @@ "[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.\n", "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. \n", - "[INFO] __main__: msgs_received=1000, meta=EventMetadata(topic='test_topic', partition=0, offset=999, timestamp=1683884722296, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 999}', checksum=None, serialized_key_size=-1, serialized_value_size=41, headers=())\n", - "[INFO] __main__: msgs_received=2000, meta=EventMetadata(topic='test_topic', partition=0, offset=1999, timestamp=1683884722312, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 1999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=3000, meta=EventMetadata(topic='test_topic', partition=0, offset=2999, timestamp=1683884722328, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 2999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=4000, meta=EventMetadata(topic='test_topic', partition=0, offset=3999, timestamp=1683884722343, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 3999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=5000, meta=EventMetadata(topic='test_topic', partition=0, offset=4999, timestamp=1683884722356, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 4999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=6000, meta=EventMetadata(topic='test_topic', partition=0, offset=5999, timestamp=1683884722368, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 5999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=7000, meta=EventMetadata(topic='test_topic', partition=0, offset=6999, timestamp=1683884722381, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 6999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=8000, meta=EventMetadata(topic='test_topic', partition=0, offset=7999, timestamp=1683884722393, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 7999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=9000, meta=EventMetadata(topic='test_topic', partition=0, offset=8999, timestamp=1683884722405, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 8999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=1000, meta=EventMetadata(topic='test_topic', partition=0, offset=999, timestamp=1684481140950, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 999}', checksum=None, serialized_key_size=-1, serialized_value_size=41, headers=())\n", + "[INFO] __main__: msgs_received=2000, meta=EventMetadata(topic='test_topic', partition=0, offset=1999, timestamp=1684481140964, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 1999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=3000, meta=EventMetadata(topic='test_topic', partition=0, offset=2999, timestamp=1684481140981, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 2999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=4000, meta=EventMetadata(topic='test_topic', partition=0, offset=3999, timestamp=1684481140994, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 3999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=5000, meta=EventMetadata(topic='test_topic', partition=0, offset=4999, timestamp=1684481141006, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 4999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=6000, meta=EventMetadata(topic='test_topic', partition=0, offset=5999, timestamp=1684481141019, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 5999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=7000, meta=EventMetadata(topic='test_topic', partition=0, offset=6999, timestamp=1684481141032, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 6999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=8000, meta=EventMetadata(topic='test_topic', partition=0, offset=7999, timestamp=1684481141044, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 7999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=9000, meta=EventMetadata(topic='test_topic', partition=0, offset=8999, timestamp=1684481141057, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 8999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 50496...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 50496 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 50133...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 50133 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 117386...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 117386 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 117024...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 117024 terminated.\n", "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", @@ -1097,7 +1533,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "b2c77c22b9d44d348aee0935f5e5af74", + "model_id": "8248d9d3b5304d73a69e8693c5b55b2a", "version_major": 2, "version_minor": 0 }, @@ -1119,21 +1555,21 @@ "[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'test_topic'}\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer subscribed.\n", "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'test_topic': 1}. \n", - "[INFO] __main__: msgs_received=1000, meta=EventMetadata(topic='test_topic', partition=0, offset=999, timestamp=1683884733112, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 999}', checksum=None, serialized_key_size=-1, serialized_value_size=41, headers=())\n", - "[INFO] __main__: msgs_received=2000, meta=EventMetadata(topic='test_topic', partition=0, offset=1999, timestamp=1683884733173, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 1999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=3000, meta=EventMetadata(topic='test_topic', partition=0, offset=2999, timestamp=1683884733187, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 2999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=4000, meta=EventMetadata(topic='test_topic', partition=0, offset=3999, timestamp=1683884733200, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 3999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=5000, meta=EventMetadata(topic='test_topic', partition=0, offset=4999, timestamp=1683884733212, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 4999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=6000, meta=EventMetadata(topic='test_topic', partition=0, offset=5999, timestamp=1683884733225, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 5999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=7000, meta=EventMetadata(topic='test_topic', partition=0, offset=6999, timestamp=1683884733238, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 6999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=8000, meta=EventMetadata(topic='test_topic', partition=0, offset=7999, timestamp=1683884733250, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 7999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", - "[INFO] __main__: msgs_received=9000, meta=EventMetadata(topic='test_topic', partition=0, offset=8999, timestamp=1683884733263, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 8999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=1000, meta=EventMetadata(topic='test_topic', partition=0, offset=999, timestamp=1684481151297, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 999}', checksum=None, serialized_key_size=-1, serialized_value_size=41, headers=())\n", + "[INFO] __main__: msgs_received=2000, meta=EventMetadata(topic='test_topic', partition=0, offset=1999, timestamp=1684481151312, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 1999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=3000, meta=EventMetadata(topic='test_topic', partition=0, offset=2999, timestamp=1684481151362, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 2999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=4000, meta=EventMetadata(topic='test_topic', partition=0, offset=3999, timestamp=1684481151374, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 3999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=5000, meta=EventMetadata(topic='test_topic', partition=0, offset=4999, timestamp=1684481151387, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 4999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=6000, meta=EventMetadata(topic='test_topic', partition=0, offset=5999, timestamp=1684481151399, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 5999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=7000, meta=EventMetadata(topic='test_topic', partition=0, offset=6999, timestamp=1684481151412, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 6999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=8000, meta=EventMetadata(topic='test_topic', partition=0, offset=7999, timestamp=1684481151424, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 7999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", + "[INFO] __main__: msgs_received=9000, meta=EventMetadata(topic='test_topic', partition=0, offset=8999, timestamp=1684481151437, timestamp_type=0, key=None, value=b'{\"url\": \"http://www.ai.com\", \"port\": 8999}', checksum=None, serialized_key_size=-1, serialized_value_size=42, headers=())\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 51641...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 51641 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 51281...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 51281 terminated.\n" + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 118525...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 118525 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 118165...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 118165 terminated.\n" ] } ], @@ -1195,7 +1631,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "44cfd22c476c49e183d5a93c3104fc0d", + "model_id": "d5f144bc4beb4e34a4268b3ffc1563f3", "version_major": 2, "version_minor": 0 }, @@ -1228,10 +1664,10 @@ "[INFO] __main__: msgs_received=9000\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 52797...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 52797 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 52436...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 52436 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 119665...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 119665 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 119304...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 119304 terminated.\n", "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", @@ -1242,7 +1678,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "6329cf00c38845da9c8ab332d6206800", + "model_id": "021a12b9337845fa80b65c67f1e704c3", "version_major": 2, "version_minor": 0 }, @@ -1275,10 +1711,10 @@ "[INFO] __main__: msgs_received=9000\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 53937...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 53937 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 53577...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 53577 terminated.\n" + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 120806...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 120806 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 120446...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 120446 terminated.\n" ] } ], @@ -1338,7 +1774,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "33de8d82f1dd46eaa09df48ce3fac03a", + "model_id": "2efd3d503b4e476cb49087054bf1ff26", "version_major": 2, "version_minor": 0 }, @@ -1371,10 +1807,10 @@ "[INFO] __main__: msgs_received=9000\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 55077...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 55077 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 54717...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 54717 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 121949...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 121949 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 121587...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 121587 terminated.\n", "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", @@ -1385,7 +1821,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "f66974c372594677b441a584ec3a3d22", + "model_id": "0c28c5e502a74b9384202583589f632f", "version_major": 2, "version_minor": 0 }, @@ -1418,10 +1854,10 @@ "[INFO] __main__: msgs_received=9000\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 56218...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 56218 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 55857...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 55857 terminated.\n" + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 123090...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 123090 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 122728...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 122728 terminated.\n" ] } ], @@ -1483,7 +1919,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "2ec97ab4f9b747d38f01e0fec472538d", + "model_id": "d1d439c836234340966fdf39fc32f36e", "version_major": 2, "version_minor": 0 }, @@ -1497,7 +1933,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "144a049d228345cda4b1a079fa0a9bf1", + "model_id": "d3a57c79aa65425c9054420d57650276", "version_major": 2, "version_minor": 0 }, @@ -1522,12 +1958,12 @@ "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", "Messages processed: 50,000\n", - "Time : 4.89 s\n", - "Throughput. : 10,226 msg/s\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 57361...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 57361 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 56999...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 56999 terminated.\n", + "Time : 5.18 s\n", + "Throughput. : 9,656 msg/s\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 124229...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 124229 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 123868...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 123868 terminated.\n", "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", @@ -1538,7 +1974,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "357fad08997442bba8cbe044ec9445fd", + "model_id": "a8c7227cde234d76b2b7e949fb8cb9c8", "version_major": 2, "version_minor": 0 }, @@ -1552,7 +1988,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "b3a04af251ee46b7b86fc1060f87a4c6", + "model_id": "77a8955186df47bdab025909670db12e", "version_major": 2, "version_minor": 0 }, @@ -1577,12 +2013,12 @@ "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", "Messages processed: 50,000\n", - "Time : 3.95 s\n", - "Throughput. : 12,666 msg/s\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 58500...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 58500 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 58140...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 58140 terminated.\n" + "Time : 4.04 s\n", + "Throughput. : 12,373 msg/s\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 125368...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 125368 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 125008...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 125008 terminated.\n" ] } ], @@ -1649,7 +2085,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "3360c7cfc55b4c5bbd3a435d421731c3", + "model_id": "8c5f512f431a4fc89312b0c2105c5bc6", "version_major": 2, "version_minor": 0 }, @@ -1663,7 +2099,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "06e18bc1ac074502aa4c2fdebe38dc12", + "model_id": "0d1f8cbbf5a44071b576356169958ac8", "version_major": 2, "version_minor": 0 }, @@ -1688,12 +2124,12 @@ "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", "Messages processed: 50,000\n", - "Time : 5.95 s\n", - "Throughput. : 8,399 msg/s\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 59638...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 59638 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 59278...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 59278 terminated.\n", + "Time : 6.28 s\n", + "Throughput. : 7,956 msg/s\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 126511...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 126511 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 126149...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 126149 terminated.\n", "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", @@ -1704,7 +2140,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "60ff01678440483e9d8ff4417b805125", + "model_id": "f71de53cc3824f0bbf22fefa145d94c5", "version_major": 2, "version_minor": 0 }, @@ -1718,7 +2154,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "616655792fbd46ca9c7ec7d6cb67f2cb", + "model_id": "c514bc4680e14d37aff2e986f6380814", "version_major": 2, "version_minor": 0 }, @@ -1745,10 +2181,10 @@ "Messages processed: 50,000\n", "Time : 5.12 s\n", "Throughput. : 9,758 msg/s\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 60795...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 60795 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 60434...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 60434 terminated.\n" + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 127650...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 127650 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 127289...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 127289 terminated.\n" ] } ], @@ -1825,7 +2261,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "8fe724f3529f40bdba9545ef77b7e8f1", + "model_id": "3cd7bb1479a546fba542eb27d59db1a7", "version_major": 2, "version_minor": 0 }, @@ -1839,7 +2275,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "9cbf236144304a7b9c859b669bce6a1a", + "model_id": "495009629efb463dacf28c2a48c53f5f", "version_major": 2, "version_minor": 0 }, @@ -1864,12 +2300,12 @@ "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", "Messages processed: 100,000\n", - "Time : 8.13 s\n", - "Throughput. : 12,295 msg/s\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 61938...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 61938 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 61578...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 61578 terminated.\n" + "Time : 8.31 s\n", + "Throughput. : 12,034 msg/s\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 128793...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 128793 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 128430...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 128430 terminated.\n" ] } ], diff --git a/nbs/011_TaskStreaming.ipynb b/nbs/011_TaskStreaming.ipynb index 7d7bcee..128a601 100644 --- a/nbs/011_TaskStreaming.ipynb +++ b/nbs/011_TaskStreaming.ipynb @@ -15,7 +15,22 @@ "execution_count": null, "id": "66b86144", "metadata": {}, - "outputs": [], + "outputs": [ + { + "ename": "IndentationError", + "evalue": "unexpected indent (task_streaming.py, line 208)", + "output_type": "error", + "traceback": [ + "Traceback \u001b[0;36m(most recent call last)\u001b[0m:\n", + "\u001b[0m File \u001b[1;32m/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py:3460\u001b[0m in \u001b[1;35mrun_code\u001b[0m\n exec(code_obj, self.user_global_ns, self.user_ns)\u001b[0m\n", + "\u001b[0m Cell \u001b[1;32mIn[2], line 15\u001b[0m\n from fastkafka._components.logger import get_logger\u001b[0m\n", + "\u001b[0m File \u001b[1;32m/work/fastkafka/fastkafka/__init__.py:8\u001b[0m\n from ._application.app import FastKafka\u001b[0m\n", + "\u001b[0m File \u001b[1;32m/work/fastkafka/fastkafka/_application/app.py:32\u001b[0m\n from fastkafka._components.aiokafka_consumer_loop import (\u001b[0m\n", + "\u001b[0;36m File \u001b[0;32m/work/fastkafka/fastkafka/_components/aiokafka_consumer_loop.py:25\u001b[0;36m\n\u001b[0;31m from .task_streaming import get_executor, StreamExecutor\u001b[0;36m\n", + "\u001b[0;36m File \u001b[0;32m/work/fastkafka/fastkafka/_components/task_streaming.py:208\u001b[0;36m\u001b[0m\n\u001b[0;31m \"\"\"\u001b[0m\n\u001b[0m ^\u001b[0m\n\u001b[0;31mIndentationError\u001b[0m\u001b[0;31m:\u001b[0m unexpected indent\n" + ] + } + ], "source": [ "# | export \n", "\n", @@ -106,7 +121,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "575adcdde80d4140a9bcc072b0423333", + "model_id": "6445f3bcd6b1450797e959f8a136e49c", "version_major": 2, "version_minor": 0 }, @@ -120,7 +135,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "d3badf71946549ec8a37a94f69584400", + "model_id": "1a382712296d4e1891d8b8c8b85b9e11", "version_major": 2, "version_minor": 0 }, @@ -181,7 +196,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "d57db15b0430473992f7fb9930709d40", + "model_id": "60ff8fff4bcd4a3fa9105b5b4d3daef2", "version_major": 2, "version_minor": 0 }, @@ -195,7 +210,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "982731621e094b88b2833061aa9fb9de", + "model_id": "bd50489641544a4992bd42103464a81b", "version_major": 2, "version_minor": 0 }, @@ -293,12 +308,30 @@ " self.finished = False\n", "\n", " async def add(self, item: Task) -> None:\n", + " \"\"\"\n", + " Adds a task to the task pool.\n", + "\n", + " Args:\n", + " item: The task to be added.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", " while len(self.pool) >= self.size:\n", " await asyncio.sleep(0)\n", " self.pool.add(item)\n", " item.add_done_callback(self.discard)\n", "\n", " def discard(self, task: Task) -> None:\n", + " \"\"\"\n", + " Discards a completed task from the task pool.\n", + "\n", + " Args:\n", + " task: The completed task to be discarded.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", " e = task.exception()\n", " if e is not None and self.on_error is not None:\n", " try:\n", @@ -311,6 +344,12 @@ " self.pool.discard(task)\n", "\n", " def __len__(self) -> int:\n", + " \"\"\"\n", + " Returns the number of tasks in the task pool.\n", + "\n", + " Returns:\n", + " The number of tasks in the task pool.\n", + " \"\"\"\n", " return len(self.pool)\n", "\n", " async def __aenter__(self) -> \"TaskPool\":\n", @@ -324,6 +363,15 @@ "\n", " @staticmethod\n", " def log_error(logger: Logger) -> Callable[[Exception], None]:\n", + " \"\"\"\n", + " Creates a decorator that logs errors using the specified logger.\n", + "\n", + " Args:\n", + " logger: The logger to use for error logging.\n", + "\n", + " Returns:\n", + " The decorator function.\n", + " \"\"\"\n", " def _log_error(e: Exception, logger: Logger = logger) -> None:\n", " logger.warning(f\"{e=}\")\n", " return _log_error" @@ -404,10 +452,25 @@ " self.exception_found = False\n", " \n", " def on_error(self, e: Exception) -> None:\n", + " \"\"\"\n", + " Handles an error by storing the exception.\n", + "\n", + " Args:\n", + " e: The exception to be handled.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", " self.exceptions.append(e)\n", " self.exception_found = True\n", " \n", " def _monitor_step(self) -> None:\n", + " \"\"\"\n", + " Raises the next exception in the queue.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", " if len(self.exceptions) > 0:\n", " e = self.exceptions.pop(0)\n", " raise e\n", @@ -485,9 +548,10 @@ " @abstractmethod\n", " async def run( # type: ignore\n", " self,\n", + " *,\n", " is_shutting_down_f: Callable[[], bool],\n", - " produce_func: Callable[[], Awaitable[ConsumerRecord]],\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " generator: Callable[[], Awaitable[ConsumerRecord]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " ) -> None:\n", " pass" ] @@ -539,7 +603,7 @@ "\n", "\n", "def _process_items_task( # type: ignore\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " task_pool: TaskPool\n", ") -> Callable[\n", " [\n", @@ -551,12 +615,12 @@ "]:\n", " async def _process_items_wrapper( # type: ignore\n", " receive_stream: anyio.streams.memory.MemoryObjectReceiveStream,\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]] = consume_func,\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]] = processor,\n", " task_pool= task_pool,\n", " ):\n", " async with receive_stream:\n", " async for msg in receive_stream:\n", - " task: asyncio.Task = asyncio.create_task(consume_func(msg)) # type: ignore\n", + " task: asyncio.Task = asyncio.create_task(processor(msg)) # type: ignore\n", " await task_pool.add(task)\n", "\n", " return _process_items_wrapper" @@ -591,17 +655,29 @@ "\n", " async def run( # type: ignore\n", " self,\n", + " *,\n", " is_shutting_down_f: Callable[[], bool],\n", - " produce_func: Callable[[], Awaitable[ConsumerRecord]],\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " generator: Callable[[], Awaitable[ConsumerRecord]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " ) -> None:\n", " send_stream, receive_stream = anyio.create_memory_object_stream(\n", " max_buffer_size=self.max_buffer_size\n", " )\n", + " \"\"\"\n", + " Runs the dynamic task executor.\n", + "\n", + " Args:\n", + " is_shutting_down_f: Function to check if the executor is shutting down.\n", + " generator: Generator function for retrieving consumer records.\n", + " processor: Processor function for processing consumer records.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", "\n", " async with self.exception_monitor, self.task_pool:\n", " async with anyio.create_task_group() as tg:\n", - " tg.start_soon(_process_items_task(consume_func, self.task_pool), receive_stream)\n", + " tg.start_soon(_process_items_task(processor, self.task_pool), receive_stream)\n", " async with send_stream:\n", " while not is_shutting_down_f():\n", " if (\n", @@ -609,7 +685,7 @@ " and self.throw_exceptions\n", " ):\n", " break\n", - " msgs = await produce_func()\n", + " msgs = await generator()\n", " for msg in msgs:\n", " await send_stream.send(msg)" ] @@ -672,9 +748,9 @@ "stream = DynamicTaskExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(),\n", - " produce_func=produce,\n", - " consume_func=consume,\n", + " is_shutting_down_f=is_shutting_down_f(),\n", + " generator=produce,\n", + " processor=consume,\n", ")" ] }, @@ -691,9 +767,9 @@ "stream = DynamicTaskExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(),\n", - " produce_func=mock_produce,\n", - " consume_func=mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", ")\n", "\n", "mock_produce.assert_awaited()\n", @@ -713,9 +789,9 @@ "stream = DynamicTaskExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(),\n", - " produce_func=mock_produce,\n", - " consume_func=mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", ")\n", "\n", "mock_produce.assert_called()\n", @@ -739,9 +815,9 @@ "\n", "with pytest.raises(RuntimeError) as e:\n", " await stream.run(\n", - " is_shutting_down_f(num_msgs),\n", - " produce_func=mock_produce,\n", - " consume_func=mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(num_msgs),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", " )\n", "\n", "mock_produce.assert_called()\n", @@ -784,9 +860,9 @@ "stream = DynamicTaskExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(num_msgs),\n", - " produce_func=mock_produce,\n", - " consume_func=mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(num_msgs),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", ")\n", "\n", "mock_produce.assert_called()\n", @@ -812,7 +888,7 @@ "\n", "\n", "def _process_items_coro( # type: ignore\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " throw_exceptions: bool,\n", ") -> Callable[\n", " [\n", @@ -824,13 +900,13 @@ "]:\n", " async def _process_items_wrapper( # type: ignore\n", " receive_stream: anyio.streams.memory.MemoryObjectReceiveStream,\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]] = consume_func,\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]] = processor,\n", " throw_exceptions: bool = throw_exceptions,\n", " ) -> Awaitable[None]:\n", " async with receive_stream:\n", " async for msg in receive_stream:\n", " try:\n", - " await consume_func(msg)\n", + " await processor(msg)\n", " except Exception as e:\n", " if throw_exceptions:\n", " raise e\n", @@ -861,20 +937,32 @@ "\n", " async def run( # type: ignore\n", " self,\n", + " *,\n", " is_shutting_down_f: Callable[[], bool],\n", - " produce_func: Callable[[], Awaitable[ConsumerRecord]],\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " generator: Callable[[], Awaitable[ConsumerRecord]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " ) -> None:\n", + " \"\"\"\n", + " Runs the sequential executor.\n", + "\n", + " Args:\n", + " is_shutting_down_f: Function to check if the executor is shutting down.\n", + " generator: Generator function for retrieving consumer records.\n", + " processor: Processor function for processing consumer records.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", "\n", " send_stream, receive_stream = anyio.create_memory_object_stream(\n", " max_buffer_size=self.max_buffer_size\n", " )\n", " \n", " async with anyio.create_task_group() as tg:\n", - " tg.start_soon(_process_items_coro(consume_func, self.throw_exceptions), receive_stream)\n", + " tg.start_soon(_process_items_coro(processor, self.throw_exceptions), receive_stream)\n", " async with send_stream:\n", " while not is_shutting_down_f():\n", - " msgs = await produce_func()\n", + " msgs = await generator()\n", " for msg in msgs:\n", " await send_stream.send(msg)" ] @@ -895,7 +983,11 @@ "stream = SequentialExecutor(throw_exceptions=True)\n", "\n", "with pytest.raises(ExceptionGroup) as e:\n", - " await stream.run(is_shutting_down_f(num_msgs), produce_func=mock_produce, consume_func=mock_consume)\n", + " await stream.run(\n", + " is_shutting_down_f=is_shutting_down_f(num_msgs),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", + " )\n", "\n", "mock_produce.assert_called()\n", "mock_consume.assert_awaited_with(\"msg\")" @@ -937,9 +1029,9 @@ "stream = SequentialExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(num_msgs),\n", - " mock_produce,\n", - " mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(num_msgs),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", ")\n", "\n", "mock_produce.assert_called()\n", @@ -957,6 +1049,18 @@ "\n", "\n", "def get_executor(executor: Union[str, StreamExecutor, None] = None) -> StreamExecutor:\n", + " \"\"\"\n", + " Returns an instance of the specified executor.\n", + "\n", + " Args:\n", + " executor: Executor instance or name of the executor.\n", + "\n", + " Returns:\n", + " Instance of the specified executor.\n", + "\n", + " Raises:\n", + " AttributeError: If the executor is not found.\n", + " \"\"\"\n", " if isinstance(executor, StreamExecutor):\n", " return executor\n", " elif executor is None:\n", diff --git a/nbs/013_ProducerDecorator.ipynb b/nbs/013_ProducerDecorator.ipynb index dc61680..fbb73ab 100644 --- a/nbs/013_ProducerDecorator.ipynb +++ b/nbs/013_ProducerDecorator.ipynb @@ -27,6 +27,7 @@ "from asyncio import iscoroutinefunction # do not use the version from inspect\n", "from collections import namedtuple\n", "from dataclasses import dataclass\n", + "from inspect import Parameter\n", "from typing import *\n", "\n", "from aiokafka import AIOKafkaProducer\n", @@ -101,6 +102,49 @@ "assert event.key == b\"123\"" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "4e7de730", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "\n", + "def unwrap_from_kafka_event(var_type: Union[Type, Parameter]) -> Union[Type, Parameter]:\n", + " \"\"\"\n", + " Unwraps the type from a KafkaEvent.\n", + "\n", + " Vars:\n", + " var_type: Type to unwrap.\n", + "\n", + " Returns:\n", + " Type: Unwrapped type if the given type is a KafkaEvent, otherwise returns the same type.\n", + "\n", + " Example:\n", + " - Input: KafkaEvent[str]\n", + " Output: str\n", + " - Input: int\n", + " Output: int\n", + " \"\"\"\n", + " if hasattr(var_type, \"__origin__\") and var_type.__origin__ == KafkaEvent:\n", + " return var_type.__args__[0] # type: ignore\n", + " else:\n", + " return var_type" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7277bcf0", + "metadata": {}, + "outputs": [], + "source": [ + "assert unwrap_from_kafka_event(KafkaEvent[int]) == int\n", + "assert unwrap_from_kafka_event(int) == int" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/nbs/014_AsyncAPI.ipynb b/nbs/014_AsyncAPI.ipynb index 53bd090..1bcaa48 100644 --- a/nbs/014_AsyncAPI.ipynb +++ b/nbs/014_AsyncAPI.ipynb @@ -39,8 +39,9 @@ "\n", "from fastkafka._components.docs_dependencies import _check_npm_with_local\n", "from fastkafka._components.logger import get_logger\n", - "from fastkafka._components.producer_decorator import KafkaEvent, ProduceCallable\n", - "from fastkafka._components.aiokafka_consumer_loop import ConsumeCallable" + "from fastkafka._components.producer_decorator import KafkaEvent, ProduceCallable, unwrap_from_kafka_event\n", + "from fastkafka._components.aiokafka_consumer_loop import ConsumeCallable, EventMetadata\n", + "from fastkafka._components.helpers import unwrap_list_type" ] }, { @@ -757,13 +758,13 @@ "text/html": [ "
{\n", "│ 'consumers': {\n", - "│ │ 'my_topic_1': <function on_my_topic_one at 0x7fa211b16050>,\n", - "│ │ 'my_topic_2': <function on_my_topic_2 at 0x7fa211b164d0>\n", + "│ │ 'my_topic_1': <function on_my_topic_one at 0x7fbc1208ee60>,\n", + "│ │ 'my_topic_2': <function on_my_topic_2 at 0x7fbc124bdbd0>\n", "│ },\n", "│ 'producers': {\n", - "│ │ 'my_topic_3': <function to_my_topic_3 at 0x7fa2115f1e10>,\n", - "│ │ 'my_topic_4': <function to_my_topic_4 at 0x7fa2115f1ea0>,\n", - "│ │ 'my_topic_5': <function to_my_topic_5 at 0x7fa2115f1f30>\n", + "│ │ 'my_topic_3': <function to_my_topic_3 at 0x7fbc11d655a0>,\n", + "│ │ 'my_topic_4': <function to_my_topic_4 at 0x7fbc11d65630>,\n", + "│ │ 'my_topic_5': <function to_my_topic_5 at 0x7fbc11d656c0>\n", "│ }\n", "}\n", "\n" @@ -771,13 +772,13 @@ "text/plain": [ "\u001b[1m{\u001b[0m\n", "\u001b[2;32m│ \u001b[0m\u001b[32m'consumers'\u001b[0m: \u001b[1m{\u001b[0m\n", - "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_1'\u001b[0m: \u001b[1m<\u001b[0m\u001b[1;95mfunction\u001b[0m\u001b[39m on_my_topic_one at \u001b[0m\u001b[1;36m0x7fa211b16050\u001b[0m\u001b[39m>,\u001b[0m\n", - "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_2'\u001b[0m\u001b[39m: