Skip to content

API reference

This page documents every public class, function, and exception in BlazeRPC.


blazerpc.BlazeApp

The main entry point. Creates an application, registers models, and starts the server.

from blazerpc import BlazeApp

app = BlazeApp(
    name="my-service",
    enable_batching=True,
    max_batch_size=32,
    batch_timeout_ms=10.0,
)

Constructor

Parameter Type Default Description
name str "blazerpc" Application name (used in logging and diagnostics).
enable_batching bool True Enable adaptive request batching.
max_batch_size int 32 Maximum number of requests in a single batch.
batch_timeout_ms float 10.0 Maximum time (ms) to wait before dispatching a partial batch.
middleware list[Middleware] \| None None List of middleware instances to attach on server startup.

app.model(name, version="1", streaming=False)

Decorator that registers a function as a model endpoint.

Parameter Type Default Description
name str required Model name. Becomes part of the RPC method name (PredictName).
version str "1" Model version string.
streaming bool False If True, the function must be an async generator that yields responses.
@app.model("sentiment", version="2")
def predict_sentiment(text: list[str]) -> list[float]:
    return model.predict(text)

await app.serve(host="0.0.0.0", port=50051)

Start the gRPC server and block until a shutdown signal is received. Registers the inference servicer, health service, and reflection handlers automatically.

Parameter Type Default Description
host str "0.0.0.0" Bind address.
port int 50051 Listen port.

app.state

An AppState instance for attaching shared resources (loaded models, database pools, config). Accessible from dependency functions via ctx.app_state.

app.state.classifier = load_my_model()
app.state.db_pool = create_pool()

See the dependency injection guide for usage patterns and examples.

app.registry

The underlying ModelRegistry instance. Useful for introspection:

for model in app.registry.list_models():
    print(f"{model.name} v{model.version}")

blazerpc.Context

Per-request context object injected into handler parameters. Add a Context-typed parameter to receive it automatically. Context parameters are not included in the Protobuf request message — clients never send them.

Convention: Place Context before request fields. See Parameter ordering for the full convention.

from blazerpc import Context

@app.model("info")
def info(ctx: Context, text: str) -> str:
    return f"method={ctx.method}, peer={ctx.peer}"
Attribute Type Description
metadata MultiDict \| None gRPC invocation metadata (headers) sent by the client.
peer Any Connection peer info (address, certificate).
method str Full gRPC method path, e.g. "/blazerpc.InferenceService/PredictIris".
app_state AppState Reference to app.state.

See the dependency injection guide for detailed usage and examples.


blazerpc.Depends

Mark a handler parameter as an injected dependency. Use Depends(fn) as the parameter's default value. The dependency function receives the per-request Context and returns the value to inject. Both sync and async functions are supported. Depends parameters are not included in the Protobuf message — clients never send them.

Convention: Place Depends parameters after request fields. See Parameter ordering for the full convention.

from blazerpc import Context, Depends

def get_classifier(ctx: Context):
    return ctx.app_state.classifier

@app.model("predict")
def predict(text: str, clf = Depends(get_classifier)) -> str:
    return clf.predict([text])
Constructor parameter Type Description
fn Callable Dependency function that receives Context.

Models using Depends are excluded from adaptive batching. See Limitations for details.

See the dependency injection guide for detailed usage, async dependencies, and real-world patterns.


blazerpc.BlazeClient

Async gRPC client for calling BlazeRPC model endpoints.

from blazerpc import BlazeClient

async with BlazeClient("127.0.0.1", 50051, registry=app.registry) as client:
    result = await client.predict("echo", text="hello")

    async for chunk in client.stream("tokens", prompt="hi"):
        print(chunk)

Constructor

Parameter Type Default Description
host str "127.0.0.1" Server address.
port int 50051 Server port.
registry ModelRegistry \| None None Model registry used to build Protobuf message classes. Required for predict and stream. Pass app.registry.

await client.predict(model_name, **kwargs)

Make a unary prediction call. Returns the model's result value, deserialized from the Protobuf response.

Parameter Type Description
model_name str The registered model name.
**kwargs Any Input fields matching the model's parameters.

async for chunk in client.stream(model_name, **kwargs)

Make a server-streaming call. Yields each chunk's result value, deserialized from Protobuf.

Parameter Type Description
model_name str The registered model name.
**kwargs Any Input fields matching the model's parameters.

client.close()

Close the underlying gRPC channel. Also called automatically when using the async context manager.


blazerpc.TensorInput

Type annotation for tensor-typed model inputs. Used by the codegen layer to emit TensorProto fields.

from blazerpc import TensorInput
import numpy as np

def classify(image: TensorInput[np.float32, "batch", 224, 224, 3]) -> ...:
    ...

The subscript arguments are dtype followed by shape dimensions. Shape dimensions can be integers or strings (symbolic names like "batch").

blazerpc.TensorOutput

Type annotation for tensor-typed model outputs. Same subscript syntax as TensorInput.

from blazerpc import TensorOutput
import numpy as np

def classify(...) -> TensorOutput[np.float32, "batch", 1000]:
    ...

Exceptions

All exceptions inherit from BlazeRPCError.

BlazeRPCError

Base exception for all BlazeRPC errors.

ValidationError

Raised when input validation fails (bad shapes, types, missing annotations).

Attribute Type Description
field str \| None The field that failed validation.

ModelNotFoundError

Raised when a requested model is not found in the registry.

Attribute Type Description
name str Model name.
version str Model version.

SerializationError

Raised when tensor serialization or deserialization fails.

Attribute Type Description
dtype str \| None The dtype that caused the error.

InferenceError

Raised when model inference fails.

Attribute Type Description
model_name str \| None The model that failed.

ConfigurationError

Raised for invalid configuration (bad import paths, missing settings).


Serialization

blazerpc.runtime.serialization.TensorProto

Wire representation of a tensor. A dataclass with __slots__.

Field Type Description
shape tuple[int, ...] Tensor shape.
dtype str Proto type string (e.g. "float", "int64").
data bytes Raw tensor bytes.

serialize_tensor(arr: np.ndarray) -> TensorProto

Serialize a NumPy array to a TensorProto. Raises SerializationError if the dtype is unsupported.

deserialize_tensor(proto: TensorProto) -> np.ndarray

Deserialize a TensorProto back to a NumPy array. Uses np.frombuffer() for zero-copy reconstruction.


Server

blazerpc.server.grpc.GRPCServer

Production-ready async gRPC server. Wraps grpclib.server.Server with signal handling and graceful shutdown.

server = GRPCServer(handlers, middleware=[...], grace_period=5.0)
await server.start(host="0.0.0.0", port=50051)
Constructor parameter Type Default Description
handlers Sequence[Any] required List of grpclib-compatible handlers.
middleware Sequence[Middleware] \| None None Middleware instances attached to the server before it starts listening.
grace_period float 5.0 Seconds to wait for in-flight requests during shutdown.

build_health_service(servicers=None) -> Health

Create a gRPC health service. Pass servicer instances for per-service health tracking, or None for unconditional SERVING status.

build_reflection_service(handlers=None) -> list

Create gRPC reflection handlers. Pass gRPC service handler objects (e.g. the servicer from build_servicer()) so clients can discover available RPCs.


Middleware

blazerpc.server.middleware.Middleware

Abstract base class for server middleware. Subclass it and implement on_request() and on_response().

class MyMiddleware(Middleware):
    async def on_request(self, event: RecvRequest) -> None:
        ...

    async def on_response(self, event: SendTrailingMetadata) -> None:
        ...

Call middleware.attach(server) to register it on a grpclib.server.Server instance.

LoggingMiddleware

Logs every RPC call with method name, peer address, and response status.

MetricsMiddleware

Exports Prometheus metrics:

  • blazerpc_requests_total{method, status} -- Counter of total requests.
  • blazerpc_request_duration_seconds{method} -- Histogram of request durations.

OTelMetricsMiddleware

Pushes RPC metrics via the OpenTelemetry Metrics API. Install with pip install blazerpc[otel].

Constructor parameter Type Default Description
meter Meter \| None None Custom OTel Meter. Uses global meter provider if None.

Instruments:

  • blazerpc.rpc.count -- Counter with attributes method, status.
  • blazerpc.rpc.duration -- Histogram (seconds) with attribute method.

ExceptionMiddleware

Base class for custom exception-to-gRPC-status mapping. A no-op by default.


Runtime

blazerpc.runtime.batcher.Batcher

Adaptive request batcher. Collects individual requests into batches. Automatically managed by BlazeApp.serve() — batchers are created for each non-streaming, non-DI model and stopped on shutdown. Batchers are not created for streaming models or models that use Context/Depends (see Automatic exclusions).

Constructor parameter Type Default Description
max_size int 32 Maximum items per batch.
timeout_ms float 10.0 Max wait time (ms) before dispatching.

Key methods:

  • await submit(request) -- Submit a request and wait for the batched result.
  • await start(inference_fn) -- Start the background batching loop.
  • await stop() -- Stop the batching loop.

blazerpc.runtime.executor.ModelExecutor

Wraps a registered model function with sync/async bridging.

  • await execute(kwargs) -- Run inference with keyword arguments.
  • await execute_batch(kwargs_list) -- Run inference on a batch of inputs.

Synchronous model functions are offloaded to a thread pool via asyncio.to_thread().


Code generation

blazerpc.codegen.proto.ProtoGenerator

Generates .proto file content from a ModelRegistry.

from blazerpc.codegen.proto import ProtoGenerator

proto_content = ProtoGenerator().generate(app.registry)

blazerpc.codegen.servicer.build_servicer(registry, batchers=None, app_state=None)

Builds a grpclib-compatible InferenceServicer from a ModelRegistry.

from blazerpc.codegen.servicer import build_servicer

servicer = build_servicer(app.registry, batchers={"my_model": batcher}, app_state=app.state)

Contrib

blazerpc.contrib.pytorch

Function / Decorator Description
torch_to_numpy(tensor) -> np.ndarray Detach, move to CPU, and convert to NumPy.
numpy_to_torch(arr, device, dtype) -> Tensor Convert NumPy array to a PyTorch tensor.
@torch_model(device="cpu") Decorator that auto-converts inputs and outputs.

blazerpc.contrib.tensorflow

Function / Decorator Description
tf_to_numpy(tensor) -> np.ndarray Convert a TensorFlow tensor to NumPy.
numpy_to_tf(arr, dtype) -> tf.Tensor Convert NumPy array to a TensorFlow tensor.
@tf_model(dtype=None) Decorator that auto-converts inputs and outputs.

blazerpc.contrib.onnx.ONNXModel

Wrapper around an ONNX Runtime inference session.

from blazerpc.contrib.onnx import ONNXModel

model = ONNXModel("model.onnx", providers=["CUDAExecutionProvider"])
results = model.predict(input_array)
Constructor parameter Type Default Description
model_path str \| Path required Path to the .onnx file.
providers list[str] \| None ["CPUExecutionProvider"] ONNX Runtime execution providers.
session_options Any None Optional ort.SessionOptions.

Methods:

  • predict(*inputs) -> list[np.ndarray] -- Run inference with positional inputs matched to input names.
  • predict_dict(inputs) -> dict[str, np.ndarray] -- Run inference with named inputs, returning named outputs.
  • input_names -> list[str] -- Names of the model's input tensors.
  • output_names -> list[str] -- Names of the model's output tensors.