Skip to content

🚀 Quickstart

Get your event-driven flow running with Dispytch in four simple steps.


1. Define Your Event

Subclass EventBase to declare your event’s route, along with its payload:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from dispytch import EventBase
from dispytch.kafka import KafkaEventRoute


class MyEvent(EventBase):
    __route__ = KafkaEventRoute(
        topic="my_topic",
    )

    user_id: str
    value: int
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from dispytch import EventBase
from dispytch.rabbitmq import RabbitMQEventRoute


class MyEvent(EventBase):
    __route__ = RabbitMQEventRoute(
        exchange="my.exchange",
        routing_key="my.routing.key",
    )

    user_id: str
    value: int
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from dispytch import EventBase
from dispytch.redis import RedisEventRoute


class MyEvent(EventBase):
    __route__ = RedisEventRoute(
        channel="my.channel",
    )

    user_id: str
    value: int

2. Emit Events

Create an EventEmitter with your configured backend producer, then emit events asynchronously:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from aiokafka import AIOKafkaProducer
from dispytch import EventEmitter, EventBase
from dispytch.kafka import KafkaProducer, KafkaEventRoute


async def main():
    raw_kafka_producer = AIOKafkaProducer(bootstrap_servers="localhost:19092")
    await raw_kafka_producer.start()  # REMEMBER TO START THE PRODUCER!

    producer = KafkaProducer(raw_kafka_producer)
    emitter = EventEmitter(producer)

    await emitter.emit(
        MyEvent(user_id="user_123", value=123)
    )
    print("Event sent!")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import aio_pika
from dispytch import EventEmitter, EventBase
from dispytch.rabbitmq import RabbitMQProducer, RabbitMQEventRoute


async def main():
    connection = await aio_pika.connect('amqp://guest:guest@localhost:5672')
    channel = await connection.channel()
    exchange = await channel.declare_exchange('my.exchange', aio_pika.ExchangeType.DIRECT)

    producer = RabbitMQProducer([exchange])
    emitter = EventEmitter(producer)

    await emitter.emit(
        MyEvent(user_id="user_123", value=123)
    )
    print("Event sent!")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
# !!! Important: Use the asyncio-compatible Redis client from redis.asyncio
from redis.asyncio import Redis
from dispytch import EventEmitter, EventBase
from dispytch.redis import RedisProducer, RedisEventRoute


async def main():
    redis = Redis()

    producer = RedisProducer(redis)
    emitter = EventEmitter(producer)

    await emitter.emit(
        MyEvent(user_id="user_123", value=123)
    )
    print("Event sent!")

3. Register Event Handlers

Organize handlers with Router. Define the event schema using Pydantic BaseModel, then decorate your handler function:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pydantic import BaseModel
from dispytch import Router, Event
from dispytch.kafka import KafkaEventSubscription


class MyEvent(BaseModel):
    user_id: str
    value: int


my_router = Router()


@my_router.handler(KafkaEventSubscription(topic="my_topic"))
async def handle_user_event(event: Event[MyEvent]):
    print(f"Received user event from user: {event.user_id} with value: {event.value}")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pydantic import BaseModel
from dispytch import Router, Event
from dispytch.rabbitmq import RabbitMQEventSubscription


class MyEvent(BaseModel):
    user_id: str
    value: int


my_router = Router()


@my_router.handler(RabbitMQEventSubscription(routing_key="my.routing.key"))
async def handle_user_event(event: Event[MyEvent]):
    print(f"Received user event from user: {event.user_id} with value: {event.value}")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
from pydantic import BaseModel
from dispytch import Router, Event
from dispytch.redis import RedisEventSubscription


class MyEvent(BaseModel):
    user_id: str
    value: int


my_router = Router()


@my_router.handler(RedisEventSubscription(channel="my.channel"))
async def handle_user_event(event: Event[MyEvent]):
    print(f"Received user event from user: {event.user_id} with value: {event.value}")

4. Start the Dispatcher

Connect your backend consumer to an EventDispatcher, register your router(s) and start the dispatcher:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
from aiokafka import AIOKafkaConsumer
from dispytch.dispatcher import EventDispatcher
from dispytch.kafka import KafkaConsumer
from routers import my_router


async def main():
    consumer = KafkaConsumer(
        AIOKafkaConsumer(
            "my_topic",
            bootstrap_servers="localhost:9092",
            enable_auto_commit=False,  # must be false, dispytch handles offsets
            group_id='consumer_group_id',
            auto_offset_reset='earliest'
        )
    )
    await consumer.start() # Remember to start the consumer

    dispatcher = EventDispatcher(consumer)
    dispatcher.add_router(my_router)
    await dispatcher.start()


if __name__ == "__main__":
    asyncio.run(main())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import aio_pika
from dispytch.dispatcher import EventDispatcher
from dispytch.rabbitmq import RabbitMQConsumer
from routers import my_router


async def setup_queue():
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost:5672")
    channel = await connection.channel()

    exchange = await channel.declare_exchange("my.exchange", aio_pika.ExchangeType.DIRECT)
    queue = await channel.declare_queue("my.events.queue", durable=True)
    await queue.bind(exchange, routing_key="my.routing.key")

    return queue


async def main():
    queue = await setup_queue()

    consumer = RabbitMQConsumer(queue)
    await consumer.start()  # Remember to start the consumer

    dispatcher = EventDispatcher(consumer)
    dispatcher.add_router(my_router)
    await dispatcher.start()


if __name__ == "__main__":
    asyncio.run(main())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
from redis.asyncio import Redis
from dispytch.dispatcher import EventDispatcher
from dispytch.redis import RedisConsumer
from routers import my_router


async def main():
    redis = Redis()
    pubsub = redis.pubsub()
    await pubsub.psubscribe("my.channel")

    consumer = RedisConsumer(pubsub)

    dispatcher = EventDispatcher(consumer)
    dispatcher.add_router(my_router)
    await dispatcher.start()


if __name__ == "__main__":
    asyncio.run(main())

That’s It!

And there you have it: Events defined, emitted, and handled. Fully decoupled via DI and middleware. Simple, clean, and async