Compare commits

...

1 Commits

Author SHA1 Message Date
dagbs
08b628649e fix(openviking): add atexit safety net for session commit
Ensures pending sessions are committed on process exit even if
shutdown_memory_provider is never called (gateway crash, SIGKILL,
or exception in _async_flush_memories preventing shutdown).

Also reorders on_session_end to wait for the pending sync thread
before checking turn_count, so the last turn's messages are flushed.

Based on PR #4919 by dagbs.
2026-04-06 16:45:13 -07:00

View File

@@ -23,6 +23,7 @@ Capabilities:
from __future__ import annotations from __future__ import annotations
import atexit
import json import json
import logging import logging
import os import os
@@ -37,6 +38,30 @@ _DEFAULT_ENDPOINT = "http://127.0.0.1:1933"
_TIMEOUT = 30.0 _TIMEOUT = 30.0
# ---------------------------------------------------------------------------
# Process-level atexit safety net — ensures pending sessions are committed
# even if shutdown_memory_provider is never called (e.g. gateway crash,
# SIGKILL, or exception in _async_flush_memories preventing shutdown).
# ---------------------------------------------------------------------------
_last_active_provider: Optional["OpenVikingMemoryProvider"] = None
def _atexit_commit_sessions():
"""Fire on_session_end for the last active provider on process exit."""
global _last_active_provider
provider = _last_active_provider
if provider is None:
return
_last_active_provider = None
try:
provider.on_session_end([])
except Exception:
pass # best-effort at shutdown time
atexit.register(_atexit_commit_sessions)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# HTTP helper — uses httpx to avoid requiring the openviking SDK # HTTP helper — uses httpx to avoid requiring the openviking SDK
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -277,6 +302,10 @@ class OpenVikingMemoryProvider(MemoryProvider):
logger.warning("httpx not installed — OpenViking plugin disabled") logger.warning("httpx not installed — OpenViking plugin disabled")
self._client = None self._client = None
# Register as the last active provider for atexit safety net
global _last_active_provider
_last_active_provider = self
def system_prompt_block(self) -> str: def system_prompt_block(self) -> str:
if not self._client: if not self._client:
return "" return ""
@@ -387,13 +416,18 @@ class OpenVikingMemoryProvider(MemoryProvider):
OpenViking automatically extracts 6 categories of memories: OpenViking automatically extracts 6 categories of memories:
profile, preferences, entities, events, cases, and patterns. profile, preferences, entities, events, cases, and patterns.
""" """
if not self._client or self._turn_count == 0: if not self._client:
return return
# Wait for any pending sync to finish first # Wait for any pending sync to finish first — do this before the
# turn_count check so the last turn's messages are flushed even if
# the count hasn't been incremented yet.
if self._sync_thread and self._sync_thread.is_alive(): if self._sync_thread and self._sync_thread.is_alive():
self._sync_thread.join(timeout=10.0) self._sync_thread.join(timeout=10.0)
if self._turn_count == 0:
return
try: try:
self._client.post(f"/api/v1/sessions/{self._session_id}/commit") self._client.post(f"/api/v1/sessions/{self._session_id}/commit")
logger.info("OpenViking session %s committed (%d turns)", self._session_id, self._turn_count) logger.info("OpenViking session %s committed (%d turns)", self._session_id, self._turn_count)
@@ -449,6 +483,10 @@ class OpenVikingMemoryProvider(MemoryProvider):
for t in (self._sync_thread, self._prefetch_thread): for t in (self._sync_thread, self._prefetch_thread):
if t and t.is_alive(): if t and t.is_alive():
t.join(timeout=5.0) t.join(timeout=5.0)
# Clear atexit reference so it doesn't double-commit
global _last_active_provider
if _last_active_provider is self:
_last_active_provider = None
# -- Tool implementations ------------------------------------------------ # -- Tool implementations ------------------------------------------------