Event Service Agent Kata

Ports (Adapter Contracts)

← Back to Design Docs ← Documentation Home

Minimal domain-facing ports to keep the system simple and broker/storage agnostic. Adapters implement these interfaces.

Shared Types

```typescript ignore export type TenantId = string & { readonly __tag: ‘TenantId’ } export type CorrelationId = string & { readonly __tag: ‘CorrelationId’ } export type EnvelopeId = string & { readonly __tag: ‘EnvelopeId’ }

export interface RequestContext { tenantId: TenantId correlationId: CorrelationId }


## MessageEnvelope

Minimal envelope shared by publishers/consumers. See [Semantics](/event-service-agent-kata/design/messages.html#semantics-essentials).

**Note**: MessageEnvelope exists as both:

- **TypeScript interface** (type-level documentation, below)
- **Effect Schema** (`MessageEnvelope` in `@event-service-agent/schemas`) for runtime validation

The schema provides JSON serialization (`parseJson`/`encodeJson`) and validation at infrastructure boundaries.

```typescript ignore
export interface MessageEnvelope<T = unknown> {
	id: EnvelopeId // unique id
	type: string // message type
	tenantId: TenantId
	aggregateId?: string // ordering key (e.g., serviceCallId)
	timestampMs: number // producer timestamp
	correlationId?: CorrelationId
	causationId?: string
	payload: T
}

Storage Roles

ClockPort

Adapter responsibilities: Provide stable UTC milliseconds; no blocking; test fake allowed.
Used in: Orchestration (timeouts), Timer (scheduling).

```typescript ignore export interface ClockPort { nowMs(): number // Epoch millis (UTC) }


## EventBusPort

Adapter responsibilities: Publish to broker; consume from topics/partitions; preserve per-aggregate order via partition key.\
Used in: [Orchestration], [Execution], [Timer], [API] (publish only).

**Design Note**: The envelope is self-contained with all routing metadata (tenantId, correlationId, aggregateId). No separate context parameter needed - this eliminates duplication and ensures single source of truth.

```typescript ignore
import * as Data from 'effect/Data'
import type * as Effect from 'effect/Effect'
import type { NonEmptyReadonlyArray } from 'effect/Array'

export interface EventBusPort {
	publish(envelopes: NonEmptyReadonlyArray<MessageEnvelope>): Effect.Effect<void, PublishError>

	subscribe<E>(
		topics: string[],
		handler: (env: MessageEnvelope) => Effect.Effect<void, E>,
	): Effect.Effect<void, SubscribeError>
}

export class PublishError extends Data.TaggedError('PublishError')<{
	readonly cause: string
}> {}

export class SubscribeError extends Data.TaggedError('SubscribeError')<{
	readonly cause: string
}> {}

Routing: Adapter extracts tenantId and optional aggregateId from envelope to construct partition/routing key (e.g., tenantId.serviceCallId).

TimerPort

Adapter responsibilities: Schedule/cancel one-shot timers with at-least-once delivery semantics.
Used in: Orchestration.

```typescript ignore export interface TimerSpec { id: string // idempotent key dueTimeMs: number // epoch millis when due tenantId: TenantId }

export interface TimerPort { schedule(spec: TimerSpec, ctx: RequestContext): Promise cancel(id: string, tenantId: TenantId, ctx: RequestContext): Promise }


## HttpClientPort

Adapter responsibilities: Perform HTTP call and return status/headers/body without extra concerns.\
Used in: [Execution].

```typescript ignore
export type HttpMethod = 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE'

export interface HttpRequest {
	method: HttpMethod
	url: string
	headers?: Record<string, string>
	body?: unknown
}

export interface HttpResponse<T = unknown> {
	status: number
	headers: Record<string, string>
	body?: T
}

export interface HttpClientPort {
	request<T = unknown>(req: HttpRequest, ctx: RequestContext): Promise<HttpResponse<T>>
}

PersistencePort (Domain DB)

Role: domain CRUD with conditional transitions; Orchestration is the only writer.
Used in: Orchestration (write), API (read-only).

Note: responseMeta and errorMeta follow the payload shapes defined in the messages catalog — see ExecutionSucceeded (responseMeta) and ExecutionFailed (errorMeta) in design/messages.md.

```typescript ignore export interface PersistencePort { // Read models for API getServiceCall(tenantId: string, serviceCallId: string, ctx: RequestContext): Promise<unknown | null> listServiceCalls( tenantId: string, filters: Record<string, unknown>, paging: { limit: number; offset: number }, ctx: RequestContext, ): Promise<unknown[]>

// Single-writer transitions (guarded updates)
createServiceCall(dto: unknown, ctx: RequestContext): Promise<void>
setRunning(tenantId: string, serviceCallId: string, startedAtMs: number, ctx: RequestContext): Promise<boolean> // returns true if updated from Scheduled
setSucceeded(
	tenantId: string,
	serviceCallId: string,
	finishedAtMs: number,
	responseMeta: unknown,
	ctx: RequestContext,
): Promise<boolean>
setFailed(
	tenantId: string,
	serviceCallId: string,
	finishedAtMs: number,
	errorMeta: unknown,
	ctx: RequestContext,
): Promise<boolean> } ```

OutboxPublisher

Role: ensure messages are published after DB commit. Orchestration appends messages to an outbox table within the same transaction; a dispatcher publishes them to the broker in order.
Used in: Orchestration.

```typescript ignore export interface OutboxPublisher { append(envelopes: MessageEnvelope[], ctx: RequestContext): Promise // called within DB tx dispatch(batchSize?: number): Promise // background loop }


## TimerEventBusPort

Role: Domain-specific event publishing/subscribing for Timer module. Accepts/returns pure domain events (`DueTimeReached`), adapter handles envelope wrapping and broker delegation.\
Used in: [Timer] (workflows publish events, handlers consume commands).

**Design Pattern**: Port accepts **pure domain events**, not infrastructure DTOs. Adapter wraps events in `MessageEnvelope` and delegates to shared `EventBusPort`. See [Hexagonal Architecture: Event Publishing Pattern](/event-service-agent-kata/design/hexagonal-architecture-layers.html#event-publishing-pattern-domain-events-at-port-boundary) for rationale.

```typescript ignore
import type * as Effect from 'effect/Effect'
import type * as Messages from '@event-service-agent/schemas/messages'
import type { MessageMetadata } from '@event-service-agent/platform/context'

export interface TimerEventBusPort {
	/**
	 * Publish DueTimeReached domain event with MessageMetadata Context
	 *
	 * MessageMetadata is a Context.Tag from the platform layer that carries correlation and causation identifiers for
	 * distributed tracing.
	 *
	 * Note the asymmetry in how MessageMetadata is referenced:
	 *
	 * - In Effect R parameter: Use the tag itself (MessageMetadata)
	 * - In handler parameters: Use the extracted type (MessageMetadata.Type)
	 *
	 * Workflow provides MessageMetadata via Effect.provideService:
	 *
	 * - CorrelationId: From timer aggregate (original ScheduleTimer command)
	 * - CausationId: Option.none() (time-triggered, not command-caused)
	 *
	 * Adapter responsibility:
	 *
	 * - Extract MessageMetadata from Context: `yield* MessageMetadata`
	 * - Generate EnvelopeId (UUID v7)
	 * - Wrap event in MessageEnvelope with metadata fields
	 * - Delegate to EventBusPort.publish([envelope])
	 *
	 * @param event - Pure domain event (DueTimeReached.Type)
	 *
	 * @returns Effect requiring MessageMetadata Context
	 */
	publishDueTimeReached(
		event: Messages.Timer.Events.DueTimeReached.Type,
	): Effect.Effect<void, PublishError, MessageMetadata>

	/**
	 * Subscribe to ScheduleTimer commands from Orchestration
	 *
	 * Asymmetric pattern: Adapter passes MessageMetadata as handler parameter (not via Context). This aligns with Effect
	 * ecosystem conventions:
	 *
	 * - Publishing: Workflow provides Context (ambient data)
	 * - Subscribing: Adapter passes parameter (explicit data flow)
	 *
	 * Adapter responsibility:
	 *
	 * - Subscribe to timer.commands topic
	 * - Parse MessageEnvelope<ScheduleTimer>
	 * - Extract MessageMetadata from envelope:
	 *
	 *   - CorrelationId: envelope.correlationId
	 *   - CausationId: Option.some(envelope.id) // Command envelope becomes causation
	 * - Invoke handler with command + metadata
	 *
	 * @param handler - Command handler (workflow invocation)
	 */
	subscribeToScheduleTimerCommands<E, R>(
		handler: (
			command: Messages.Orchestration.Commands.ScheduleTimer.Type,
			metadata: MessageMetadata.Type,
		) => Effect.Effect<void, E, R>,
	): Effect.Effect<void, Ports.SubscribeError | E, R>
}

MessageMetadata Context Pattern (ADR-0013):

Notes