Module 5 · Lesson 3 · ~25 min read

Streaming — Server, Client, and Bidirectional

Streaming RPCs are where gRPC pulls ahead of REST. Canton's transaction stream — the one your indexers and integrations will consume — is server streaming. Get the patterns right and a robust integration is straightforward.

Server streaming — one request, many responses

The proto:

rpc StreamUpdates(StreamRequest) returns (stream Update);

Generated client interface:

type SubmitterClient interface {
    StreamUpdates(ctx context.Context, in *StreamRequest, opts ...grpc.CallOption) (Submitter_StreamUpdatesClient, error)
}

type Submitter_StreamUpdatesClient interface {
    Recv() (*Update, error)
    grpc.ClientStream
}

The client call returns a stream handle. You loop over Recv() until it returns io.EOF (clean end) or another error.

stream, err := client.StreamUpdates(ctx, &StreamRequest{Party: "Alice", Offset: 0})
if err != nil { return err }

for {
    upd, err := stream.Recv()
    if err == io.EOF { break }       // server closed cleanly
    if err != nil { return err }     // network error, deadline, etc.
    handle(upd)
}

Server-side implementation:

func (s *submitterServer) StreamUpdates(req *StreamRequest, stream Submitter_StreamUpdatesServer) error {
    for upd := range s.subscribe(req.Party, req.Offset) {
        if err := stream.Send(upd); err != nil {
            return err
        }
        select {
        case <-stream.Context().Done():
            return stream.Context().Err()
        default:
        }
    }
    return nil  // returning ends the stream cleanly (client sees io.EOF)
}

Notice stream.Context().Done(): the server stream carries its own context, and that context is canceled when the client gives up. Check it periodically so you can stop work — otherwise the goroutine producing updates will keep going after the client is gone.

Client streaming — many requests, one response

rpc UploadDar(stream DarChunk) returns (UploadResponse);
stream, _ := client.UploadDar(ctx)
for _, chunk := range chunks {
    if err := stream.Send(chunk); err != nil { return err }
}
resp, err := stream.CloseAndRecv()  // signals "done sending" + reads response

Less common in Canton land but useful when uploading large payloads in chunks.

Bidirectional streaming — both sides stream

rpc CompletionStream(stream CompletionRequest) returns (stream Completion);

You need two goroutines on the client: one writing requests, one reading responses. They share the stream handle.

stream, _ := client.CompletionStream(ctx)

// Sender
go func() {
    for req := range outbound {
        if err := stream.Send(req); err != nil { return }
    }
    stream.CloseSend()  // signals "no more sends from this side"
}()

// Receiver
for {
    resp, err := stream.Recv()
    if err == io.EOF { break }
    if err != nil { return err }
    handle(resp)
}

You shouldn't read and write to the same stream from one goroutine — Recv blocks. Two goroutines, one for each direction, is the rule.

The state machine for any streaming call

StateWhat it means
Stream openedClient called the RPC. Connection established.
Sending / receivingNormal operation.
Half-closed (client done sending)CloseSend() called. Server can still send.
Server doneServer returned from handler. Client's Recv() returns io.EOF.
Either side erroredStream torn down. Both sides see the error on next op.
Context canceledStream torn down. Both sides see context.Canceled or DeadlineExceeded.

Backpressure — gRPC handles it for you, mostly

HTTP/2 has flow control. If the receiver doesn't read, the sender's Send eventually blocks. So a slow consumer naturally throttles the producer — no separate backpressure mechanism needed.

What this means in practice:

Reconnect / resume

A streaming connection can drop. For a transaction-stream consumer:

  1. Track the last offset you've successfully processed and persisted.
  2. On disconnect, sleep with exponential backoff.
  3. Re-establish the stream with StreamRequest{Offset: lastOffset + 1}.
  4. Resume.

Canton's Ledger API is designed for this — every transaction has a stable offset. You build the indexer to be idempotent, store the offset alongside the data, and crashes/network blips become resumes.

Common mistakes

Don't

Forget to call CloseSend on a client-streaming or bidirectional stream. Without it, the server doesn't know you're done, and the response never comes.

Don't

Treat err != nil as fatal indiscriminately. io.EOF from Recv is the clean end-of-stream signal — handle it as success.

Don't

Use a context without timeout for an "indefinite" stream. Even long-running streams need a way to cancel — use context.WithCancel and call cancel on shutdown signals (SIGTERM).

Takeaways