Create an EventEmitter with your configured backend producer, then emit events asynchronously:
1 2 3 4 5 6 7 8 910111213141516
fromaiokafkaimportAIOKafkaProducerfromdispytchimportEventEmitter,EventBasefromdispytch.kafkaimportKafkaProducer,KafkaEventRouteasyncdefmain():raw_kafka_producer=AIOKafkaProducer(bootstrap_servers="localhost:19092")awaitraw_kafka_producer.start()# REMEMBER TO START THE PRODUCER!producer=KafkaProducer(raw_kafka_producer)emitter=EventEmitter(producer)awaitemitter.emit(MyEvent(user_id="user_123",value=123))print("Event sent!")
# !!! Important: Use the asyncio-compatible Redis client from redis.asynciofromredis.asyncioimportRedisfromdispytchimportEventEmitter,EventBasefromdispytch.redisimportRedisProducer,RedisEventRouteasyncdefmain():redis=Redis()producer=RedisProducer(redis)emitter=EventEmitter(producer)awaitemitter.emit(MyEvent(user_id="user_123",value=123))print("Event sent!")
Organize handlers with Router. Define the event schema using Pydantic BaseModel, then decorate your handler function:
1 2 3 4 5 6 7 8 910111213141516
frompydanticimportBaseModelfromdispytchimportRouter,Eventfromdispytch.kafkaimportKafkaEventSubscriptionclassMyEvent(BaseModel):user_id:strvalue:intmy_router=Router()@my_router.handler(KafkaEventSubscription(topic="my_topic"))asyncdefhandle_user_event(event:Event[MyEvent]):print(f"Received user event from user: {event.user_id} with value: {event.value}")
1 2 3 4 5 6 7 8 910111213141516
frompydanticimportBaseModelfromdispytchimportRouter,Eventfromdispytch.rabbitmqimportRabbitMQEventSubscriptionclassMyEvent(BaseModel):user_id:strvalue:intmy_router=Router()@my_router.handler(RabbitMQEventSubscription(routing_key="my.routing.key"))asyncdefhandle_user_event(event:Event[MyEvent]):print(f"Received user event from user: {event.user_id} with value: {event.value}")
1 2 3 4 5 6 7 8 910111213141516
frompydanticimportBaseModelfromdispytchimportRouter,Eventfromdispytch.redisimportRedisEventSubscriptionclassMyEvent(BaseModel):user_id:strvalue:intmy_router=Router()@my_router.handler(RedisEventSubscription(channel="my.channel"))asyncdefhandle_user_event(event:Event[MyEvent]):print(f"Received user event from user: {event.user_id} with value: {event.value}")
importasynciofromaiokafkaimportAIOKafkaConsumerfromdispytch.dispatcherimportEventDispatcherfromdispytch.kafkaimportKafkaConsumerfromroutersimportmy_routerasyncdefmain():consumer=KafkaConsumer(AIOKafkaConsumer("my_topic",bootstrap_servers="localhost:9092",enable_auto_commit=False,# must be false, dispytch handles offsetsgroup_id='consumer_group_id',auto_offset_reset='earliest'))awaitconsumer.start()# Remember to start the consumerdispatcher=EventDispatcher(consumer)dispatcher.add_router(my_router)awaitdispatcher.start()if__name__=="__main__":asyncio.run(main())
importasyncioimportaio_pikafromdispytch.dispatcherimportEventDispatcherfromdispytch.rabbitmqimportRabbitMQConsumerfromroutersimportmy_routerasyncdefsetup_queue():connection=awaitaio_pika.connect_robust("amqp://guest:guest@localhost:5672")channel=awaitconnection.channel()exchange=awaitchannel.declare_exchange("my.exchange",aio_pika.ExchangeType.DIRECT)queue=awaitchannel.declare_queue("my.events.queue",durable=True)awaitqueue.bind(exchange,routing_key="my.routing.key")returnqueueasyncdefmain():queue=awaitsetup_queue()consumer=RabbitMQConsumer(queue)awaitconsumer.start()# Remember to start the consumerdispatcher=EventDispatcher(consumer)dispatcher.add_router(my_router)awaitdispatcher.start()if__name__=="__main__":asyncio.run(main())