Run Events SSE 接口

/runs/events 端点通过 Server-Sent Events (SSE) 流式发送评测运行的实时更新。 客户端只需订阅一次,即可接收 run 状态迁移、进度计数、条目级更新和日志记录。

端点摘要

MethodPathDescription
GET/runs/events打开一个用于接收 run 更新的 SSE 连接

Authentication & CORS

  • 该流遵循与其他 API 调用相同的认证方式(cookies/session headers)。 客户端创建 EventSource 时应设置 withCredentials: true,确保 cookies 会被带上。
  • 默认启用 CORS;只要请求来自可信前端,后端就会返回正确的 Access-Control-Allow-Origin 头。

Query Parameters

NameTypeDefaultNotes
limitnumbernull可选的最大事件数,达到后关闭连接。测试时尤其有用。

Stream 生命周期

  • 服务端会立即发送注释 : ready,表示连接已建立。
  • 如果 15 秒内没有事件,会发送保活注释 : ping;客户端可忽略。
  • 连接会一直保持,直到客户端主动关闭、达到 limit,或 Redis/后端发生错误。

事件类型

每条数据消息都是一个带 type 字段的 JSON 对象。目前支持四类事件。

run_status

当 run 状态发生变化时发出,例如 pending → running → completed、调度了 retry、发起了取消、或 worker 恢复了既有 run。

{
  "type": "run_status",
  "runId": "run-123",
  "status": "running",
  "startedAt": "2025-10-21T08:01:00Z",
  "finishedAt": null,
  "retryCount": 1,
  "retryAfter": "2025-10-21T08:07:30Z",
  "retryRequestedAt": "2025-10-21T08:03:00Z",
  "retryReason": "Worker shutdown requested; retrying run after interruption.",
  "resumedCount": 1,
  "lastResumedAt": "2025-10-21T08:05:11Z",
  "cancelRequestedAt": null
}
  • status 与 REST schema 保持一致(pendingrunningcompletedfailedcanceled)。
  • startedAt / finishedAt 为 UTC 的 ISO-8601 时间戳;未确定时可为 null
  • retryCount 表示已进行的自动重试次数(未重试时为 0)。
  • retryAfter 是下一次计划重试的 UTC 时间;一旦 run 被重新派发或结束,就会变为 null
  • retryRequestedAtretryReason 记录调度器最近一次入队重试的时间与原因。
  • resumedCountlastResumedAt 用于表示 worker 在中断后恢复既有 run 的次数与时间。
  • cancelRequestedAt 表示用户已请求取消正在进行的 run。在这一阶段,UI 应保留当前 item 卡片,直到 provider 请求真正结束。

run_progress

用于报告粗粒度执行计数,让 UI 能显示进度指示器。

{
  "type": "run_progress",
  "runId": "run-123",
  "completed": 12,
  "total": 32
}
  • completed 表示截至当前已处理的数据集条目数。
  • total 是该 run 的预期总条目数(包含 variations)。

run_item

当单个数据集条目开始、产生 activity 快照、完成或失败时发出。 活跃运行面板依赖这类事件来同步当前卡片与最近历史堆栈。

{
  "type": "run_item",
  "runId": "run-123",
  "sequence": 15,
  "total": 100,
  "phase": "activity",
  "item": {
    "itemId": "item-15",
    "datasetItemId": "item-15",
    "sequence": 15,
    "state": "running",
    "promptText": "Which option best describes...",
    "promptPayload": null,
    "answerPayload": null,
    "choices": ["A", "B", "C", "D"],
    "assets": null,
    "section": {
      "id": "section-1",
      "name": "Core Knowledge",
      "order": 1
    }
  },
  "response": null,
  "score": null,
  "latencyMs": null,
  "activity": {
    "requestStartedAt": "2026-03-05T16:20:12Z",
    "firstChunkAt": "2026-03-05T16:20:15Z",
    "lastChunkAt": "2026-03-05T16:20:31Z",
    "requestFinishedAt": null,
    "transport": "streaming",
    "streamingRequested": true,
    "streamingFallback": false,
    "fallbackReason": null,
    "chunkCount": 17,
    "textChunkCount": 12,
    "reasoningChunkCount": 5,
    "responsePreview": "B",
    "responsePreviewTruncated": false,
    "reasoningPreview": "Comparing the answer choices...",
    "reasoningPreviewTruncated": false
  },
  "at": "2026-03-05T16:20:31Z"
}
  • phase 取值为 startedactivitycompletedfailed
  • sequence 是该条目在 run 中的 1-based 顺序。
  • item 是当前题卡所需的阶段快照。
  • response 是当前文本响应快照;请求未结束前可能为 null
  • scorelatencyMs 在条目仍运行中时也可能为 null
  • activity 为可选字段,携带已持久化的请求级可观测性快照:
    • requestStartedAtfirstChunkAtlastChunkAtrequestFinishedAt
    • transportstreamingbuffered
    • streamingRequestedstreamingFallbackfallbackReason
    • chunkCounttextChunkCountreasoningChunkCount
    • responsePreviewresponsePreviewTruncated
    • reasoningPreviewreasoningPreviewTruncated
  • Activity 快照会随着请求推进重复发出,也可能在最终 completed / failed 快照中再次附带,方便客户端无需额外 REST 请求就能结算最终卡片。

run_log

用于流式传输流水线日志(队列事件、错误、评分摘要等)。

{
  "type": "run_log",
  "runId": "run-123",
  "id": "log-b5bcd9",
  "level": "info",
  "message": "Run completed successfully.",
  "data": { "durationMs": 320000 },
  "createdAt": "2025-10-21T08:06:31Z"
}
  • level 取值为 debuginfowarnerror
  • data 在可用时包含结构化 JSON 元数据,也可能为 null

客户端指引

  1. 只初始化一次:应用壳层挂载时创建 EventSource,并通过状态管理(SPA 中使用 TanStack Query)共享更新。
  2. 处理可恢复的 SSE 失败:Redis 或后端中断可能导致流关闭。source.onerror 触发后应按退避策略重连,同时保留最新的 /runs/active 快照,避免 UI 抖动。
  3. 更新缓存:使用 run_statusrun_progress 更新 run 列表;使用 run_item 驱动卡片级 UI。必要时让 SPA 对受影响的 runs/items 做 query invalidation。
  4. 保留进行中取消语义:如果 cancelRequestedAt 已存在,但当前 item 请求尚未结束,不应立刻折叠到终态历史,而应显示“已请求取消”的中间状态。
  5. 及时清理:组件卸载或离开页面时务必调用 source.close(),避免泄漏连接。

Migration Notes

  • 2025-11-05run_status 新增 retry 元数据(retryCountretryAfterretryRequestedAtretryReason)。未使用这些字段的客户端应忽略未知键。
  • 2026-03-12run_status 可能还会包含 resumedCountlastResumedAtcancelRequestedAt;同时 run_item.phase 新增 activity。任何假设 item phase 只有终态的客户端都需要更新 parser / reducer。
  • 升级时,请检查所有假设键顺序或固定 shape 的 SSE parser(例如 zod schema),并把新增字段标记为可选。
  • SPA 目前会把最新的 retry / resume / cancel 快照同时保存在 run 列表与 active board 中。 自定义消费者也可以沿用同样模式:先从 /runs/active hydrate,再合并 run_statusrun_item 事件,重连后必要时回退到新的 read-model 拉取。

示例(React)

useEffect(() => {
  const source = new EventSource("/runs/events", { withCredentials: true })

  source.onmessage = (event) => {
    if (!event.data) return
    const payload = JSON.parse(event.data)
    switch (payload.type) {
      case "run_status":
        // update cached run data
        break
      case "run_progress":
        // update progress indicator
        break
      case "run_item":
        // update active item card / history
        break
      case "run_log":
        // append to log panel
        break
    }
  }

  source.onerror = (err) => {
    console.error("SSE error", err)
    source.close()
    // optional: schedule reconnection
  }

  return () => source.close()
}, [])

故障排查

  • CORS 错误:确认前端来源已被允许,并且客户端(withCredentials)与服务端中间件都启用了 credentials。
  • 日志过量:在显示前先按 level 过滤 run_log 事件。
  • 缺少更新:确认 API 进程能访问 Redis;若 Redis 不可用,广播会静默降级,只在服务端日志中体现。