Skip to content

Subscription

Subscriptions stream events over WebSocket. Use @Subscription() to return an async iterator.

Basic Subscription

Python
import asyncio
from typing import AsyncIterator, Annotated

from nestipy.graphql import Resolver
from nestipy.graphql.decorator import Subscription
from nestipy.ioc import Arg


@Resolver()
class EventsResolver:
    @Subscription()
    async def ticks(self, count: Annotated[int, Arg()] = 5) -> AsyncIterator[int]:
        for i in range(count):
            yield i
            await asyncio.sleep(0.5)

PubSub Helper

Nestipy includes a simple PubSub helper for subscriptions:

Python
from typing import AsyncIterator
from nestipy.graphql import Resolver, PubSub
from nestipy.graphql.decorator import Subscription


pubsub = PubSub()


@Resolver()
class EventsResolver:
    @Subscription()
    async def events(self) -> AsyncIterator[str]:
        async for value in pubsub.async_iterator("events"):
            yield value


# Somewhere else in your code
pubsub.publish("events", "hello")

Tips

  • Use PubSub for simple in-process eventing.
  • For production, consider a distributed pub/sub system if you scale horizontally.

Support us

Nestipy is a project released under the MIT license, meaning it's open source and freely available for use and modification. Its development thrives with the generous contributions of these fantastic individuals.