Event Service Agent Kata

ADR-0011: Message Schema Validation with Effect Schema

Note: Section 3 “Central Registry (Contracts Package)” has been superseded by ADR-0012. Message schemas now live in @event-service-agent/schemas, not @event-service-agent/contracts. See ADR-0012 for the architectural rationale and migration plan.


Problem

Current message definitions (packages/contracts/src/messages/messages.ts) use plain TypeScript interfaces with const enum discriminators. This provides:

When implementing adapters (e.g., timer-event-bus.adapter.ts), developers must:

  1. Manually construct objects matching interface shape
  2. Hope data is valid (no runtime checks)
  3. Write custom encoding/decoding logic
  4. Risk runtime failures from invalid data

Core Issue: Plain interfaces conflate two concerns:

Domain Modeling Made Functional (DMMF) principle: “Parse, Don’t Validate” — transform untrusted input into trusted domain types once at boundaries.


Context

Current Architecture

```typescript ignore // packages/contracts/src/messages/messages.ts interface DueTimeReached extends Event { readonly reachedAt?: Iso8601DateTime.Type }

// Adapter usage (NO validation) const event: Timer.Events.DueTimeReached = { type: ‘DueTimeReached’, tenantId: rawData.tenantId, // ⚠️ Could be invalid! serviceCallId: rawData.serviceCallId, reachedAt: rawData.reachedAt, }


### Effect Schema Solution

Effect Schema implements DMMF "Parse, Don't Validate" pattern:

```typescript ignore
// Domain event as validated Schema
class DueTimeReached extends Schema.TaggedClass<DueTimeReached>()('DueTimeReached', {
	tenantId: TenantId, // Branded, validated
	serviceCallId: ServiceCallId, // Branded, validated
	reachedAt: Schema.optional(Iso8601DateTime),
}) {}

// At adapter boundary (parse once)
const event = yield * Schema.decode(DueTimeReached)(rawData)
//    ^^^^^ Either succeeds (valid) or fails (ParseError)

// Domain receives GUARANTEED valid event
yield * workflow.handle(event)

Decision

Adopt Effect Schema for all domain messages (events and commands) using incremental migration:

1. Module Ownership

Each module owns its message schemas:

packages/
  timer/src/domain/
    events.domain.ts          ← export class DueTimeReached
  orchestration/src/domain/
    events.domain.ts          ← Orchestration events
    commands.domain.ts        ← Orchestration commands
  execution/src/domain/
    events.domain.ts          ← Execution events
  api/src/domain/
    commands.domain.ts        ← API commands

2. Schema Class Pattern

Use Schema.TaggedClass with direct field repetition (Option A from analysis):

```typescript ignore export class DueTimeReached extends Schema.TaggedClass()('DueTimeReached', { tenantId: TenantId, // Base field (repeated) serviceCallId: ServiceCallId, // Base field (repeated) reachedAt: Schema.optional(Iso8601DateTime), // Event-specific }) {}


**Rationale**:

- Only 2 base fields (`tenantId`, `serviceCallId`) — repetition acceptable
- Explicit and obvious (no magic)
- No abstraction tax
- Can refactor to helper function later if pattern emerges

**Commands that create aggregates** omit `serviceCallId`:

```typescript ignore
export class SubmitServiceCall extends Schema.TaggedClass<SubmitServiceCall>()('SubmitServiceCall', {
	tenantId: TenantId,
	// NO serviceCallId - will be generated by Orchestration
	name: Schema.NonEmpty,
	dueAt: Iso8601DateTime,
	requestSpec: RequestSpec,
}) {}

Schema Class Conventions

Static Methods (for ergonomics):

```typescript ignore export class DueTimeReached extends Schema.TaggedClass()('DueTimeReached', { tenantId: TenantId, serviceCallId: ServiceCallId, reachedAt: Schema.optional(Iso8601DateTime), }) { /** * Decode from DTO shape (Encoded → validated Type) * * Expects structured DTO with correct field names, not arbitrary unknown. For JSON strings, use * Schema.parseJson(DueTimeReached) instead. * * Returns Effect<DueTimeReached, ParseError> */ static readonly decode = Schema.decode(DueTimeReached)

/**
 * Encode to DTO shape (validated Type → Encoded)
 *
 * Produces JSON-serializable DTO with unbranded types. Example output: { _tag: 'DueTimeReached', tenantId: string,
 * ... }
 *
 * Returns Effect<DueTimeReached.Dto, ParseError>
 */
static readonly encode = Schema.encode(DueTimeReached)

/**
 * Safe constructor (alternative to `new DueTimeReached(...)`) Validates at construction time
 */
// static readonly make = (props: {
//   tenantId: TenantId.Type
//   serviceCallId: ServiceCallId.Type
//   reachedAt?: Iso8601DateTime.Type
// }) => new DueTimeReached(props) } ```

Usage:

```typescript ignore // Decode from DTO shape (already parsed from JSON) const dto = { _tag: ‘DueTimeReached’, tenantId: ‘tenant-123’, … } const event = yield * DueTimeReached.decode(dto);

// Parse from JSON string (combines JSON.parse + decode) const jsonString = ‘{“_tag”:”DueTimeReached”,”tenantId”:”tenant-123”,…}’ const event = yield * Schema.parseJson(DueTimeReached)(jsonString);

// Construct in domain const event = new DueTimeReached({ tenantId, serviceCallId, reachedAt: Option.some(now), });

// Encode for wire (produces DTO) const dto = yield * DueTimeReached.encode(event); // dto: { _tag: ‘DueTimeReached’, tenantId: string, serviceCallId: string, … }


**Type Namespace Pattern** (match current messages.ts structure):

**DTO Type Exports** (for adapter boundaries):

```typescript ignore
// Domain type (validated, used in workflows)
export type DueTimeReached = Schema.Schema.Type<typeof Timer.Events.DueTimeReached>

// DTO type (wire format, used in adapters)
export type DueTimeReachedDTO = Schema.Schema.Encoded<typeof Timer.Events.DueTimeReached>
// Inferred type:
// {
//   type: 'DueTimeReached'
//   tenantId: string        // ← Unbrand for serialization
//   serviceCallId: string   // ← Unbrand for serialization
//   reachedAt?: string      // ← ISO8601 string
// }

3. Central Registry (Superseded by ADR-0012)

⚠️ SUPERSEDED: This section described placing schemas in @event-service-agent/contracts. This approach was reconsidered during implementation and superseded by ADR-0012.

Current Decision (ADR-0012): Schemas live in @event-service-agent/schemas package. See ADR-0012 for rationale (avoids circular dependencies, preserves DX, clear separation).

Create packages/contracts/src/messages/schemas.ts exporting Schema unions:

Original proposal (no longer valid):

```typescript ignore // ❌ NOT IMPLEMENTED - Creates circular dependency import { DueTimeReached } from ‘@event-service-agent/timer/domain’ import { ServiceCallScheduled } from ‘@event-service-agent/orchestration/domain’ // … import all schemas

export const DomainEvent = Schema.Union( DueTimeReached, ServiceCallScheduled, // … all events )


**Actual implementation** (per ADR-0012):

```typescript ignore
// ✅ Schemas defined in @event-service-agent/schemas
// packages/schemas/src/messages/timer/events.schema.ts
export class DueTimeReached extends Schema.TaggedClass(...) { }

// packages/schemas/src/envelope/domain-message.schema.ts
export const DomainMessage = Schema.Union(
  DueTimeReached,
  ServiceCallScheduled,
  // ... all messages
);

For complete architecture, see ADR-0012: Package Structure - Schemas and Platform Split.

Usage in Broker Adapter:

```typescript ignore const decodeMessage = Schema.decode(DomainMessage)

function handleIncoming(raw: unknown) { return Effect.gen(function* () { const message = yield* decodeMessage(raw) // ^^^^^^^ DomainEvent | DomainCommand (validated!)

	// Type-safe routing (exhaustive switch)
	switch (message.type) {
		case 'DueTimeReached':
			return yield* routeToOrchestration(message)
		case 'StartExecution':
			return yield* routeToExecution(message)
			// TypeScript enforces exhaustiveness
	}
}) } ```

4. Keep Interface Documentation

Retain packages/contracts/src/messages/messages.ts as DTO reference:

5. MessageEnvelope Schema & Serialization

Problem: Envelope Wrapping & JSON Serialization

Domain events need to be:

  1. Encoded to DTOs (Schema.encode)
  2. Wrapped in MessageEnvelope with routing metadata
  3. Serialized to JSON for NATS wire format
  4. Deserialized and validated on the consumer side

Four-Layer Serialization Flow:

Layer 1: Domain Events (branded types, validated)
  ↓ Schema.encode (Type → DTO)
Layer 2: Adapter wraps DTO in MessageEnvelope
  ↓ envelope = { id, type, tenantId, payload: dto, ... }
Layer 3: EventBusPort abstraction (payload: unknown)
  ↓ Type-erased for polymorphism
Layer 4: NATS adapter JSON serialization
  ↓ JSON.stringify(envelope) → wire bytes

Solution: MessageEnvelope with Union

Create Effect Schema for envelope structure validation with union of all domain messages:

```typescript ignore // packages/contracts/src/types/message-envelope.schema.ts import * as Schema from ‘effect/Schema’

/**

// Type helper for envelope with typed payload export declare namespace MessageEnvelope { type Type = Schema.Schema.Type type Encoded = Schema.Schema.Encoded }


**Key Design Decision**: Use **union of all messages** (not `Schema.Unknown`) because:

1. ✅ **Single-phase decode**: JSON → validated envelope with typed payload
2. ✅ **Pattern matching works**: `payload` is `DomainMessage` union type
3. ✅ **Effect Schema handles discrimination**: Tries each union member automatically
4. ✅ **Type safety preserved**: TypeScript narrows on `payload._tag`
5. ✅ **No manual routing logic**: Schema validates correct message structure

**Why not generic `Schema.Unknown`?**

- ❌ Loses type information after decode
- ❌ Requires manual second decode step
- ❌ Can't pattern match on payload (unknown type)
- ❌ Defeats purpose of Effect Schema validation

The union approach provides both **runtime validation** and **compile-time type safety** in a single decode operation.

#### Envelope Construction Pattern

Adapters construct validated envelopes using **direct Schema class instantiation** combined with **MessageMetadata Context**:

```typescript ignore
// Real implementation from timer-event-bus.adapter.ts
publishDueTimeReached: Effect.fn('publishDueTimeReached')(function* (dueTimeReached: DueTimeReached.Type) {
	// 1. Extract MessageMetadata from Effect Context
	//    Workflow provides this via Effect.provideService
	const metadata = yield* MessageMetadata

	// 2. Generate envelope ID (UUID v7)
	const envelopeId = yield* EnvelopeId.makeUUID7()

	// 3. Get current timestamp for envelope metadata
	const timestampMs = yield* clock.now()

	// 4. Construct validated envelope via Schema class
	//    Schema validates structure + payload in single step
	const envelope: MessageEnvelope.Type = new MessageEnvelope({
		id: envelopeId,
		type: dueTimeReached._tag, // Discriminator ('DueTimeReached')
		payload: dueTimeReached, // Domain event (already validated)
		tenantId: dueTimeReached.tenantId,
		timestampMs,

		// Extract correlation metadata from Context
		correlationId: metadata.correlationId, // Option<CorrelationId>
		causationId: metadata.causationId, // Option<EnvelopeId>

		// Preserve aggregate ID for ordering
		aggregateId: Option.some(dueTimeReached.serviceCallId),
	})

	// 5. Publish via EventBusPort (JSON serialization at NATS layer)
	yield* eventBus.publish([envelope])
})

Key Aspects:

MessageMetadata Context Pattern (see ADR-0013):

```typescript ignore // Workflow provisions metadata when publishing yield * eventBus.publishDueTimeReached(event).pipe( Effect.provideService(MessageMetadata, { correlationId: timer.correlationId, // From aggregate causationId: Option.none(), // Time-triggered }), )

// Port signature requires MessageMetadata // publishDueTimeReached: (event: DueTimeReached.Type) => Effect<void, PublishError, MessageMetadata>

// Adapter extracts from Context const metadata = yield * MessageMetadata


**Consuming (Wire → Domain)**:

```typescript ignore
// In NATS adapter or consumer handler
function* handleMessage(jsonString: string) {
	// 1. Parse JSON → validate envelope structure + payload (single step!)
	const envelope = yield* MessageEnvelope.parseJson(jsonString)
	// envelope.payload is DomainMessage union (validated!)

	// 2. Pattern match on discriminator for type-safe routing
	const result = yield* match(envelope.payload, {
		DueTimeReached: (payload) => {
			// TypeScript knows payload is DueTimeReached
			// Pass metadata as parameter (subscribing pattern)
			return handleDueTimeReached(payload, {
				correlationId: envelope.correlationId,
				causationId: Option.some(envelope.id), // Current envelope becomes causation
			})
		},

		ServiceCallScheduled: (payload) => {
			// TypeScript knows payload is ServiceCallScheduled
			return handleScheduled(payload, {
				correlationId: envelope.correlationId,
				causationId: Option.some(envelope.id),
			})
		},
		// ... exhaustive match for all message types
	})

	return result
}

Alternative: Manual discrimination:

```typescript ignore // If not using match helper const envelope = yield * MessageEnvelope.parseJson(jsonString)

switch (envelope.payload._tag) { case ‘DueTimeReached’: // TypeScript narrows to DueTimeReached return yield * handleDueTimeReached(envelope.payload)

case 'ServiceCallScheduled':
	// TypeScript narrows to ServiceCallScheduled
	return yield * handleScheduled(envelope.payload)

default:
	// TypeScript enforces exhaustiveness checking
	const _exhaustive: never = envelope.payload
	return yield * Effect.fail(new UnknownMessageType({ type: envelope.type })) } ```

Key Benefits

Single-Phase Decode: Envelope + payload validated in one operation
Type-Safe Pattern Matching: Payload is typed union, not unknown
Exhaustiveness Checking: TypeScript ensures all message types handled
Effect Schema Discrimination: Automatically tries union members, validates structure
No Manual Routing Logic: Schema handles validation and type narrowing
Boundary Validation: JSON → fully validated envelope at infrastructure layer
Type Safety: EnvelopeId, TenantId, and payload all validated with branded types
Direct Schema Construction: MessageEnvelope class provides type safety + validation without helpers

6. Incremental Migration

Phase 1 (PL-4.4): Timer module only

Phase 2 (PL-14): Envelope infrastructure

Phase 3 (PL-24): MessageMetadata Context

Phase 4 (Future): All modules


Consequences

Positive

Runtime Safety: Impossible to construct invalid messages
Boundary Parsing: Single validation point (adapters)
Type-Safe Routing: Discriminated unions in broker
Encode/Decode: Automatic JSON serialization
Invariant Enforcement: Non-empty strings, valid timestamps, branded types
DMMF Compliance: “Parse, Don’t Validate” pattern
Incremental: Migrate module-by-module (low risk)
Module Autonomy: Each module owns its contracts

Negative

⚠️ Learning Curve: Developers must understand Effect Schema
⚠️ Field Repetition: Base fields (tenantId, serviceCallId) repeated in every schema
⚠️ Migration Effort: ~2-4 hours for full migration (9 events + 3 commands)
⚠️ Dependency: Couples domain to Effect Schema (acceptable given Effect-TS stack commitment)

Neutral


Implementation Notes

Serialization Layer Boundaries

Clear separation of concerns across layers:

  1. Domain Layer: Pure domain events (e.g., DueTimeReached.Type) with Schema validation
  2. Workflow Layer: Provides MessageMetadata via Effect.provideService when publishing
  3. Adapter Layer:
    • Extracts MessageMetadata from Context (publishing)
    • Constructs MessageEnvelope via Schema class (type safety + validation)
    • Passes MessageMetadata as parameter to handlers (subscribing)
  4. EventBusPort: Type-safe abstraction with MessageEnvelope.Type and MessageMetadata dependency
  5. NATS Adapter: MessageEnvelope.encodeJson/MessageEnvelope.parseJson for JSON serialization

Key Insights:

Envelope Payload Design: Union vs Schema.Unknown

Decision: Use Schema.Union of all domain messages, NOT Schema.Unknown

Rationale:

When consuming messages from the wire (NATS), we need to:

  1. Parse JSON string → validate envelope structure
  2. Validate payload is a valid domain message
  3. Type-safely route based on message type

Why Union Works:

```typescript ignore // Envelope with union payload payload: DomainMessage // Union of all message schemas

// Consuming: const envelope = yield * MessageEnvelope.parseJson(jsonString) // envelope.payload: DomainEvent | DomainCommand (typed!)

match(envelope.payload, { DueTimeReached: (p) => handle(p), // TypeScript knows type! ServiceCallScheduled: (p) => handle(p), // … exhaustive match })


**Why Schema.Unknown Doesn't Work**:

```typescript ignore
// Envelope with unknown payload
payload: Schema.Unknown

// Consuming:
const envelope = yield * MessageEnvelope.parseJson(jsonString)
// envelope.payload: unknown (no type info!)

// ❌ Can't pattern match on unknown
// ❌ Need manual second decode: yield* DomainMessage.decode(envelope.payload)
// ❌ Two-phase decode adds complexity and error handling
// ❌ Loses single-responsibility: envelope validation separate from payload

Generic Constructor Considered:

```typescript ignore // Alternative: Generic schema constructor const MessageEnvelope = ( messageSchema: M ) => Schema.Struct({ payload: messageSchema, ... })

// Problem at decode time: const envelope = yield* MessageEnvelope(???).parseJson(jsonString) // ^^^ // Which schema? We don’t know until we decode!


**Conclusion**: Union of domain messages is the idiomatic Effect Schema pattern for discriminated message types. It provides:

- ✅ Single decode operation (envelope + payload)
- ✅ Type-safe discrimination via Effect Schema union handling
- ✅ Pattern matching with exhaustiveness checking
- ✅ No manual routing logic required

Generic constructors would only help at **publishing time** (where type is known), but provide no benefit at **consuming time** (where type is unknown and must be discriminated).

### Base Schema Pattern (Deferred)

Current decision: **Direct repetition** (Option A)

If repetition becomes painful (>5 base fields), consider:

- **Option B**: `EventBase.fields` spread pattern
- **Option C**: `defineEvent()` helper function

Revisit if base fields grow beyond current 2 fields.

### HTTP Request Schemas

`RequestSpec` and `RequestSpecWithoutBody` need Schema definitions:

```typescript ignore
// packages/contracts/src/types/http.type.ts
export class RequestSpec extends Schema.Class<RequestSpec>('RequestSpec')({
	method: Schema.Literal('GET', 'POST', 'PUT', 'PATCH', 'DELETE'),
	url: Schema.String,
	headers: Schema.optional(Schema.Record({ key: Schema.String, value: Schema.String })),
	body: Schema.optional(Schema.String),
}) {}

export class RequestSpecWithoutBody extends Schema.Class<RequestSpecWithoutBody>('RequestSpecWithoutBody')({
	method: Schema.Literal('GET', 'POST', 'PUT', 'PATCH', 'DELETE'),
	url: Schema.String,
	headers: Schema.optional(Schema.Record({ key: Schema.String, value: Schema.String })),
	bodySnippet: Schema.optional(Schema.String),
}) {}

Error Metadata Schemas

ResponseMeta and ErrorMeta (used in events) need schemas:

```typescript ignore export class ResponseMeta extends Schema.Class('ResponseMeta')({ status: Schema.Number, headers: Schema.optional(Schema.Record({ key: Schema.String, value: Schema.String })), bodySnippet: Schema.optional(Schema.String), latencyMs: Schema.optional(Schema.Number), }) {}

export class ErrorMeta extends Schema.Class('ErrorMeta')({ kind: Schema.String, message: Schema.optional(Schema.String), details: Schema.optional(Schema.Record({ key: Schema.String, value: Schema.Unknown })), latencyMs: Schema.optional(Schema.Number), }) {} ```


Alternatives Considered

A. Keep Plain Interfaces + Manual Validation

Rejected: Scatters validation logic, error-prone, no compile-time guarantee of validation coverage.

B. Zod Schemas

Rejected: Effect Schema is native to Effect-TS ecosystem, provides:

C. io-ts

Rejected: Older approach, Effect Schema is more ergonomic and better maintained.

D. JSON Schema + Codegen

Rejected: Adds tooling complexity, loses type-level expressiveness, not idiomatic for Effect-TS.


References


Tracking