Skip to content

Asynchronous event dispatching/handling library for FastAPI and Starlette

License

Notifications You must be signed in to change notification settings

melvinkcx/fastapi-events

Repository files navigation

fastapi-events

An event dispatching/handling library for FastAPI, and Starlette.

PyPI - Downloads

Features:

  • Straightforward API for emitting events anywhere in your code.
  • Events are handled after responses are returned, ensuring no impact on response time.
  • Supports event piping to remote queues.
  • Powerful built-in handlers for local and remote event handling
  • Coroutine functions (async def) are treated as first-class citizens
  • Write your own handlers; don't be limited to just whatfastapi_eventsprovides
  • (>=0.3.0) Supports event payload validation via Pydantic (Seehere)
  • (>=0.4.0) Supports event chaining: dispatching events within handlers (thanks to@ndopj for contributing to the idea)
  • (>=0.7.0) Supports OpenTelemetry. Seethis sectionfor details
  • (>=0.9.0) Adds support forFastAPI dependenciesin local handlers. Seethis sectionfor details
  • (>=0.9.1) Now supports Pydantic v2
  • (>=0.10.0) Enables dispatching Pydantic models as events (thanks to@WilliamStamfor contributing to this idea)

If you use or appreciate this project, please consider giving it a star to help it reach more developers. Thanks =)

Installation

pip install fastapi-events

To use it with AWS handlers, install:

pip install fastapi-events[aws]

To use it with GCP handlers. install:

pip install fastapi-events[google]

To enable OpenTelemetry (OTEL) support, install:

pip install fastapi-events[otel]

Usage

fastapi-eventssupports both FastAPI and Starlette. To use it, simply configure it as middleware.

  • Configuringfastapi-eventsfor FastAPI:

    fromfastapiimportFastAPI
    fromfastapi.requestsimportRequest
    fromfastapi.responsesimportJSONResponse
    
    fromfastapi_events.dispatcherimportdispatch
    fromfastapi_events.middlewareimportEventHandlerASGIMiddleware
    fromfastapi_events.handlers.localimportlocal_handler
    
    
    app=FastAPI()
    app.add_middleware(EventHandlerASGIMiddleware,
    handlers=[local_handler])# registering handler(s)
    
    
    @app.get("/")
    defindex(request:Request)->JSONResponse:
    dispatch("my-fancy-event",payload={"id":1})# Emit events anywhere in your code
    returnJSONResponse()
  • Configuringfastapi-eventsfor Starlette:

    fromstarlette.applicationsimportStarlette
    fromstarlette.middlewareimportMiddleware
    fromstarlette.requestsimportRequest
    fromstarlette.responsesimportJSONResponse
    
    fromfastapi_events.dispatcherimportdispatch
    fromfastapi_events.handlers.localimportlocal_handler
    fromfastapi_events.middlewareimportEventHandlerASGIMiddleware
    
    app=Starlette(middleware=[
    Middleware(EventHandlerASGIMiddleware,
    handlers=[local_handler])# registering handlers
    ])
    
    @app.route("/")
    asyncdefroot(request:Request)->JSONResponse:
    dispatch("new event",payload={"id":1})# Emit events anywhere in your code
    returnJSONResponse()
  • Configuringfastapi-eventsfor Starlite:

    fromstarlite.appimportStarlite
    fromstarlite.enumsimportMediaType
    fromstarlite.handlersimportget
    fromstarlite.middlewareimportDefineMiddleware
    
    fromfastapi_events.dispatcherimportdispatch
    fromfastapi_events.handlers.localimportlocal_handler
    fromfastapi_events.middlewareimportEventHandlerASGIMiddleware
    
    @get(path="/",media_type=MediaType.TEXT)
    asyncdefroot()->str:
    dispatch("new event",payload={"id":1})# Emit events anywhere in your code
    return"OK"
    
    app=Starlite(middleware=[
    DefineMiddleware(EventHandlerASGIMiddleware,
    handlers=[local_handler])# registering handlers
    ],
    route_handlers=[root],
    )

Dispatching events

Events can be dispatched anywhere in the code, provided that they are dispatched before a response is generated.

Option 1 - using dict

# anywhere in code

fromfastapi_events.dispatcherimportdispatch

dispatch(
"cat-requested-a-fish",# Event name, accepts any valid string
payload={"cat_id":"fd375d23-b0c9-4271-a9e0-e028c4cd7230"}# Event payload, accepts any arbitrary data
)

dispatch("a_cat_is_spotted")# This works too!

Option 2 - using Pydantic model

New feature since version 0.10.0

It is now possible to dispatch pydantic model as events. A special thanks to @WilliamStamfor introducing this remarkable idea.

# anywhere in code
importpydantic
fromfastapi_events.dispatcherimportdispatch


classCatRequestedAFishEvent(pydantic.BaseModel):
__event_name__="cat-requested-a-fish"

cat_id:pydantic.UUID4


# Option 2 - dispatching event with pydantic model
dispatch(CatRequestedAFishEvent(cat_id="fd375d23-b0c9-4271-a9e0-e028c4cd7230"))

# which is equivalent to:
dispatch("cat-requested-a-fish",payload={"cat_id":"fd375d23-b0c9-4271-a9e0-e028c4cd7230"})

Event Payload Validation With Pydantic

Since version 0.3.0, event payload validation is possible. To enable this feature, register a Pydantic model with the corresponding event name.

>=0.10.0:Event name can now be defined as a part of the payload schema as__event_name__

importuuid
fromenumimportEnum
fromdatetimeimportdatetime

frompydanticimportBaseModel
fromfastapi_events.registry.payload_schemaimportregistryaspayload_schema


classUserEvents(Enum):
SIGNED_UP="USER_SIGNED_UP"
ACTIVATED="USER_ACTIVATED"


# Registering your event payload schema
@payload_schema.register(event_name=UserEvents.SIGNED_UP)
classSignUpPayload(BaseModel):
user_id:uuid.UUID
created_at:datetime

# which is also equivalent to
@payload_schema.register
classSignUpPayload(BaseModel):
__event_name__="USER_SIGNED_UP"

user_id:uuid.UUID
created_at:datetime

Wildcard in event name is currently not supported

The payload will be validated automatically without any changes required when invoking the dispatcher.

# Events with payload schema registered
dispatch(UserEvents.SIGNED_UP)# raises ValidationError, missing payload
dispatch(UserEvents.SIGNED_UP,
{"user_id":"9e79cdbb-b216-40f7-9a05-20d223dee89a"})# raises ValidationError, missing `created_at`
dispatch(UserEvents.SIGNED_UP,
{"user_id":"9e79cdbb-b216-40f7-9a05-20d223dee89a","created_at":datetime.utcnow()})# OK!

# Events without payload schema -> No validation will be performed
dispatch(UserEvents.ACTIVATED,
{"user_id":"9e79cdbb-b216-40f7-9a05-20d223dee89a"})# OK! no validation will be performed

# Events dispatched with Pydantic model (>=0.10.0) -> Validation will be skipped since it would have been already validated
# If you choose to do this, you must ensure __event_name__ is defined in SignUpPayload
dispatch(SignUpPayload(user_id="9e79cdbb-b216-40f7-9a05-20d223dee89a",created_at=datetime.utcnow()))

Payload validation is optional. Payload of events without its schema registered will not be validated.

Handling Events

Handle events locally

The flexibility offastapi-eventsenales customisation of how events should be handled. To begin, you may want to handle your events locally.

# ex: in handlers.py

fromfastapi_events.handlers.localimportlocal_handler
fromfastapi_events.typingimportEvent


@local_handler.register(event_name="cat*")
defhandle_all_cat_events(event:Event):
"""
this handler will match with an events prefixed with `cat`.
ex: "cat_eats_a_fish", "cat_is_cute", etc
"""
# the `event` argument is nothing more than a tuple of event name and payload
event_name,payload=event

# TODO do anything you'd like with the event


@local_handler.register(event_name="cat*")# Tip: You can register several handlers with the same event name
defhandle_all_cat_events_another_way(event:Event):
pass


@local_handler.register(event_name="*")
asyncdefhandle_all_events(event:Event):
# event handlers can be coroutine function too (`async def`)
pass

Using Dependencies in Local Handler

new feature in fastapi-events>=0.9.0

Dependencies can now be utilized with local handlers, and sub-dependencies are also supported.

As of now, dependencies utilizing a generator (with theyieldkeyword) are not yet supported.

# ex: in handlers.py
fromfastapiimportDepends

fromfastapi_events.handlers.localimportlocal_handler
fromfastapi_events.typingimportEvent


asyncdefget_db_conn():
pass# return a DB conn


asyncdefget_db_session(
db_conn=Depends(get_db_conn)
):
pass# return a DB session created from `db_conn`


@local_handler.register(event_name="*")
asyncdefhandle_all_events(
event:Event,
db_session=Depends(get_db_session)
):
# use the `db_session` here
pass

Piping Events To Remote Queues

In larger projects, it's common to have dedicated services for handling events separately. For example,fastapi-eventsincludes an AWS SQS forwarder, allowing you to forward events to a remote queue.

  1. RegisterSQSForwardHandleras handlers:

    app=FastAPI()
    app.add_middleware(EventHandlerASGIMiddleware,
    handlers=[SQSForwardHandler(queue_url="test-queue",
    region_name="eu-central-1")])# registering handler(s)
  2. Start dispatching events! By default, events will be serialised into JSON format:

    ["event name",{"payload":"here is the payload"}]

Tip: to pipe events to multiple queues, provide multiple handlers while addingEventHandlerASGIMiddleware.

Built-in handlers

Here is a list of built-in event handlers:

  • LocalHandler/local_handler:

    • import fromfastapi_events.handlers.local
    • for handling events locally. See examplesabove
    • event name pattern matching is done using Unix shell-style matching (fnmatch)
  • SQSForwardHandler:

    • import fromfastapi_events.handlers.aws
    • to forward events to an AWS SQS queue
  • EchoHandler:

    • import fromfastapi_events.handlers.echo
    • to forward events to stdout withpprint.Great for debugging purpose
  • GoogleCloudSimplePubSubHandler:

    • import fromfastapi_events.handlers.gcp
    • to publish events to a single pubsub topic

Creating Custom Handlers

Creating your own handler is as simple as inheriting from theBaseEventHandlerclass infastapi_events.handlers.base.

To handle events,fastapi_eventscalls one of these methods, following this priority order:

  1. handle_many(events): The coroutine function should expect the backlog of the events collected.

  2. handle(event): Ifhandle_many()is not defined in your custom handler,handle() will be called by iterating through the events in the backlog.

fromtypingimportIterable

fromfastapi_events.typingimportEvent
fromfastapi_events.handlers.baseimportBaseEventHandler


classMyOwnEventHandler(BaseEventHandler):
asyncdefhandle(self,event:Event)->None:
"""
Handle events one by one
"""
pass

asyncdefhandle_many(self,events:Iterable[Event])->None:
"""
Handle events by batch
"""
pass

OpenTelemetry (OTEL) support

Since version 0.7.0, OpenTelemetry support has been added as an optional feature.

To enable it, make sure you install the following optional modules:

pip install fastapi-events[otel]

Note that no instrumentation library is needed as fastapi_events supports OTEL natively

Spans will be created when:

  • fastapi_events.dispatcher.dispatchis invoked,
  • fastapi_events.handlers.local.LocalHandleris handling an event

Support for other handlers will be added in the future.

Cookbook

1) Suppressing Events / Disablingdispatch()Globally

If you wish to globally suppress events, especially during testing, you can achieve this without having to mock or patch the dispatch() function. Simply set the environment variable FASTAPI_EVENTS_DISABLE_DISPATCH to 1, True, or any truthy values.

2) Validating Event Payload During Dispatch

This feature requires Pydantic, which is included with FastAPI. If you're using Starlette, ensure that Pydantic is installed separately.

SeeEvent Payload Validation With Pydantic

3) Dispatching events within handlers (Event Chaining)

It is now possible to dispatch events within another event handlers. You'll need version 0.4 or above.

Comparison between events dispatched within the request-response cycle and event handlers are:

dispatched within request-response cycle dispatched within event handlers
processing of events will be handled after the response has been made will be scheduled to the running event loop immediately
order of processing always after the response is made not guaranteed
supports payload schema validation with Pydantic Yes Yes
can be disabled globally withFASTAPI_EVENTS_DISABLE_DISPATCH Yes Yes

4) Dispatching events outside of a request

One of the goals offastapi-eventsis to dispatch events without the need to manage specific instance ofEventHandlerASGIMiddleware. By default, this is handled usingContextVars. However, there are scenarios where users may want to dispatch events outside the standard request sequence. This can be achieved by generating a custom identifier for the middleware.

By default, the middleware identifier is generated from the object ID of theEventHandlerASGIMiddlewareinstance and is managed internally without user intervention. If a user needs to dispatch events outside of a request-response lifecycle, they can generate a custommiddleware_idvalue and passed it toEventHandlerASGIMiddlewareduring its creation. This value can then be used withdispatch()to ensure the correctEventHandlerASGIMiddlewareinstance is selected.

It's important to note that dispatching events during a request does not require the middleware_id. The dispatcher will automatically discover the appropriate event handler.

In the following example, the ID is generated using the object ID of theFastAPIinstance. The middleware identifier must be a uniqueint,but there are no other restrictions.

importasyncio

fromfastapiimportFastAPI
fromfastapi.requestsimportRequest
fromfastapi.responsesimportJSONResponse

fromfastapi_events.dispatcherimportdispatch
fromfastapi_events.middlewareimportEventHandlerASGIMiddleware
fromfastapi_events.handlers.localimportlocal_handler

app=FastAPI()
event_handler_id:int=id(app)
app.add_middleware(EventHandlerASGIMiddleware,
handlers=[local_handler],# registering handler(s)
middleware_id=event_handler_id)# register custom middleware id


asyncdefdispatch_task()->None:
"""background task to dispatch autonomous events" ""

foriinrange(100):
# without the middleware_id, this call would raise a LookupError
dispatch("date",payload={"idx":i},middleware_id=event_handler_id)
awaitasyncio.sleep(1)


@app.on_event("startup")
asyncdefstartup_event()->None:
asyncio.create_task(dispatch_task())


@app.get("/")
defindex(request:Request)->JSONResponse:
dispatch("hello",payload={"id":1})# Emit events anywhere in your code
returnJSONResponse({"detail":{"msg":"hello world"}})

FAQs:

  1. I'm gettingLookupErrorwhendispatch()is used:

    def dispatch(event_name: str, payload: Optional[Any] = None) ->None:
    >q: Deque[Event] =event_store.get()
    E LookupError:<ContextVar name='fastapi_context'at 0x400a1f12b0>

    Answer:

    The proper functioning ofdispatch()relies onContextVars. Various factors can lead to a LookupError, with a common cause being the invocation ofdispatch()outside the request-response lifecycle of FastAPI/Starlette, such as callingdispatch()after a response has been returned.

    If you encounter this issue, a workaround is available by using a user-defined middleware_id. Refer toDispatching Events Outside of a Requestfor details.

    If you're encountering this during testing, consider disablingdispatch()for testing purposes. Refer toSuppressing Events / Disablingdispatch()Globallyfor details.

  2. My event handlers are not registered / Local handlers are not being executed:

    Answer:

    To ensure that the module where your local event handlers are defined is loaded during runtime, make sure to import the module in yourinit.py. This straightforward fix guarantees the proper loading of modules during runtime.

Feedback, Questions?

Any form of feedback and questions are welcome! Please create an issuehere.