Module 7 · Phase 3 · ~90 min · Real Go & live Canton

Phase 3 — Stream Transaction Updates

Subscribe to the participant's transaction stream. Watch updates flow as they happen. Run cantonctl stream in one terminal, cantonctl submit in another, see your submission appear. This is the foundational shape of every Canton indexer, integration, and bridge.

The relevant service

The Update Service (com.daml.ledger.api.v2.UpdateService) is the entry point. It exposes a server-streaming RPC, typically called GetUpdates or GetTransactions depending on Canton version. Inputs:

Returns a stream of Update messages, each containing one or more CreatedEvent / ArchivedEvent entries.

Step 1 — Add the UpdateService stub

import updatepb "example.com/cantonctl/proto/com/daml/ledger/api/v2"

type Client struct {
    conn     *grpc.ClientConn
    commands commandpb.CommandServiceClient
    updates  updatepb.UpdateServiceClient
    cfg      Config
}

// In Dial:
c.updates = updatepb.NewUpdateServiceClient(conn)

Step 2 — A Stream method that emits events on a channel

The shape: a single method that opens the gRPC stream, reads in a loop, and emits parsed events on a channel for the caller. Cancel via the passed context.

type Event struct {
    Offset       int64     // or string, depending on Canton version
    UpdateID     string
    Kind         string    // "created" or "archived"
    ContractID   string
    TemplateName string
    Witnesses    []string
    Fields       map[string]any  // for created events; protobuf Value rendered to Go any
}

func (c *Client) Stream(ctx context.Context, party string, beginOffset int64) (<-chan Event, error) {
    req := &updatepb.GetUpdatesRequest{
        BeginExclusive: /* version-dependent shape */,
        Filter: &updatepb.TransactionFilter{
            FiltersByParty: map[string]*updatepb.Filters{
                party: {},  // empty filter = all templates
            },
        },
        Verbose: true,
    }
    stream, err := c.updates.GetUpdates(ctx, req)
    if err != nil {
        return nil, fmt.Errorf("open stream: %w", err)
    }

    out := make(chan Event, 16)
    go func() {
        defer close(out)
        for {
            upd, err := stream.Recv()
            if err == io.EOF { return }
            if err != nil {
                slog.Error("stream recv", "err", err)
                return
            }
            for _, ev := range parseEvents(upd) {
                select {
                case out <- ev:
                case <-ctx.Done():
                    return
                }
            }
        }
    }()
    return out, nil
}

parseEvents walks the protobuf Update message, extracts CreatedEvent and ArchivedEvent entries, and converts the Daml Value protobuf back to Go-friendly types. Tedious but mechanical.

Step 3 — CLI subcommand

In cmd/cantonctl/main.go:

case "stream":
    party := *streamParty
    ch, err := c.Stream(ctx, party, 0)
    if err != nil { die(err) }
    for ev := range ch {
        slog.Info("event",
            "kind", ev.Kind,
            "offset", ev.Offset,
            "contract_id", ev.ContractID,
            "template", ev.TemplateName,
        )
    }

The for ev := range ch exits cleanly when ctx is canceled (Ctrl-C), because the goroutine in Stream closes the channel on ctx.Done.

Step 4 — Test the round trip

Open two terminals.

Terminal A:

go run ./cmd/cantonctl stream --party Alice

Terminal B:

go run ./cmd/cantonctl submit \
  --party Alice \
  --package <pkg> --module Iou --entity Iou \
  --field issuer=Alice --field owner=Bob \
  --field amount=42.0 --field currency=EUR

Within a moment, terminal A should print a "created" event for the new contract. That's a real Canton end-to-end round trip in Go.

Then Ctrl-C terminal A. The stream should close cleanly — log a "shutting down" message, no panic, no goroutine leak.

Resilience touches

Production-grade improvements (worth doing if time permits, otherwise skip):

  1. Persist the offset after every event you process. On restart, resume from lastOffset + 1.
  2. Reconnect on stream errors with exponential backoff (Module 6 Lesson 3 pattern).
  3. Apply backpressure if your consumer is slow — buffer with a known cap.
  4. Add a Prometheus counter for events received by template name.

Done when

Stream output reflects submitted events in real time. Ctrl-C exits cleanly. No goroutines linger after exit (verifiable with defer pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) at process end if you want to be sure).