Skip to content

Small router in rust to listen to various streaming sources then initiate and route payloads to prefect jobs

License

Notifications You must be signed in to change notification settings

nicelgueta/prefect-event-router

Repository files navigation

Prefect Event Router

This is an asynchronous event handler written in Rust that is used to handle events that should kick-off a prefect flow. It is a simple handler that listens to some queue/socket-like source to kick off prefect flows.

Why

As an open source user of Prefect, there is no native support for an event-driven architecture (without having to use prefect cloud). Working in a business with incoming events that should trigger a flow, I needed a way to listen to these events and kick off a flow in Prefect all within our private network.

How it works

The event handler is a simple tokio rust application that asynchronously listens to multiple streams and can kicks off a prefect flow depending on the type of message received to any of the target message streams.

Architecture diagram

architecture

Usage

The application works by using a JSON config file for the queue-message type-flow mapping. A config is passed to each async thread thats created to listen to a queue.

Here's an example: example-config.json

{
    "threads": [
        {
            "publisher_type": "AzureStorageQueue",
            "storage_account": "storage-account-name",
            "queue_name": "test",
        },
        // From the bundled example:
        {
            "publisher_type": "StdInput",
        }
    ]
}

The binary listener is then run using the following command. Each thread runs forever until a SIGINT is sent to the main process.

./prefect-event-handler example-config.json

Example

Perhaps the best way to demonstrate how this works is to use the included example in this repo. Here we will setup a local prefect instance in a dedicated local venv, deploy two flows and then run the prefect-event-router to receive messages from stdinput that will be used to kick off our test flows.

Setup

Prequisites: Cargo and Python must be installed prior to running this setup.

To setup, simply run make setup-prefect-test

  1. Setup a local prefect server in a python virtualenv in the prefect-testing/ directiory.
  2. Deploy two flows from prefect-testing/test_flow.py - one without parameters and one with.

Running the example

To see this in full action, open three separate terminals to simulate the 3 processes that work in tandem:

  1. In terminal 1, run: make start-prefect - this will start a local prefect server. Here you can view flows in action from the GUI.
  2. In terminal 2, run: make start-worker - this starts the worker process that will actually execute the flows that have been initiated by our handler.
  3. In the final terminal, run: make run-example - this exceutes our compiled handler process using the test.json configuration provided in this repo.

You may see some SQLAlchemy errors when you start up the prefect processes - just ignore these, they're irrelevant to this test and don't prevent it from working.

The test json:

{
    "threads": [
        {
            "publisher_type": "StdInput"
        }
    ]
}

You can see from this json that our handler will create one listener thread that will await input from stdin. Depending on the message received, it will either execute 1 of two different flows (these were both deployed in the setup).

Messages should be received in JSON format. For the first message, this is the expected input. paste this into the terminal and you should see the prefect flow in terminal 2, beginning to kick-off.

{"flow_name": "Test Flow", "deployment_name": "integration-test"}

Now try dropping in a message for the second flow that takes parameters. You should see the flow in prefect being executed with the payload provided:

{"flow_name": "Test Flow with Params", "deployment_name": "integration-test", "payload": {"name": "Gordon Bennett"}}

Simple!

Once finished, to clear out the example venv and config, just run make reset-prefect-test.

Beyond the example

Setup

A few environment variables are required to get started. The first is the URI to your prefect instance. (If you're running this application on the same machine as your prefect server, this is commonly http://127.0.0.1:4200/api)

export PREFECT_API_URL="https://[email protected]/api"

Server Authentication

NB. Only applies if you've explicitly added authentication to your prefect server. The standard way to authenticate to prefect cloud is by using a JWT. This application supports the same pattern for self-hosted prefect servers that have implemented this feature. Simply set the following environment variable to trigger authentication:

export PREFECT_API_KEY="your-api-key"

Publisher Authentication

Azure

The application uses the DefaultAzureCredential to authenticate with Azure storage accounts. This means that it will use the environment variables or the managed identity of the VM it is running on to authenticate.

Main Features

  • Add simple stdin example
  • Add Azure Storage Queue support
  • Add ØMQ support
  • Add Kafka Support
  • Add a logging system to log the messages received and the flows kicked off

Building

To build the event handler, you need to have rust and cargo installed. You can install rust using rustup. Once you have rust installed, you can build the event handler using the following command:

cargo build --release

About

Small router in rust to listen to various streaming sources then initiate and route payloads to prefect jobs

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published