diff --git a/batch_runner.py b/batch_runner.py index 49a91c6f69d..8a8093f8207 100644 --- a/batch_runner.py +++ b/batch_runner.py @@ -689,16 +689,12 @@ class BatchRunner: print("\n" + "=" * 70) print("🚀 Starting Batch Processing") print("=" * 70) - - # Load checkpoint - checkpoint_data = self._load_checkpoint() if resume else { - "run_name": self.run_name, - "completed_prompts": [], - "last_updated": None - } - - if resume and checkpoint_data.get("completed_prompts"): - print(f"📂 Resuming from checkpoint ({len(checkpoint_data['completed_prompts'])} prompts already completed)") + + # Always load checkpoint if it exists to skip completed indices + checkpoint_data = self._load_checkpoint() + + if checkpoint_data.get("completed_prompts"): + print(f"📂 Found existing checkpoint - skipping {len(checkpoint_data['completed_prompts'])} already completed prompts") completed_prompts_set = set(checkpoint_data.get("completed_prompts", [])) diff --git a/run_agent.py b/run_agent.py index f7da65d88c8..77cee8e101f 100644 --- a/run_agent.py +++ b/run_agent.py @@ -738,7 +738,6 @@ class AIAgent: if self.save_trajectories: # Use the client wrapper's format method if available to get the exact Hermes format if hasattr(self, 'client') and hasattr(self.client, 'format'): - raise ValueError("reached this point") formatted_messages = self.client.format(messages, self.tools, render_final=True) # We need to adapt this formatted list to the trajectory format expected by _save_trajectory @@ -803,12 +802,16 @@ class AIAgent: # Fallback to original saving method self._save_trajectory(messages, user_message, completed) - # Clean up VM for this task after conversation completes - try: - await asyncio.to_thread(cleanup_vm, effective_task_id) - except Exception as e: - if self.verbose_logging: - logging.warning(f"Failed to cleanup VM for task {effective_task_id}: {e}") + # Clean up VM for this task after conversation completes (fire-and-forget) + # Don't await this - let it run in the background so we don't block returning results + async def cleanup_task(): + try: + await asyncio.to_thread(cleanup_vm, effective_task_id) + except Exception as e: + if self.verbose_logging: + logging.warning(f"Failed to cleanup VM for task {effective_task_id}: {e}") + + asyncio.create_task(cleanup_task()) # Get profiling statistics for this conversation profiling_stats = get_profiler().get_statistics() diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index e4b4364268a..d5820d1169e 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -100,30 +100,17 @@ def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300): global _active_instances, _active_contexts, _last_activity current_time = time.time() - tasks_to_cleanup = [] + instances_to_cleanup = [] + # Find and extract instances to cleanup while holding lock with _instance_lock: # Find all VMs that have been inactive for too long for task_id, last_time in list(_last_activity.items()): if current_time - last_time > vm_lifetime_seconds: - tasks_to_cleanup.append(task_id) - - # Clean up the inactive VMs - for task_id in tasks_to_cleanup: - try: if task_id in _active_instances: - instance = _active_instances[task_id] - # Terminate the VM instance - if hasattr(instance, 'terminate'): - instance.terminate() - elif hasattr(instance, 'stop'): - instance.stop() - elif hasattr(instance, 'delete'): - instance.delete() - - # Remove from tracking dictionaries + instances_to_cleanup.append((task_id, _active_instances[task_id])) + # Remove from tracking dictionaries immediately del _active_instances[task_id] - print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}") if task_id in _active_contexts: del _active_contexts[task_id] @@ -131,8 +118,18 @@ def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300): if task_id in _last_activity: del _last_activity[task_id] - except Exception as e: - print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}") + # Terminate outside the lock so we don't block other operations + for task_id, instance in instances_to_cleanup: + try: + if hasattr(instance, 'terminate'): + instance.terminate() + elif hasattr(instance, 'stop'): + instance.stop() + elif hasattr(instance, 'delete'): + instance.delete() + print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}") + except Exception as e: + print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}") def _cleanup_thread_worker(): """ @@ -185,28 +182,30 @@ def cleanup_vm(task_id: str): """ global _active_instances, _active_contexts, _last_activity + # Extract instance from dict while holding lock, but don't terminate yet + instance_to_cleanup = None with _instance_lock: + if task_id in _active_instances: + instance_to_cleanup = _active_instances[task_id] + # Remove from tracking dictionaries immediately + del _active_instances[task_id] + + if task_id in _active_contexts: + del _active_contexts[task_id] + + if task_id in _last_activity: + del _last_activity[task_id] + + # Terminate outside the lock so multiple cleanups can run concurrently + if instance_to_cleanup: try: - if task_id in _active_instances: - instance = _active_instances[task_id] - # Terminate the VM instance - if hasattr(instance, 'terminate'): - instance.terminate() - elif hasattr(instance, 'stop'): - instance.stop() - elif hasattr(instance, 'delete'): - instance.delete() - - # Remove from tracking dictionaries - del _active_instances[task_id] - print(f"[VM Cleanup] Manually terminated VM for task: {task_id}") - - if task_id in _active_contexts: - del _active_contexts[task_id] - - if task_id in _last_activity: - del _last_activity[task_id] - + if hasattr(instance_to_cleanup, 'terminate'): + instance_to_cleanup.terminate() + elif hasattr(instance_to_cleanup, 'stop'): + instance_to_cleanup.stop() + elif hasattr(instance_to_cleanup, 'delete'): + instance_to_cleanup.delete() + print(f"[VM Cleanup] Manually terminated VM for task: {task_id}") except Exception as e: print(f"[VM Cleanup] Error manually cleaning up VM for task {task_id}: {e}")