SSE Stream Resume
How Soliplex uses the AG-UI protocol and the run_event message store
to let clients transparently reconnect to an in-flight run after a
dropped network connection.
Why this exists
AG-UI runs are long-lived, token-by-token SSE streams. A mobile radio
flap, a proxy idle-timeout, or a sleeping laptop can tear the
connection down mid-response. Without resume, the user sees a partial
message and the run appears broken even though the agent is still
running on the server. With resume, the agent keeps running, the
client reconnects with a Last-Event-ID, and the UI catches up to the
current state as if nothing happened.
The mechanism is the browser-standard SSE resume primitive (WHATWG spec) plus a persistent copy of every streamed event so the server can replay from an arbitrary point.
Architecture
Three layers collaborate:
- Protocol layer — every
data:frame on the wire carries anid:prefix of the form{run_id}:{event_index}. The client echoes the most recently seen id back on reconnect via theLast-Event-IDHTTP header. - Storage layer — every event the agent emits is written to the
run_eventtable (ordered by its autoincrement primary key) before being flushed to the live SSE stream. A separatefinishedtimestamp on therunrow tells reconnecting consumers when polling can stop. - Client layer — the Flutter
AgUiStreamClienttracks the last seen id, detects stream drops, reconnects withLast-Event-ID, and emits lifecycleCustomEvents (stream.reconnecting,stream.reconnected,stream.reconnect_failed) so the UI can show a toast.
Event identifier format
run_idis the UUID of theRunrow — identifies which stream this event belongs to when multiple runs overlap in the same thread.event_indexis a monotonically-increasing integer starting at 0, scoped to a single run. It is not a global sequence number; each run has its own. It is not stored as its own column — it is the position of the event within its run'seventslist, whose ordering is guaranteed by the autoincrementid_primary key onrun_event.- Keepalive comments (
: keepalive …) and any non-data:frames pass through without anid:prefix — they do not consume an index and cannot be resumed against.
soliplex.views.streaming.add_sse_event_ids()
is the only place ids are written; on reconnect it is re-entered with
start_index = after_index + 1 so the replayed stream picks up
exactly where the client left off.
Server side
Normal first-connect path
POST /v1/rooms/{room_id}/agui/{thread_id}/{run_id} (no
Last-Event-ID)
- Authorize the user for the room.
- Persist the
RunAgentInputviaadd_run_input(). - Spawn a background task
drive_llm_stream()that consumes the agent's event stream and: - pushes each event onto an unbounded
asyncio.Queue(for the live SSE consumer), and - writes each event to
run_eventwith its monotonicevent_indexviasave_single_event(). - When the agent completes (or errors),
drive_llm_stream()puts a sentinel on the queue and callsfinish_run()which stampsrun.finishedwith the current time. - The HTTP response streams events drained from the queue, encoded
by the AG-UI adapter, tagged with ids by
add_sse_event_ids(), wrapped with keepalives, and returned as aStreamingResponse.
Critical decoupling: drive_llm_stream() is a create_task()
coroutine, not awaited inline. The queue is unbounded. Together
this means a client disconnect cancels only the SSE response — the
background task keeps draining the agent, persisting events, and
marking the run finished. A reconnecting client can then pick up
the full tail, including events written after it disconnected.
Reconnect path
When the request carries a Last-Event-ID: {run_id}:{N} header,
soliplex.views.agui.post_room_agui_thread_id_run_id
takes a completely different path:
parse_last_event_id()splits on the final:and extractsafter_index = N. Malformed values are treated as if the header was absent (i.e. a fresh run is attempted).- Authorize the user for the room (same check as first-connect).
- Skip
add_run_input, skip the agent, skipdrive_llm_stream— the run is already executing (or finished) somewhere else. - Create a
stream_from_db()generator scoped to(user_name, room_id, thread_id, run_id, after_index=N). - Feed that generator through the normal AG-UI encoder and
add_sse_event_ids(start_index=N+1)pipeline. - Return the same
StreamingResponseshape as first-connect; the client cannot tell the difference on the wire.
stream_from_db() loops: call list_run_events_after(after_index)
(which loads run.events and returns the slice past
after_index, paired with each row's position), yield everything
it returns, update after_index to the last yielded index, then
check is_run_finished(). If not finished, sleep
SSE_RECONNECT_POLL_INTERVAL_SECS (0.2 s) and loop. If finished,
do one final drain query (to catch events written between the
last poll and the finished flag being set) and terminate.
Storage model
run run_event
───────── ─────────
id_ (PK) <──┐ id_ (PK, autoincrement) ← ordering key
user_name └── run_id_ (FK → run.id_, ON DELETE CASCADE)
room_id data ← json-dumped AG-UI Event
thread_id_ created
...
finished (nullable)
See soliplex.agui.schema.Run and soliplex.agui.schema.RunEvent
Notes:
- Retention is indefinite — no TTL. Events live until the
parent
runorthreadrow is deleted (both cascade). event_indexis not persisted; it is the position of the row withinrun.events(loaded inid_order). On the write side,drive_llm_stream()keeps a local counter only for log/wire purposes. On the read side,list_run_events_after()slicesrun.eventsfromafter_index + 1and pairs each row with its position.run.finishedis the single source of truth for "no more events coming". Crash-recovery after a process restart relies on it being unset — those runs are effectively stuck (stale events inrun_event, no background task running) and will poll forever unless cleaned up administratively.- Events are persisted before they are counted as delivered — if the DB write fails, the error is logged but the event is still pushed to the live queue. The client sees it; a reconnect would not. This is a tradeoff in favor of live responsiveness.
Tracking the last seen id
AgUiStreamClient.runAgent() parses SSE via the ag_ui package's
SseParser, which exposes SseMessage.id for every dispatched
message. The client holds a local String? lastEventId and updates
it on each non-empty id. The SseParser preserves _lastEventId
across messages per the spec — individual data: frames do not
need to repeat their own id.
When to resume
Resume is triggered only by stream errors, not by clean EOF:
| Termination condition | Client action |
|---|---|
Normal close after RUN_FINISHED/RUN_ERROR |
Done, return. |
| Clean EOF without a terminal event | Done, return (log only). |
CancelledException (user cancel) |
Propagate, never resume. |
Initial-connect AuthException (401/403) |
Propagate. |
Initial-connect NetworkException |
Propagate (no id to resume against). |
Stream error mid-body, lastEventId != null |
Resume. |
NetworkException on reconnect attempt |
Retry, counts toward budget. |
5xx ApiException on reconnect attempt |
Retry, counts toward budget. |
4xx ApiException / AuthException on reconnect |
Synthesize terminal failure, stop. |
A resume attempt reissues the same POST to the same URL with
the same body — plus a Last-Event-ID header. The server
routes it to stream_from_db. The client does not know, and does
not need to know, whether the agent is still running or has
already finished.
Retry policy
Configured by ResumePolicy (defaults):
- 5 attempts per drop, 500 ms → 8 s exponential backoff (×2), ±20 % jitter.
- Attempt counter resets to zero after a successful resume — the budget is per-drop, not cumulative over the lifetime of the run.
- On exhaustion, the client yields a
CustomEvent(name: 'stream.reconnect_failed', …)followed by a syntheticRunErrorEvent(code: 'stream.resume_failed'), then returns cleanly. Downstream state machines see a terminal run via the normalRunErrorEventpath — no resume-specific branches needed in the UI state layer.
Surfacing the lifecycle to the UI
The client emits three synthetic CustomEvents interleaved with
the real run events:
name |
value |
|---|---|
stream.reconnecting |
{attempt, lastEventId, error} |
stream.reconnected |
{attempt} |
stream.reconnect_failed |
{attempts, error} |
These are never sent by the server and never written to the
run_event table — they exist only within a single client run
instance. ReconnectStatus.tryParse() parses them into a typed
sealed class; AgentSession exposes a reconnectStatus signal
that drives the _ReconnectBanner toast above the chat thread.
Logging
All client-side resume activity logs under the
soliplex_client.agui_stream logger name:
- Level 800 (info): attempt start and success.
- Level 900 (warning): stream drop detected, will attempt resume.
- Level 1000 (severe): final give-up after exhausted attempts.
Server-side activity is captured by logfire spans around
drive_llm_stream() and the endpoint handler; reconnects surface
as additional entries under the same endpoint span.
Sequence diagram
Example: a run that starts normally, is interrupted mid-stream by a
network drop, and resumes via Last-Event-ID.
sequenceDiagram
autonumber
participant UI as Flutter UI
participant C as AgUiStreamClient
participant S as FastAPI<br/>(agui endpoint)
participant T as drive_llm_stream<br/>(bg task)
participant DB as run_event table
participant A as Agent
Note over UI,A: Initial connect
UI->>C: runAgent(endpoint, input)
C->>S: POST .../{thread}/{run}<br/>Accept: text/event-stream
S->>DB: add_run_input
S->>T: create_task(drive_llm_stream)
S-->>C: 200 OK (stream opens)
T->>A: consume llm_stream
A-->>T: RunStarted
T->>DB: save_single_event(index=0)
T-->>S: queue.put(RunStarted)
S-->>C: id: run-1:0\ndata: {RUN_STARTED}
A-->>T: TextMessageContent("Hel")
T->>DB: save_single_event(index=1)
T-->>S: queue.put(...)
S-->>C: id: run-1:1\ndata: {..."Hel"}
C-->>UI: RunStartedEvent, TextMessageContentEvent
Note over C,S: Network drop after id=run-1:1
S-xC: TCP reset / proxy idle-close
C->>C: lastEventId = "run-1:1"<br/>attempt = 1
C-->>UI: CustomEvent(stream.reconnecting, attempt=1)
UI->>UI: show "Reconnecting…" toast
Note over T,DB: Agent keeps running in background
A-->>T: TextMessageContent("lo")
T->>DB: save_single_event(index=2)
A-->>T: RunFinished
T->>DB: save_single_event(index=3)
T->>DB: finish_run() (sets run.finished)
Note over UI,A: Resume (500ms backoff later)
C->>S: POST .../{thread}/{run}<br/>Last-Event-ID: run-1:1
S->>S: parse_last_event_id → after_index=1
S->>S: authorize user for room
S-->>C: 200 OK (stream_from_db path)
loop until run.finished and drained
S->>DB: list_run_events_after(after_index)
DB-->>S: [(2, "lo"), (3, RunFinished)]
S-->>C: id: run-1:2\ndata: {..."lo"}
S-->>C: id: run-1:3\ndata: {RUN_FINISHED}
S->>DB: is_run_finished?
DB-->>S: true
end
S-->>C: stream closes
C-->>UI: CustomEvent(stream.reconnected, attempt=1)
UI->>UI: toast → "Reconnected." (auto-dismiss 3s)
C-->>UI: TextMessageContentEvent("lo"),<br/>RunFinishedEvent
C->>C: sawTerminalEvent = true, return
File reference
Backend
| Concern | File | Symbols |
|---|---|---|
| Endpoint dispatch & Last-Event-ID parse | src/soliplex/views/agui.py |
parse_last_event_id, post_room_agui_thread_id_run_id |
| Background event pump + DB writes | src/soliplex/views/agui.py |
drive_llm_stream, save_single_event, finish_run |
| SSE id injection & DB replay | src/soliplex/views/streaming.py |
add_sse_event_ids, stream_from_db |
| Event persistence API | src/soliplex/agui/persistence.py |
ThreadStorage.save_single_event, list_run_events_after, is_run_finished, finish_run |
| Storage schema | src/soliplex/agui/schema.py |
Run, RunEvent |
| Unit tests | tests/unit/views/test_agui_views.py |
test_post_room_agui_reconnect |
| End-to-end tests | tests/functional/test_agui_thread.py |
test_sse_resume_with_last_event_id |
Design notes and limits
- Per-run, not per-thread.
Last-Event-IDscopes to one run. Once a run ends, a new user turn creates a new run with a freshevent_indexcounter — a client'slastEventIdfrom the previous run is not meaningful for the new one. - Authorization is re-checked on every reconnect. Revoked permissions mid-run terminate the resume cleanly (4xx → synthetic terminal event on the client).
- Event persistence is best-effort. DB write failures during
drive_llm_streamare logged but do not stop live delivery — the cost is that those specific events cannot be replayed. A reconnect after such a failure will skip them. - Polling cost. Reconnect readers poll every 200 ms. For long-lived resumes against a slow agent, this is cheap but non-zero; an event-driven notification would be more efficient at the cost of added complexity.
- Crash recovery is partial. A server crash mid-run leaves
run.finishedunset. A reconnecting client will poll forever (modulo its own retry budget). There is no watchdog that force-finishes orphaned runs. - Synthetic client events never round-trip.
stream.*CustomEvents are constructed in-memory by the Dart client; the server neither emits nor persists them, so they will not appear in a replay after a second drop.