Dispytch is an asynchronous Python framework designed to streamline the development of event-driven services.
๐ Features
๐ง Async core โ built for modern Python I/O
๐ FastAPI-style dependency injection โ clean, decoupled handlers
๐ฌ Pluggable transport layer โ with Kafka, RabbitMQ and Redis PubSub out-of-the-box
๐งพ Pydantic v2 validation โ event schemas are validated using pydantic
๐ Built-in retry logic โ configurable, resilient, no boilerplate
โ
Automatic acknowledgement โ events are acknowledged automatically
โ ๏ธ Error Handling โ handle failures and prevent message loss with DLQ
โ๏ธ Composable Middleware โ set up logging, metrics, filtering, observability
โจ Example: Emitting Events
Kafka RabbitMQ Redis Pub Sub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 import uuid
from datetime import datetime
from pydantic import BaseModel
from dispytch import EventEmitter , EventBase
from dispytch.kafka import KafkaEventRoute
class User ( BaseModel ):
id : str
email : str
name : str
class UserEvent ( EventBase ):
__route__ = KafkaEventRoute (
topic = "user_events"
)
class UserRegistered ( UserEvent ):
type : str = "user_registered"
user : User
timestamp : int
async def example_emit ( emitter : EventEmitter ):
await emitter . emit (
UserRegistered (
user = User (
id = str ( uuid . uuid4 ()),
email = "example@mail.com" ,
name = "John Doe" ,
),
timestamp = int ( datetime . now () . timestamp ()),
)
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 import uuid
from datetime import datetime
from pydantic import BaseModel
from dispytch import EventEmitter , EventBase
from dispytch.rabbitmq import RabbitMQEventRoute
class User ( BaseModel ):
id : str
email : str
name : str
class UserRegistered ( EventBase ):
__route__ = RabbitMQEventRoute (
exchange = "user.events" ,
routing_key = "user.registered"
)
user : User
timestamp : int
async def example_emit ( emitter : EventEmitter ):
await emitter . emit (
UserRegistered (
user = User (
id = str ( uuid . uuid4 ()),
email = "example@mail.com" ,
name = "John Doe" ,
),
timestamp = int ( datetime . now () . timestamp ()),
)
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 import uuid
from datetime import datetime
from pydantic import BaseModel
from dispytch import EventEmitter , EventBase
from dispytch.redis import RedisEventRoute
class UserNotification ( EventBase ):
__route__ = RedisEventRoute (
channel = "user. {user_id} .notification" ,
)
user_id : int
message : str
timestamp : int
async def example_emit ( emitter : EventEmitter , user_id : int ):
await emitter . emit (
UserNotification (
user_id = user_id ,
message = "Hello from Dispytch example" ,
timestamp = int ( datetime . now () . timestamp ()),
)
)
โจ Example: Handling Events
Kafka RabbitMQ Redis Pub Sub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 from typing import Annotated
from pydantic import BaseModel
from dispytch import Event , Dependency , Router
from dispytch.kafka import KafkaEventSubscription
from dispytch.middleware import Filter
# Service Dependency
class UserService :
def __init__ ( self ):
self . users = []
async def do_smth_with_the_user ( self , user ):
print ( "Doing something with user" , user )
self . users . append ( user )
def get_user_service ():
return UserService ()
# Event Schemas
class User ( BaseModel ):
id : str
email : str
name : str
class UserCreatedEvent ( BaseModel ):
type : str
user : User
timestamp : int
# Event handler
user_events = Router ()
@user_events . handler (
KafkaEventSubscription ( topic = "user_events" ),
middlewares = [ Filter ( lambda ctx : ctx . event [ "type" ] == "user_registered" )]
)
async def handle_user_registered (
event : Event [ UserCreatedEvent ],
user_service : Annotated [ UserService , Dependency ( get_user_service )]
):
user = event . user
timestamp = event . timestamp
print ( f "[User Registered] { user . id } - { user . email } at { timestamp } " )
await user_service . do_smth_with_the_user ( user )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 from typing import Annotated
from pydantic import BaseModel
from dispytch import Event , Dependency , Router
from dispytch.rabbitmq import RabbitMQEventSubscription
# Service Dependency
class UserService :
def __init__ ( self ):
self . users = []
async def do_smth_with_the_user ( self , user ):
print ( "Doing something with user" , user )
self . users . append ( user )
def get_user_service ():
return UserService ()
# Event Schemas
class User ( BaseModel ):
id : str
email : str
name : str
class UserCreatedEvent ( BaseModel ):
type : str
user : User
timestamp : int
# Event handler
user_events = Router ()
@user_events . handler ( RabbitMQEventSubscription ( routing_key = "user.registered" ))
async def handle_user_registered (
event : Event [ UserCreatedEvent ],
user_service : Annotated [ UserService , Dependency ( get_user_service )]
):
user = event . user
timestamp = event . timestamp
print ( f "[User Registered] { user . id } - { user . email } at { timestamp } " )
await user_service . do_smth_with_the_user ( user )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 from typing import Annotated
from pydantic import BaseModel
from dispytch import Event , Dependency , Router , SubscriptionParam
from dispytch.redis import RedisEventSubscription
# Event Schemas
class UserNotification ( BaseModel ):
message : str
timestamp : int
# Event handler
user_events = Router ()
@user_events . handler ( RedisEventSubscription ( channel = "user. {user_id} .notification" ))
async def handle_user_notification (
event : Event [ UserNotification ],
user_id : Annotated [ int , SubscriptionParam ()]
):
print ( f "[User Notification] From: { user_id } - { event . message } at { event . timestamp } " )