Compare commits

...

2 Commits

Author SHA1 Message Date
teknium1
bb5f847093 security: enforce 0600/0700 file permissions on sensitive files
Enforces owner-only permissions on files and directories that contain
secrets or sensitive data. Previously, all files were created with
default umask permissions, which could allow other users on shared
systems to read API keys, config, and cron job data.

Changes:
- hermes_cli/config.py: Added _secure_dir()/_secure_file() helpers,
  ensure_hermes_home() sets 0700 on all dirs, save_config() sets 0600
  (save_env_value already had this from a prior commit)
- cron/jobs.py: Added matching helpers, ensure_dirs() sets 0700,
  save_jobs() and save_job_output() set 0600/0700
- cli.py: save_config_value() sets 0600 after writing config
- 8 new tests in tests/test_file_permissions.py

All chmod calls wrapped in try/except for Windows compatibility.

Cherry-picked from PR #757, rebased onto current main with conflict
resolution (save_config now uses atomic_yaml_write).

Closes #757
2026-03-11 00:28:30 -07:00
teknium1
de47aa6921 fix: /new, /reset, and /clear all start a fresh session
Previously /new and /reset were identical — both just cleared the
in-memory conversation history while keeping the same session_id.
This created confusing 'split sessions' in the DB where half the
messages belonged to a conversation the agent no longer remembered.

Now all three commands (/new, /reset, /clear) call new_session()
which properly:
- Flushes memories before switching
- Ends the current session in the DB
- Generates a fresh session_id
- Clears conversation history and resets state
- Updates the agent's session_id
- Invalidates the system prompt cache

/reset is kept as an alias for /new. /clear additionally clears
the screen and redraws the banner.

Closes #641
Inspired by PR #749 by Bartok9
2026-03-11 00:20:45 -07:00
5 changed files with 236 additions and 21 deletions

62
cli.py
View File

@@ -1060,6 +1060,12 @@ def save_config_value(key_path: str, value: any) -> bool:
with open(config_path, 'w') as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
# Enforce owner-only permissions on config files (contain API keys)
try:
os.chmod(config_path, 0o600)
except (OSError, NotImplementedError):
pass
return True
except Exception as e:
logger.error("Failed to save config: %s", e)
@@ -2148,15 +2154,53 @@ class HermesCLI:
flush_tool_summary()
print()
def reset_conversation(self):
"""Reset the conversation history."""
def new_session(self, silent=False):
"""Start a new session with a fresh session ID.
Ends the current session in the DB, generates a new session_id,
clears conversation history, and updates the agent. Both /new
and /reset use this — there is no "keep the same session" reset.
Args:
silent: If True, suppress the confirmation print (used by /clear
which does its own screen clear + banner redraw).
"""
# Flush memories from current conversation before switching
if self.agent and self.conversation_history:
try:
self.agent.flush_memories(self.conversation_history)
except Exception:
pass
# End current session in DB
if self._session_db and self.session_id:
try:
self._session_db.end_session(self.session_id, "new_session")
except Exception:
pass
# Generate fresh session ID
timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S")
short_uuid = uuid.uuid4().hex[:6]
self.session_id = f"{timestamp_str}_{short_uuid}"
# Reset state
self.conversation_history = []
print("(^_^)b Conversation reset!")
self._pending_title = None
self._resumed = False
# Update agent's session ID and invalidate cached system prompt
if self.agent:
self.agent.session_id = self.session_id
if hasattr(self.agent, '_invalidate_system_prompt'):
self.agent._invalidate_system_prompt()
if not silent:
print(f"(^_^)v New session started!")
def reset_conversation(self):
"""Reset the conversation by starting a new session."""
self.new_session()
def save_conversation(self):
"""Save the current conversation to a file."""
@@ -2548,12 +2592,8 @@ class HermesCLI:
elif cmd_lower == "/config":
self.show_config()
elif cmd_lower == "/clear":
# Flush memories before clearing
if self.agent and self.conversation_history:
try:
self.agent.flush_memories(self.conversation_history)
except Exception:
pass
# Start a new session (flush memories, end old session, new ID)
self.new_session(silent=True)
# Clear terminal screen. Inside the TUI, Rich's console.clear()
# goes through patch_stdout's StdoutProxy which swallows the
# screen-clear escape sequences. Use prompt_toolkit's output
@@ -2565,8 +2605,6 @@ class HermesCLI:
out.flush()
else:
self.console.clear()
# Reset conversation
self.conversation_history = []
# Show fresh banner. Inside the TUI we must route Rich output
# through ChatConsole (which uses prompt_toolkit's native ANSI
# renderer) instead of self.console (which writes raw to stdout
@@ -2647,7 +2685,7 @@ class HermesCLI:
else:
_cprint(" Session database not available.")
elif cmd_lower in ("/reset", "/new"):
self.reset_conversation()
self.new_session()
elif cmd_lower.startswith("/model"):
# Use original case so model names like "Anthropic/Claude-Opus-4" are preserved
parts = cmd_original.split(maxsplit=1)

View File

@@ -32,10 +32,29 @@ JOBS_FILE = CRON_DIR / "jobs.json"
OUTPUT_DIR = CRON_DIR / "output"
def _secure_dir(path: Path):
"""Set directory to owner-only access (0700). No-op on Windows."""
try:
os.chmod(path, 0o700)
except (OSError, NotImplementedError):
pass
def _secure_file(path: Path):
"""Set file to owner-only read/write (0600). No-op on Windows."""
try:
if path.exists():
os.chmod(path, 0o600)
except (OSError, NotImplementedError):
pass
def ensure_dirs():
"""Ensure cron directories exist."""
"""Ensure cron directories exist with secure permissions."""
CRON_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
_secure_dir(CRON_DIR)
_secure_dir(OUTPUT_DIR)
# =============================================================================
@@ -223,6 +242,7 @@ def save_jobs(jobs: List[Dict[str, Any]]):
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, JOBS_FILE)
_secure_file(JOBS_FILE)
except BaseException:
try:
os.unlink(tmp_path)
@@ -400,11 +420,13 @@ def save_job_output(job_id: str, output: str):
ensure_dirs()
job_output_dir = OUTPUT_DIR / job_id
job_output_dir.mkdir(parents=True, exist_ok=True)
_secure_dir(job_output_dir)
timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S")
output_file = job_output_dir / f"{timestamp}.md"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(output)
_secure_file(output_file)
return output_file

View File

@@ -21,10 +21,10 @@ COMMANDS = {
"/provider": "Show available providers and current provider",
"/prompt": "View/set custom system prompt",
"/personality": "Set a predefined personality",
"/clear": "Clear screen and reset conversation (fresh start)",
"/clear": "Clear screen and start a new session",
"/history": "Show conversation history",
"/new": "Start a new conversation (reset history)",
"/reset": "Reset conversation only (keep screen)",
"/new": "Start a new session (fresh session ID + history)",
"/reset": "Start a new session (alias for /new)",
"/retry": "Retry the last message (resend to agent)",
"/undo": "Remove the last user/assistant exchange",
"/save": "Save the current conversation",

View File

@@ -47,13 +47,32 @@ def get_project_root() -> Path:
"""Get the project installation directory."""
return Path(__file__).parent.parent.resolve()
def _secure_dir(path):
"""Set directory to owner-only access (0700). No-op on Windows."""
try:
os.chmod(path, 0o700)
except (OSError, NotImplementedError):
pass
def _secure_file(path):
"""Set file to owner-only read/write (0600). No-op on Windows."""
try:
if os.path.exists(str(path)):
os.chmod(path, 0o600)
except (OSError, NotImplementedError):
pass
def ensure_hermes_home():
"""Ensure ~/.hermes directory structure exists."""
"""Ensure ~/.hermes directory structure exists with secure permissions."""
home = get_hermes_home()
(home / "cron").mkdir(parents=True, exist_ok=True)
(home / "sessions").mkdir(parents=True, exist_ok=True)
(home / "logs").mkdir(parents=True, exist_ok=True)
(home / "memories").mkdir(parents=True, exist_ok=True)
home.mkdir(parents=True, exist_ok=True)
_secure_dir(home)
for subdir in ("cron", "sessions", "logs", "memories"):
d = home / subdir
d.mkdir(parents=True, exist_ok=True)
_secure_dir(d)
# =============================================================================
@@ -872,6 +891,7 @@ def save_config(config: Dict[str, Any]):
normalized,
extra_content=_COMMENTED_SECTIONS if sections else None,
)
_secure_file(config_path)
def load_env() -> Dict[str, str]:

View File

@@ -0,0 +1,135 @@
"""Tests for file permissions hardening on sensitive files."""
import json
import os
import stat
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
class TestCronFilePermissions(unittest.TestCase):
"""Verify cron files get secure permissions."""
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.cron_dir = Path(self.tmpdir) / "cron"
self.output_dir = self.cron_dir / "output"
def tearDown(self):
import shutil
shutil.rmtree(self.tmpdir, ignore_errors=True)
@patch("cron.jobs.CRON_DIR")
@patch("cron.jobs.OUTPUT_DIR")
@patch("cron.jobs.JOBS_FILE")
def test_ensure_dirs_sets_0700(self, mock_jobs_file, mock_output, mock_cron):
mock_cron.__class__ = Path
# Use real paths
cron_dir = Path(self.tmpdir) / "cron"
output_dir = cron_dir / "output"
with patch("cron.jobs.CRON_DIR", cron_dir), \
patch("cron.jobs.OUTPUT_DIR", output_dir):
from cron.jobs import ensure_dirs
ensure_dirs()
cron_mode = stat.S_IMODE(os.stat(cron_dir).st_mode)
output_mode = stat.S_IMODE(os.stat(output_dir).st_mode)
self.assertEqual(cron_mode, 0o700)
self.assertEqual(output_mode, 0o700)
@patch("cron.jobs.CRON_DIR")
@patch("cron.jobs.OUTPUT_DIR")
@patch("cron.jobs.JOBS_FILE")
def test_save_jobs_sets_0600(self, mock_jobs_file, mock_output, mock_cron):
cron_dir = Path(self.tmpdir) / "cron"
output_dir = cron_dir / "output"
jobs_file = cron_dir / "jobs.json"
with patch("cron.jobs.CRON_DIR", cron_dir), \
patch("cron.jobs.OUTPUT_DIR", output_dir), \
patch("cron.jobs.JOBS_FILE", jobs_file):
from cron.jobs import save_jobs
save_jobs([{"id": "test", "prompt": "hello"}])
file_mode = stat.S_IMODE(os.stat(jobs_file).st_mode)
self.assertEqual(file_mode, 0o600)
def test_save_job_output_sets_0600(self):
output_dir = Path(self.tmpdir) / "output"
with patch("cron.jobs.OUTPUT_DIR", output_dir), \
patch("cron.jobs.CRON_DIR", Path(self.tmpdir)), \
patch("cron.jobs.ensure_dirs"):
output_dir.mkdir(parents=True, exist_ok=True)
from cron.jobs import save_job_output
output_file = save_job_output("test-job", "test output content")
file_mode = stat.S_IMODE(os.stat(output_file).st_mode)
self.assertEqual(file_mode, 0o600)
# Job output dir should also be 0700
job_dir = output_dir / "test-job"
dir_mode = stat.S_IMODE(os.stat(job_dir).st_mode)
self.assertEqual(dir_mode, 0o700)
class TestConfigFilePermissions(unittest.TestCase):
"""Verify config files get secure permissions."""
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
import shutil
shutil.rmtree(self.tmpdir, ignore_errors=True)
def test_save_config_sets_0600(self):
config_path = Path(self.tmpdir) / "config.yaml"
with patch("hermes_cli.config.get_config_path", return_value=config_path), \
patch("hermes_cli.config.ensure_hermes_home"):
from hermes_cli.config import save_config
save_config({"model": "test/model"})
file_mode = stat.S_IMODE(os.stat(config_path).st_mode)
self.assertEqual(file_mode, 0o600)
def test_save_env_value_sets_0600(self):
env_path = Path(self.tmpdir) / ".env"
with patch("hermes_cli.config.get_env_path", return_value=env_path), \
patch("hermes_cli.config.ensure_hermes_home"):
from hermes_cli.config import save_env_value
save_env_value("TEST_KEY", "test_value")
file_mode = stat.S_IMODE(os.stat(env_path).st_mode)
self.assertEqual(file_mode, 0o600)
def test_ensure_hermes_home_sets_0700(self):
home = Path(self.tmpdir) / ".hermes"
with patch("hermes_cli.config.get_hermes_home", return_value=home):
from hermes_cli.config import ensure_hermes_home
ensure_hermes_home()
home_mode = stat.S_IMODE(os.stat(home).st_mode)
self.assertEqual(home_mode, 0o700)
for subdir in ("cron", "sessions", "logs", "memories"):
subdir_mode = stat.S_IMODE(os.stat(home / subdir).st_mode)
self.assertEqual(subdir_mode, 0o700, f"{subdir} should be 0700")
class TestSecureHelpers(unittest.TestCase):
"""Test the _secure_file and _secure_dir helpers."""
def test_secure_file_nonexistent_no_error(self):
from cron.jobs import _secure_file
_secure_file(Path("/nonexistent/path/file.json")) # Should not raise
def test_secure_dir_nonexistent_no_error(self):
from cron.jobs import _secure_dir
_secure_dir(Path("/nonexistent/path")) # Should not raise
if __name__ == "__main__":
unittest.main()