mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-01-29 22:56:36 +08:00
## Summary Implement a flexible sandbox provider system supporting both self-managed (Docker) and SaaS (Aliyun Code Interpreter) backends for secure code execution in agent workflows. **Key Changes:** - ✅ Aliyun Code Interpreter provider using official `agentrun-sdk>=0.0.16` - ✅ Self-managed provider with gVisor (runsc) security - ✅ Arguments parameter support for dynamic code execution - ✅ Database-only configuration (removed fallback logic) - ✅ Configuration scripts for quick setup Issue #12479 ## Features ### 🔌 Provider Abstraction Layer **1. Self-Managed Provider** (`agent/sandbox/providers/self_managed.py`) - Wraps existing executor_manager HTTP API - gVisor (runsc) for secure container isolation - Configurable pool size, timeout, retry logic - Languages: Python, Node.js, JavaScript - ⚠️ **Requires**: gVisor installation, Docker, base images **2. Aliyun Code Interpreter** (`agent/sandbox/providers/aliyun_codeinterpreter.py`) - SaaS integration using official agentrun-sdk - Serverless microVM execution with auto-authentication - Hard timeout: 30 seconds max - Credentials: `AGENTRUN_ACCESS_KEY_ID`, `AGENTRUN_ACCESS_KEY_SECRET`, `AGENTRUN_ACCOUNT_ID`, `AGENTRUN_REGION` - Automatically wraps code to call `main()` function **3. E2B Provider** (`agent/sandbox/providers/e2b.py`) - Placeholder for future integration ### ⚙️ Configuration System - `conf/system_settings.json`: Default provider = `aliyun_codeinterpreter` - `agent/sandbox/client.py`: Enforces database-only configuration - Admin UI: `/admin/sandbox-settings` - Configuration validation via `validate_config()` method - Health checks for all providers ### 🎯 Key Capabilities **Arguments Parameter Support:** All providers support passing arguments to `main()` function: ```python # User code def main(name: str, count: int) -> dict: return {"message": f"Hello {name}!" * count} # Executed with: arguments={"name": "World", "count": 3} # Result: {"message": "Hello World!Hello World!Hello World!"} ``` **Self-Describing Providers:** Each provider implements `get_config_schema()` returning form configuration for Admin UI **Error Handling:** Structured `ExecutionResult` with stdout, stderr, exit_code, execution_time ## Configuration Scripts Two scripts for quick Aliyun sandbox setup: **Shell Script (requires jq):** ```bash source scripts/configure_aliyun_sandbox.sh ``` **Python Script (interactive):** ```bash python3 scripts/configure_aliyun_sandbox.py ``` ## Testing ```bash # Unit tests uv run pytest agent/sandbox/tests/test_providers.py -v # Aliyun provider tests uv run pytest agent/sandbox/tests/test_aliyun_codeinterpreter.py -v # Integration tests (requires credentials) uv run pytest agent/sandbox/tests/test_aliyun_codeinterpreter_integration.py -v # Quick SDK validation python3 agent/sandbox/tests/verify_sdk.py ``` **Test Coverage:** - 30 unit tests for provider abstraction - Provider-specific tests for Aliyun - Integration tests with real API - Security tests for executor_manager ## Documentation - `docs/develop/sandbox_spec.md` - Complete architecture specification - `agent/sandbox/tests/MIGRATION_GUIDE.md` - Migration from legacy sandbox - `agent/sandbox/tests/QUICKSTART.md` - Quick start guide - `agent/sandbox/tests/README.md` - Testing documentation ## Breaking Changes ⚠️ **Migration Required:** 1. **Directory Move**: `sandbox/` → `agent/sandbox/` - Update imports: `from sandbox.` → `from agent.sandbox.` 2. **Mandatory Configuration**: - SystemSettings must have `sandbox.provider_type` configured - Removed fallback default values - Configuration must exist in database (from `conf/system_settings.json`) 3. **Aliyun Credentials**: - Requires `AGENTRUN_*` environment variables (not `ALIYUN_*`) - `AGENTRUN_ACCOUNT_ID` is now required (Aliyun primary account ID) 4. **Self-Managed Provider**: - gVisor (runsc) must be installed for security - Install: `go install gvisor.dev/gvisor/runsc@latest` ## Database Schema Changes ```python # SystemSettings.value: CharField → TextField api/db/db_models.py: Changed for unlimited config length # SystemSettingsService.get_by_name(): Fixed query precision api/db/services/system_settings_service.py: startswith → exact match ``` ## Files Changed ### Backend (Python) - `agent/sandbox/providers/base.py` - SandboxProvider ABC interface - `agent/sandbox/providers/manager.py` - ProviderManager - `agent/sandbox/providers/self_managed.py` - Self-managed provider - `agent/sandbox/providers/aliyun_codeinterpreter.py` - Aliyun provider - `agent/sandbox/providers/e2b.py` - E2B provider (placeholder) - `agent/sandbox/client.py` - Unified client (enforces DB-only config) - `agent/tools/code_exec.py` - Updated to use provider system - `admin/server/services.py` - SandboxMgr with registry & validation - `admin/server/routes.py` - 5 sandbox API endpoints - `conf/system_settings.json` - Default: aliyun_codeinterpreter - `api/db/db_models.py` - TextField for SystemSettings.value - `api/db/services/system_settings_service.py` - Exact match query ### Frontend (TypeScript/React) - `web/src/pages/admin/sandbox-settings.tsx` - Settings UI - `web/src/services/admin-service.ts` - Sandbox service functions - `web/src/services/admin.service.d.ts` - Type definitions - `web/src/utils/api.ts` - Sandbox API endpoints ### Documentation - `docs/develop/sandbox_spec.md` - Architecture spec - `agent/sandbox/tests/MIGRATION_GUIDE.md` - Migration guide - `agent/sandbox/tests/QUICKSTART.md` - Quick start - `agent/sandbox/tests/README.md` - Testing guide ### Configuration Scripts - `scripts/configure_aliyun_sandbox.sh` - Shell script (jq) - `scripts/configure_aliyun_sandbox.py` - Python script ### Tests - `agent/sandbox/tests/test_providers.py` - 30 unit tests - `agent/sandbox/tests/test_aliyun_codeinterpreter.py` - Provider tests - `agent/sandbox/tests/test_aliyun_codeinterpreter_integration.py` - Integration tests - `agent/sandbox/tests/verify_sdk.py` - SDK validation ## Architecture ``` Admin UI → Admin API → SandboxMgr → ProviderManager → [SelfManaged|Aliyun|E2B] ↓ SystemSettings ``` ## Usage ### 1. Configure Provider **Via Admin UI:** 1. Navigate to `/admin/sandbox-settings` 2. Select provider (Aliyun Code Interpreter / Self-Managed) 3. Fill in configuration 4. Click "Test Connection" to verify 5. Click "Save" to apply **Via Configuration Scripts:** ```bash # Aliyun provider export AGENTRUN_ACCESS_KEY_ID="xxx" export AGENTRUN_ACCESS_KEY_SECRET="yyy" export AGENTRUN_ACCOUNT_ID="zzz" export AGENTRUN_REGION="cn-shanghai" source scripts/configure_aliyun_sandbox.sh ``` ### 2. Restart Service ```bash cd docker docker compose restart ragflow-server ``` ### 3. Execute Code in Agent ```python from agent.sandbox.client import execute_code result = execute_code( code='def main(name: str) -> dict: return {"message": f"Hello {name}!"}', language="python", timeout=30, arguments={"name": "World"} ) print(result.stdout) # {"message": "Hello World!"} ``` ## Troubleshooting ### "Container pool is busy" (Self-Managed) - **Cause**: Pool exhausted (default: 1 container in `.env`) - **Fix**: Increase `SANDBOX_EXECUTOR_MANAGER_POOL_SIZE` to 5+ ### "Sandbox provider type not configured" - **Cause**: Database missing configuration - **Fix**: Run config script or set via Admin UI ### "gVisor not found" - **Cause**: runsc not installed - **Fix**: `go install gvisor.dev/gvisor/runsc@latest && sudo cp ~/go/bin/runsc /usr/local/bin/` ### Aliyun authentication errors - **Cause**: Wrong environment variable names - **Fix**: Use `AGENTRUN_*` prefix (not `ALIYUN_*`) ## Checklist - [x] All tests passing (30 unit tests + integration tests) - [x] Documentation updated (spec, migration guide, quickstart) - [x] Type definitions added (TypeScript) - [x] Admin UI implemented - [x] Configuration validation - [x] Health checks implemented - [x] Error handling with structured results - [x] Breaking changes documented - [x] Configuration scripts created - [x] gVisor requirements documented Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
360 lines
12 KiB
Python
360 lines
12 KiB
Python
#
|
|
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
"""
|
|
Self-managed sandbox provider implementation.
|
|
|
|
This provider wraps the existing executor_manager HTTP API which manages
|
|
a pool of Docker containers with gVisor for secure code execution.
|
|
"""
|
|
|
|
import base64
|
|
import time
|
|
import uuid
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
import requests
|
|
|
|
from .base import SandboxProvider, SandboxInstance, ExecutionResult
|
|
|
|
|
|
class SelfManagedProvider(SandboxProvider):
|
|
"""
|
|
Self-managed sandbox provider using Daytona/Docker.
|
|
|
|
This provider communicates with the executor_manager HTTP API
|
|
which manages a pool of containers for code execution.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.endpoint: str = "http://localhost:9385"
|
|
self.timeout: int = 30
|
|
self.max_retries: int = 3
|
|
self.pool_size: int = 10
|
|
self._initialized: bool = False
|
|
|
|
def initialize(self, config: Dict[str, Any]) -> bool:
|
|
"""
|
|
Initialize the provider with configuration.
|
|
|
|
Args:
|
|
config: Configuration dictionary with keys:
|
|
- endpoint: HTTP endpoint (default: "http://localhost:9385")
|
|
- timeout: Request timeout in seconds (default: 30)
|
|
- max_retries: Maximum retry attempts (default: 3)
|
|
- pool_size: Container pool size for info (default: 10)
|
|
|
|
Returns:
|
|
True if initialization successful, False otherwise
|
|
"""
|
|
self.endpoint = config.get("endpoint", "http://localhost:9385")
|
|
self.timeout = config.get("timeout", 30)
|
|
self.max_retries = config.get("max_retries", 3)
|
|
self.pool_size = config.get("pool_size", 10)
|
|
|
|
# Validate endpoint is accessible
|
|
if not self.health_check():
|
|
# Try to fall back to SANDBOX_HOST from settings if we are using localhost
|
|
if "localhost" in self.endpoint or "127.0.0.1" in self.endpoint:
|
|
try:
|
|
from api import settings
|
|
if settings.SANDBOX_HOST and settings.SANDBOX_HOST not in self.endpoint:
|
|
original_endpoint = self.endpoint
|
|
self.endpoint = f"http://{settings.SANDBOX_HOST}:9385"
|
|
if self.health_check():
|
|
import logging
|
|
logging.warning(f"Sandbox self_managed: Connected using settings.SANDBOX_HOST fallback: {self.endpoint} (original: {original_endpoint})")
|
|
self._initialized = True
|
|
return True
|
|
else:
|
|
self.endpoint = original_endpoint # Restore if fallback also fails
|
|
except ImportError:
|
|
pass
|
|
|
|
return False
|
|
|
|
self._initialized = True
|
|
return True
|
|
|
|
def create_instance(self, template: str = "python") -> SandboxInstance:
|
|
"""
|
|
Create a new sandbox instance.
|
|
|
|
Note: For self-managed provider, instances are managed internally
|
|
by the executor_manager's container pool. This method returns
|
|
a logical instance handle.
|
|
|
|
Args:
|
|
template: Programming language (python, nodejs)
|
|
|
|
Returns:
|
|
SandboxInstance object
|
|
|
|
Raises:
|
|
RuntimeError: If instance creation fails
|
|
"""
|
|
if not self._initialized:
|
|
raise RuntimeError("Provider not initialized. Call initialize() first.")
|
|
|
|
# Normalize language
|
|
language = self._normalize_language(template)
|
|
|
|
# The executor_manager manages instances internally via container pool
|
|
# We create a logical instance ID for tracking
|
|
instance_id = str(uuid.uuid4())
|
|
|
|
return SandboxInstance(
|
|
instance_id=instance_id,
|
|
provider="self_managed",
|
|
status="running",
|
|
metadata={
|
|
"language": language,
|
|
"endpoint": self.endpoint,
|
|
"pool_size": self.pool_size,
|
|
}
|
|
)
|
|
|
|
def execute_code(
|
|
self,
|
|
instance_id: str,
|
|
code: str,
|
|
language: str,
|
|
timeout: int = 10,
|
|
arguments: Optional[Dict[str, Any]] = None
|
|
) -> ExecutionResult:
|
|
"""
|
|
Execute code in the sandbox.
|
|
|
|
Args:
|
|
instance_id: ID of the sandbox instance (not used for self-managed)
|
|
code: Source code to execute
|
|
language: Programming language (python, nodejs, javascript)
|
|
timeout: Maximum execution time in seconds
|
|
arguments: Optional arguments dict to pass to main() function
|
|
|
|
Returns:
|
|
ExecutionResult containing stdout, stderr, exit_code, and metadata
|
|
|
|
Raises:
|
|
RuntimeError: If execution fails
|
|
TimeoutError: If execution exceeds timeout
|
|
"""
|
|
if not self._initialized:
|
|
raise RuntimeError("Provider not initialized. Call initialize() first.")
|
|
|
|
# Normalize language
|
|
normalized_lang = self._normalize_language(language)
|
|
|
|
# Prepare request
|
|
code_b64 = base64.b64encode(code.encode("utf-8")).decode("utf-8")
|
|
payload = {
|
|
"code_b64": code_b64,
|
|
"language": normalized_lang,
|
|
"arguments": arguments or {}
|
|
}
|
|
|
|
url = f"{self.endpoint}/run"
|
|
exec_timeout = timeout or self.timeout
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
response = requests.post(
|
|
url,
|
|
json=payload,
|
|
timeout=exec_timeout,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
if response.status_code != 200:
|
|
raise RuntimeError(
|
|
f"HTTP {response.status_code}: {response.text}"
|
|
)
|
|
|
|
result = response.json()
|
|
|
|
return ExecutionResult(
|
|
stdout=result.get("stdout", ""),
|
|
stderr=result.get("stderr", ""),
|
|
exit_code=result.get("exit_code", 0),
|
|
execution_time=execution_time,
|
|
metadata={
|
|
"status": result.get("status"),
|
|
"time_used_ms": result.get("time_used_ms"),
|
|
"memory_used_kb": result.get("memory_used_kb"),
|
|
"detail": result.get("detail"),
|
|
"instance_id": instance_id,
|
|
}
|
|
)
|
|
|
|
except requests.Timeout:
|
|
execution_time = time.time() - start_time
|
|
raise TimeoutError(
|
|
f"Execution timed out after {exec_timeout} seconds"
|
|
)
|
|
|
|
except requests.RequestException as e:
|
|
raise RuntimeError(f"HTTP request failed: {str(e)}")
|
|
|
|
def destroy_instance(self, instance_id: str) -> bool:
|
|
"""
|
|
Destroy a sandbox instance.
|
|
|
|
Note: For self-managed provider, instances are returned to the
|
|
internal pool automatically by executor_manager after execution.
|
|
This is a no-op for tracking purposes.
|
|
|
|
Args:
|
|
instance_id: ID of the instance to destroy
|
|
|
|
Returns:
|
|
True (always succeeds for self-managed)
|
|
"""
|
|
# The executor_manager manages container lifecycle internally
|
|
# Container is returned to pool after execution
|
|
return True
|
|
|
|
def health_check(self) -> bool:
|
|
"""
|
|
Check if the provider is healthy and accessible.
|
|
|
|
Returns:
|
|
True if provider is healthy, False otherwise
|
|
"""
|
|
try:
|
|
url = f"{self.endpoint}/healthz"
|
|
response = requests.get(url, timeout=5)
|
|
return response.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
def get_supported_languages(self) -> List[str]:
|
|
"""
|
|
Get list of supported programming languages.
|
|
|
|
Returns:
|
|
List of language identifiers
|
|
"""
|
|
return ["python", "nodejs", "javascript"]
|
|
|
|
@staticmethod
|
|
def get_config_schema() -> Dict[str, Dict]:
|
|
"""
|
|
Return configuration schema for self-managed provider.
|
|
|
|
Returns:
|
|
Dictionary mapping field names to their schema definitions
|
|
"""
|
|
return {
|
|
"endpoint": {
|
|
"type": "string",
|
|
"required": True,
|
|
"label": "Executor Manager Endpoint",
|
|
"placeholder": "http://localhost:9385",
|
|
"default": "http://localhost:9385",
|
|
"description": "HTTP endpoint of the executor_manager service"
|
|
},
|
|
"timeout": {
|
|
"type": "integer",
|
|
"required": False,
|
|
"label": "Request Timeout (seconds)",
|
|
"default": 30,
|
|
"min": 5,
|
|
"max": 300,
|
|
"description": "HTTP request timeout for code execution"
|
|
},
|
|
"max_retries": {
|
|
"type": "integer",
|
|
"required": False,
|
|
"label": "Max Retries",
|
|
"default": 3,
|
|
"min": 0,
|
|
"max": 10,
|
|
"description": "Maximum number of retry attempts for failed requests"
|
|
},
|
|
"pool_size": {
|
|
"type": "integer",
|
|
"required": False,
|
|
"label": "Container Pool Size",
|
|
"default": 10,
|
|
"min": 1,
|
|
"max": 100,
|
|
"description": "Size of the container pool (configured in executor_manager)"
|
|
}
|
|
}
|
|
|
|
def _normalize_language(self, language: str) -> str:
|
|
"""
|
|
Normalize language identifier to executor_manager format.
|
|
|
|
Args:
|
|
language: Language identifier (python, python3, nodejs, javascript)
|
|
|
|
Returns:
|
|
Normalized language identifier
|
|
"""
|
|
if not language:
|
|
return "python"
|
|
|
|
lang_lower = language.lower()
|
|
if lang_lower in ("python", "python3"):
|
|
return "python"
|
|
elif lang_lower in ("javascript", "nodejs"):
|
|
return "nodejs"
|
|
else:
|
|
return language
|
|
|
|
def validate_config(self, config: dict) -> tuple[bool, Optional[str]]:
|
|
"""
|
|
Validate self-managed provider configuration.
|
|
|
|
Performs custom validation beyond the basic schema validation,
|
|
such as checking URL format.
|
|
|
|
Args:
|
|
config: Configuration dictionary to validate
|
|
|
|
Returns:
|
|
Tuple of (is_valid, error_message)
|
|
"""
|
|
# Validate endpoint URL format
|
|
endpoint = config.get("endpoint", "")
|
|
if endpoint:
|
|
# Check if it's a valid HTTP/HTTPS URL or localhost
|
|
import re
|
|
url_pattern = r'^(https?://|http://localhost|http://[\d\.]+:[a-z]+:[/]|http://[\w\.]+:)'
|
|
if not re.match(url_pattern, endpoint):
|
|
return False, f"Invalid endpoint format: {endpoint}. Must start with http:// or https://"
|
|
|
|
# Validate pool_size is positive
|
|
pool_size = config.get("pool_size", 10)
|
|
if isinstance(pool_size, int) and pool_size <= 0:
|
|
return False, "Pool size must be greater than 0"
|
|
|
|
# Validate timeout is reasonable
|
|
timeout = config.get("timeout", 30)
|
|
if isinstance(timeout, int) and (timeout < 1 or timeout > 600):
|
|
return False, "Timeout must be between 1 and 600 seconds"
|
|
|
|
# Validate max_retries
|
|
max_retries = config.get("max_retries", 3)
|
|
if isinstance(max_retries, int) and (max_retries < 0 or max_retries > 10):
|
|
return False, "Max retries must be between 0 and 10"
|
|
|
|
return True, None
|