mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-01 08:21:50 +08:00
Compare commits
1 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b1898911ff |
@@ -295,6 +295,16 @@ def auth_remove_command(args) -> None:
|
|||||||
raise SystemExit(f'No credential matching "{target}" for provider {provider}.')
|
raise SystemExit(f'No credential matching "{target}" for provider {provider}.')
|
||||||
print(f"Removed {provider} credential #{index} ({removed.label})")
|
print(f"Removed {provider} credential #{index} ({removed.label})")
|
||||||
|
|
||||||
|
# If this was an env-seeded credential, also clear the env var from .env
|
||||||
|
# so it doesn't get re-seeded on the next load_pool() call.
|
||||||
|
if removed.source.startswith("env:"):
|
||||||
|
env_var = removed.source[len("env:"):]
|
||||||
|
if env_var:
|
||||||
|
from hermes_cli.config import remove_env_value
|
||||||
|
cleared = remove_env_value(env_var)
|
||||||
|
if cleared:
|
||||||
|
print(f"Cleared {env_var} from .env")
|
||||||
|
|
||||||
|
|
||||||
def auth_reset_command(args) -> None:
|
def auth_reset_command(args) -> None:
|
||||||
provider = _normalize_provider(getattr(args, "provider", ""))
|
provider = _normalize_provider(getattr(args, "provider", ""))
|
||||||
|
|||||||
@@ -1900,6 +1900,51 @@ def save_env_value(key: str, value: str):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def remove_env_value(key: str) -> bool:
|
||||||
|
"""Remove a key from ~/.hermes/.env and os.environ.
|
||||||
|
|
||||||
|
Returns True if the key was found and removed, False otherwise.
|
||||||
|
"""
|
||||||
|
if is_managed():
|
||||||
|
managed_error(f"remove {key}")
|
||||||
|
return False
|
||||||
|
if not _ENV_VAR_NAME_RE.match(key):
|
||||||
|
raise ValueError(f"Invalid environment variable name: {key!r}")
|
||||||
|
env_path = get_env_path()
|
||||||
|
if not env_path.exists():
|
||||||
|
os.environ.pop(key, None)
|
||||||
|
return False
|
||||||
|
|
||||||
|
read_kw = {"encoding": "utf-8", "errors": "replace"} if _IS_WINDOWS else {}
|
||||||
|
write_kw = {"encoding": "utf-8"} if _IS_WINDOWS else {}
|
||||||
|
|
||||||
|
with open(env_path, **read_kw) as f:
|
||||||
|
lines = f.readlines()
|
||||||
|
lines = _sanitize_env_lines(lines)
|
||||||
|
|
||||||
|
new_lines = [line for line in lines if not line.strip().startswith(f"{key}=")]
|
||||||
|
found = len(new_lines) < len(lines)
|
||||||
|
|
||||||
|
if found:
|
||||||
|
fd, tmp_path = tempfile.mkstemp(dir=str(env_path.parent), suffix='.tmp', prefix='.env_')
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, 'w', **write_kw) as f:
|
||||||
|
f.writelines(new_lines)
|
||||||
|
f.flush()
|
||||||
|
os.fsync(f.fileno())
|
||||||
|
os.replace(tmp_path, env_path)
|
||||||
|
except BaseException:
|
||||||
|
try:
|
||||||
|
os.unlink(tmp_path)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
raise
|
||||||
|
_secure_file(env_path)
|
||||||
|
|
||||||
|
os.environ.pop(key, None)
|
||||||
|
return found
|
||||||
|
|
||||||
|
|
||||||
def save_anthropic_oauth_token(value: str, save_fn=None):
|
def save_anthropic_oauth_token(value: str, save_fn=None):
|
||||||
"""Persist an Anthropic OAuth/setup token and clear the API-key slot."""
|
"""Persist an Anthropic OAuth/setup token and clear the API-key slot."""
|
||||||
writer = save_fn or save_env_value
|
writer = save_fn or save_env_value
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ from hermes_cli.config import (
|
|||||||
load_config,
|
load_config,
|
||||||
load_env,
|
load_env,
|
||||||
migrate_config,
|
migrate_config,
|
||||||
|
remove_env_value,
|
||||||
save_config,
|
save_config,
|
||||||
save_env_value,
|
save_env_value,
|
||||||
save_env_value_secure,
|
save_env_value_secure,
|
||||||
@@ -149,6 +150,49 @@ class TestSaveEnvValueSecure:
|
|||||||
assert env_mode == 0o600
|
assert env_mode == 0o600
|
||||||
|
|
||||||
|
|
||||||
|
class TestRemoveEnvValue:
|
||||||
|
def test_removes_key_from_env_file(self, tmp_path):
|
||||||
|
env_path = tmp_path / ".env"
|
||||||
|
env_path.write_text("KEY_A=value_a\nKEY_B=value_b\nKEY_C=value_c\n")
|
||||||
|
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path), "KEY_B": "value_b"}):
|
||||||
|
result = remove_env_value("KEY_B")
|
||||||
|
assert result is True
|
||||||
|
content = env_path.read_text()
|
||||||
|
assert "KEY_B" not in content
|
||||||
|
assert "KEY_A=value_a" in content
|
||||||
|
assert "KEY_C=value_c" in content
|
||||||
|
|
||||||
|
def test_clears_os_environ(self, tmp_path):
|
||||||
|
env_path = tmp_path / ".env"
|
||||||
|
env_path.write_text("MY_KEY=my_value\n")
|
||||||
|
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path), "MY_KEY": "my_value"}):
|
||||||
|
remove_env_value("MY_KEY")
|
||||||
|
assert "MY_KEY" not in os.environ
|
||||||
|
|
||||||
|
def test_returns_false_when_key_not_found(self, tmp_path):
|
||||||
|
env_path = tmp_path / ".env"
|
||||||
|
env_path.write_text("OTHER_KEY=value\n")
|
||||||
|
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path)}):
|
||||||
|
result = remove_env_value("MISSING_KEY")
|
||||||
|
assert result is False
|
||||||
|
# File should be untouched
|
||||||
|
assert env_path.read_text() == "OTHER_KEY=value\n"
|
||||||
|
|
||||||
|
def test_handles_missing_env_file(self, tmp_path):
|
||||||
|
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path), "GHOST_KEY": "ghost"}):
|
||||||
|
result = remove_env_value("GHOST_KEY")
|
||||||
|
assert result is False
|
||||||
|
# os.environ should still be cleared
|
||||||
|
assert "GHOST_KEY" not in os.environ
|
||||||
|
|
||||||
|
def test_clears_os_environ_even_when_not_in_file(self, tmp_path):
|
||||||
|
env_path = tmp_path / ".env"
|
||||||
|
env_path.write_text("OTHER=stuff\n")
|
||||||
|
with patch.dict(os.environ, {"HERMES_HOME": str(tmp_path), "ORPHAN_KEY": "orphan"}):
|
||||||
|
remove_env_value("ORPHAN_KEY")
|
||||||
|
assert "ORPHAN_KEY" not in os.environ
|
||||||
|
|
||||||
|
|
||||||
class TestSaveConfigAtomicity:
|
class TestSaveConfigAtomicity:
|
||||||
"""Verify save_config uses atomic writes (tempfile + os.replace)."""
|
"""Verify save_config uses atomic writes (tempfile + os.replace)."""
|
||||||
|
|
||||||
|
|||||||
@@ -521,3 +521,139 @@ def test_auth_list_prefers_explicit_reset_time(monkeypatch, capsys):
|
|||||||
out = capsys.readouterr().out
|
out = capsys.readouterr().out
|
||||||
assert "device_code_exhausted" in out
|
assert "device_code_exhausted" in out
|
||||||
assert "7d 0h left" in out
|
assert "7d 0h left" in out
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_remove_env_seeded_clears_env_var(tmp_path, monkeypatch):
|
||||||
|
"""Removing an env-seeded credential should also clear the env var from .env
|
||||||
|
so the entry doesn't get re-seeded on the next load_pool() call."""
|
||||||
|
hermes_home = tmp_path / "hermes"
|
||||||
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||||
|
|
||||||
|
# Write a .env with an OpenRouter key
|
||||||
|
env_path = hermes_home / ".env"
|
||||||
|
env_path.write_text("OPENROUTER_API_KEY=sk-or-test-key-12345\nOTHER_KEY=keep-me\n")
|
||||||
|
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-test-key-12345")
|
||||||
|
|
||||||
|
# Seed the pool with the env entry
|
||||||
|
_write_auth_store(
|
||||||
|
tmp_path,
|
||||||
|
{
|
||||||
|
"version": 1,
|
||||||
|
"credential_pool": {
|
||||||
|
"openrouter": [
|
||||||
|
{
|
||||||
|
"id": "env-1",
|
||||||
|
"label": "OPENROUTER_API_KEY",
|
||||||
|
"auth_type": "api_key",
|
||||||
|
"priority": 0,
|
||||||
|
"source": "env:OPENROUTER_API_KEY",
|
||||||
|
"access_token": "sk-or-test-key-12345",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
from hermes_cli.auth_commands import auth_remove_command
|
||||||
|
|
||||||
|
class _Args:
|
||||||
|
provider = "openrouter"
|
||||||
|
target = "1"
|
||||||
|
|
||||||
|
auth_remove_command(_Args())
|
||||||
|
|
||||||
|
# Env var should be cleared from os.environ
|
||||||
|
import os
|
||||||
|
assert os.environ.get("OPENROUTER_API_KEY") is None
|
||||||
|
|
||||||
|
# Env var should be removed from .env file
|
||||||
|
env_content = env_path.read_text()
|
||||||
|
assert "OPENROUTER_API_KEY" not in env_content
|
||||||
|
# Other keys should still be there
|
||||||
|
assert "OTHER_KEY=keep-me" in env_content
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_remove_env_seeded_does_not_resurrect(tmp_path, monkeypatch):
|
||||||
|
"""After removing an env-seeded credential, load_pool should NOT re-create it."""
|
||||||
|
hermes_home = tmp_path / "hermes"
|
||||||
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||||
|
|
||||||
|
# Write .env with an OpenRouter key
|
||||||
|
env_path = hermes_home / ".env"
|
||||||
|
env_path.write_text("OPENROUTER_API_KEY=sk-or-test-key-12345\n")
|
||||||
|
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-test-key-12345")
|
||||||
|
|
||||||
|
_write_auth_store(
|
||||||
|
tmp_path,
|
||||||
|
{
|
||||||
|
"version": 1,
|
||||||
|
"credential_pool": {
|
||||||
|
"openrouter": [
|
||||||
|
{
|
||||||
|
"id": "env-1",
|
||||||
|
"label": "OPENROUTER_API_KEY",
|
||||||
|
"auth_type": "api_key",
|
||||||
|
"priority": 0,
|
||||||
|
"source": "env:OPENROUTER_API_KEY",
|
||||||
|
"access_token": "sk-or-test-key-12345",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
from hermes_cli.auth_commands import auth_remove_command
|
||||||
|
|
||||||
|
class _Args:
|
||||||
|
provider = "openrouter"
|
||||||
|
target = "1"
|
||||||
|
|
||||||
|
auth_remove_command(_Args())
|
||||||
|
|
||||||
|
# Now reload the pool — the entry should NOT come back
|
||||||
|
from agent.credential_pool import load_pool
|
||||||
|
pool = load_pool("openrouter")
|
||||||
|
assert not pool.has_credentials()
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_remove_manual_entry_does_not_touch_env(tmp_path, monkeypatch):
|
||||||
|
"""Removing a manually-added credential should NOT touch .env."""
|
||||||
|
hermes_home = tmp_path / "hermes"
|
||||||
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||||
|
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
|
||||||
|
|
||||||
|
env_path = hermes_home / ".env"
|
||||||
|
env_path.write_text("SOME_KEY=some-value\n")
|
||||||
|
|
||||||
|
_write_auth_store(
|
||||||
|
tmp_path,
|
||||||
|
{
|
||||||
|
"version": 1,
|
||||||
|
"credential_pool": {
|
||||||
|
"openrouter": [
|
||||||
|
{
|
||||||
|
"id": "manual-1",
|
||||||
|
"label": "my-key",
|
||||||
|
"auth_type": "api_key",
|
||||||
|
"priority": 0,
|
||||||
|
"source": "manual",
|
||||||
|
"access_token": "sk-or-manual-key",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
from hermes_cli.auth_commands import auth_remove_command
|
||||||
|
|
||||||
|
class _Args:
|
||||||
|
provider = "openrouter"
|
||||||
|
target = "1"
|
||||||
|
|
||||||
|
auth_remove_command(_Args())
|
||||||
|
|
||||||
|
# .env should be untouched
|
||||||
|
assert env_path.read_text() == "SOME_KEY=some-value\n"
|
||||||
|
|||||||
Reference in New Issue
Block a user