| ← Back to Design Docs | ← Documentation Home |
Minimal domain-facing ports to keep the system simple and broker/storage agnostic. Adapters implement these interfaces.
```typescript ignore export type TenantId = string & { readonly __tag: ‘TenantId’ } export type CorrelationId = string & { readonly __tag: ‘CorrelationId’ } export type EnvelopeId = string & { readonly __tag: ‘EnvelopeId’ }
export interface RequestContext { tenantId: TenantId correlationId: CorrelationId }
## MessageEnvelope
Minimal envelope shared by publishers/consumers. See [Semantics](/event-service-agent-kata/design/messages.html#semantics-essentials).
**Note**: MessageEnvelope exists as both:
- **TypeScript interface** (type-level documentation, below)
- **Effect Schema** (`MessageEnvelope` in `@event-service-agent/schemas`) for runtime validation
The schema provides JSON serialization (`parseJson`/`encodeJson`) and validation at infrastructure boundaries.
```typescript ignore
export interface MessageEnvelope<T = unknown> {
id: EnvelopeId // unique id
type: string // message type
tenantId: TenantId
aggregateId?: string // ordering key (e.g., serviceCallId)
timestampMs: number // producer timestamp
correlationId?: CorrelationId
causationId?: string
payload: T
}
Adapter responsibilities: Provide stable UTC milliseconds; no blocking; test fake allowed.
Used in: Orchestration (timeouts), Timer (scheduling).
```typescript ignore export interface ClockPort { nowMs(): number // Epoch millis (UTC) }
## EventBusPort
Adapter responsibilities: Publish to broker; consume from topics/partitions; preserve per-aggregate order via partition key.\
Used in: [Orchestration], [Execution], [Timer], [API] (publish only).
**Design Note**: The envelope is self-contained with all routing metadata (tenantId, correlationId, aggregateId). No separate context parameter needed - this eliminates duplication and ensures single source of truth.
```typescript ignore
import * as Data from 'effect/Data'
import type * as Effect from 'effect/Effect'
import type { NonEmptyReadonlyArray } from 'effect/Array'
export interface EventBusPort {
publish(envelopes: NonEmptyReadonlyArray<MessageEnvelope>): Effect.Effect<void, PublishError>
subscribe<E>(
topics: string[],
handler: (env: MessageEnvelope) => Effect.Effect<void, E>,
): Effect.Effect<void, SubscribeError>
}
export class PublishError extends Data.TaggedError('PublishError')<{
readonly cause: string
}> {}
export class SubscribeError extends Data.TaggedError('SubscribeError')<{
readonly cause: string
}> {}
Routing: Adapter extracts tenantId and optional aggregateId from envelope to construct partition/routing key (e.g., tenantId.serviceCallId).
Adapter responsibilities: Schedule/cancel one-shot timers with at-least-once delivery semantics.
Used in: Orchestration.
```typescript ignore export interface TimerSpec { id: string // idempotent key dueTimeMs: number // epoch millis when due tenantId: TenantId }
export interface TimerPort {
schedule(spec: TimerSpec, ctx: RequestContext): Promise
## HttpClientPort
Adapter responsibilities: Perform HTTP call and return status/headers/body without extra concerns.\
Used in: [Execution].
```typescript ignore
export type HttpMethod = 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE'
export interface HttpRequest {
method: HttpMethod
url: string
headers?: Record<string, string>
body?: unknown
}
export interface HttpResponse<T = unknown> {
status: number
headers: Record<string, string>
body?: T
}
export interface HttpClientPort {
request<T = unknown>(req: HttpRequest, ctx: RequestContext): Promise<HttpResponse<T>>
}
Role: domain CRUD with conditional transitions; Orchestration is the only writer.
Used in: Orchestration (write), API (read-only).
Note: responseMeta and errorMeta follow the payload shapes defined in the messages catalog — see ExecutionSucceeded (responseMeta) and ExecutionFailed (errorMeta) in design/messages.md.
```typescript ignore export interface PersistencePort { // Read models for API getServiceCall(tenantId: string, serviceCallId: string, ctx: RequestContext): Promise<unknown | null> listServiceCalls( tenantId: string, filters: Record<string, unknown>, paging: { limit: number; offset: number }, ctx: RequestContext, ): Promise<unknown[]>
// Single-writer transitions (guarded updates)
createServiceCall(dto: unknown, ctx: RequestContext): Promise<void>
setRunning(tenantId: string, serviceCallId: string, startedAtMs: number, ctx: RequestContext): Promise<boolean> // returns true if updated from Scheduled
setSucceeded(
tenantId: string,
serviceCallId: string,
finishedAtMs: number,
responseMeta: unknown,
ctx: RequestContext,
): Promise<boolean>
setFailed(
tenantId: string,
serviceCallId: string,
finishedAtMs: number,
errorMeta: unknown,
ctx: RequestContext,
): Promise<boolean> } ```
Role: ensure messages are published after DB commit. Orchestration appends messages to an outbox table within the same transaction; a dispatcher publishes them to the broker in order.
Used in: Orchestration.
```typescript ignore
export interface OutboxPublisher {
append(envelopes: MessageEnvelope[], ctx: RequestContext): Promise
## TimerEventBusPort
Role: Domain-specific event publishing/subscribing for Timer module. Accepts/returns pure domain events (`DueTimeReached`), adapter handles envelope wrapping and broker delegation.\
Used in: [Timer] (workflows publish events, handlers consume commands).
**Design Pattern**: Port accepts **pure domain events**, not infrastructure DTOs. Adapter wraps events in `MessageEnvelope` and delegates to shared `EventBusPort`. See [Hexagonal Architecture: Event Publishing Pattern](/event-service-agent-kata/design/hexagonal-architecture-layers.html#event-publishing-pattern-domain-events-at-port-boundary) for rationale.
```typescript ignore
import type * as Effect from 'effect/Effect'
import type * as Messages from '@event-service-agent/schemas/messages'
import type { MessageMetadata } from '@event-service-agent/platform/context'
export interface TimerEventBusPort {
/**
* Publish DueTimeReached domain event with MessageMetadata Context
*
* MessageMetadata is a Context.Tag from the platform layer that carries correlation and causation identifiers for
* distributed tracing.
*
* Note the asymmetry in how MessageMetadata is referenced:
*
* - In Effect R parameter: Use the tag itself (MessageMetadata)
* - In handler parameters: Use the extracted type (MessageMetadata.Type)
*
* Workflow provides MessageMetadata via Effect.provideService:
*
* - CorrelationId: From timer aggregate (original ScheduleTimer command)
* - CausationId: Option.none() (time-triggered, not command-caused)
*
* Adapter responsibility:
*
* - Extract MessageMetadata from Context: `yield* MessageMetadata`
* - Generate EnvelopeId (UUID v7)
* - Wrap event in MessageEnvelope with metadata fields
* - Delegate to EventBusPort.publish([envelope])
*
* @param event - Pure domain event (DueTimeReached.Type)
*
* @returns Effect requiring MessageMetadata Context
*/
publishDueTimeReached(
event: Messages.Timer.Events.DueTimeReached.Type,
): Effect.Effect<void, PublishError, MessageMetadata>
/**
* Subscribe to ScheduleTimer commands from Orchestration
*
* Asymmetric pattern: Adapter passes MessageMetadata as handler parameter (not via Context). This aligns with Effect
* ecosystem conventions:
*
* - Publishing: Workflow provides Context (ambient data)
* - Subscribing: Adapter passes parameter (explicit data flow)
*
* Adapter responsibility:
*
* - Subscribe to timer.commands topic
* - Parse MessageEnvelope<ScheduleTimer>
* - Extract MessageMetadata from envelope:
*
* - CorrelationId: envelope.correlationId
* - CausationId: Option.some(envelope.id) // Command envelope becomes causation
* - Invoke handler with command + metadata
*
* @param handler - Command handler (workflow invocation)
*/
subscribeToScheduleTimerCommands<E, R>(
handler: (
command: Messages.Orchestration.Commands.ScheduleTimer.Type,
metadata: MessageMetadata.Type,
) => Effect.Effect<void, E, R>,
): Effect.Effect<void, Ports.SubscribeError | E, R>
}
MessageMetadata Context Pattern (ADR-0013):
import { MessageMetadata } from '@event-service-agent/platform/context'MessageMetadata in R parameter. Workflow provides via Effect.provideService(MessageMetadata, { correlationId, causationId }). Adapter extracts yield* MessageMetadata.Subscribing: Adapter passes MessageMetadata as handler parameter (not via Context). This asymmetry aligns with Effect ecosystem: Context for ambient cross-cutting concerns, parameters for explicit data flow.
Notes
design/messages.md.