Skip to content

Help with pytest #887

Answered by davorrunje
ChadDa3mon asked this question in Q&A
Discussion options

You must be logged in to vote

Please try this:

from typing import Any, Dict

import pytest
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker, TestKafkaBroker

crazy_dict = {"msg": "test", "n": 2, "d": {"a": 1, "b": 2}}

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("test")
async def handle(msg: Dict[str, Any], logger: Logger):
    logger.info(f"handle(msg={msg})")
    return "response"


@pytest.mark.asyncio
async def test_handle():
    async with TestKafkaBroker(broker) as br:

        await br.publish(crazy_dict, "test")

        # check an incoming message body
        handle.mock.assert_called_with(crazy_dict)

If you run it with pytest -s, you …

Replies: 1 comment

Comment options

You must be logged in to vote
0 replies
Answer selected by Lancetnik
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
2 participants