Skip to content

errantsky/streamkeeper

Repository files navigation

Streamkeeper

Hex.pm Documentation

An Elixir/OTP implementation of the Durable Streams protocol - a specification for append-only, URL-addressable byte logs.

Table of Contents

Features

  • Full HTTP protocol compliance (PUT, POST, GET, DELETE, HEAD)
  • JSON mode with array flattening
  • Long-polling and Server-Sent Events (SSE) for live updates
  • Phoenix LiveView helper module for easy integration
  • Stream TTL and expiration
  • Sequence ordering enforcement
  • ETag-based caching
  • OTP supervision tree for fault tolerance

Requirements

  • Elixir 1.19+
  • Erlang/OTP 27+

Installation

Add streamkeeper to your dependencies in mix.exs:

def deps do
  [
    {:streamkeeper, "~> 0.3.0"},
    # Choose an HTTP server adapter:
    {:plug_cowboy, "~> 2.7"}  # or {:bandit, "~> 1.0"}
  ]
end

Quick Start

Standalone Server

Start the HTTP server on a specific port:

# Create a router module that mounts the protocol at /v1/stream
defmodule MyApp.StreamRouter do
  use Plug.Router
  plug :match
  plug :dispatch
  forward "/v1/stream", to: DurableStreams.Protocol.Plug
end

# In your application.ex or IEx
{:ok, _} = Plug.Cowboy.http(MyApp.StreamRouter, [], port: 4000)

Phoenix Integration

DurableStreams integrates seamlessly with Phoenix applications. The library starts its own supervision tree automatically when added as a dependency, so no additional setup is required beyond routing.

Step 1: Add the dependency

# mix.exs
def deps do
  [
    {:phoenix, "~> 1.7"},
    {:streamkeeper, "~> 0.3.0"},
    # ... other deps
  ]
end

Step 2: Add the route

# lib/my_app_web/router.ex
defmodule MyAppWeb.Router do
  use MyAppWeb, :router

  # Your existing pipelines...

  # Forward durable streams requests (no pipeline needed - it handles its own parsing)
  forward "/v1/stream", DurableStreams.Protocol.Plug
end

Step 3: Use it!

# Create a stream
curl -X PUT http://localhost:4000/v1/stream/my-stream \
  -H "Content-Type: text/plain"

# Append data
curl -X POST http://localhost:4000/v1/stream/my-stream \
  -H "Content-Type: text/plain" \
  -d "Hello, Phoenix!"

# Read data
curl "http://localhost:4000/v1/stream/my-stream?offset=-1"

Using the programmatic API in Phoenix contexts

You can also use the DurableStreams module directly in your Phoenix controllers, channels, or LiveViews:

defmodule MyAppWeb.StreamController do
  use MyAppWeb, :controller

  def create(conn, %{"id" => stream_id}) do
    case DurableStreams.create(stream_id, content_type: "application/json") do
      {:ok, _} -> json(conn, %{status: "created", stream_id: stream_id})
      {:error, :already_exists} -> json(conn, %{status: "exists", stream_id: stream_id})
    end
  end

  def append(conn, %{"id" => stream_id, "data" => data}) do
    {:ok, offset} = DurableStreams.append(stream_id, Jason.encode!(data))
    json(conn, %{offset: offset})
  end
end

Note: DurableStreams uses its own Phoenix.PubSub instance (DurableStreams.PubSub) which does not conflict with your application's PubSub.

Phoenix LiveView Integration

For LiveView applications, use the DurableStreams.LiveView helper module to handle long-polling with automatic offset tracking and reconnection:

defmodule MyAppWeb.EventsLive do
  use Phoenix.LiveView
  alias DurableStreams.LiveView, as: DSLive

  def mount(_params, _session, socket) do
    {:ok, DSLive.init(socket)}
  end

  def handle_event("subscribe", %{"stream_id" => stream_id}, socket) do
    {:noreply, DSLive.listen(socket, stream_id)}
  end

  def handle_event("unsubscribe", _, socket) do
    {:noreply, DSLive.stop(socket)}
  end

  # Handle stream messages
  def handle_info(msg, socket) do
    if DSLive.stream_message?(msg) do
      case DSLive.handle_message(socket, msg) do
        {:data, messages, socket} ->
          {:noreply, process_messages(socket, messages)}

        {:status, _status, socket} ->
          {:noreply, socket}

        {:complete, socket} ->
          {:noreply, assign(socket, :finished, true)}

        {:error, reason, socket} ->
          {:noreply, assign(socket, :error, reason)}
      end
    else
      {:noreply, socket}
    end
  end

  defp process_messages(socket, messages) do
    Enum.reduce(messages, socket, fn msg, acc ->
      update(acc, :events, &[msg.data | &1])
    end)
  end
end

Available functions:

Function Description
init/2 Initialize stream assigns on socket
listen/3 Start listening to a stream with offset tracking
stop/1 Stop the listener, preserve stream ID and offset
reset/1 Stop and clear all stream state
stream_message?/1 Check if a message is from the stream listener
handle_message/2 Process stream messages, returns {:data, messages, socket}, {:status, status, socket}, {:complete, socket}, or {:error, reason, socket}
status/1 Get current status (:idle, :connecting, :streaming, :disconnected)
stream_id/1 Get current stream ID
offset/1 Get current offset
listening?/1 Check if actively listening

The module uses ds_ prefixed assigns (e.g., @ds_status, @ds_stream_id, @ds_offset) to avoid conflicts with your application's assigns.

Note: Requires phoenix_live_view as an optional dependency. The module is only compiled when Phoenix.LiveView is available.

Programmatic API

Use the DurableStreams.StreamManager module directly:

# Create a stream
{:ok, "my-stream"} = DurableStreams.StreamManager.create("my-stream",
  content_type: "text/plain",
  ttl: 3600  # expires in 1 hour
)

# Append data
{:ok, offset} = DurableStreams.StreamManager.append("my-stream", "Hello, World!")

# Read data
{:ok, result} = DurableStreams.StreamManager.read("my-stream", "-1")
# result.data => "Hello, World!"
# result.offset => "0006478b4bce37b5-0001-98ee"

# Long-poll for new data
{:ok, result} = DurableStreams.StreamManager.read("my-stream", offset,
  live: true,
  timeout: 30_000
)

# Delete stream
:ok = DurableStreams.StreamManager.delete("my-stream")

Retention Policies

Streams can have automatic retention policies that compact old messages:

# Create a stream with retention policy
{:ok, _} = DurableStreams.StreamManager.create("log-stream",
  content_type: "text/plain",
  retention: [
    max_age: :timer.hours(24),     # Remove messages older than 24h
    max_messages: 100_000,          # Keep at most 100k messages
    max_bytes: 50 * 1024 * 1024    # Keep at most 50MB
  ]
)

# When messages are compacted, reading old offsets returns 410 Gone
# The response includes the earliest valid offset

JSON Mode

When a stream is created with content-type: application/json, it operates in JSON mode:

# Create JSON stream
{:ok, _} = DurableStreams.StreamManager.create("json-stream",
  content_type: "application/json"
)

# Arrays are flattened one level
# POST [{"a": 1}, {"b": 2}] stores two messages
{:ok, _} = DurableStreams.StreamManager.append("json-stream",
  DurableStreams.JSON.encode!([%{a: 1}, %{b: 2}])
)

# Read returns array of messages
{:ok, result} = DurableStreams.StreamManager.read_messages("json-stream", "-1")
# result.messages => [%{data: "{\"a\":1}", offset: "..."}, ...]

HTTP API

Method Path Description
PUT /:stream_id Create a stream
POST /:stream_id Append data
GET /:stream_id Read from offset
DELETE /:stream_id Delete stream
HEAD /:stream_id Get metadata

Request Headers

Header Description
Content-Type Stream content type (required for POST, optional for PUT)
Stream-TTL Time-to-live in seconds
Stream-Expires-At ISO 8601 expiration timestamp
Stream-Seq Sequence value for ordering
If-None-Match ETag for conditional GET

Response Headers

Header Description
Stream-Next-Offset Offset for resuming reads
Stream-Up-To-Date True when no more data available
Stream-Cursor Cursor for jitter handling
ETag Entity tag for caching
Location Stream URL (on 201)

Query Parameters

Parameter Description
offset Start reading after this offset (-1 for beginning)
live Enable long-polling (true) or SSE (sse)
timeout Long-poll timeout in seconds

Examples

The examples/ directory contains runnable demonstrations:

Simple Demo

A command-line script showing core API usage:

elixir examples/simple_demo.exs

Shows stream creation, appending, reading, long-polling, and JSON mode.

LLM Token Streaming

The flagship example demonstrating resumable AI token streaming — the primary use case from the Durable Streams announcement:

iex examples/llm_streaming.exs
# Open http://localhost:4000

# Optional: Set API keys for real AI responses (works without them in Demo mode)
export ANTHROPIC_API_KEY=your-key-here
export OPENAI_API_KEY=your-key-here

Demonstrates:

  • Resumable streaming — disconnect mid-response and resume without losing tokens
  • Multi-client broadcast — multiple tabs watch the same AI response
  • Replay capability — re-watch responses from the beginning
  • Multi-provider support — works with Claude, GPT, or Demo mode (no API keys needed)

See examples/README.md for more details.

Configuration

The library uses sensible defaults but can be configured:

# config/config.exs
config :streamkeeper,
  storage: DurableStreams.Storage.ETS,
  default_timeout: 30_000

Development

Setup

# Clone the repository
git clone https://github.com/errantsky/streamkeeper.git
cd streamkeeper

# Install dependencies
mix deps.get

# Compile
mix compile

Running Tests

# Run unit tests
mix test

# Run with coverage
mix test --cover

Conformance Testing

This library passes 100% of the official Durable Streams conformance tests.

Prerequisites:

  • Node.js 18+ (for the conformance test suite)

Running conformance tests:

# Use the mix task (recommended)
mix durable_streams.conformance

Current conformance: 131/131 tests passing (100%)

Code Quality

# Format code
mix format

# Run static analysis (if credo is installed)
mix credo

Architecture

Component Overview

graph TB
    subgraph "HTTP Layer"
        Plug[Protocol.Plug]
        Handlers[HTTP Handlers]
    end

    subgraph "Business Logic"
        SM[StreamManager]
        SS[StreamServer GenServer]
    end

    subgraph "Storage"
        ETS[ETS Storage]
        PubSub[Phoenix.PubSub]
    end

    subgraph "OTP Supervision"
        App[Application]
        Sup[StreamSupervisor]
        Reg[Registry]
    end

    Plug --> Handlers
    Handlers --> SM
    SM --> SS
    SS --> ETS
    SS --> PubSub
    App --> Sup
    App --> Reg
    App --> ETS
    Sup --> SS
Loading

OTP Supervision Tree

graph TB
    App[DurableStreams.Application]
    Sup[DurableStreams.StreamSupervisor<br/>DynamicSupervisor]
    Reg[DurableStreams.Registry<br/>Registry]
    ETS[DurableStreams.Storage.ETS<br/>GenServer]
    PS[Phoenix.PubSub]
    SS1[StreamServer 1]
    SS2[StreamServer 2]
    SS3[StreamServer N]

    App --> Sup
    App --> Reg
    App --> ETS
    App --> PS
    Sup --> SS1
    Sup --> SS2
    Sup --> SS3
Loading

Each stream is managed by its own GenServer process, providing:

  • Process isolation
  • Independent failure handling
  • Concurrent access
  • Automatic cleanup on TTL expiration

Request Flow

sequenceDiagram
    participant C as Client
    participant P as Protocol.Plug
    participant H as Handler
    participant SM as StreamManager
    participant SS as StreamServer
    participant S as Storage.ETS

    C->>P: PUT /v1/stream/:id
    P->>H: Create Handler
    H->>SM: create(id, opts)
    SM->>SS: start_link
    SS->>S: create(id, stream)
    S-->>SS: :ok
    SS-->>SM: {:ok, pid}
    SM-->>H: {:ok, id}
    H-->>C: 201 Created

    C->>P: POST /v1/stream/:id
    P->>H: Append Handler
    H->>SM: append(id, data)
    SM->>SS: GenServer.call(:append)
    SS->>S: append(id, data)
    S-->>SS: {:ok, offset}
    SS-->>SM: {:ok, offset}
    SM-->>H: {:ok, offset}
    H-->>C: 200 OK + offset
Loading

Long-Polling Flow

sequenceDiagram
    participant C as Client
    participant H as Read Handler
    participant SS as StreamServer
    participant S as Storage
    participant PS as PubSub

    C->>H: GET ?offset=X&live=true
    H->>SS: read(offset, live: true)
    SS->>S: read(offset)
    S-->>SS: {:ok, %{data: <<>>}}
    Note over SS: No data, register waiter
    SS->>SS: Add to waiters list

    Note over C,PS: ... time passes ...

    C->>SS: Another client appends
    SS->>S: append(data)
    S-->>SS: {:ok, new_offset}
    SS->>PS: broadcast(:stream_append)
    PS-->>SS: Notify waiters
    SS->>S: read(offset)
    S-->>SS: {:ok, %{data: ...}}
    SS-->>H: {:ok, result}
    H-->>C: 200 OK + data
Loading

Protocol Implementation Notes

This library implements the Durable Streams protocol specification. Below are implementation-specific behaviors and minor differences from the reference specification.

Cursor Format

The protocol suggests using 20-second time intervals from a fixed epoch for cursor values. This implementation uses millisecond timestamps instead, which still ensures:

  • Monotonically increasing values
  • Uniqueness across requests
  • Proper jitter handling when client echoes cursor back

Retention Policy Implementation

Streams can have automatic retention policies that remove old messages based on:

  • max_age: Remove messages older than a specified duration (in milliseconds)
  • max_messages: Keep at most N messages in the stream
  • max_bytes: Keep at most N bytes of data in the stream

When messages are compacted, requests for those offsets return 410 Gone with a Stream-Earliest-Offset header indicating where valid data begins.

Features Not Implemented

Feature Status Notes
429 Too Many Requests Not implemented Rate limiting is left to infrastructure (reverse proxy, load balancer)
501 Not Implemented Not needed All protocol operations are supported

Storage Backend

Currently only ETS (in-memory) storage is provided. For production use with persistence requirements, a custom storage backend implementing DurableStreams.Storage.Behaviour should be used.

License

MIT License - see LICENSE for details.

About

Durable Streams protocol implementation in Elixir

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •