diff --git a/fastkafka/_application/tester.py b/fastkafka/_application/tester.py index 682bad2..dcf0928 100644 --- a/fastkafka/_application/tester.py +++ b/fastkafka/_application/tester.py @@ -13,7 +13,9 @@ from .. import KafkaEvent from .app import FastKafka +from .._components.helpers import unwrap_list_type from .._components.meta import delegates, export, patch +from .._components.producer_decorator import unwrap_from_kafka_event from .._testing.apache_kafka_broker import ApacheKafkaBroker from .._testing.in_memory_broker import InMemoryBroker from .._testing.local_redpanda_broker import LocalRedpandaBroker @@ -148,11 +150,7 @@ async def __aexit__(self, *args: Any) -> None: def mirror_producer(topic: str, producer_f: Callable[..., Any]) -> Callable[..., Any]: msg_type = inspect.signature(producer_f).return_annotation - if hasattr(msg_type, "__origin__") and msg_type.__origin__ == KafkaEvent: - msg_type = msg_type.__args__[0] - - if hasattr(msg_type, "__origin__") and msg_type.__origin__ == list: - msg_type = msg_type.__args__[0] + msg_type_unwrapped = unwrap_list_type(unwrap_from_kafka_event(msg_type)) async def skeleton_func(msg: BaseModel) -> None: pass @@ -168,7 +166,7 @@ async def skeleton_func(msg: BaseModel) -> None: parameters=[ inspect.Parameter( name="msg", - annotation=msg_type, + annotation=msg_type_unwrapped, kind=inspect.Parameter.POSITIONAL_OR_KEYWORD, ) ] @@ -182,6 +180,8 @@ async def skeleton_func(msg: BaseModel) -> None: def mirror_consumer(topic: str, consumer_f: Callable[..., Any]) -> Callable[..., Any]: msg_type = inspect.signature(consumer_f).parameters["msg"] + msg_type_unwrapped = unwrap_list_type(msg_type) + async def skeleton_func(msg: BaseModel) -> BaseModel: return msg @@ -192,7 +192,9 @@ async def skeleton_func(msg: BaseModel) -> BaseModel: mirror_func.__name__ = "to_" + topic # adjust arg and return val - sig = sig.replace(parameters=[msg_type], return_annotation=msg_type.annotation) + sig = sig.replace( + parameters=[msg_type], return_annotation=msg_type_unwrapped.annotation + ) mirror_func.__signature__ = sig # type: ignore return mirror_func diff --git a/fastkafka/_components/aiokafka_consumer_loop.py b/fastkafka/_components/aiokafka_consumer_loop.py index d8defa9..2d131dc 100644 --- a/fastkafka/_components/aiokafka_consumer_loop.py +++ b/fastkafka/_components/aiokafka_consumer_loop.py @@ -60,6 +60,14 @@ class EventMetadata: @staticmethod def create_event_metadata(record: ConsumerRecord) -> "EventMetadata": # type: ignore + """Creates an instance of EventMetadata from a ConsumerRecord. + + Args: + record: The Kafka ConsumerRecord. + + Returns: + The created EventMetadata instance. + """ return EventMetadata( topic=record.topic, partition=record.partition, @@ -75,10 +83,15 @@ def create_event_metadata(record: ConsumerRecord) -> "EventMetadata": # type: i ) # %% ../../nbs/011_ConsumerLoop.ipynb 11 -AsyncConsume = Callable[[BaseModel], Awaitable[None]] -AsyncConsumeMeta = Callable[[BaseModel, EventMetadata], Awaitable[None]] -SyncConsume = Callable[[BaseModel], None] -SyncConsumeMeta = Callable[[BaseModel, EventMetadata], None] +AsyncConsume = Callable[[Union[List[BaseModel], BaseModel]], Awaitable[None]] +AsyncConsumeMeta = Callable[ + [Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], + Awaitable[None], +] +SyncConsume = Callable[[Union[List[BaseModel], BaseModel]], None] +SyncConsumeMeta = Callable[ + [Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], None +] ConsumeCallable = Union[AsyncConsume, AsyncConsumeMeta, SyncConsume, SyncConsumeMeta] @@ -96,26 +109,30 @@ def _callback_parameters_wrapper( """ async def _params_wrap( - msg: BaseModel, - meta: EventMetadata, + msg: Union[BaseModel, List[BaseModel]], + meta: Union[EventMetadata, List[EventMetadata]], callback: Union[AsyncConsume, AsyncConsumeMeta] = callback, ) -> None: types = list(get_type_hints(callback).values()) - args: List[Union[BaseModel, EventMetadata]] = [msg] + args: List[ + Union[BaseModel, List[BaseModel], EventMetadata, List[EventMetadata]] + ] = [msg] if EventMetadata in types: args.insert(types.index(EventMetadata), meta) + if List[EventMetadata] in types: + args.insert(types.index(List[EventMetadata]), meta) await callback(*args) # type: ignore return _params_wrap -# %% ../../nbs/011_ConsumerLoop.ipynb 16 +# %% ../../nbs/011_ConsumerLoop.ipynb 17 def _prepare_callback(callback: ConsumeCallable) -> AsyncConsumeMeta: """ Prepares a callback to be used in the consumer loop. 1. If callback is sync, asyncify it 2. Wrap the callback into a safe callback for exception handling - Params: + Args: callback: async callable that will be prepared for use in consumer Returns: @@ -126,35 +143,113 @@ def _prepare_callback(callback: ConsumeCallable) -> AsyncConsumeMeta: ) return _callback_parameters_wrapper(async_callback) -# %% ../../nbs/011_ConsumerLoop.ipynb 18 -async def _stream_msgs( # type: ignore - msgs: Dict[TopicPartition, bytes], - send_stream: anyio.streams.memory.MemoryObjectSendStream[Any], -) -> None: +# %% ../../nbs/011_ConsumerLoop.ipynb 24 +def _get_single_msg_handlers( # type: ignore + *, + consumer: AIOKafkaConsumer, + callback: AsyncConsumeMeta, + decoder_fn: Callable[[bytes, ModelMetaclass], Any], + msg_type: Type[BaseModel], + **kwargs: Any, +) -> Tuple[ + Callable[ + [ + ConsumerRecord, + AsyncConsumeMeta, + Callable[[bytes, ModelMetaclass], Any], + Type[BaseModel], + ], + Awaitable[None], + ], + Callable[[AIOKafkaConsumer, Any], Awaitable[List[ConsumerRecord]]], +]: """ - Decodes and streams the message and topic to the send_stream. + Retrieves the message handlers for consuming single messages from a Kafka topic. - Params: - msgs: - send_stream: + Args: + consumer: The Kafka consumer instance. + callback: The callback function to handle the consumed message. + decoder_fn: The function to decode the consumed message. + msg_type: The type of the consumed message. + **kwargs: Additional keyword arguments for the consumer. + + Returns: + The handle_msg function and poll_consumer function. """ - for topic_partition, topic_msgs in msgs.items(): - topic = topic_partition.topic - try: - await send_stream.send(topic_msgs) - except Exception as e: - logger.warning( - f"_stream_msgs(): Unexpected exception '{e.__repr__()}' caught and ignored for topic='{topic_partition.topic}', partition='{topic_partition.partition}' and messages: {topic_msgs!r}" - ) + async def handle_msg( # type: ignore + record: ConsumerRecord, + callback: AsyncConsumeMeta = callback, + decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn, + msg_type: Type[BaseModel] = msg_type, + ) -> None: + await callback( + decoder_fn(record.value, msg_type), + EventMetadata.create_event_metadata(record), + ) + + async def poll_consumer( # type: ignore + consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs + ) -> List[ConsumerRecord]: + msgs = await consumer.getmany(**kwargs) + return [msg for msg_group in msgs.values() for msg in msg_group] + + return handle_msg, poll_consumer + +# %% ../../nbs/011_ConsumerLoop.ipynb 26 +def _get_batch_msg_handlers( # type: ignore + *, + consumer: AIOKafkaConsumer, + callback: AsyncConsumeMeta, + decoder_fn: Callable[[bytes, ModelMetaclass], Any], + msg_type: Type[BaseModel], + **kwargs: Any, +) -> Tuple[ + Callable[ + [ + List[ConsumerRecord], + AsyncConsumeMeta, + Callable[[bytes, ModelMetaclass], Any], + Type[BaseModel], + ], + Awaitable[None], + ], + Callable[[AIOKafkaConsumer, Any], Awaitable[List[List[ConsumerRecord]]]], +]: + """ + Retrieves the message handlers for consuming messages in batches from a Kafka topic. + + Args: + consumer: The Kafka consumer instance. + callback: The callback function to handle the consumed messages. + decoder_fn: The function to decode the consumed messages. + msg_type: The type of the consumed messages. + **kwargs: Additional keyword arguments for the consumer. -def _decode_streamed_msgs( # type: ignore - msgs: List[ConsumerRecord], msg_type: BaseModel -) -> List[BaseModel]: - decoded_msgs = [msg_type.parse_raw(msg.value.decode("utf-8")) for msg in msgs] - return decoded_msgs + Returns: + The handle_msg function and poll_consumer function. + """ -# %% ../../nbs/011_ConsumerLoop.ipynb 23 + async def handle_msg( # type: ignore + records: List[ConsumerRecord], + callback: AsyncConsumeMeta = callback, + decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn, + msg_type: Type[BaseModel] = msg_type, + ) -> None: + await callback( + [decoder_fn(record.value, msg_type) for record in records], + [EventMetadata.create_event_metadata(record) for record in records], + ) + + async def poll_consumer( # type: ignore + consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs + ) -> List[List[ConsumerRecord]]: + msgs = await consumer.getmany(**kwargs) + return [value for value in msgs.values() if len(value) > 0] + + return handle_msg, poll_consumer + +# %% ../../nbs/011_ConsumerLoop.ipynb 28 @delegates(AIOKafkaConsumer.getmany) async def _aiokafka_consumer_loop( # type: ignore consumer: AIOKafkaConsumer, @@ -163,7 +258,7 @@ async def _aiokafka_consumer_loop( # type: ignore decoder_fn: Callable[[bytes, ModelMetaclass], Any], callback: ConsumeCallable, max_buffer_size: int = 100_000, - msg_type: Type[BaseModel], + msg_type: Union[Type[List[BaseModel]], Type[BaseModel]], is_shutting_down_f: Callable[[], bool], executor: Union[str, StreamExecutor, None] = None, **kwargs: Any, @@ -172,7 +267,7 @@ async def _aiokafka_consumer_loop( # type: ignore Consumer loop for infinite pooling of the AIOKafka consumer for new messages. Calls consumer.getmany() and after the consumer return messages or times out, messages are decoded and streamed to defined callback. - Params: + Args: topic: Topic to subscribe decoder_fn: Function to decode the messages consumed from the topic callbacks: Dict of callbacks mapped to their respective topics @@ -184,29 +279,35 @@ async def _aiokafka_consumer_loop( # type: ignore prepared_callback = _prepare_callback(callback) - async def handle_msg( # type: ignore - record: ConsumerRecord, - callback: AsyncConsumeMeta = prepared_callback, - decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn, - msg_type: Type[BaseModel] = msg_type, - ) -> None: - await callback( - decoder_fn(record.value, msg_type), - EventMetadata.create_event_metadata(record), + if hasattr(msg_type, "__origin__") and msg_type.__origin__ == list: + handle_msg, poll_consumer = _get_batch_msg_handlers( + consumer=consumer, + callback=prepared_callback, + decoder_fn=decoder_fn, + msg_type=msg_type.__args__[0], # type: ignore + **kwargs, + ) + else: + handle_msg, poll_consumer = _get_single_msg_handlers( + consumer=consumer, + callback=prepared_callback, + decoder_fn=decoder_fn, + msg_type=msg_type, # type: ignore + **kwargs, ) - async def poll_consumer(kwargs: Any = kwargs) -> List[ConsumerRecord]: # type: ignore - msgs = await consumer.getmany(**kwargs) - return [msg for msg_group in msgs.values() for msg in msg_group] - - await get_executor(executor).run(is_shutting_down_f, poll_consumer, handle_msg) + await get_executor(executor).run( + is_shutting_down_f=is_shutting_down_f, + generator=poll_consumer, # type: ignore + processor=handle_msg, # type: ignore + ) -# %% ../../nbs/011_ConsumerLoop.ipynb 28 +# %% ../../nbs/011_ConsumerLoop.ipynb 35 def sanitize_kafka_config(**kwargs: Any) -> Dict[str, Any]: """Sanitize Kafka config""" return {k: "*" * len(v) if "pass" in k.lower() else v for k, v in kwargs.items()} -# %% ../../nbs/011_ConsumerLoop.ipynb 30 +# %% ../../nbs/011_ConsumerLoop.ipynb 37 @delegates(AIOKafkaConsumer) @delegates(_aiokafka_consumer_loop, keep=True) async def aiokafka_consumer_loop( @@ -216,7 +317,7 @@ async def aiokafka_consumer_loop( timeout_ms: int = 100, max_buffer_size: int = 100_000, callback: ConsumeCallable, - msg_type: Type[BaseModel], + msg_type: Union[Type[List[BaseModel]], Type[BaseModel]], is_shutting_down_f: Callable[[], bool], executor: Union[str, StreamExecutor, None] = None, **kwargs: Any, diff --git a/fastkafka/_components/asyncapi.py b/fastkafka/_components/asyncapi.py index 2197bf7..47f6799 100644 --- a/fastkafka/_components/asyncapi.py +++ b/fastkafka/_components/asyncapi.py @@ -25,8 +25,13 @@ from .docs_dependencies import _check_npm_with_local from .logger import get_logger -from .producer_decorator import KafkaEvent, ProduceCallable -from .aiokafka_consumer_loop import ConsumeCallable +from fastkafka._components.producer_decorator import ( + KafkaEvent, + ProduceCallable, + unwrap_from_kafka_event, +) +from .aiokafka_consumer_loop import ConsumeCallable, EventMetadata +from .helpers import unwrap_list_type # %% ../../nbs/014_AsyncAPI.ipynb 3 logger = get_logger(__name__) @@ -156,11 +161,8 @@ def _get_msg_cls_for_producer(f: ProduceCallable) -> Type[Any]: f"Producer function must have a defined return value, got {return_type} as return value" ) - if hasattr(return_type, "__origin__") and return_type.__origin__ == KafkaEvent: - return_type = return_type.__args__[0] - - if hasattr(return_type, "__origin__") and return_type.__origin__ == list: - return_type = return_type.__args__[0] + return_type = unwrap_from_kafka_event(return_type) + return_type = unwrap_list_type(return_type) if not hasattr(return_type, "json"): raise ValueError(f"Producer function return value must have json method") @@ -171,24 +173,30 @@ def _get_msg_cls_for_consumer(f: ConsumeCallable) -> Type[Any]: types = get_type_hints(f) return_type = types.pop("return", type(None)) types_list = list(types.values()) + # @app.consumer does not return a value + if return_type != type(None): + raise ValueError( + f"Consumer function cannot return any value, got {return_type}" + ) # @app.consumer first consumer argument must be a msg which is a subclass of BaseModel try: - if issubclass(BaseModel, types_list[0]): + msg_type = types_list[0] + + msg_type = unwrap_list_type(msg_type) + + if not issubclass(msg_type, BaseModel): raise ValueError( f"Consumer function first param must be a BaseModel subclass msg, got {types_list}" ) + + return msg_type # type: ignore + except IndexError: raise ValueError( f"Consumer function first param must be a BaseModel subclass msg, got {types_list}" ) - # @app.consumer does not return a value - if return_type != type(None): - raise ValueError( - f"Consumer function cannot return any value, got {return_type}" - ) - return types_list[0] # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 25 +# %% ../../nbs/014_AsyncAPI.ipynb 27 def _get_topic_dict( f: Callable[[Any], Any], direction: str = "publish" ) -> Dict[str, Any]: @@ -209,7 +217,7 @@ def _get_topic_dict( msg_schema["description"] = f.__doc__ # type: ignore return {direction: msg_schema} -# %% ../../nbs/014_AsyncAPI.ipynb 28 +# %% ../../nbs/014_AsyncAPI.ipynb 30 def _get_channels_schema( consumers: Dict[str, ConsumeCallable], producers: Dict[str, ProduceCallable], @@ -220,7 +228,7 @@ def _get_channels_schema( topics[topic] = _get_topic_dict(f, d) return topics -# %% ../../nbs/014_AsyncAPI.ipynb 30 +# %% ../../nbs/014_AsyncAPI.ipynb 32 def _get_kafka_msg_classes( consumers: Dict[str, ConsumeCallable], producers: Dict[str, ProduceCallable], @@ -236,7 +244,7 @@ def _get_kafka_msg_definitions( ) -> Dict[str, Dict[str, Any]]: return schema(_get_kafka_msg_classes(consumers, producers)) # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 32 +# %% ../../nbs/014_AsyncAPI.ipynb 34 def _get_example(cls: Type[BaseModel]) -> BaseModel: kwargs: Dict[str, Any] = {} for k, v in cls.__fields__.items(): @@ -253,7 +261,7 @@ def _get_example(cls: Type[BaseModel]) -> BaseModel: return json.loads(cls(**kwargs).json()) # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 34 +# %% ../../nbs/014_AsyncAPI.ipynb 36 def _add_example_to_msg_definitions( msg_cls: Type[BaseModel], msg_schema: Dict[str, Dict[str, Any]] ) -> None: @@ -282,7 +290,7 @@ def _get_msg_definitions_with_examples( return msg_schema -# %% ../../nbs/014_AsyncAPI.ipynb 36 +# %% ../../nbs/014_AsyncAPI.ipynb 38 def _get_security_schemes(kafka_brokers: KafkaBrokers) -> Dict[str, Any]: security_schemes = {} for key, kafka_broker in kafka_brokers.brokers.items(): @@ -292,7 +300,7 @@ def _get_security_schemes(kafka_brokers: KafkaBrokers) -> Dict[str, Any]: ) return security_schemes -# %% ../../nbs/014_AsyncAPI.ipynb 38 +# %% ../../nbs/014_AsyncAPI.ipynb 40 def _get_components_schema( consumers: Dict[str, ConsumeCallable], producers: Dict[str, ProduceCallable], @@ -325,7 +333,7 @@ def _sub_values(d: Any, substitutions: Dict[str, str] = substitutions) -> Any: return _sub_values(components) # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 40 +# %% ../../nbs/014_AsyncAPI.ipynb 42 def _get_servers_schema(kafka_brokers: KafkaBrokers) -> Dict[str, Any]: servers = json.loads(kafka_brokers.json(sort_keys=False))["brokers"] @@ -334,7 +342,7 @@ def _get_servers_schema(kafka_brokers: KafkaBrokers) -> Dict[str, Any]: servers[key]["security"] = [{f"{key}_default_security": []}] return servers # type: ignore -# %% ../../nbs/014_AsyncAPI.ipynb 42 +# %% ../../nbs/014_AsyncAPI.ipynb 44 def _get_asyncapi_schema( consumers: Dict[str, ConsumeCallable], producers: Dict[str, ProduceCallable], @@ -355,7 +363,7 @@ def _get_asyncapi_schema( "components": components, } -# %% ../../nbs/014_AsyncAPI.ipynb 44 +# %% ../../nbs/014_AsyncAPI.ipynb 46 def yaml_file_cmp(file_1: Union[Path, str], file_2: Union[Path, str]) -> bool: try: import yaml @@ -371,7 +379,7 @@ def _read(f: Union[Path, str]) -> Dict[str, Any]: d = [_read(f) for f in [file_1, file_2]] return d[0] == d[1] -# %% ../../nbs/014_AsyncAPI.ipynb 45 +# %% ../../nbs/014_AsyncAPI.ipynb 47 def _generate_async_spec( *, consumers: Dict[str, ConsumeCallable], @@ -415,7 +423,7 @@ def _generate_async_spec( ) return False -# %% ../../nbs/014_AsyncAPI.ipynb 47 +# %% ../../nbs/014_AsyncAPI.ipynb 49 def _generate_async_docs( *, spec_path: Path, @@ -447,7 +455,7 @@ def _generate_async_docs( f"Generation of async docs failed, used '$ {' '.join(cmd)}'{p.stdout.decode()}" ) -# %% ../../nbs/014_AsyncAPI.ipynb 49 +# %% ../../nbs/014_AsyncAPI.ipynb 51 def export_async_spec( *, consumers: Dict[str, ConsumeCallable], diff --git a/fastkafka/_components/helpers.py b/fastkafka/_components/helpers.py index 63b8a69..1d3b517 100644 --- a/fastkafka/_components/helpers.py +++ b/fastkafka/_components/helpers.py @@ -1,7 +1,7 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../../nbs/998_Internal_Helpers.ipynb. # %% auto 0 -__all__ = ['in_notebook', 'change_dir', 'ImportFromStringError', 'true_after'] +__all__ = ['in_notebook', 'change_dir', 'ImportFromStringError', 'true_after', 'unwrap_list_type'] # %% ../../nbs/998_Internal_Helpers.ipynb 2 def in_notebook() -> bool: @@ -23,7 +23,7 @@ def in_notebook() -> bool: import sys from datetime import datetime, timedelta from functools import wraps -from inspect import signature +from inspect import signature, Parameter from pathlib import Path from typing import * @@ -100,3 +100,25 @@ def _true_after(seconds: Union[int, float] = seconds, t: datetime = t) -> bool: return (datetime.now() - t) > timedelta(seconds=seconds) return _true_after + +# %% ../../nbs/998_Internal_Helpers.ipynb 12 +def unwrap_list_type(var_type: Union[Type, Parameter]) -> Union[Type, Parameter]: + """ + Unwraps the type of a list. + + Vars: + var_type: Type to unwrap. + + Returns: + Unwrapped type if the given type is a list, otherwise returns the same type. + + Example: + - Input: List[str] + Output: str + - Input: int + Output: int + """ + if hasattr(var_type, "__origin__") and var_type.__origin__ == list: + return var_type.__args__[0] # type: ignore + else: + return var_type diff --git a/fastkafka/_components/producer_decorator.py b/fastkafka/_components/producer_decorator.py index ffadca8..b36e5d9 100644 --- a/fastkafka/_components/producer_decorator.py +++ b/fastkafka/_components/producer_decorator.py @@ -1,8 +1,8 @@ # AUTOGENERATED! DO NOT EDIT! File to edit: ../../nbs/013_ProducerDecorator.ipynb. # %% auto 0 -__all__ = ['BaseSubmodel', 'ProduceReturnTypes', 'ProduceCallable', 'KafkaEvent', 'release_callback', 'produce_single', - 'send_batch', 'produce_batch', 'producer_decorator'] +__all__ = ['BaseSubmodel', 'ProduceReturnTypes', 'ProduceCallable', 'KafkaEvent', 'unwrap_from_kafka_event', 'release_callback', + 'produce_single', 'send_batch', 'produce_batch', 'producer_decorator'] # %% ../../nbs/013_ProducerDecorator.ipynb 1 import asyncio @@ -13,6 +13,7 @@ from asyncio import iscoroutinefunction # do not use the version from inspect from collections import namedtuple from dataclasses import dataclass +from inspect import Parameter from typing import * from aiokafka import AIOKafkaProducer @@ -41,6 +42,28 @@ class KafkaEvent(Generic[BaseSubmodel]): key: Optional[bytes] = None # %% ../../nbs/013_ProducerDecorator.ipynb 5 +def unwrap_from_kafka_event(var_type: Union[Type, Parameter]) -> Union[Type, Parameter]: + """ + Unwraps the type from a KafkaEvent. + + Vars: + var_type: Type to unwrap. + + Returns: + Type: Unwrapped type if the given type is a KafkaEvent, otherwise returns the same type. + + Example: + - Input: KafkaEvent[str] + Output: str + - Input: int + Output: int + """ + if hasattr(var_type, "__origin__") and var_type.__origin__ == KafkaEvent: + return var_type.__args__[0] # type: ignore + else: + return var_type + +# %% ../../nbs/013_ProducerDecorator.ipynb 7 ProduceReturnTypes = Union[ BaseModel, KafkaEvent[BaseModel], List[BaseModel], KafkaEvent[List[BaseModel]] ] @@ -49,17 +72,17 @@ class KafkaEvent(Generic[BaseSubmodel]): Callable[..., ProduceReturnTypes], Callable[..., Awaitable[ProduceReturnTypes]] ] -# %% ../../nbs/013_ProducerDecorator.ipynb 8 +# %% ../../nbs/013_ProducerDecorator.ipynb 10 def _wrap_in_event( message: Union[BaseModel, List[BaseModel], KafkaEvent] ) -> KafkaEvent: return message if type(message) == KafkaEvent else KafkaEvent(message) -# %% ../../nbs/013_ProducerDecorator.ipynb 11 +# %% ../../nbs/013_ProducerDecorator.ipynb 13 def release_callback(fut: asyncio.Future) -> None: pass -# %% ../../nbs/013_ProducerDecorator.ipynb 12 +# %% ../../nbs/013_ProducerDecorator.ipynb 14 async def produce_single( # type: ignore producer: AIOKafkaProducer, topic: str, @@ -71,7 +94,7 @@ async def produce_single( # type: ignore ) fut.add_done_callback(release_callback) -# %% ../../nbs/013_ProducerDecorator.ipynb 14 +# %% ../../nbs/013_ProducerDecorator.ipynb 16 async def send_batch( # type: ignore producer: AIOKafkaProducer, topic: str, batch: BatchBuilder, key: Optional[bytes] ) -> None: @@ -108,7 +131,7 @@ async def produce_batch( # type: ignore await send_batch(producer, topic, batch, wrapped_val.key) -# %% ../../nbs/013_ProducerDecorator.ipynb 16 +# %% ../../nbs/013_ProducerDecorator.ipynb 18 def producer_decorator( producer_store: Dict[str, Any], func: ProduceCallable, diff --git a/fastkafka/_components/task_streaming.py b/fastkafka/_components/task_streaming.py index 5662552..de3f47a 100644 --- a/fastkafka/_components/task_streaming.py +++ b/fastkafka/_components/task_streaming.py @@ -35,12 +35,30 @@ def __init__( self.finished = False async def add(self, item: Task) -> None: + """ + Adds a task to the task pool. + + Args: + item: The task to be added. + + Returns: + None + """ while len(self.pool) >= self.size: await asyncio.sleep(0) self.pool.add(item) item.add_done_callback(self.discard) def discard(self, task: Task) -> None: + """ + Discards a completed task from the task pool. + + Args: + task: The completed task to be discarded. + + Returns: + None + """ e = task.exception() if e is not None and self.on_error is not None: try: @@ -53,6 +71,12 @@ def discard(self, task: Task) -> None: self.pool.discard(task) def __len__(self) -> int: + """ + Returns the number of tasks in the task pool. + + Returns: + The number of tasks in the task pool. + """ return len(self.pool) async def __aenter__(self) -> "TaskPool": @@ -66,6 +90,16 @@ async def __aexit__(self, *args: Any, **kwargs: Any) -> None: @staticmethod def log_error(logger: Logger) -> Callable[[Exception], None]: + """ + Creates a decorator that logs errors using the specified logger. + + Args: + logger: The logger to use for error logging. + + Returns: + The decorator function. + """ + def _log_error(e: Exception, logger: Logger = logger) -> None: logger.warning(f"{e=}") @@ -78,10 +112,25 @@ def __init__(self) -> None: self.exception_found = False def on_error(self, e: Exception) -> None: + """ + Handles an error by storing the exception. + + Args: + e: The exception to be handled. + + Returns: + None + """ self.exceptions.append(e) self.exception_found = True def _monitor_step(self) -> None: + """ + Raises the next exception in the queue. + + Returns: + None + """ if len(self.exceptions) > 0: e = self.exceptions.pop(0) raise e @@ -99,15 +148,16 @@ class StreamExecutor(ABC): @abstractmethod async def run( # type: ignore self, + *, is_shutting_down_f: Callable[[], bool], - produce_func: Callable[[], Awaitable[ConsumerRecord]], - consume_func: Callable[[ConsumerRecord], Awaitable[None]], + generator: Callable[[], Awaitable[ConsumerRecord]], + processor: Callable[[ConsumerRecord], Awaitable[None]], ) -> None: pass # %% ../../nbs/011_TaskStreaming.ipynb 20 def _process_items_task( # type: ignore - consume_func: Callable[[ConsumerRecord], Awaitable[None]], task_pool: TaskPool + processor: Callable[[ConsumerRecord], Awaitable[None]], task_pool: TaskPool ) -> Callable[ [ anyio.streams.memory.MemoryObjectReceiveStream, @@ -118,12 +168,12 @@ def _process_items_task( # type: ignore ]: async def _process_items_wrapper( # type: ignore receive_stream: anyio.streams.memory.MemoryObjectReceiveStream, - consume_func: Callable[[ConsumerRecord], Awaitable[None]] = consume_func, + processor: Callable[[ConsumerRecord], Awaitable[None]] = processor, task_pool=task_pool, ): async with receive_stream: async for msg in receive_stream: - task: asyncio.Task = asyncio.create_task(consume_func(msg)) # type: ignore + task: asyncio.Task = asyncio.create_task(processor(msg)) # type: ignore await task_pool.add(task) return _process_items_wrapper @@ -148,18 +198,30 @@ def __init__( # type: ignore async def run( # type: ignore self, + *, is_shutting_down_f: Callable[[], bool], - produce_func: Callable[[], Awaitable[ConsumerRecord]], - consume_func: Callable[[ConsumerRecord], Awaitable[None]], + generator: Callable[[], Awaitable[ConsumerRecord]], + processor: Callable[[ConsumerRecord], Awaitable[None]], ) -> None: send_stream, receive_stream = anyio.create_memory_object_stream( max_buffer_size=self.max_buffer_size ) + """ + Runs the dynamic task executor. + + Args: + is_shutting_down_f: Function to check if the executor is shutting down. + generator: Generator function for retrieving consumer records. + processor: Processor function for processing consumer records. + + Returns: + None + """ async with self.exception_monitor, self.task_pool: async with anyio.create_task_group() as tg: tg.start_soon( - _process_items_task(consume_func, self.task_pool), receive_stream + _process_items_task(processor, self.task_pool), receive_stream ) async with send_stream: while not is_shutting_down_f(): @@ -168,13 +230,13 @@ async def run( # type: ignore and self.throw_exceptions ): break - msgs = await produce_func() + msgs = await generator() for msg in msgs: await send_stream.send(msg) # %% ../../nbs/011_TaskStreaming.ipynb 30 def _process_items_coro( # type: ignore - consume_func: Callable[[ConsumerRecord], Awaitable[None]], + processor: Callable[[ConsumerRecord], Awaitable[None]], throw_exceptions: bool, ) -> Callable[ [ @@ -186,13 +248,13 @@ def _process_items_coro( # type: ignore ]: async def _process_items_wrapper( # type: ignore receive_stream: anyio.streams.memory.MemoryObjectReceiveStream, - consume_func: Callable[[ConsumerRecord], Awaitable[None]] = consume_func, + processor: Callable[[ConsumerRecord], Awaitable[None]] = processor, throw_exceptions: bool = throw_exceptions, ) -> Awaitable[None]: async with receive_stream: async for msg in receive_stream: try: - await consume_func(msg) + await processor(msg) except Exception as e: if throw_exceptions: raise e @@ -213,26 +275,51 @@ def __init__( # type: ignore async def run( # type: ignore self, + *, is_shutting_down_f: Callable[[], bool], - produce_func: Callable[[], Awaitable[ConsumerRecord]], - consume_func: Callable[[ConsumerRecord], Awaitable[None]], + generator: Callable[[], Awaitable[ConsumerRecord]], + processor: Callable[[ConsumerRecord], Awaitable[None]], ) -> None: + """ + Runs the sequential executor. + + Args: + is_shutting_down_f: Function to check if the executor is shutting down. + generator: Generator function for retrieving consumer records. + processor: Processor function for processing consumer records. + + Returns: + None + """ + send_stream, receive_stream = anyio.create_memory_object_stream( max_buffer_size=self.max_buffer_size ) async with anyio.create_task_group() as tg: tg.start_soon( - _process_items_coro(consume_func, self.throw_exceptions), receive_stream + _process_items_coro(processor, self.throw_exceptions), receive_stream ) async with send_stream: while not is_shutting_down_f(): - msgs = await produce_func() + msgs = await generator() for msg in msgs: await send_stream.send(msg) # %% ../../nbs/011_TaskStreaming.ipynb 34 def get_executor(executor: Union[str, StreamExecutor, None] = None) -> StreamExecutor: + """ + Returns an instance of the specified executor. + + Args: + executor: Executor instance or name of the executor. + + Returns: + Instance of the specified executor. + + Raises: + AttributeError: If the executor is not found. + """ if isinstance(executor, StreamExecutor): return executor elif executor is None: diff --git a/fastkafka/_modidx.py b/fastkafka/_modidx.py index b095f0c..0625356 100644 --- a/fastkafka/_modidx.py +++ b/fastkafka/_modidx.py @@ -101,12 +101,12 @@ 'fastkafka/_components/aiokafka_consumer_loop.py'), 'fastkafka._components.aiokafka_consumer_loop._callback_parameters_wrapper': ( 'consumerloop.html#_callback_parameters_wrapper', 'fastkafka/_components/aiokafka_consumer_loop.py'), - 'fastkafka._components.aiokafka_consumer_loop._decode_streamed_msgs': ( 'consumerloop.html#_decode_streamed_msgs', - 'fastkafka/_components/aiokafka_consumer_loop.py'), + 'fastkafka._components.aiokafka_consumer_loop._get_batch_msg_handlers': ( 'consumerloop.html#_get_batch_msg_handlers', + 'fastkafka/_components/aiokafka_consumer_loop.py'), + 'fastkafka._components.aiokafka_consumer_loop._get_single_msg_handlers': ( 'consumerloop.html#_get_single_msg_handlers', + 'fastkafka/_components/aiokafka_consumer_loop.py'), 'fastkafka._components.aiokafka_consumer_loop._prepare_callback': ( 'consumerloop.html#_prepare_callback', 'fastkafka/_components/aiokafka_consumer_loop.py'), - 'fastkafka._components.aiokafka_consumer_loop._stream_msgs': ( 'consumerloop.html#_stream_msgs', - 'fastkafka/_components/aiokafka_consumer_loop.py'), 'fastkafka._components.aiokafka_consumer_loop.aiokafka_consumer_loop': ( 'consumerloop.html#aiokafka_consumer_loop', 'fastkafka/_components/aiokafka_consumer_loop.py'), 'fastkafka._components.aiokafka_consumer_loop.sanitize_kafka_config': ( 'consumerloop.html#sanitize_kafka_config', @@ -212,7 +212,9 @@ 'fastkafka._components.helpers.in_notebook': ( 'internal_helpers.html#in_notebook', 'fastkafka/_components/helpers.py'), 'fastkafka._components.helpers.true_after': ( 'internal_helpers.html#true_after', - 'fastkafka/_components/helpers.py')}, + 'fastkafka/_components/helpers.py'), + 'fastkafka._components.helpers.unwrap_list_type': ( 'internal_helpers.html#unwrap_list_type', + 'fastkafka/_components/helpers.py')}, 'fastkafka._components.logger': { 'fastkafka._components.logger.cached_log': ( 'logger.html#cached_log', 'fastkafka/_components/logger.py'), 'fastkafka._components.logger.get_default_logger_configuration': ( 'logger.html#get_default_logger_configuration', @@ -264,7 +266,9 @@ 'fastkafka._components.producer_decorator.release_callback': ( 'producerdecorator.html#release_callback', 'fastkafka/_components/producer_decorator.py'), 'fastkafka._components.producer_decorator.send_batch': ( 'producerdecorator.html#send_batch', - 'fastkafka/_components/producer_decorator.py')}, + 'fastkafka/_components/producer_decorator.py'), + 'fastkafka._components.producer_decorator.unwrap_from_kafka_event': ( 'producerdecorator.html#unwrap_from_kafka_event', + 'fastkafka/_components/producer_decorator.py')}, 'fastkafka._components.task_streaming': { 'fastkafka._components.task_streaming.DynamicTaskExecutor': ( 'taskstreaming.html#dynamictaskexecutor', 'fastkafka/_components/task_streaming.py'), 'fastkafka._components.task_streaming.DynamicTaskExecutor.__init__': ( 'taskstreaming.html#dynamictaskexecutor.__init__', diff --git a/nbs/011_ConsumerLoop.ipynb b/nbs/011_ConsumerLoop.ipynb index 368e56d..c986420 100644 --- a/nbs/011_ConsumerLoop.ipynb +++ b/nbs/011_ConsumerLoop.ipynb @@ -173,6 +173,14 @@ "\n", " @staticmethod\n", " def create_event_metadata(record: ConsumerRecord) -> \"EventMetadata\": # type: ignore\n", + " \"\"\"Creates an instance of EventMetadata from a ConsumerRecord.\n", + "\n", + " Args:\n", + " record: The Kafka ConsumerRecord.\n", + "\n", + " Returns:\n", + " The created EventMetadata instance.\n", + " \"\"\"\n", " return EventMetadata(\n", " topic=record.topic,\n", " partition=record.partition,\n", @@ -235,10 +243,10 @@ "source": [ "# | export\n", "\n", - "AsyncConsume = Callable[[BaseModel], Awaitable[None]]\n", - "AsyncConsumeMeta = Callable[[BaseModel, EventMetadata], Awaitable[None]]\n", - "SyncConsume = Callable[[BaseModel], None]\n", - "SyncConsumeMeta = Callable[[BaseModel, EventMetadata], None]\n", + "AsyncConsume = Callable[[Union[List[BaseModel], BaseModel]], Awaitable[None]]\n", + "AsyncConsumeMeta = Callable[[Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], Awaitable[None]]\n", + "SyncConsume = Callable[[Union[List[BaseModel], BaseModel]], None]\n", + "SyncConsumeMeta = Callable[[Union[List[BaseModel], BaseModel], Union[List[EventMetadata], EventMetadata]], None]\n", "\n", "ConsumeCallable = Union[\n", " AsyncConsume, AsyncConsumeMeta, SyncConsume, SyncConsumeMeta\n", @@ -266,16 +274,19 @@ " Returns:\n", " Wrapped callback with filtered params\n", " \"\"\"\n", + "\n", " async def _params_wrap(\n", - " msg: BaseModel,\n", - " meta: EventMetadata,\n", + " msg: Union[BaseModel, List[BaseModel]],\n", + " meta: Union[EventMetadata, List[EventMetadata]],\n", " callback: Union[AsyncConsume, AsyncConsumeMeta] = callback,\n", " ) -> None:\n", " types = list(get_type_hints(callback).values())\n", - " args: List[Union[BaseModel, EventMetadata]] = [msg]\n", + " args: List[Union[BaseModel, List[BaseModel], EventMetadata, List[EventMetadata]]] = [msg]\n", " if EventMetadata in types:\n", " args.insert(types.index(EventMetadata), meta)\n", - " await callback(*args) # type: ignore\n", + " if List[EventMetadata] in types:\n", + " args.insert(types.index(List[EventMetadata]), meta)\n", + " await callback(*args) # type: ignore\n", "\n", " return _params_wrap" ] @@ -323,6 +334,21 @@ "await with_meta(\"Example_msg\", \"Some_meta\")" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "ec76eaf3", + "metadata": {}, + "outputs": [], + "source": [ + "@_callback_parameters_wrapper\n", + "async def with_meta(msg: List[BaseModel], meta: List[EventMetadata]):\n", + " assert msg == \"Example_msg\"\n", + " assert meta == \"Some_meta\"\n", + "\n", + "await with_meta(\"Example_msg\", \"Some_meta\")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -341,7 +367,7 @@ " 1. If callback is sync, asyncify it\n", " 2. Wrap the callback into a safe callback for exception handling\n", "\n", - " Params:\n", + " Args:\n", " callback: async callable that will be prepared for use in consumer\n", "\n", " Returns:\n", @@ -381,9 +407,6 @@ "metadata": {}, "outputs": [], "source": [ - "# | export\n", - "\n", - "\n", "async def _stream_msgs( # type: ignore\n", " msgs: Dict[TopicPartition, bytes],\n", " send_stream: anyio.streams.memory.MemoryObjectSendStream[Any],\n", @@ -391,7 +414,7 @@ " \"\"\"\n", " Decodes and streams the message and topic to the send_stream.\n", "\n", - " Params:\n", + " Args:\n", " msgs:\n", " send_stream:\n", " \"\"\"\n", @@ -541,6 +564,226 @@ " mock.assert_has_calls([call(msg) for msg in msgs.values()])" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "e5275963", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "\n", + "def _get_single_msg_handlers( # type: ignore\n", + " *,\n", + " consumer: AIOKafkaConsumer,\n", + " callback: AsyncConsumeMeta,\n", + " decoder_fn: Callable[[bytes, ModelMetaclass], Any],\n", + " msg_type: Type[BaseModel],\n", + " **kwargs: Any,\n", + ") -> Tuple[\n", + " Callable[\n", + " [\n", + " ConsumerRecord,\n", + " AsyncConsumeMeta,\n", + " Callable[[bytes, ModelMetaclass], Any],\n", + " Type[BaseModel],\n", + " ],\n", + " Awaitable[None],\n", + " ],\n", + " Callable[[AIOKafkaConsumer, Any], Awaitable[List[ConsumerRecord]]],\n", + "]:\n", + " \n", + " \"\"\"\n", + " Retrieves the message handlers for consuming single messages from a Kafka topic.\n", + "\n", + " Args:\n", + " consumer: The Kafka consumer instance.\n", + " callback: The callback function to handle the consumed message.\n", + " decoder_fn: The function to decode the consumed message.\n", + " msg_type: The type of the consumed message.\n", + " **kwargs: Additional keyword arguments for the consumer.\n", + "\n", + " Returns:\n", + " The handle_msg function and poll_consumer function.\n", + " \"\"\"\n", + " async def handle_msg( # type: ignore\n", + " record: ConsumerRecord,\n", + " callback: AsyncConsumeMeta = callback,\n", + " decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,\n", + " msg_type: Type[BaseModel] = msg_type,\n", + " ) -> None:\n", + " await callback(\n", + " decoder_fn(record.value, msg_type),\n", + " EventMetadata.create_event_metadata(record),\n", + " )\n", + "\n", + " async def poll_consumer( # type: ignore\n", + " consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs\n", + " ) -> List[ConsumerRecord]:\n", + " msgs = await consumer.getmany(**kwargs)\n", + " return [msg for msg_group in msgs.values() for msg in msg_group]\n", + "\n", + " return handle_msg, poll_consumer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cc43f2f7", + "metadata": {}, + "outputs": [], + "source": [ + "topic_partitions = [(\"topic_0\", 0), (\"topic_0\", 1)]\n", + "\n", + "msg = MyMessage(url=\"http://www.acme.com\", port=22)\n", + "msgs = {\n", + " TopicPartition(topic, partition): [\n", + " create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + " ]\n", + " for topic, partition in topic_partitions\n", + "}\n", + "record = create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + "\n", + "consumer = AsyncMock()\n", + "consumer.getmany.return_value = msgs\n", + "\n", + "callback = AsyncMock()\n", + "decoder_fn = json_decoder\n", + "msg_type = MyMessage\n", + "\n", + "handle_msg, poll_consumer = _get_single_msg_handlers(\n", + " consumer=consumer, callback=callback, decoder_fn=decoder_fn, msg_type=msg_type\n", + ")\n", + "\n", + "got_msgs = await poll_consumer()\n", + "assert len(msgs.values()) == len(got_msgs)\n", + "\n", + "for msg in got_msgs:\n", + " await handle_msg(msg)\n", + "\n", + "callback.assert_has_awaits(\n", + " [\n", + " call(\n", + " json_decoder(msg.value, msg_type), EventMetadata.create_event_metadata(msg)\n", + " )\n", + " for msg in got_msgs\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5ddd1b59", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "\n", + "def _get_batch_msg_handlers( # type: ignore\n", + " *,\n", + " consumer: AIOKafkaConsumer,\n", + " callback: AsyncConsumeMeta,\n", + " decoder_fn: Callable[[bytes, ModelMetaclass], Any],\n", + " msg_type: Type[BaseModel],\n", + " **kwargs: Any,\n", + ") -> Tuple[\n", + " Callable[\n", + " [\n", + " List[ConsumerRecord],\n", + " AsyncConsumeMeta,\n", + " Callable[[bytes, ModelMetaclass], Any],\n", + " Type[BaseModel],\n", + " ],\n", + " Awaitable[None],\n", + " ],\n", + " Callable[[AIOKafkaConsumer, Any], Awaitable[List[List[ConsumerRecord]]]],\n", + "]:\n", + " \"\"\"\n", + " Retrieves the message handlers for consuming messages in batches from a Kafka topic.\n", + "\n", + " Args:\n", + " consumer: The Kafka consumer instance.\n", + " callback: The callback function to handle the consumed messages.\n", + " decoder_fn: The function to decode the consumed messages.\n", + " msg_type: The type of the consumed messages.\n", + " **kwargs: Additional keyword arguments for the consumer.\n", + "\n", + " Returns:\n", + " The handle_msg function and poll_consumer function.\n", + " \"\"\"\n", + "\n", + " async def handle_msg( # type: ignore\n", + " records: List[ConsumerRecord],\n", + " callback: AsyncConsumeMeta = callback,\n", + " decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,\n", + " msg_type: Type[BaseModel] = msg_type,\n", + " ) -> None:\n", + " await callback(\n", + " [decoder_fn(record.value, msg_type) for record in records],\n", + " [EventMetadata.create_event_metadata(record) for record in records],\n", + " )\n", + "\n", + " async def poll_consumer( # type: ignore\n", + " consumer: AIOKafkaConsumer = consumer, kwargs: Any = kwargs\n", + " ) -> List[List[ConsumerRecord]]:\n", + " msgs = await consumer.getmany(**kwargs)\n", + " return [value for value in msgs.values() if len(value)>0]\n", + "\n", + " return handle_msg, poll_consumer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18367eb4", + "metadata": {}, + "outputs": [], + "source": [ + "topic_partitions = [(\"topic_0\", 0), (\"topic_0\", 1)]\n", + "\n", + "msg = MyMessage(url=\"http://www.acme.com\", port=22)\n", + "msgs = {\n", + " TopicPartition(topic, partition): [\n", + " create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + " ]\n", + " for topic, partition in topic_partitions\n", + "}\n", + "record = create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + "\n", + "consumer = AsyncMock()\n", + "consumer.getmany.return_value = msgs\n", + "\n", + "callback = AsyncMock()\n", + "decoder_fn = json_decoder\n", + "msg_type = MyMessage\n", + "\n", + "handle_msg, poll_consumer = _get_batch_msg_handlers(\n", + " consumer=consumer, callback=callback, decoder_fn=decoder_fn, msg_type=msg_type\n", + ")\n", + "\n", + "got_msgs = await poll_consumer()\n", + "assert len(msgs.values()) == len(got_msgs)\n", + "\n", + "for msgs in got_msgs:\n", + " assert len(msgs) == 1\n", + "\n", + "for msg in got_msgs:\n", + " await handle_msg(msg)\n", + "\n", + "callback.assert_has_awaits(\n", + " [\n", + " call(\n", + " [json_decoder(msg_unwrapped.value, msg_type) for msg_unwrapped in msg],\n", + " [EventMetadata.create_event_metadata(msg_unwrapped) for msg_unwrapped in msg],\n", + " )\n", + " for msg in got_msgs\n", + " ]\n", + ")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -559,7 +802,7 @@ " decoder_fn: Callable[[bytes, ModelMetaclass], Any],\n", " callback: ConsumeCallable,\n", " max_buffer_size: int = 100_000,\n", - " msg_type: Type[BaseModel],\n", + " msg_type: Union[Type[List[BaseModel]], Type[BaseModel]],\n", " is_shutting_down_f: Callable[[], bool],\n", " executor: Union[str, StreamExecutor, None] = None,\n", " **kwargs: Any,\n", @@ -568,7 +811,7 @@ " Consumer loop for infinite pooling of the AIOKafka consumer for new messages. Calls consumer.getmany()\n", " and after the consumer return messages or times out, messages are decoded and streamed to defined callback.\n", "\n", - " Params:\n", + " Args:\n", " topic: Topic to subscribe\n", " decoder_fn: Function to decode the messages consumed from the topic\n", " callbacks: Dict of callbacks mapped to their respective topics\n", @@ -580,22 +823,28 @@ "\n", " prepared_callback = _prepare_callback(callback)\n", "\n", - " async def handle_msg( # type: ignore\n", - " record: ConsumerRecord,\n", - " callback: AsyncConsumeMeta = prepared_callback,\n", - " decoder_fn: Callable[[bytes, ModelMetaclass], Any] = decoder_fn,\n", - " msg_type: Type[BaseModel] = msg_type,\n", - " ) -> None:\n", - " await callback(\n", - " decoder_fn(record.value, msg_type),\n", - " EventMetadata.create_event_metadata(record),\n", + " if hasattr(msg_type, \"__origin__\") and msg_type.__origin__ == list:\n", + " handle_msg, poll_consumer = _get_batch_msg_handlers(\n", + " consumer=consumer,\n", + " callback=prepared_callback,\n", + " decoder_fn=decoder_fn,\n", + " msg_type=msg_type.__args__[0], # type: ignore\n", + " **kwargs,\n", + " )\n", + " else:\n", + " handle_msg, poll_consumer = _get_single_msg_handlers(\n", + " consumer=consumer,\n", + " callback=prepared_callback,\n", + " decoder_fn=decoder_fn,\n", + " msg_type=msg_type, # type: ignore\n", + " **kwargs,\n", " )\n", "\n", - " async def poll_consumer(kwargs: Any = kwargs) -> List[ConsumerRecord]: # type: ignore\n", - " msgs = await consumer.getmany(**kwargs)\n", - " return [msg for msg_group in msgs.values() for msg in msg_group]\n", - "\n", - " await get_executor(executor).run(is_shutting_down_f, poll_consumer, handle_msg)" + " await get_executor(executor).run(\n", + " is_shutting_down_f=is_shutting_down_f,\n", + " generator=poll_consumer, # type: ignore\n", + " processor=handle_msg, # type: ignore\n", + " )" ] }, { @@ -612,6 +861,16 @@ " return _is_shutting_down_f" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "3020fa4a", + "metadata": {}, + "outputs": [], + "source": [ + "from fastkafka._components.task_streaming import SequentialExecutor" + ] + }, { "cell_type": "code", "execution_count": null, @@ -638,7 +897,9 @@ "f = asyncio.Future()\n", "f.set_result(msgs)\n", "mock_consumer.configure_mock(**{\"getmany.return_value\": f})\n", - "mock_callback = Mock()\n", + "\n", + "def f(msg: MyMessage): pass\n", + "mock_callback = MagicMock(spec=f)\n", "\n", "\n", "for is_async in [True, False]:\n", @@ -661,6 +922,57 @@ "print(\"ok\")" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "a6854bb7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "ok\n" + ] + } + ], + "source": [ + "topic = \"topic_0\"\n", + "partition = 0\n", + "msg = MyMessage(url=\"http://www.acme.com\", port=22)\n", + "record = create_consumer_record(topic=topic, partition=partition, msg=msg)\n", + "\n", + "mock_consumer = MagicMock()\n", + "msgs = {TopicPartition(topic, 0): [record]}\n", + "\n", + "f = asyncio.Future()\n", + "f.set_result(msgs)\n", + "mock_consumer.configure_mock(**{\"getmany.return_value\": f})\n", + "\n", + "def f(msg: List[MyMessage]): pass\n", + "mock_callback = MagicMock(spec=f)\n", + "\n", + "\n", + "for is_async in [True, False]:\n", + " for executor_type in [\"DynamicTaskExecutor\", \"SequentialExecutor\"]:\n", + " await _aiokafka_consumer_loop(\n", + " consumer=mock_consumer,\n", + " topic=topic,\n", + " decoder_fn=json_decoder,\n", + " max_buffer_size=100,\n", + " timeout_ms=10,\n", + " callback=asyncer.asyncify(mock_callback) if is_async else mock_callback,\n", + " msg_type=List[MyMessage],\n", + " is_shutting_down_f=is_shutting_down_f(mock_consumer.getmany),\n", + " executor_type=executor_type,\n", + " )\n", + "\n", + " assert mock_consumer.getmany.call_count == 1\n", + " mock_callback.assert_called_once_with([msg])\n", + "\n", + "print(\"ok\")" + ] + }, { "cell_type": "code", "execution_count": null, @@ -832,7 +1144,7 @@ " timeout_ms: int = 100,\n", " max_buffer_size: int = 100_000,\n", " callback: ConsumeCallable,\n", - " msg_type: Type[BaseModel],\n", + " msg_type: Union[Type[List[BaseModel]], Type[BaseModel]],\n", " is_shutting_down_f: Callable[[], bool],\n", " executor: Union[str, StreamExecutor, None] = None,\n", " **kwargs: Any,\n", @@ -908,7 +1220,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "d2ff2c5cf7924175a7e020b8b6334b64", + "model_id": "61317fa5eb244e60a286e4edcdcf9a51", "version_major": 2, "version_minor": 0 }, @@ -941,10 +1253,10 @@ "[INFO] __main__: msgs_received=9000\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 48208...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 48208 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 47846...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 47846 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 112786...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 112786 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 112426...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 112426 terminated.\n", "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", @@ -955,7 +1267,7 @@ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "c305aaaa1a244e52842a053f02ac6825", + "model_id": "75d0b57876d943438e5add00860e967c", "version_major": 2, "version_minor": 0 }, @@ -988,10 +1300,10 @@ "[INFO] __main__: msgs_received=9000\n", "[INFO] __main__: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] __main__: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 49350...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 49350 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 48989...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 48989 terminated.\n" + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 113934...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 113934 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 113569...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 113569 terminated.\n" ] } ], @@ -1030,6 +1342,130 @@ " assert msgs_sent == msgs_received, f\"{msgs_sent} != {msgs_received}\"" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "fde91e3e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", + "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "17f7bd19cb6c490484c2483ebed0a6b6", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "producing to 'test_topic': 0%| | 0/9178 [00:00 None:\n", + " \"\"\"\n", + " Adds a task to the task pool.\n", + "\n", + " Args:\n", + " item: The task to be added.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", " while len(self.pool) >= self.size:\n", " await asyncio.sleep(0)\n", " self.pool.add(item)\n", " item.add_done_callback(self.discard)\n", "\n", " def discard(self, task: Task) -> None:\n", + " \"\"\"\n", + " Discards a completed task from the task pool.\n", + "\n", + " Args:\n", + " task: The completed task to be discarded.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", " e = task.exception()\n", " if e is not None and self.on_error is not None:\n", " try:\n", @@ -311,6 +344,12 @@ " self.pool.discard(task)\n", "\n", " def __len__(self) -> int:\n", + " \"\"\"\n", + " Returns the number of tasks in the task pool.\n", + "\n", + " Returns:\n", + " The number of tasks in the task pool.\n", + " \"\"\"\n", " return len(self.pool)\n", "\n", " async def __aenter__(self) -> \"TaskPool\":\n", @@ -324,6 +363,15 @@ "\n", " @staticmethod\n", " def log_error(logger: Logger) -> Callable[[Exception], None]:\n", + " \"\"\"\n", + " Creates a decorator that logs errors using the specified logger.\n", + "\n", + " Args:\n", + " logger: The logger to use for error logging.\n", + "\n", + " Returns:\n", + " The decorator function.\n", + " \"\"\"\n", " def _log_error(e: Exception, logger: Logger = logger) -> None:\n", " logger.warning(f\"{e=}\")\n", " return _log_error" @@ -404,10 +452,25 @@ " self.exception_found = False\n", " \n", " def on_error(self, e: Exception) -> None:\n", + " \"\"\"\n", + " Handles an error by storing the exception.\n", + "\n", + " Args:\n", + " e: The exception to be handled.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", " self.exceptions.append(e)\n", " self.exception_found = True\n", " \n", " def _monitor_step(self) -> None:\n", + " \"\"\"\n", + " Raises the next exception in the queue.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", " if len(self.exceptions) > 0:\n", " e = self.exceptions.pop(0)\n", " raise e\n", @@ -485,9 +548,10 @@ " @abstractmethod\n", " async def run( # type: ignore\n", " self,\n", + " *,\n", " is_shutting_down_f: Callable[[], bool],\n", - " produce_func: Callable[[], Awaitable[ConsumerRecord]],\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " generator: Callable[[], Awaitable[ConsumerRecord]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " ) -> None:\n", " pass" ] @@ -539,7 +603,7 @@ "\n", "\n", "def _process_items_task( # type: ignore\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " task_pool: TaskPool\n", ") -> Callable[\n", " [\n", @@ -551,12 +615,12 @@ "]:\n", " async def _process_items_wrapper( # type: ignore\n", " receive_stream: anyio.streams.memory.MemoryObjectReceiveStream,\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]] = consume_func,\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]] = processor,\n", " task_pool= task_pool,\n", " ):\n", " async with receive_stream:\n", " async for msg in receive_stream:\n", - " task: asyncio.Task = asyncio.create_task(consume_func(msg)) # type: ignore\n", + " task: asyncio.Task = asyncio.create_task(processor(msg)) # type: ignore\n", " await task_pool.add(task)\n", "\n", " return _process_items_wrapper" @@ -591,17 +655,29 @@ "\n", " async def run( # type: ignore\n", " self,\n", + " *,\n", " is_shutting_down_f: Callable[[], bool],\n", - " produce_func: Callable[[], Awaitable[ConsumerRecord]],\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " generator: Callable[[], Awaitable[ConsumerRecord]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " ) -> None:\n", " send_stream, receive_stream = anyio.create_memory_object_stream(\n", " max_buffer_size=self.max_buffer_size\n", " )\n", + " \"\"\"\n", + " Runs the dynamic task executor.\n", + "\n", + " Args:\n", + " is_shutting_down_f: Function to check if the executor is shutting down.\n", + " generator: Generator function for retrieving consumer records.\n", + " processor: Processor function for processing consumer records.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", "\n", " async with self.exception_monitor, self.task_pool:\n", " async with anyio.create_task_group() as tg:\n", - " tg.start_soon(_process_items_task(consume_func, self.task_pool), receive_stream)\n", + " tg.start_soon(_process_items_task(processor, self.task_pool), receive_stream)\n", " async with send_stream:\n", " while not is_shutting_down_f():\n", " if (\n", @@ -609,7 +685,7 @@ " and self.throw_exceptions\n", " ):\n", " break\n", - " msgs = await produce_func()\n", + " msgs = await generator()\n", " for msg in msgs:\n", " await send_stream.send(msg)" ] @@ -672,9 +748,9 @@ "stream = DynamicTaskExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(),\n", - " produce_func=produce,\n", - " consume_func=consume,\n", + " is_shutting_down_f=is_shutting_down_f(),\n", + " generator=produce,\n", + " processor=consume,\n", ")" ] }, @@ -691,9 +767,9 @@ "stream = DynamicTaskExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(),\n", - " produce_func=mock_produce,\n", - " consume_func=mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", ")\n", "\n", "mock_produce.assert_awaited()\n", @@ -713,9 +789,9 @@ "stream = DynamicTaskExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(),\n", - " produce_func=mock_produce,\n", - " consume_func=mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", ")\n", "\n", "mock_produce.assert_called()\n", @@ -739,9 +815,9 @@ "\n", "with pytest.raises(RuntimeError) as e:\n", " await stream.run(\n", - " is_shutting_down_f(num_msgs),\n", - " produce_func=mock_produce,\n", - " consume_func=mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(num_msgs),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", " )\n", "\n", "mock_produce.assert_called()\n", @@ -784,9 +860,9 @@ "stream = DynamicTaskExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(num_msgs),\n", - " produce_func=mock_produce,\n", - " consume_func=mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(num_msgs),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", ")\n", "\n", "mock_produce.assert_called()\n", @@ -812,7 +888,7 @@ "\n", "\n", "def _process_items_coro( # type: ignore\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " throw_exceptions: bool,\n", ") -> Callable[\n", " [\n", @@ -824,13 +900,13 @@ "]:\n", " async def _process_items_wrapper( # type: ignore\n", " receive_stream: anyio.streams.memory.MemoryObjectReceiveStream,\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]] = consume_func,\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]] = processor,\n", " throw_exceptions: bool = throw_exceptions,\n", " ) -> Awaitable[None]:\n", " async with receive_stream:\n", " async for msg in receive_stream:\n", " try:\n", - " await consume_func(msg)\n", + " await processor(msg)\n", " except Exception as e:\n", " if throw_exceptions:\n", " raise e\n", @@ -861,20 +937,32 @@ "\n", " async def run( # type: ignore\n", " self,\n", + " *,\n", " is_shutting_down_f: Callable[[], bool],\n", - " produce_func: Callable[[], Awaitable[ConsumerRecord]],\n", - " consume_func: Callable[[ConsumerRecord], Awaitable[None]],\n", + " generator: Callable[[], Awaitable[ConsumerRecord]],\n", + " processor: Callable[[ConsumerRecord], Awaitable[None]],\n", " ) -> None:\n", + " \"\"\"\n", + " Runs the sequential executor.\n", + "\n", + " Args:\n", + " is_shutting_down_f: Function to check if the executor is shutting down.\n", + " generator: Generator function for retrieving consumer records.\n", + " processor: Processor function for processing consumer records.\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", "\n", " send_stream, receive_stream = anyio.create_memory_object_stream(\n", " max_buffer_size=self.max_buffer_size\n", " )\n", " \n", " async with anyio.create_task_group() as tg:\n", - " tg.start_soon(_process_items_coro(consume_func, self.throw_exceptions), receive_stream)\n", + " tg.start_soon(_process_items_coro(processor, self.throw_exceptions), receive_stream)\n", " async with send_stream:\n", " while not is_shutting_down_f():\n", - " msgs = await produce_func()\n", + " msgs = await generator()\n", " for msg in msgs:\n", " await send_stream.send(msg)" ] @@ -895,7 +983,11 @@ "stream = SequentialExecutor(throw_exceptions=True)\n", "\n", "with pytest.raises(ExceptionGroup) as e:\n", - " await stream.run(is_shutting_down_f(num_msgs), produce_func=mock_produce, consume_func=mock_consume)\n", + " await stream.run(\n", + " is_shutting_down_f=is_shutting_down_f(num_msgs),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", + " )\n", "\n", "mock_produce.assert_called()\n", "mock_consume.assert_awaited_with(\"msg\")" @@ -937,9 +1029,9 @@ "stream = SequentialExecutor()\n", "\n", "await stream.run(\n", - " is_shutting_down_f(num_msgs),\n", - " mock_produce,\n", - " mock_consume,\n", + " is_shutting_down_f=is_shutting_down_f(num_msgs),\n", + " generator=mock_produce,\n", + " processor=mock_consume,\n", ")\n", "\n", "mock_produce.assert_called()\n", @@ -957,6 +1049,18 @@ "\n", "\n", "def get_executor(executor: Union[str, StreamExecutor, None] = None) -> StreamExecutor:\n", + " \"\"\"\n", + " Returns an instance of the specified executor.\n", + "\n", + " Args:\n", + " executor: Executor instance or name of the executor.\n", + "\n", + " Returns:\n", + " Instance of the specified executor.\n", + "\n", + " Raises:\n", + " AttributeError: If the executor is not found.\n", + " \"\"\"\n", " if isinstance(executor, StreamExecutor):\n", " return executor\n", " elif executor is None:\n", diff --git a/nbs/013_ProducerDecorator.ipynb b/nbs/013_ProducerDecorator.ipynb index dc61680..fbb73ab 100644 --- a/nbs/013_ProducerDecorator.ipynb +++ b/nbs/013_ProducerDecorator.ipynb @@ -27,6 +27,7 @@ "from asyncio import iscoroutinefunction # do not use the version from inspect\n", "from collections import namedtuple\n", "from dataclasses import dataclass\n", + "from inspect import Parameter\n", "from typing import *\n", "\n", "from aiokafka import AIOKafkaProducer\n", @@ -101,6 +102,49 @@ "assert event.key == b\"123\"" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "4e7de730", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "\n", + "def unwrap_from_kafka_event(var_type: Union[Type, Parameter]) -> Union[Type, Parameter]:\n", + " \"\"\"\n", + " Unwraps the type from a KafkaEvent.\n", + "\n", + " Vars:\n", + " var_type: Type to unwrap.\n", + "\n", + " Returns:\n", + " Type: Unwrapped type if the given type is a KafkaEvent, otherwise returns the same type.\n", + "\n", + " Example:\n", + " - Input: KafkaEvent[str]\n", + " Output: str\n", + " - Input: int\n", + " Output: int\n", + " \"\"\"\n", + " if hasattr(var_type, \"__origin__\") and var_type.__origin__ == KafkaEvent:\n", + " return var_type.__args__[0] # type: ignore\n", + " else:\n", + " return var_type" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7277bcf0", + "metadata": {}, + "outputs": [], + "source": [ + "assert unwrap_from_kafka_event(KafkaEvent[int]) == int\n", + "assert unwrap_from_kafka_event(int) == int" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/nbs/014_AsyncAPI.ipynb b/nbs/014_AsyncAPI.ipynb index 53bd090..1bcaa48 100644 --- a/nbs/014_AsyncAPI.ipynb +++ b/nbs/014_AsyncAPI.ipynb @@ -39,8 +39,9 @@ "\n", "from fastkafka._components.docs_dependencies import _check_npm_with_local\n", "from fastkafka._components.logger import get_logger\n", - "from fastkafka._components.producer_decorator import KafkaEvent, ProduceCallable\n", - "from fastkafka._components.aiokafka_consumer_loop import ConsumeCallable" + "from fastkafka._components.producer_decorator import KafkaEvent, ProduceCallable, unwrap_from_kafka_event\n", + "from fastkafka._components.aiokafka_consumer_loop import ConsumeCallable, EventMetadata\n", + "from fastkafka._components.helpers import unwrap_list_type" ] }, { @@ -757,13 +758,13 @@ "text/html": [ "
{\n",
        "'consumers': {\n",
-       "│   │   'my_topic_1': <function on_my_topic_one at 0x7fa211b16050>,\n",
-       "│   │   'my_topic_2': <function on_my_topic_2 at 0x7fa211b164d0>\n",
+       "│   │   'my_topic_1': <function on_my_topic_one at 0x7fbc1208ee60>,\n",
+       "│   │   'my_topic_2': <function on_my_topic_2 at 0x7fbc124bdbd0>\n",
        "},\n",
        "'producers': {\n",
-       "│   │   'my_topic_3': <function to_my_topic_3 at 0x7fa2115f1e10>,\n",
-       "│   │   'my_topic_4': <function to_my_topic_4 at 0x7fa2115f1ea0>,\n",
-       "│   │   'my_topic_5': <function to_my_topic_5 at 0x7fa2115f1f30>\n",
+       "│   │   'my_topic_3': <function to_my_topic_3 at 0x7fbc11d655a0>,\n",
+       "│   │   'my_topic_4': <function to_my_topic_4 at 0x7fbc11d65630>,\n",
+       "│   │   'my_topic_5': <function to_my_topic_5 at 0x7fbc11d656c0>\n",
        "}\n",
        "}\n",
        "
\n" @@ -771,13 +772,13 @@ "text/plain": [ "\u001b[1m{\u001b[0m\n", "\u001b[2;32m│ \u001b[0m\u001b[32m'consumers'\u001b[0m: \u001b[1m{\u001b[0m\n", - "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_1'\u001b[0m: \u001b[1m<\u001b[0m\u001b[1;95mfunction\u001b[0m\u001b[39m on_my_topic_one at \u001b[0m\u001b[1;36m0x7fa211b16050\u001b[0m\u001b[39m>,\u001b[0m\n", - "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_2'\u001b[0m\u001b[39m: \u001b[0m\n", + "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_1'\u001b[0m: \u001b[1m<\u001b[0m\u001b[1;95mfunction\u001b[0m\u001b[39m on_my_topic_one at \u001b[0m\u001b[1;36m0x7fbc1208ee60\u001b[0m\u001b[39m>,\u001b[0m\n", + "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_2'\u001b[0m\u001b[39m: \u001b[0m\n", "\u001b[2;32m│ \u001b[0m\u001b[1;39m}\u001b[0m\u001b[39m,\u001b[0m\n", "\u001b[2;32m│ \u001b[0m\u001b[32m'producers'\u001b[0m\u001b[39m: \u001b[0m\u001b[1;39m{\u001b[0m\n", - "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_3'\u001b[0m\u001b[39m: ,\u001b[0m\n", - "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_4'\u001b[0m\u001b[39m: ,\u001b[0m\n", - "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_5'\u001b[0m\u001b[39m: \u001b[0m\n", + "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_3'\u001b[0m\u001b[39m: ,\u001b[0m\n", + "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_4'\u001b[0m\u001b[39m: ,\u001b[0m\n", + "\u001b[2;32m│ │ \u001b[0m\u001b[32m'my_topic_5'\u001b[0m\u001b[39m: \u001b[0m\n", "\u001b[2;32m│ \u001b[0m\u001b[1m}\u001b[0m\n", "\u001b[1m}\u001b[0m\n" ] @@ -828,7 +829,12 @@ "async def on_my_topic_2(msg: MyMsgEmail) -> None:\n", " raise NotImplemented\n", "\n", - "\n", + "async def on_my_topic_2_meta(msg: MyMsgEmail, meta: EventMetadata) -> None:\n", + " raise NotImplemented\n", + " \n", + "async def on_my_topic_2_batch(msg: List[MyMsgEmail]) -> None:\n", + " raise NotImplemented\n", + " \n", "async def to_my_topic_3(msg) -> MyMsgUrl:\n", " raise NotImplemented\n", "\n", @@ -869,11 +875,8 @@ " f\"Producer function must have a defined return value, got {return_type} as return value\"\n", " )\n", " \n", - " if hasattr(return_type, \"__origin__\") and return_type.__origin__ == KafkaEvent:\n", - " return_type = return_type.__args__[0]\n", - " \n", - " if hasattr(return_type, \"__origin__\") and return_type.__origin__ == list:\n", - " return_type = return_type.__args__[0]\n", + " return_type = unwrap_from_kafka_event(return_type)\n", + " return_type = unwrap_list_type(return_type)\n", "\n", " if not hasattr(return_type, \"json\"):\n", " raise ValueError(f\"Producer function return value must have json method\")\n", @@ -982,22 +985,28 @@ " types = get_type_hints(f)\n", " return_type = types.pop(\"return\", type(None))\n", " types_list = list(types.values())\n", + " # @app.consumer does not return a value\n", + " if return_type != type(None):\n", + " raise ValueError(\n", + " f\"Consumer function cannot return any value, got {return_type}\"\n", + " )\n", " # @app.consumer first consumer argument must be a msg which is a subclass of BaseModel\n", - " try: \n", - " if issubclass(BaseModel, types_list[0]):\n", + " try:\n", + " msg_type = types_list[0]\n", + "\n", + " msg_type = unwrap_list_type(msg_type)\n", + "\n", + " if not issubclass(msg_type, BaseModel):\n", " raise ValueError(\n", " f\"Consumer function first param must be a BaseModel subclass msg, got {types_list}\"\n", " )\n", + "\n", + " return msg_type # type: ignore\n", + " \n", " except IndexError:\n", - " raise ValueError(\n", - " f\"Consumer function first param must be a BaseModel subclass msg, got {types_list}\"\n", - " )\n", - " # @app.consumer does not return a value\n", - " if return_type != type(None):\n", " raise ValueError(\n", - " f\"Consumer function cannot return any value, got {return_type}\"\n", - " )\n", - " return types_list[0] # type: ignore" + " f\"Consumer function first param must be a BaseModel subclass msg, got {types_list}\"\n", + " )" ] }, { @@ -1023,6 +1032,52 @@ "assert actual == expected" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "0d0625f2", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "__main__.MyMsgEmail" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "expected = MyMsgEmail\n", + "actual = _get_msg_cls_for_consumer(on_my_topic_2_meta)\n", + "display(actual)\n", + "assert actual == expected" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d701a5c9", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "__main__.MyMsgEmail" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "expected = MyMsgEmail\n", + "actual = _get_msg_cls_for_consumer(on_my_topic_2_batch)\n", + "display(actual)\n", + "assert actual == expected" + ] + }, { "cell_type": "code", "execution_count": null, @@ -1691,7 +1746,7 @@ }, { "cell_type": "raw", - "id": "e7f6e6cd", + "id": "eb42773f", "metadata": {}, "source": [ "expected = {\n", @@ -2622,23 +2677,11 @@ "[INFO] __main__: Old async specifications at '/tmp/003_AsyncAPI/asyncapi/spec/asyncapi.yml' does not exist.\n", "[INFO] __main__: New async specifications generated at: '/tmp/003_AsyncAPI/asyncapi/spec/asyncapi.yml'\n", "[INFO] __main__: Async docs generated at '/tmp/003_AsyncAPI/asyncapi/docs'\n", - "[INFO] __main__: Output of '$ npx -y -p @asyncapi/generator ag /tmp/003_AsyncAPI/asyncapi/spec/asyncapi.yml @asyncapi/html-template -o /tmp/003_AsyncAPI/asyncapi/docs --force-write'npm WARN deprecated har-validator@5.1.5: this library is no longer supported\n", - "npm WARN deprecated uuid@3.4.0: Please upgrade to version 7 or higher. Older versions may use Math.random() in certain circumstances, which is known to be problematic. See https://v8.dev/blog/math-random for details.\n", - "npm WARN deprecated readdir-scoped-modules@1.1.0: This functionality has been moved to @npmcli/fs\n", - "npm WARN deprecated @npmcli/move-file@1.1.2: This functionality has been moved to @npmcli/fs\n", - "npm WARN deprecated request@2.88.2: request has been deprecated, see https://github.com/request/request/issues/3142\n", - "npm WARN deprecated mkdirp@0.3.5: Legacy versions of mkdirp are no longer supported. Please update to mkdirp 1.x. (Note that the API surface has changed to use Promises in 1.x.)\n", - "npm WARN deprecated mkdirp@0.3.5: Legacy versions of mkdirp are no longer supported. Please update to mkdirp 1.x. (Note that the API surface has changed to use Promises in 1.x.)\n", - "\u001b[32m\n", + "[INFO] __main__: Output of '$ npx -y -p @asyncapi/generator ag /tmp/003_AsyncAPI/asyncapi/spec/asyncapi.yml @asyncapi/html-template -o /tmp/003_AsyncAPI/asyncapi/docs --force-write'\u001b[32m\n", "\n", "Done! ✨\u001b[0m\n", "\u001b[33mCheck out your shiny new generated files at \u001b[0m\u001b[35m/tmp/003_AsyncAPI/asyncapi/docs\u001b[0m\u001b[33m.\u001b[0m\n", "\n", - "npm notice \n", - "npm notice New minor version of npm available! 9.5.0 -> 9.6.5\n", - "npm notice Changelog: \n", - "npm notice Run `npm install -g npm@9.6.5` to update!\n", - "npm notice \n", "\n" ] } diff --git a/nbs/016_Tester.ipynb b/nbs/016_Tester.ipynb index e8aa9a8..12512bb 100644 --- a/nbs/016_Tester.ipynb +++ b/nbs/016_Tester.ipynb @@ -28,7 +28,9 @@ "\n", "from fastkafka import KafkaEvent\n", "from fastkafka._application.app import FastKafka\n", + "from fastkafka._components.helpers import unwrap_list_type\n", "from fastkafka._components.meta import delegates, export, patch\n", + "from fastkafka._components.producer_decorator import unwrap_from_kafka_event\n", "from fastkafka._testing.apache_kafka_broker import ApacheKafkaBroker\n", "from fastkafka._testing.in_memory_broker import InMemoryBroker\n", "from fastkafka._testing.local_redpanda_broker import LocalRedpandaBroker" @@ -431,12 +433,12 @@ "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9788'}'\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9788', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", - "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", - "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9788', 'auto_offset_reset': 'latest', 'max_poll_records': 100}\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n", "[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'preprocessed_signals'})\n", "[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'preprocessed_signals'}\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n", + "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", + "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9788', 'auto_offset_reset': 'latest', 'max_poll_records': 100}\n", "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}. \n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n", "[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'predictions'})\n", @@ -449,10 +451,10 @@ "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 113359...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 113359 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 112987...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 112987 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 249907...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 249907 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 249547...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 249547 terminated.\n", "ok\n" ] } @@ -496,11 +498,7 @@ "def mirror_producer(topic: str, producer_f: Callable[..., Any]) -> Callable[..., Any]:\n", " msg_type = inspect.signature(producer_f).return_annotation\n", "\n", - " if hasattr(msg_type, \"__origin__\") and msg_type.__origin__ == KafkaEvent:\n", - " msg_type = msg_type.__args__[0]\n", - "\n", - " if hasattr(msg_type, \"__origin__\") and msg_type.__origin__ == list:\n", - " msg_type = msg_type.__args__[0]\n", + " msg_type_unwrapped = unwrap_list_type(unwrap_from_kafka_event(msg_type))\n", "\n", " async def skeleton_func(msg: BaseModel) -> None:\n", " pass\n", @@ -516,7 +514,7 @@ " parameters=[\n", " inspect.Parameter(\n", " name=\"msg\",\n", - " annotation=msg_type,\n", + " annotation=msg_type_unwrapped,\n", " kind=inspect.Parameter.POSITIONAL_OR_KEYWORD,\n", " )\n", " ]\n", @@ -593,6 +591,8 @@ "def mirror_consumer(topic: str, consumer_f: Callable[..., Any]) -> Callable[..., Any]:\n", " msg_type = inspect.signature(consumer_f).parameters[\"msg\"]\n", "\n", + " msg_type_unwrapped = unwrap_list_type(msg_type)\n", + " \n", " async def skeleton_func(msg: BaseModel) -> BaseModel:\n", " return msg\n", "\n", @@ -603,7 +603,7 @@ " mirror_func.__name__ = \"to_\" + topic\n", "\n", " # adjust arg and return val\n", - " sig = sig.replace(parameters=[msg_type], return_annotation=msg_type.annotation)\n", + " sig = sig.replace(parameters=[msg_type], return_annotation=msg_type_unwrapped.annotation)\n", "\n", " mirror_func.__signature__ = sig # type: ignore\n", " return mirror_func" @@ -776,7 +776,7 @@ } ], "source": [ - "# Test KafkaEvent mirroring\n", + "# Test KafkaEvent mirroring and consumer batching\n", "\n", "\n", "class TestMsg(BaseModel):\n", @@ -787,7 +787,7 @@ "\n", "\n", "@second_app.consumes()\n", - "async def on_preprocessed_signals(msg: TestMsg):\n", + "async def on_preprocessed_signals(msg: List[TestMsg]):\n", " await to_predictions(TestMsg(msg=\"prediction\"))\n", "\n", "\n", @@ -930,38 +930,32 @@ "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", - "stdout=, stderr=, returncode=1\n", - "[INFO] fastkafka._testing.apache_kafka_broker: zookeeper startup falied, generating a new port and retrying...\n", - "[INFO] fastkafka._testing.apache_kafka_broker: port=44383\n", "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n", - "stdout=, stderr=, returncode=1\n", - "[INFO] fastkafka._testing.apache_kafka_broker: kafka startup falied, generating a new port and retrying...\n", - "[INFO] fastkafka._testing.apache_kafka_broker: port=52761\n", - "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:52761\n", - "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:52761'}'\n", - "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:52761'}'\n", - "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:52761'}'\n", - "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:52761'}'\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n", + "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n", + "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n", + "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n", + "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", - "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:52761', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", - "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:52761'}'\n", + "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", + "[INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': '127.0.0.1:9092'}'\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n", "[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'preprocessed_signals'})\n", "[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'preprocessed_signals'}\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", - "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:52761', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", + "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}. \n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n", "[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'preprocessed_signals'})\n", "[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'preprocessed_signals'}\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", - "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:52761', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", + "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", - "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:52761', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", + "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", - "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:52761', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", + "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'preprocessed_signals': 1}. \n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n", "[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'topic1'})\n", @@ -975,9 +969,9 @@ "[INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'predictions'})\n", "[INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'predictions'}\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n", + "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'predictions': 1}. \n", "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'topic2': 1}. \n", "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'topic1': 1}. \n", - "[INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'predictions': 1}. \n", "Sending prediction: msg='prediction'\n", "Sending prediction: msg='prediction'\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n", @@ -990,10 +984,10 @@ "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n", "[INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 117420...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 117420 terminated.\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 116007...\n", - "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 116007 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 247517...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 247517 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 247156...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 247156 terminated.\n", "ok\n" ] } diff --git a/nbs/998_Internal_Helpers.ipynb b/nbs/998_Internal_Helpers.ipynb index 5c05a08..5048eca 100644 --- a/nbs/998_Internal_Helpers.ipynb +++ b/nbs/998_Internal_Helpers.ipynb @@ -77,7 +77,7 @@ "import sys\n", "from datetime import datetime, timedelta\n", "from functools import wraps\n", - "from inspect import signature\n", + "from inspect import signature, Parameter\n", "from pathlib import Path\n", "from typing import *\n", "\n", @@ -248,6 +248,48 @@ "time.sleep(0.1)\n", "assert f()" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "652fe0a0", + "metadata": {}, + "outputs": [], + "source": [ + "# | export\n", + "\n", + "def unwrap_list_type(var_type: Union[Type, Parameter]) -> Union[Type, Parameter]:\n", + " \"\"\"\n", + " Unwraps the type of a list.\n", + "\n", + " Vars:\n", + " var_type: Type to unwrap.\n", + "\n", + " Returns:\n", + " Unwrapped type if the given type is a list, otherwise returns the same type.\n", + "\n", + " Example:\n", + " - Input: List[str]\n", + " Output: str\n", + " - Input: int\n", + " Output: int\n", + " \"\"\"\n", + " if hasattr(var_type, \"__origin__\") and var_type.__origin__ == list:\n", + " return var_type.__args__[0] # type: ignore\n", + " else:\n", + " return var_type" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "cac74052", + "metadata": {}, + "outputs": [], + "source": [ + "assert unwrap_list_type(List[int]) == int\n", + "assert unwrap_list_type(int) == int" + ] } ], "metadata": { diff --git a/nbs/guides/Guide_11_Consumes_Basics.ipynb b/nbs/guides/Guide_11_Consumes_Basics.ipynb index d1d58c3..703f057 100644 --- a/nbs/guides/Guide_11_Consumes_Basics.ipynb +++ b/nbs/guides/Guide_11_Consumes_Basics.ipynb @@ -1167,39 +1167,6 @@ "assert \"Got msg: msg='Hello world'\" in consumer_task.value[1].decode(\"UTF-8\")" ] }, - { - "cell_type": "code", - "execution_count": null, - "id": "73446389", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "[6814]: [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to '127.0.0.1:50361'\n", - "[6814]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n", - "[6814]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': '127.0.0.1:50361', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n", - "[6814]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n", - "[6814]: [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'hello_world'})\n", - "[6814]: [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'hello_world'}\n", - "[6814]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n", - "[6814]: [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'hello_world': 1}. \n", - "[6814]: ConsumerRecord(topic='hello_world', partition=0, offset=0, timestamp=1683803949271, timestamp_type=0, key=None, value=b'{\"msg\": \"Hello world\"}', checksum=None, serialized_key_size=-1, serialized_value_size=22, headers=())\n", - "[6814]: [INFO] consumer_example: Got msg: msg='Hello world'\n", - "Starting process cleanup, this may take a few seconds...\n", - "[INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 6814...\n", - "[6814]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n", - "[6814]: [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n", - "[INFO] fastkafka._server: terminate_asyncio_process(): Process 6814 terminated.\n", - "\n" - ] - } - ], - "source": [ - "print(consumer_task.value[1].decode(\"UTF-8\"))" - ] - }, { "cell_type": "markdown", "id": "9572ebf3", diff --git a/nbs/guides/Guide_12_Batch_Consuming.ipynb b/nbs/guides/Guide_12_Batch_Consuming.ipynb new file mode 100644 index 0000000..fcda102 --- /dev/null +++ b/nbs/guides/Guide_12_Batch_Consuming.ipynb @@ -0,0 +1,365 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "84b22f68", + "metadata": {}, + "source": [ + "# Batch consuming" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7a35b05c", + "metadata": {}, + "outputs": [], + "source": [ + "# | hide\n", + "\n", + "import asyncio\n", + "\n", + "import asyncer\n", + "from IPython.display import Markdown as md\n", + "\n", + "from fastkafka._components._subprocess import terminate_asyncio_process\n", + "from fastkafka._testing.apache_kafka_broker import run_and_match\n", + "from fastkafka.testing import ApacheKafkaBroker, run_script_and_cancel" + ] + }, + { + "cell_type": "markdown", + "id": "8c655e4f", + "metadata": {}, + "source": [ + "If you want to consume data in batches `@consumes` decorator makes that possible for you. By typing a consumed msg object as a `list` of messages the consumer will call your consuming function with a batch of messages consumed from a single partition. Let's demonstrate that now." + ] + }, + { + "cell_type": "markdown", + "id": "18535f2f", + "metadata": {}, + "source": [ + "## Consume function with batching\n", + "\n", + "To consume messages in batches, you need to wrap you message type into a list and the `@consumes` decorator will take care of the rest for you. Your consumes function will be called with batches grouped by partition now." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d09190cd", + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "```python\n", + "@app.consumes(auto_offset_reset=\"earliest\")\n", + "async def on_hello_world(msg: List[HelloWorld]):\n", + " logger.info(f\"Got msg batch: {msg}\")\n", + "\n", + "```" + ], + "text/plain": [ + "" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# | echo: false\n", + "\n", + "consumes_decorator_batch = \"\"\"@app.consumes(auto_offset_reset=\"earliest\")\n", + "async def on_hello_world(msg: List[HelloWorld]):\n", + " logger.info(f\"Got msg batch: {msg}\")\n", + "\"\"\"\n", + "md(f\"```python\\n{consumes_decorator_batch}\\n```\")" + ] + }, + { + "cell_type": "markdown", + "id": "f66b68d7", + "metadata": {}, + "source": [ + "## App example\n", + "\n", + "We will modify the app example from [@consumes basics](/docs/guides/Guide_11_Consumes_Basics.md) guide to consume `HelloWorld` messages batch. The final app will look like this (make sure you replace the `` and `` with the actual values):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "673c7f8a", + "metadata": {}, + "outputs": [], + "source": [ + "# | hide\n", + "\n", + "app = \"\"\"\n", + "import asyncio\n", + "from typing import List\n", + "from pydantic import BaseModel, Field\n", + "\n", + "from fastkafka import FastKafka\n", + "from fastkafka._components.logger import get_logger\n", + "\n", + "logger = get_logger(__name__)\n", + "\n", + "class HelloWorld(BaseModel):\n", + " msg: str = Field(\n", + " ...,\n", + " example=\"Hello\",\n", + " description=\"Demo hello world message\",\n", + " )\n", + "\n", + "kafka_brokers = {\n", + " \"demo_broker\": {\n", + " \"url\": \"\",\n", + " \"description\": \"local demo kafka broker\",\n", + " \"port\": \"\",\n", + " }\n", + "}\n", + "\n", + "app = FastKafka(kafka_brokers=kafka_brokers)\n", + "\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2abb4c3d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "```python\n", + "\n", + "import asyncio\n", + "from typing import List\n", + "from pydantic import BaseModel, Field\n", + "\n", + "from fastkafka import FastKafka\n", + "from fastkafka._components.logger import get_logger\n", + "\n", + "logger = get_logger(__name__)\n", + "\n", + "class HelloWorld(BaseModel):\n", + " msg: str = Field(\n", + " ...,\n", + " example=\"Hello\",\n", + " description=\"Demo hello world message\",\n", + " )\n", + "\n", + "kafka_brokers = {\n", + " \"demo_broker\": {\n", + " \"url\": \"\",\n", + " \"description\": \"local demo kafka broker\",\n", + " \"port\": \"\",\n", + " }\n", + "}\n", + "\n", + "app = FastKafka(kafka_brokers=kafka_brokers)\n", + "\n", + "@app.consumes(auto_offset_reset=\"earliest\")\n", + "async def on_hello_world(msg: List[HelloWorld]):\n", + " logger.info(f\"Got msg batch: {msg}\")\n", + "\n", + "```" + ], + "text/plain": [ + "" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# | echo: false\n", + "\n", + "batch_example = app + consumes_decorator_batch\n", + "\n", + "md(f\"```python\\n{batch_example}\\n```\")" + ] + }, + { + "cell_type": "markdown", + "id": "d4ec6dab", + "metadata": {}, + "source": [ + "## Send the messages to kafka topic" + ] + }, + { + "cell_type": "markdown", + "id": "c81e65bc", + "metadata": {}, + "source": [ + "Lets send a couple of `HelloWorld` messages to the *hello_world* topic and check if our consumer kafka application has logged the received messages batch. In your terminal, run the following command at least two times to create multiple messages in your kafka queue:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6ef181f6", + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "```shell\n", + "echo {\\\"msg\\\": \\\"Hello world\\\"} | kafka-console-producer.sh --topic=hello_world --bootstrap-server=\n", + "```" + ], + "text/plain": [ + "" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# | echo: false\n", + "\n", + "producer_cmd = 'echo {\\\\\"msg\\\\\": \\\\\"Hello world\\\\\"} | kafka-console-producer.sh --topic=hello_world --bootstrap-server='\n", + "md(f\"```shell\\n{producer_cmd}\\n```\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01604778", + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "Now we can run the app. Copy the code of the example app in consumer_example.py and run it by running\n", + "```shell\n", + "fastkafka run --num-workers=1 --kafka-broker=demo_broker consumer_example:app\n", + "```" + ], + "text/plain": [ + "" + ] + }, + "execution_count": null, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# | echo: false\n", + "\n", + "script_file = \"consumer_example.py\"\n", + "filename = script_file.split(\".py\")[0]\n", + "cmd = f\"fastkafka run --num-workers=1 --kafka-broker=demo_broker {filename}:app\"\n", + "md(\n", + " f\"Now we can run the app. Copy the code of the example app in {script_file} and run it by running\\n```shell\\n{cmd}\\n```\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a66904c8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): entering...\n", + "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): (<_UnixSelectorEventLoop running=True closed=False debug=False>) is already running!\n", + "[WARNING] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): calling nest_asyncio.apply()\n", + "[INFO] fastkafka._components.test_dependencies: Java is already installed.\n", + "[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n", + "[INFO] fastkafka._components.test_dependencies: Kafka is installed.\n", + "[INFO] fastkafka._components.test_dependencies: But not exported to PATH, exporting...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Starting zookeeper...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Starting kafka...\n", + "[INFO] fastkafka._testing.apache_kafka_broker: Local Kafka broker up and running on 127.0.0.1:9092\n", + "[INFO] fastkafka._testing.apache_kafka_broker: .start(): returning 127.0.0.1:9092\n", + "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.start(): exited.\n", + "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): entering...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 60656...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 60656 terminated.\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Terminating the process 60277...\n", + "[INFO] fastkafka._components._subprocess: terminate_asyncio_process(): Process 60277 terminated.\n", + "[INFO] fastkafka._testing.apache_kafka_broker: ApacheKafkaBroker.stop(): exited.\n" + ] + } + ], + "source": [ + "# | hide\n", + "\n", + "\n", + "with ApacheKafkaBroker(\n", + " topics=[\"hello_world\"], apply_nest_asyncio=True\n", + ") as bootstrap_server:\n", + " async with asyncer.create_task_group() as task_group:\n", + " server_url = bootstrap_server.split(\":\")[0]\n", + " server_port = bootstrap_server.split(\":\")[1]\n", + "\n", + " producer_tasks = [task_group.soonify(asyncio.create_subprocess_shell)(\n", + " cmd=producer_cmd.replace(\n", + " \"\", bootstrap_server\n", + " ),\n", + " stdout=asyncio.subprocess.PIPE,\n", + " stderr=asyncio.subprocess.PIPE,\n", + " ) for _ in range(2)]\n", + " \n", + " await asyncio.sleep(5)\n", + " \n", + " consumer_task = task_group.soonify(run_script_and_cancel)(\n", + " script=batch_example.replace(\n", + " \"\", server_url\n", + " ).replace(\"\", server_port),\n", + " script_file=script_file,\n", + " cmd=cmd,\n", + " cancel_after=20,\n", + " )\n", + "\n", + "assert \"Got msg batch: [HelloWorld(msg='Hello world'), HelloWorld(msg='Hello world')]\" in consumer_task.value[1].decode(\"UTF-8\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e86e202e", + "metadata": {}, + "outputs": [], + "source": [ + "# | echo: False\n", + "\n", + "print(consumer_task.value[1].decode(\"UTF-8\"))" + ] + }, + { + "cell_type": "markdown", + "id": "9292901c", + "metadata": {}, + "source": [ + "You should see the your Kafka messages being logged in batches by your consumer." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "python3", + "language": "python", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/nbs/sidebar.yml b/nbs/sidebar.yml index eb53e91..f3a9cfc 100644 --- a/nbs/sidebar.yml +++ b/nbs/sidebar.yml @@ -8,6 +8,7 @@ website: - section: Writing services contents: - guides/Guide_11_Consumes_Basics.ipynb + - guides/Guide_12_Batch_Consuming.ipynb - guides/Guide_21_Produces_Basics.ipynb - guides/Guide_22_Partition_Keys.ipynb - guides/Guide_23_Batch_Producing.ipynb