
Dispytch is a lightweight, async Python framework for event-handling.
Itβs designed to streamline the development of clean and testable event-driven services.
π Features
- π§ Async core β built for modern Python I/O
- π FastAPI-style dependency injection β clean, decoupled handlers
- π¬ Backend-flexible β with Kafka, RabbitMQ and Redis PubSub out-of-the-box
- π§Ύ Pydantic-based validation β event schemas are validated using pydantic
- π Built-in retry logic β configurable, resilient, no boilerplate
- β
Automatic acknowledgement β events are acknowledged automatically after successful processing
β¨ Example: Emitting Events
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | import uuid
from datetime import datetime
from pydantic import BaseModel
from dispytch import EventBase
class User(BaseModel):
id: str
email: str
name: str
class UserRegistered(EventBase):
__topic__ = "user_events"
__event_type__ = "user_registered"
user: User
timestamp: int
async def example_emit(emitter):
await emitter.emit(
UserRegistered(
user=User(
id=str(uuid.uuid4()),
email="example@mail.com",
name="John Doe",
),
timestamp=int(datetime.now().timestamp()),
)
)
|
β¨ Example: Handling Events
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 | from typing import Annotated
from pydantic import BaseModel
from dispytch import Event, Dependency, HandlerGroup
from service import UserService, get_user_service
class User(BaseModel):
id: str
email: str
name: str
# Define event body schema
class UserCreatedEvent(BaseModel):
user: User
timestamp: int
user_events = HandlerGroup()
@user_events.handler(topic='user_events', event='user_registered')
async def handle_user_registered(
event: Event[UserCreatedEvent],
user_service: Annotated[UserService, Dependency(get_user_service)]
):
user = event.body.user
timestamp = event.body.timestamp
print(f"[User Registered] {user.id} - {user.email} at {timestamp}")
await user_service.do_smth_with_the_user(user)
|