Event Service Agent Kata

Messages (Commands & Events)

← Back to Design Docs ← Documentation Home

Purpose

Note on Schema Implementations: This document defines the wire format contracts (DTOs) for documentation and discussion. The source of truth for runtime validation and domain types is the Effect Schema implementations in each module’s domain/ folder (e.g., packages/timer/src/domain/events.domain.ts). See ADR-0011 for schema migration strategy.

Conventions

Semantics

Serialization Flow

Messages traverse architectural layers from domain to wire:

┌─────────────────────────────────────────────────────────────┐
│ Layer 1: Workflow (domain events + metadata provisioning)    │
│   event = new DueTimeReached({ tenantId, serviceCallId })   │
│   yield* eventBus.publish(event).pipe(                      │
│     Effect.provideService(MessageMetadata, {                │
│       correlationId, causationId                            │
│     })                                                      │
│   )                                                         │
└─────────────────────────┬───────────────────────────────────┘
                          │ Port requires MessageMetadata in R
┌─────────────────────────▼───────────────────────────────────┐
│ Layer 2: Adapter (extract Context + wrap envelope)          │
│   metadata = yield* MessageMetadata                         │
│   envelopeId = yield* EnvelopeId.makeUUID7()                │
│   envelope = new MessageEnvelope({                          │
│     id: envelopeId,                                         │
│     payload: event,  // Domain event (validated)            │
│     correlationId: metadata.correlationId,                  │
│     causationId: metadata.causationId,                      │
│     ...                                                     │
│   })                                                        │
└─────────────────────────┬───────────────────────────────────┘
                          │ EventBusPort.publish([envelope])
┌─────────────────────────▼───────────────────────────────────┐
│ Layer 3: EventBusPort (MessageEnvelope abstraction)         │
│   publish: (envelopes: MessageEnvelope[]) => Effect<...>    │
└─────────────────────────┬───────────────────────────────────┘
                          │ MessageEnvelope.encodeJson (NATS adapter)
┌─────────────────────────▼───────────────────────────────────┐
│ Layer 4: Wire (JSON bytes over NATS)                        │
│   '{"id":"...","type":"DueTimeReached","payload":{...}}'    │
└─────────────────────────────────────────────────────────────┘

Key Points:

Consuming (reverse flow):

  1. NATS receives JSON bytes
  2. MessageEnvelope.parseJson validates envelope structure
  3. Route based on envelope.type discriminator
  4. DomainEventSchema.decode(envelope.payload) validates payload
  5. Workflow receives validated domain event

See ADR-0011 for detailed patterns.

Index by Context

Index by Type

Commands

SubmitServiceCall

StartExecution

ScheduleTimer

Events

ServiceCallSubmitted

ServiceCallScheduled

ServiceCallRunning

ServiceCallSucceeded

ServiceCallFailed

DueTimeReached

ExecutionStarted

ExecutionSucceeded

ExecutionFailed

Notes