Module 7 · Phase 3 · ~90 min · Real Go & live Canton
Subscribe to the participant's transaction stream. Watch updates flow as they happen. Run cantonctl stream in one terminal, cantonctl submit in another, see your submission appear. This is the foundational shape of every Canton indexer, integration, and bridge.
The Update Service (com.daml.ledger.api.v2.UpdateService) is the entry point. It exposes a server-streaming RPC, typically called GetUpdates or GetTransactions depending on Canton version. Inputs:
Returns a stream of Update messages, each containing one or more CreatedEvent / ArchivedEvent entries.
import updatepb "example.com/cantonctl/proto/com/daml/ledger/api/v2"
type Client struct {
conn *grpc.ClientConn
commands commandpb.CommandServiceClient
updates updatepb.UpdateServiceClient
cfg Config
}
// In Dial:
c.updates = updatepb.NewUpdateServiceClient(conn)
The shape: a single method that opens the gRPC stream, reads in a loop, and emits parsed events on a channel for the caller. Cancel via the passed context.
type Event struct {
Offset int64 // or string, depending on Canton version
UpdateID string
Kind string // "created" or "archived"
ContractID string
TemplateName string
Witnesses []string
Fields map[string]any // for created events; protobuf Value rendered to Go any
}
func (c *Client) Stream(ctx context.Context, party string, beginOffset int64) (<-chan Event, error) {
req := &updatepb.GetUpdatesRequest{
BeginExclusive: /* version-dependent shape */,
Filter: &updatepb.TransactionFilter{
FiltersByParty: map[string]*updatepb.Filters{
party: {}, // empty filter = all templates
},
},
Verbose: true,
}
stream, err := c.updates.GetUpdates(ctx, req)
if err != nil {
return nil, fmt.Errorf("open stream: %w", err)
}
out := make(chan Event, 16)
go func() {
defer close(out)
for {
upd, err := stream.Recv()
if err == io.EOF { return }
if err != nil {
slog.Error("stream recv", "err", err)
return
}
for _, ev := range parseEvents(upd) {
select {
case out <- ev:
case <-ctx.Done():
return
}
}
}
}()
return out, nil
}
parseEvents walks the protobuf Update message, extracts CreatedEvent and ArchivedEvent entries, and converts the Daml Value protobuf back to Go-friendly types. Tedious but mechanical.
In cmd/cantonctl/main.go:
case "stream":
party := *streamParty
ch, err := c.Stream(ctx, party, 0)
if err != nil { die(err) }
for ev := range ch {
slog.Info("event",
"kind", ev.Kind,
"offset", ev.Offset,
"contract_id", ev.ContractID,
"template", ev.TemplateName,
)
}
The for ev := range ch exits cleanly when ctx is canceled (Ctrl-C), because the goroutine in Stream closes the channel on ctx.Done.
Open two terminals.
Terminal A:
go run ./cmd/cantonctl stream --party Alice
Terminal B:
go run ./cmd/cantonctl submit \
--party Alice \
--package <pkg> --module Iou --entity Iou \
--field issuer=Alice --field owner=Bob \
--field amount=42.0 --field currency=EUR
Within a moment, terminal A should print a "created" event for the new contract. That's a real Canton end-to-end round trip in Go.
Then Ctrl-C terminal A. The stream should close cleanly — log a "shutting down" message, no panic, no goroutine leak.
Production-grade improvements (worth doing if time permits, otherwise skip):
lastOffset + 1.Stream output reflects submitted events in real time. Ctrl-C exits cleanly. No goroutines linger after exit (verifiable with defer pprof.Lookup("goroutine").WriteTo(os.Stderr, 1) at process end if you want to be sure).