mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-10 12:18:44 +08:00
Compare commits
1 Commits
brooklyn/g
...
bb/tui-gat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b0c21b277 |
183
ui-tui/src/__tests__/gatewayClient.test.ts
Normal file
183
ui-tui/src/__tests__/gatewayClient.test.ts
Normal file
@@ -0,0 +1,183 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
import { GatewayClient } from '../gatewayClient.js'
|
||||
|
||||
interface ListenerEntry {
|
||||
callback: (event: any) => void
|
||||
once: boolean
|
||||
}
|
||||
|
||||
class FakeWebSocket {
|
||||
static CONNECTING = 0
|
||||
static OPEN = 1
|
||||
static CLOSING = 2
|
||||
static CLOSED = 3
|
||||
static instances: FakeWebSocket[] = []
|
||||
|
||||
readyState = FakeWebSocket.CONNECTING
|
||||
sent: string[] = []
|
||||
readonly url: string
|
||||
private listeners = new Map<string, ListenerEntry[]>()
|
||||
|
||||
constructor(url: string) {
|
||||
this.url = url
|
||||
FakeWebSocket.instances.push(this)
|
||||
}
|
||||
|
||||
static reset() {
|
||||
FakeWebSocket.instances = []
|
||||
}
|
||||
|
||||
addEventListener(type: string, callback: (event: any) => void, options?: unknown) {
|
||||
const once =
|
||||
typeof options === 'object' &&
|
||||
options !== null &&
|
||||
'once' in options &&
|
||||
Boolean((options as { once?: unknown }).once)
|
||||
const entries = this.listeners.get(type) ?? []
|
||||
|
||||
entries.push({ callback, once })
|
||||
this.listeners.set(type, entries)
|
||||
}
|
||||
|
||||
removeEventListener(type: string, callback: (event: any) => void) {
|
||||
const entries = this.listeners.get(type)
|
||||
|
||||
if (!entries) {
|
||||
return
|
||||
}
|
||||
|
||||
this.listeners.set(
|
||||
type,
|
||||
entries.filter(entry => entry.callback !== callback)
|
||||
)
|
||||
}
|
||||
|
||||
send(payload: string) {
|
||||
if (this.readyState !== FakeWebSocket.OPEN) {
|
||||
throw new Error('socket not open')
|
||||
}
|
||||
|
||||
this.sent.push(payload)
|
||||
}
|
||||
|
||||
close(code = 1000) {
|
||||
if (this.readyState === FakeWebSocket.CLOSED) {
|
||||
return
|
||||
}
|
||||
|
||||
this.readyState = FakeWebSocket.CLOSED
|
||||
this.emit('close', { code })
|
||||
}
|
||||
|
||||
open() {
|
||||
this.readyState = FakeWebSocket.OPEN
|
||||
this.emit('open', {})
|
||||
}
|
||||
|
||||
message(data: string) {
|
||||
this.emit('message', { data })
|
||||
}
|
||||
|
||||
private emit(type: string, event: any) {
|
||||
const entries = [...(this.listeners.get(type) ?? [])]
|
||||
|
||||
for (const entry of entries) {
|
||||
entry.callback(event)
|
||||
if (entry.once) {
|
||||
this.removeEventListener(type, entry.callback)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
describe('GatewayClient websocket attach mode', () => {
|
||||
const originalWebSocket = globalThis.WebSocket
|
||||
|
||||
beforeEach(() => {
|
||||
FakeWebSocket.reset()
|
||||
;(globalThis as { WebSocket?: unknown }).WebSocket = FakeWebSocket as unknown as typeof WebSocket
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
delete process.env.HERMES_TUI_GATEWAY_URL
|
||||
delete process.env.HERMES_TUI_SIDECAR_URL
|
||||
FakeWebSocket.reset()
|
||||
|
||||
if (originalWebSocket) {
|
||||
globalThis.WebSocket = originalWebSocket
|
||||
} else {
|
||||
delete (globalThis as { WebSocket?: unknown }).WebSocket
|
||||
}
|
||||
})
|
||||
|
||||
it('waits for websocket open and resolves RPC requests', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
|
||||
gw.start()
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
const req = gw.request<{ ok: boolean }>('session.create', { cols: 80 })
|
||||
|
||||
expect(gatewaySocket.sent).toHaveLength(0)
|
||||
gatewaySocket.open()
|
||||
await vi.waitFor(() => expect(gatewaySocket.sent).toHaveLength(1))
|
||||
|
||||
const frame = JSON.parse(gatewaySocket.sent[0] ?? '{}') as { id: string; method: string }
|
||||
expect(frame.method).toBe('session.create')
|
||||
|
||||
gatewaySocket.message(JSON.stringify({ id: frame.id, jsonrpc: '2.0', result: { ok: true } }))
|
||||
await expect(req).resolves.toEqual({ ok: true })
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('mirrors event frames to sidecar websocket when configured', async () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
process.env.HERMES_TUI_SIDECAR_URL = 'ws://gateway.test/api/pub?token=abc&channel=demo'
|
||||
|
||||
const gw = new GatewayClient()
|
||||
const seen: string[] = []
|
||||
|
||||
gw.on('event', ev => seen.push(ev.type))
|
||||
gw.start()
|
||||
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
gatewaySocket.open()
|
||||
await vi.waitFor(() => expect(FakeWebSocket.instances).toHaveLength(2))
|
||||
|
||||
const sidecarSocket = FakeWebSocket.instances[1]!
|
||||
|
||||
sidecarSocket.open()
|
||||
gw.drain()
|
||||
|
||||
const eventFrame = JSON.stringify({
|
||||
jsonrpc: '2.0',
|
||||
method: 'event',
|
||||
params: { type: 'tool.start', payload: { tool_id: 't1' } }
|
||||
})
|
||||
gatewaySocket.message(eventFrame)
|
||||
|
||||
expect(seen).toContain('tool.start')
|
||||
expect(sidecarSocket.sent).toContain(eventFrame)
|
||||
|
||||
gw.kill()
|
||||
})
|
||||
|
||||
it('emits exit when attached websocket closes', () => {
|
||||
process.env.HERMES_TUI_GATEWAY_URL = 'ws://gateway.test/api/ws?token=abc'
|
||||
const gw = new GatewayClient()
|
||||
const exits: Array<null | number> = []
|
||||
|
||||
gw.on('exit', code => exits.push(code))
|
||||
gw.start()
|
||||
|
||||
const gatewaySocket = FakeWebSocket.instances[0]!
|
||||
|
||||
gatewaySocket.open()
|
||||
gw.drain()
|
||||
gatewaySocket.close(1011)
|
||||
|
||||
expect(exits).toEqual([1011])
|
||||
})
|
||||
})
|
||||
@@ -13,10 +13,26 @@ const MAX_BUFFERED_EVENTS = 2000
|
||||
const MAX_LOG_PREVIEW = 240
|
||||
const STARTUP_TIMEOUT_MS = Math.max(5000, parseInt(process.env.HERMES_TUI_STARTUP_TIMEOUT_MS ?? '15000', 10) || 15000)
|
||||
const REQUEST_TIMEOUT_MS = Math.max(30000, parseInt(process.env.HERMES_TUI_RPC_TIMEOUT_MS ?? '120000', 10) || 120000)
|
||||
const WS_CONNECTING = 0
|
||||
const WS_OPEN = 1
|
||||
const WS_CLOSING = 2
|
||||
const WS_CLOSED = 3
|
||||
|
||||
const truncateLine = (line: string) =>
|
||||
line.length > MAX_LOG_LINE_BYTES ? `${line.slice(0, MAX_LOG_LINE_BYTES)}… [truncated ${line.length} bytes]` : line
|
||||
|
||||
const resolveGatewayAttachUrl = () => {
|
||||
const raw = process.env.HERMES_TUI_GATEWAY_URL?.trim()
|
||||
|
||||
return raw ? raw : null
|
||||
}
|
||||
|
||||
const resolveSidecarUrl = () => {
|
||||
const raw = process.env.HERMES_TUI_SIDECAR_URL?.trim()
|
||||
|
||||
return raw ? raw : null
|
||||
}
|
||||
|
||||
const resolvePython = (root: string) => {
|
||||
const configured = process.env.HERMES_PYTHON?.trim() || process.env.PYTHON?.trim()
|
||||
|
||||
@@ -43,6 +59,22 @@ const asGatewayEvent = (value: unknown): GatewayEvent | null =>
|
||||
? (value as GatewayEvent)
|
||||
: null
|
||||
|
||||
const asWireText = (raw: unknown): string | null => {
|
||||
if (typeof raw === 'string') {
|
||||
return raw
|
||||
}
|
||||
|
||||
if (raw instanceof ArrayBuffer) {
|
||||
return new TextDecoder().decode(raw)
|
||||
}
|
||||
|
||||
if (ArrayBuffer.isView(raw)) {
|
||||
return new TextDecoder().decode(raw)
|
||||
}
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
interface Pending {
|
||||
id: string
|
||||
method: string
|
||||
@@ -53,6 +85,11 @@ interface Pending {
|
||||
|
||||
export class GatewayClient extends EventEmitter {
|
||||
private proc: ChildProcess | null = null
|
||||
private ws: WebSocket | null = null
|
||||
private wsConnectPromise: Promise<void> | null = null
|
||||
private sidecarWs: WebSocket | null = null
|
||||
private attachUrl: null | string = null
|
||||
private sidecarUrl: null | string = null
|
||||
private reqId = 0
|
||||
private logs = new CircularBuffer<string>(MAX_GATEWAY_LOG_LINES)
|
||||
private pending = new Map<string, Pending>()
|
||||
@@ -88,14 +125,35 @@ export class GatewayClient extends EventEmitter {
|
||||
this.bufferedEvents.push(ev)
|
||||
}
|
||||
|
||||
start() {
|
||||
const root = process.env.HERMES_PYTHON_SRC_ROOT ?? resolve(import.meta.dirname, '../../')
|
||||
const python = resolvePython(root)
|
||||
const cwd = process.env.HERMES_CWD || root
|
||||
const env = { ...process.env }
|
||||
const pyPath = env.PYTHONPATH?.trim()
|
||||
env.PYTHONPATH = pyPath ? `${root}${delimiter}${pyPath}` : root
|
||||
private clearReadyTimer() {
|
||||
if (this.readyTimer) {
|
||||
clearTimeout(this.readyTimer)
|
||||
this.readyTimer = null
|
||||
}
|
||||
}
|
||||
|
||||
private closeSidecarSocket() {
|
||||
try {
|
||||
this.sidecarWs?.close()
|
||||
} catch {
|
||||
// best effort
|
||||
} finally {
|
||||
this.sidecarWs = null
|
||||
}
|
||||
}
|
||||
|
||||
private closeGatewaySocket() {
|
||||
try {
|
||||
this.ws?.close()
|
||||
} catch {
|
||||
// best effort
|
||||
} finally {
|
||||
this.ws = null
|
||||
this.wsConnectPromise = null
|
||||
}
|
||||
}
|
||||
|
||||
private resetStartupState() {
|
||||
this.ready = false
|
||||
this.bufferedEvents.clear()
|
||||
this.pendingExit = undefined
|
||||
@@ -103,15 +161,10 @@ export class GatewayClient extends EventEmitter {
|
||||
this.stderrRl?.close()
|
||||
this.stdoutRl = null
|
||||
this.stderrRl = null
|
||||
this.clearReadyTimer()
|
||||
}
|
||||
|
||||
if (this.proc && !this.proc.killed && this.proc.exitCode === null) {
|
||||
this.proc.kill()
|
||||
}
|
||||
|
||||
if (this.readyTimer) {
|
||||
clearTimeout(this.readyTimer)
|
||||
}
|
||||
|
||||
private startReadyTimer(python: string, cwd: string) {
|
||||
this.readyTimer = setTimeout(() => {
|
||||
if (this.ready) {
|
||||
return
|
||||
@@ -130,7 +183,95 @@ export class GatewayClient extends EventEmitter {
|
||||
payload: { cwd, python, stderr_tail: stderrTail }
|
||||
})
|
||||
}, STARTUP_TIMEOUT_MS)
|
||||
}
|
||||
|
||||
private handleTransportExit(code: null | number, reason?: string) {
|
||||
this.clearReadyTimer()
|
||||
this.closeSidecarSocket()
|
||||
this.rejectPending(new Error(reason || `gateway exited${code === null ? '' : ` (${code})`}`))
|
||||
|
||||
if (this.subscribed) {
|
||||
this.emit('exit', code)
|
||||
} else {
|
||||
this.pendingExit = code
|
||||
}
|
||||
}
|
||||
|
||||
private connectSidecarMirror() {
|
||||
this.closeSidecarSocket()
|
||||
|
||||
if (!this.sidecarUrl) {
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof WebSocket === 'undefined') {
|
||||
this.pushLog(`[sidecar] WebSocket unavailable; skipping mirror to ${this.sidecarUrl}`)
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const ws = new WebSocket(this.sidecarUrl)
|
||||
|
||||
this.sidecarWs = ws
|
||||
ws.addEventListener('close', () => {
|
||||
if (this.sidecarWs === ws) {
|
||||
this.sidecarWs = null
|
||||
}
|
||||
})
|
||||
ws.addEventListener('error', () => {
|
||||
this.pushLog('[sidecar] mirror connection error')
|
||||
})
|
||||
} catch (err) {
|
||||
this.pushLog(`[sidecar] failed to connect: ${err instanceof Error ? err.message : String(err)}`)
|
||||
this.sidecarWs = null
|
||||
}
|
||||
}
|
||||
|
||||
private mirrorEventToSidecar(rawFrame: string) {
|
||||
const ws = this.sidecarWs
|
||||
|
||||
if (!ws || ws.readyState !== WS_OPEN) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(rawFrame)
|
||||
} catch {
|
||||
// best effort
|
||||
}
|
||||
}
|
||||
|
||||
private handleWebSocketFrame(raw: unknown) {
|
||||
const text = asWireText(raw)
|
||||
|
||||
if (!text) {
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const frame = JSON.parse(text) as Record<string, unknown>
|
||||
|
||||
if (frame.method === 'event') {
|
||||
this.mirrorEventToSidecar(text)
|
||||
}
|
||||
|
||||
this.dispatch(frame)
|
||||
} catch {
|
||||
const preview = text.trim().slice(0, MAX_LOG_PREVIEW) || '(empty frame)'
|
||||
|
||||
this.pushLog(`[protocol] malformed websocket frame: ${preview}`)
|
||||
this.publish({ type: 'gateway.protocol_error', payload: { preview } })
|
||||
}
|
||||
}
|
||||
|
||||
private startSpawnedGateway(root: string) {
|
||||
const python = resolvePython(root)
|
||||
const cwd = process.env.HERMES_CWD || root
|
||||
const env = { ...process.env }
|
||||
const pyPath = env.PYTHONPATH?.trim()
|
||||
|
||||
env.PYTHONPATH = pyPath ? `${root}${delimiter}${pyPath}` : root
|
||||
this.startReadyTimer(python, cwd)
|
||||
this.proc = spawn(python, ['-m', 'tui_gateway.entry'], { cwd, env, stdio: ['pipe', 'pipe', 'pipe'] })
|
||||
|
||||
this.stdoutRl = createInterface({ input: this.proc.stdout! })
|
||||
@@ -158,27 +299,118 @@ export class GatewayClient extends EventEmitter {
|
||||
})
|
||||
|
||||
this.proc.on('error', err => {
|
||||
this.pushLog(`[spawn] ${err.message}`)
|
||||
const line = `[spawn] ${err.message}`
|
||||
|
||||
this.pushLog(line)
|
||||
this.publish({ type: 'gateway.stderr', payload: { line } })
|
||||
this.rejectPending(new Error(`gateway error: ${err.message}`))
|
||||
this.publish({ type: 'gateway.stderr', payload: { line: `[spawn] ${err.message}` } })
|
||||
})
|
||||
|
||||
this.proc.on('exit', code => {
|
||||
if (this.readyTimer) {
|
||||
clearTimeout(this.readyTimer)
|
||||
this.readyTimer = null
|
||||
}
|
||||
|
||||
this.rejectPending(new Error(`gateway exited${code === null ? '' : ` (${code})`}`))
|
||||
|
||||
if (this.subscribed) {
|
||||
this.emit('exit', code)
|
||||
} else {
|
||||
this.pendingExit = code
|
||||
}
|
||||
this.handleTransportExit(code)
|
||||
})
|
||||
}
|
||||
|
||||
private startAttachedGateway(attachUrl: string) {
|
||||
this.startReadyTimer('websocket', attachUrl)
|
||||
|
||||
if (typeof WebSocket === 'undefined') {
|
||||
const line = `[startup] WebSocket API unavailable; cannot attach to ${attachUrl}`
|
||||
|
||||
this.pushLog(line)
|
||||
this.publish({ type: 'gateway.stderr', payload: { line } })
|
||||
this.handleTransportExit(1, 'gateway websocket unavailable')
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const ws = new WebSocket(attachUrl)
|
||||
let settled = false
|
||||
|
||||
this.ws = ws
|
||||
this.wsConnectPromise = new Promise<void>((resolve, reject) => {
|
||||
ws.addEventListener(
|
||||
'open',
|
||||
() => {
|
||||
if (!settled) {
|
||||
settled = true
|
||||
resolve()
|
||||
}
|
||||
|
||||
this.connectSidecarMirror()
|
||||
},
|
||||
{ once: true }
|
||||
)
|
||||
|
||||
ws.addEventListener(
|
||||
'error',
|
||||
() => {
|
||||
this.pushLog('[startup] gateway websocket connect error')
|
||||
|
||||
if (!settled) {
|
||||
settled = true
|
||||
reject(new Error('gateway websocket connection failed'))
|
||||
}
|
||||
},
|
||||
{ once: true }
|
||||
)
|
||||
ws.addEventListener(
|
||||
'close',
|
||||
ev => {
|
||||
if (!settled) {
|
||||
settled = true
|
||||
reject(new Error(`gateway websocket closed (${ev.code}) during connect`))
|
||||
}
|
||||
},
|
||||
{ once: true }
|
||||
)
|
||||
})
|
||||
|
||||
ws.addEventListener('message', ev => this.handleWebSocketFrame(ev.data))
|
||||
ws.addEventListener('close', ev => {
|
||||
if (this.ws === ws) {
|
||||
this.ws = null
|
||||
this.wsConnectPromise = null
|
||||
}
|
||||
this.handleTransportExit(ev.code)
|
||||
})
|
||||
ws.addEventListener('error', () => {
|
||||
const line = '[gateway] websocket transport error'
|
||||
|
||||
this.pushLog(line)
|
||||
this.publish({ type: 'gateway.stderr', payload: { line } })
|
||||
})
|
||||
} catch (err) {
|
||||
this.pushLog(`[startup] failed to connect websocket gateway: ${err instanceof Error ? err.message : String(err)}`)
|
||||
this.handleTransportExit(1, 'gateway websocket startup failed')
|
||||
}
|
||||
}
|
||||
|
||||
start() {
|
||||
const root = process.env.HERMES_PYTHON_SRC_ROOT ?? resolve(import.meta.dirname, '../../')
|
||||
const attachUrl = resolveGatewayAttachUrl()
|
||||
const sidecarUrl = resolveSidecarUrl()
|
||||
|
||||
this.attachUrl = attachUrl
|
||||
this.sidecarUrl = sidecarUrl
|
||||
this.resetStartupState()
|
||||
|
||||
if (this.proc && !this.proc.killed && this.proc.exitCode === null) {
|
||||
this.proc.kill()
|
||||
}
|
||||
this.proc = null
|
||||
this.closeGatewaySocket()
|
||||
this.closeSidecarSocket()
|
||||
|
||||
if (attachUrl) {
|
||||
this.startAttachedGateway(attachUrl)
|
||||
return
|
||||
}
|
||||
|
||||
this.startSpawnedGateway(root)
|
||||
}
|
||||
|
||||
private dispatch(msg: Record<string, unknown>) {
|
||||
const id = msg.id as string | undefined
|
||||
const p = id ? this.pending.get(id) : undefined
|
||||
@@ -258,7 +490,73 @@ export class GatewayClient extends EventEmitter {
|
||||
return this.logs.tail(Math.max(1, limit)).join('\n')
|
||||
}
|
||||
|
||||
private async ensureAttachedWebSocket(method: string): Promise<WebSocket> {
|
||||
if (!this.attachUrl) {
|
||||
throw new Error('gateway not running')
|
||||
}
|
||||
|
||||
if (!this.ws || this.ws.readyState === WS_CLOSED || this.ws.readyState === WS_CLOSING) {
|
||||
this.start()
|
||||
}
|
||||
|
||||
if (this.ws?.readyState === WS_CONNECTING) {
|
||||
try {
|
||||
await this.wsConnectPromise
|
||||
} catch (err) {
|
||||
throw err instanceof Error ? err : new Error(String(err))
|
||||
}
|
||||
}
|
||||
|
||||
if (!this.ws || this.ws.readyState !== WS_OPEN) {
|
||||
throw new Error(`gateway not connected: ${method}`)
|
||||
}
|
||||
|
||||
return this.ws
|
||||
}
|
||||
|
||||
private requestOverWebSocket<T = unknown>(method: string, params: Record<string, unknown> = {}): Promise<T> {
|
||||
return this.ensureAttachedWebSocket(method).then(
|
||||
ws =>
|
||||
new Promise<T>((resolve, reject) => {
|
||||
const id = `r${++this.reqId}`
|
||||
const timeout = setTimeout(this.onTimeout, REQUEST_TIMEOUT_MS, id)
|
||||
|
||||
timeout.unref?.()
|
||||
this.pending.set(id, {
|
||||
id,
|
||||
method,
|
||||
reject,
|
||||
resolve: v => resolve(v as T),
|
||||
timeout
|
||||
})
|
||||
|
||||
try {
|
||||
ws.send(JSON.stringify({ id, jsonrpc: '2.0', method, params }))
|
||||
} catch (e) {
|
||||
const pending = this.pending.get(id)
|
||||
|
||||
if (pending) {
|
||||
clearTimeout(pending.timeout)
|
||||
this.pending.delete(id)
|
||||
}
|
||||
|
||||
reject(e instanceof Error ? e : new Error(String(e)))
|
||||
}
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
request<T = unknown>(method: string, params: Record<string, unknown> = {}): Promise<T> {
|
||||
const attachUrl = resolveGatewayAttachUrl()
|
||||
|
||||
if (attachUrl) {
|
||||
if (this.attachUrl !== attachUrl) {
|
||||
this.attachUrl = attachUrl
|
||||
}
|
||||
|
||||
return this.requestOverWebSocket<T>(method, params)
|
||||
}
|
||||
|
||||
if (!this.proc?.stdin || this.proc.killed || this.proc.exitCode !== null) {
|
||||
this.start()
|
||||
}
|
||||
@@ -299,5 +597,8 @@ export class GatewayClient extends EventEmitter {
|
||||
|
||||
kill() {
|
||||
this.proc?.kill()
|
||||
this.closeGatewaySocket()
|
||||
this.closeSidecarSocket()
|
||||
this.clearReadyTimer()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user