Module 5 · Lesson 3 · ~25 min read
Streaming RPCs are where gRPC pulls ahead of REST. Canton's transaction stream — the one your indexers and integrations will consume — is server streaming. Get the patterns right and a robust integration is straightforward.
The proto:
rpc StreamUpdates(StreamRequest) returns (stream Update);
Generated client interface:
type SubmitterClient interface {
StreamUpdates(ctx context.Context, in *StreamRequest, opts ...grpc.CallOption) (Submitter_StreamUpdatesClient, error)
}
type Submitter_StreamUpdatesClient interface {
Recv() (*Update, error)
grpc.ClientStream
}
The client call returns a stream handle. You loop over Recv() until it returns io.EOF (clean end) or another error.
stream, err := client.StreamUpdates(ctx, &StreamRequest{Party: "Alice", Offset: 0})
if err != nil { return err }
for {
upd, err := stream.Recv()
if err == io.EOF { break } // server closed cleanly
if err != nil { return err } // network error, deadline, etc.
handle(upd)
}
Server-side implementation:
func (s *submitterServer) StreamUpdates(req *StreamRequest, stream Submitter_StreamUpdatesServer) error {
for upd := range s.subscribe(req.Party, req.Offset) {
if err := stream.Send(upd); err != nil {
return err
}
select {
case <-stream.Context().Done():
return stream.Context().Err()
default:
}
}
return nil // returning ends the stream cleanly (client sees io.EOF)
}
Notice stream.Context().Done(): the server stream carries its own context, and that context is canceled when the client gives up. Check it periodically so you can stop work — otherwise the goroutine producing updates will keep going after the client is gone.
rpc UploadDar(stream DarChunk) returns (UploadResponse);
stream, _ := client.UploadDar(ctx)
for _, chunk := range chunks {
if err := stream.Send(chunk); err != nil { return err }
}
resp, err := stream.CloseAndRecv() // signals "done sending" + reads response
Less common in Canton land but useful when uploading large payloads in chunks.
rpc CompletionStream(stream CompletionRequest) returns (stream Completion);
You need two goroutines on the client: one writing requests, one reading responses. They share the stream handle.
stream, _ := client.CompletionStream(ctx)
// Sender
go func() {
for req := range outbound {
if err := stream.Send(req); err != nil { return }
}
stream.CloseSend() // signals "no more sends from this side"
}()
// Receiver
for {
resp, err := stream.Recv()
if err == io.EOF { break }
if err != nil { return err }
handle(resp)
}
You shouldn't read and write to the same stream from one goroutine — Recv blocks. Two goroutines, one for each direction, is the rule.
| State | What it means |
|---|---|
| Stream opened | Client called the RPC. Connection established. |
| Sending / receiving | Normal operation. |
| Half-closed (client done sending) | CloseSend() called. Server can still send. |
| Server done | Server returned from handler. Client's Recv() returns io.EOF. |
| Either side errored | Stream torn down. Both sides see the error on next op. |
| Context canceled | Stream torn down. Both sides see context.Canceled or DeadlineExceeded. |
HTTP/2 has flow control. If the receiver doesn't read, the sender's Send eventually blocks. So a slow consumer naturally throttles the producer — no separate backpressure mechanism needed.
What this means in practice:
A streaming connection can drop. For a transaction-stream consumer:
StreamRequest{Offset: lastOffset + 1}.Canton's Ledger API is designed for this — every transaction has a stable offset. You build the indexer to be idempotent, store the offset alongside the data, and crashes/network blips become resumes.
Forget to call CloseSend on a client-streaming or bidirectional stream. Without it, the server doesn't know you're done, and the response never comes.
Treat err != nil as fatal indiscriminately. io.EOF from Recv is the clean end-of-stream signal — handle it as success.
Use a context without timeout for an "indefinite" stream. Even long-running streams need a way to cancel — use context.WithCancel and call cancel on shutdown signals (SIGTERM).
Recv() until io.EOF.Send(), then CloseAndRecv() for the single response.CloseSend signals you're done writing.stream.Context().Done() periodically so you can shut down when the client cancels.