Overview
Nestipy supports both monolithic and microservice architectures. The microservice layer mirrors NestJS patterns while keeping the same DI, guards, interceptors, and filters you already use for HTTP.
Getting Started
Create a microservice using NestipyFactory.create_microservice():
import random
from granian.constants import Interfaces
from app_module import AppModule
from nestipy.core import NestipyFactory
from nestipy.microservice import MicroserviceOption, Transport
app = NestipyFactory.create_microservice(
AppModule,
[MicroserviceOption(transport=Transport.TCP)],
)
if __name__ == "__main__":
app.listen(
"main:app",
address="0.0.0.0",
port=random.randint(5000, 7000),
interface=Interfaces.ASGI,
reload=True,
)
Lifecycle Overview
This diagram shows how a microservice instance starts, listens, and shuts down.
flowchart TB
A["NestipyFactory.create_microservice"] --> B["Create ClientProxy"]
B --> C["MicroServiceServer.listen"]
C --> D["Subscribe to channel"]
D --> E["Listen for incoming messages"]
E --> F{"Event or Request?"}
F -->|Event| G["Handle event callback"]
F -->|Request| H["Handle request callback"]
H --> I["Serialize response and publish"]
G --> J["Continue listening"]
I --> J
J --> K["Shutdown"]
K --> L["Before close and disconnect"]
Message and Event Patterns
Nestipy supports two patterns:
- Request-response with
@MessagePattern - Fire-and-forget with
@EventPattern
Request-response:
from typing import Annotated
from nestipy.common import Controller
from nestipy.ioc import Inject
from nestipy.microservice import MessagePattern, Payload
@Controller()
class AppController:
service: Annotated[AppService, Inject()]
@MessagePattern("test")
async def get(self, data: Annotated[str, Payload()]) -> str:
return await self.service.get()
Event pattern:
from typing import Annotated
from nestipy.common import Controller
from nestipy.ioc import Inject
from nestipy.microservice import EventPattern, Payload
@Controller()
class AppController:
service: Annotated[AppService, Inject()]
@EventPattern("test")
async def get(self, data: Annotated[str, Payload()]) -> None:
await self.service.get()
Client Registration
Register clients with ClientsModule:
from nestipy.common import Module
from nestipy.microservice import ClientsModule, ClientsConfig
from nestipy.microservice import MicroserviceOption, Transport
@Module(
imports=[
ClientsModule.register(
[
ClientsConfig(
name="MATH_SERVICE",
option=MicroserviceOption(transport=Transport.TCP),
)
]
)
],
)
class AppModule:
pass
Or register a provider manually:
from typing import Annotated
from nestipy.common import Module, ModuleProviderDict, ConfigModule, ConfigService
from nestipy.ioc import Inject
from nestipy.microservice import ClientProxy, ClientModuleFactory
from nestipy.microservice import MicroserviceOption, Transport, TCPClientOption
async def factory(config: Annotated[ConfigService, Inject()]) -> ClientProxy:
return ClientModuleFactory.create(
MicroserviceOption(
transport=Transport.TCP,
option=TCPClientOption(
host=config.token.get("HOST"),
port=config.get("PORT"),
),
)
)
@Module(
providers=[
ModuleProviderDict(
token="MATH_SERVICE",
factory=factory,
inject=[ConfigModule.for_root()],
)
]
)
class AppModule:
pass
Inject the client:
from typing import Annotated
from nestipy.common import Controller
from nestipy.microservice import Client, ClientProxy
@Controller()
class AppController:
client: Annotated[ClientProxy, Client("MATH_SERVICE")]
Sending and Emitting
Use send for request-response and emit for events:
response = await client.send("sum", {"a": 1, "b": 2})
await client.emit("audit", {"action": "sum"})
Context and Payload
Payload()injects the message payload.Ctx()orContextinjects the RPC context.
from typing import Annotated
from nestipy.microservice import Payload, Ctx, Context
@MessagePattern("ping")
async def handle(
data: Annotated[dict, Payload()],
ctx: Annotated[Context, Ctx()],
):
return {"ok": True}
Options and Defaults
MicroserviceOption supports these key fields:
transportselects the transport.optionis the transport-specific configuration object.serializercontrols request serialization. Default is JSON.channel_keyis the message channel prefix. Default isnestipy.timeoutis the client request timeout in seconds. Default is30.
Guards, Interceptors, Filters
Guards, interceptors, and filters work the same way as HTTP. The main difference is the RPC context and RpcExceptionFilter.
from nestipy.common import Catch
from nestipy.microservice import RpcException, RpcExceptionFilter
@Catch(RpcException)
class MyRpcExceptionFilter(RpcExceptionFilter):
async def catch(self, exception: "RpcException", host: "ArgumentHost"):
return exception.message
Hybrid Application
To run HTTP and microservices together, connect microservices to an HTTP app:
from granian.constants import Interfaces
from app_module import AppModule
from nestipy.core import NestipyFactory
from nestipy.microservice import MicroserviceOption, Transport
app = NestipyFactory.connect_microservice(
AppModule,
[MicroserviceOption(transport=Transport.TCP)],
)
app.start_all_microservices()
if __name__ == "__main__":
app.listen(
"main:app",
address="0.0.0.0",
port=8000,
interface=Interfaces.ASGI,
reload=True,
)
Call app.start_all_microservices() to start microservice servers after HTTP boot.
Support us
Nestipy is a project released under the MIT license, meaning it's open source and freely available for use and modification. Its development thrives with the generous contributions of these fantastic individuals.