Skip to content

Dispytch


Dispytch is an asynchronous Python framework designed to streamline the development of event-driven services.

๐Ÿš€ Features

  • ๐Ÿง  Async core โ€“ built for modern Python I/O
  • ๐Ÿ”Œ FastAPI-style dependency injection โ€“ clean, decoupled handlers
  • ๐Ÿ“ฌ Pluggable transport layer โ€“ with Kafka, RabbitMQ and Redis PubSub out-of-the-box
  • ๐Ÿงพ Pydantic v2 validation โ€“ event schemas are validated using pydantic
  • ๐Ÿ” Built-in retry logic โ€“ configurable, resilient, no boilerplate
  • โœ… Automatic acknowledgement โ€“ events are acknowledged automatically
  • โš ๏ธ Error Handling โ€“ handle failures and prevent message loss with DLQ
  • โš–๏ธ Composable Middleware โ€“ set up logging, metrics, filtering, observability

โœจ Example: Emitting Events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import uuid
from datetime import datetime

from pydantic import BaseModel

from dispytch import EventEmitter, EventBase
from dispytch.kafka import KafkaEventRoute


class User(BaseModel):
    id: str
    email: str
    name: str


class UserEvent(EventBase):
    __route__ = KafkaEventRoute(
        topic="user_events"
    )


class UserRegistered(UserEvent):
    type: str = "user_registered"

    user: User
    timestamp: int


async def example_emit(emitter: EventEmitter):
    await emitter.emit(
        UserRegistered(
            user=User(
                id=str(uuid.uuid4()),
                email="example@mail.com",
                name="John Doe",
            ),
            timestamp=int(datetime.now().timestamp()),
        )
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import uuid
from datetime import datetime

from pydantic import BaseModel

from dispytch import EventEmitter, EventBase
from dispytch.rabbitmq import RabbitMQEventRoute


class User(BaseModel):
    id: str
    email: str
    name: str


class UserRegistered(EventBase):
    __route__ = RabbitMQEventRoute(
        exchange="user.events",
        routing_key="user.registered"
    )

    user: User
    timestamp: int


async def example_emit(emitter: EventEmitter):
    await emitter.emit(
        UserRegistered(
            user=User(
                id=str(uuid.uuid4()),
                email="example@mail.com",
                name="John Doe",
            ),
            timestamp=int(datetime.now().timestamp()),
        )
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import uuid
from datetime import datetime

from pydantic import BaseModel

from dispytch import EventEmitter, EventBase
from dispytch.redis import RedisEventRoute


class UserNotification(EventBase):
    __route__ = RedisEventRoute(
        channel="user.{user_id}.notification",
    )

    user_id: int
    message: str
    timestamp: int


async def example_emit(emitter: EventEmitter, user_id: int):
    await emitter.emit(
        UserNotification(
            user_id=user_id,
            message="Hello from Dispytch example",
            timestamp=int(datetime.now().timestamp()),
        )
    )

โœจ Example: Handling Events

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from typing import Annotated

from pydantic import BaseModel
from dispytch import Event, Dependency, Router

from dispytch.kafka import KafkaEventSubscription
from dispytch.middleware import Filter


# Service Dependency

class UserService:
    def __init__(self):
        self.users = []

    async def do_smth_with_the_user(self, user):
        print("Doing something with user", user)
        self.users.append(user)


def get_user_service():
    return UserService()


# Event Schemas 

class User(BaseModel):
    id: str
    email: str
    name: str


class UserCreatedEvent(BaseModel):
    type: str
    user: User
    timestamp: int


# Event handler

user_events = Router()


@user_events.handler(
    KafkaEventSubscription(topic="user_events"),
    middlewares=[Filter(lambda ctx: ctx.event["type"] == "user_registered")]
)
async def handle_user_registered(
        event: Event[UserCreatedEvent],
        user_service: Annotated[UserService, Dependency(get_user_service)]
):
    user = event.user
    timestamp = event.timestamp

    print(f"[User Registered] {user.id} - {user.email} at {timestamp}")

    await user_service.do_smth_with_the_user(user)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from typing import Annotated

from pydantic import BaseModel
from dispytch import Event, Dependency, Router

from dispytch.rabbitmq import RabbitMQEventSubscription


# Service Dependency

class UserService:
    def __init__(self):
        self.users = []

    async def do_smth_with_the_user(self, user):
        print("Doing something with user", user)
        self.users.append(user)


def get_user_service():
    return UserService()


# Event Schemas 

class User(BaseModel):
    id: str
    email: str
    name: str


class UserCreatedEvent(BaseModel):
    type: str
    user: User
    timestamp: int


# Event handler

user_events = Router()


@user_events.handler(RabbitMQEventSubscription(routing_key="user.registered"))
async def handle_user_registered(
        event: Event[UserCreatedEvent],
        user_service: Annotated[UserService, Dependency(get_user_service)]
):
    user = event.user
    timestamp = event.timestamp

    print(f"[User Registered] {user.id} - {user.email} at {timestamp}")

    await user_service.do_smth_with_the_user(user)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from typing import Annotated

from pydantic import BaseModel
from dispytch import Event, Dependency, Router, SubscriptionParam

from dispytch.redis import RedisEventSubscription


# Event Schemas 

class UserNotification(BaseModel):
    message: str
    timestamp: int


# Event handler

user_events = Router()


@user_events.handler(RedisEventSubscription(channel="user.{user_id}.notification"))
async def handle_user_notification(
        event: Event[UserNotification],
        user_id: Annotated[int, SubscriptionParam()]
):
    print(f"[User Notification] From: {user_id} - {event.message} at {event.timestamp}")