Files
ragflow/agent/sandbox/tests/sandbox_security_tests_full.py
Zhichang Yu fd11aca8e5 feat: Implement pluggable multi-provider sandbox architecture (#12820)
## Summary

Implement a flexible sandbox provider system supporting both
self-managed (Docker) and SaaS (Aliyun Code Interpreter) backends for
secure code execution in agent workflows.

**Key Changes:**
-  Aliyun Code Interpreter provider using official
`agentrun-sdk>=0.0.16`
-  Self-managed provider with gVisor (runsc) security
-  Arguments parameter support for dynamic code execution
-  Database-only configuration (removed fallback logic)
-  Configuration scripts for quick setup

Issue #12479

## Features

### 🔌 Provider Abstraction Layer

**1. Self-Managed Provider** (`agent/sandbox/providers/self_managed.py`)
- Wraps existing executor_manager HTTP API
- gVisor (runsc) for secure container isolation
- Configurable pool size, timeout, retry logic
- Languages: Python, Node.js, JavaScript
- ⚠️ **Requires**: gVisor installation, Docker, base images

**2. Aliyun Code Interpreter**
(`agent/sandbox/providers/aliyun_codeinterpreter.py`)
- SaaS integration using official agentrun-sdk
- Serverless microVM execution with auto-authentication
- Hard timeout: 30 seconds max
- Credentials: `AGENTRUN_ACCESS_KEY_ID`, `AGENTRUN_ACCESS_KEY_SECRET`,
`AGENTRUN_ACCOUNT_ID`, `AGENTRUN_REGION`
- Automatically wraps code to call `main()` function

**3. E2B Provider** (`agent/sandbox/providers/e2b.py`)
- Placeholder for future integration

### ⚙️ Configuration System

- `conf/system_settings.json`: Default provider =
`aliyun_codeinterpreter`
- `agent/sandbox/client.py`: Enforces database-only configuration
- Admin UI: `/admin/sandbox-settings`
- Configuration validation via `validate_config()` method
- Health checks for all providers

### 🎯 Key Capabilities

**Arguments Parameter Support:**
All providers support passing arguments to `main()` function:
```python
# User code
def main(name: str, count: int) -> dict:
    return {"message": f"Hello {name}!" * count}

# Executed with: arguments={"name": "World", "count": 3}
# Result: {"message": "Hello World!Hello World!Hello World!"}
```

**Self-Describing Providers:**
Each provider implements `get_config_schema()` returning form
configuration for Admin UI

**Error Handling:**
Structured `ExecutionResult` with stdout, stderr, exit_code,
execution_time

## Configuration Scripts

Two scripts for quick Aliyun sandbox setup:

**Shell Script (requires jq):**
```bash
source scripts/configure_aliyun_sandbox.sh
```

**Python Script (interactive):**
```bash
python3 scripts/configure_aliyun_sandbox.py
```

## Testing

```bash
# Unit tests
uv run pytest agent/sandbox/tests/test_providers.py -v

# Aliyun provider tests
uv run pytest agent/sandbox/tests/test_aliyun_codeinterpreter.py -v

# Integration tests (requires credentials)
uv run pytest agent/sandbox/tests/test_aliyun_codeinterpreter_integration.py -v

# Quick SDK validation
python3 agent/sandbox/tests/verify_sdk.py
```

**Test Coverage:**
- 30 unit tests for provider abstraction
- Provider-specific tests for Aliyun
- Integration tests with real API
- Security tests for executor_manager

## Documentation

- `docs/develop/sandbox_spec.md` - Complete architecture specification
- `agent/sandbox/tests/MIGRATION_GUIDE.md` - Migration from legacy
sandbox
- `agent/sandbox/tests/QUICKSTART.md` - Quick start guide
- `agent/sandbox/tests/README.md` - Testing documentation

## Breaking Changes

⚠️ **Migration Required:**

1. **Directory Move**: `sandbox/` → `agent/sandbox/`
   - Update imports: `from sandbox.` → `from agent.sandbox.`

2. **Mandatory Configuration**: 
   - SystemSettings must have `sandbox.provider_type` configured
   - Removed fallback default values
- Configuration must exist in database (from
`conf/system_settings.json`)

3. **Aliyun Credentials**:
   - Requires `AGENTRUN_*` environment variables (not `ALIYUN_*`)
   - `AGENTRUN_ACCOUNT_ID` is now required (Aliyun primary account ID)

4. **Self-Managed Provider**:
   - gVisor (runsc) must be installed for security
   - Install: `go install gvisor.dev/gvisor/runsc@latest`

## Database Schema Changes

```python
# SystemSettings.value: CharField → TextField
api/db/db_models.py: Changed for unlimited config length

# SystemSettingsService.get_by_name(): Fixed query precision
api/db/services/system_settings_service.py: startswith → exact match
```

## Files Changed

### Backend (Python)
- `agent/sandbox/providers/base.py` - SandboxProvider ABC interface
- `agent/sandbox/providers/manager.py` - ProviderManager
- `agent/sandbox/providers/self_managed.py` - Self-managed provider
- `agent/sandbox/providers/aliyun_codeinterpreter.py` - Aliyun provider
- `agent/sandbox/providers/e2b.py` - E2B provider (placeholder)
- `agent/sandbox/client.py` - Unified client (enforces DB-only config)
- `agent/tools/code_exec.py` - Updated to use provider system
- `admin/server/services.py` - SandboxMgr with registry & validation
- `admin/server/routes.py` - 5 sandbox API endpoints
- `conf/system_settings.json` - Default: aliyun_codeinterpreter
- `api/db/db_models.py` - TextField for SystemSettings.value
- `api/db/services/system_settings_service.py` - Exact match query

### Frontend (TypeScript/React)
- `web/src/pages/admin/sandbox-settings.tsx` - Settings UI
- `web/src/services/admin-service.ts` - Sandbox service functions
- `web/src/services/admin.service.d.ts` - Type definitions
- `web/src/utils/api.ts` - Sandbox API endpoints

### Documentation
- `docs/develop/sandbox_spec.md` - Architecture spec
- `agent/sandbox/tests/MIGRATION_GUIDE.md` - Migration guide
- `agent/sandbox/tests/QUICKSTART.md` - Quick start
- `agent/sandbox/tests/README.md` - Testing guide

### Configuration Scripts
- `scripts/configure_aliyun_sandbox.sh` - Shell script (jq)
- `scripts/configure_aliyun_sandbox.py` - Python script

### Tests
- `agent/sandbox/tests/test_providers.py` - 30 unit tests
- `agent/sandbox/tests/test_aliyun_codeinterpreter.py` - Provider tests
- `agent/sandbox/tests/test_aliyun_codeinterpreter_integration.py` -
Integration tests
- `agent/sandbox/tests/verify_sdk.py` - SDK validation

## Architecture

```
Admin UI → Admin API → SandboxMgr → ProviderManager → [SelfManaged|Aliyun|E2B]
                                      ↓
                                  SystemSettings
```

## Usage

### 1. Configure Provider

**Via Admin UI:**
1. Navigate to `/admin/sandbox-settings`
2. Select provider (Aliyun Code Interpreter / Self-Managed)
3. Fill in configuration
4. Click "Test Connection" to verify
5. Click "Save" to apply

**Via Configuration Scripts:**
```bash
# Aliyun provider
export AGENTRUN_ACCESS_KEY_ID="xxx"
export AGENTRUN_ACCESS_KEY_SECRET="yyy"
export AGENTRUN_ACCOUNT_ID="zzz"
export AGENTRUN_REGION="cn-shanghai"
source scripts/configure_aliyun_sandbox.sh
```

### 2. Restart Service

```bash
cd docker
docker compose restart ragflow-server
```

### 3. Execute Code in Agent

```python
from agent.sandbox.client import execute_code

result = execute_code(
    code='def main(name: str) -> dict: return {"message": f"Hello {name}!"}',
    language="python",
    timeout=30,
    arguments={"name": "World"}
)

print(result.stdout)  # {"message": "Hello World!"}
```

## Troubleshooting

### "Container pool is busy" (Self-Managed)
- **Cause**: Pool exhausted (default: 1 container in `.env`)
- **Fix**: Increase `SANDBOX_EXECUTOR_MANAGER_POOL_SIZE` to 5+

### "Sandbox provider type not configured"
- **Cause**: Database missing configuration
- **Fix**: Run config script or set via Admin UI

### "gVisor not found"
- **Cause**: runsc not installed
- **Fix**: `go install gvisor.dev/gvisor/runsc@latest && sudo cp
~/go/bin/runsc /usr/local/bin/`

### Aliyun authentication errors
- **Cause**: Wrong environment variable names
- **Fix**: Use `AGENTRUN_*` prefix (not `ALIYUN_*`)

## Checklist

- [x] All tests passing (30 unit tests + integration tests)
- [x] Documentation updated (spec, migration guide, quickstart)
- [x] Type definitions added (TypeScript)
- [x] Admin UI implemented
- [x] Configuration validation
- [x] Health checks implemented
- [x] Error handling with structured results
- [x] Breaking changes documented
- [x] Configuration scripts created
- [x] gVisor requirements documented

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-28 13:28:21 +08:00

437 lines
13 KiB
Python

#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import os
import textwrap
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from enum import Enum
from typing import Dict, Optional
import requests
from pydantic import BaseModel
API_URL = os.getenv("SANDBOX_API_URL", "http://localhost:9385/run")
TIMEOUT = 15
MAX_WORKERS = 5
class ResultStatus(str, Enum):
SUCCESS = "success"
PROGRAM_ERROR = "program_error"
RESOURCE_LIMIT_EXCEEDED = "resource_limit_exceeded"
UNAUTHORIZED_ACCESS = "unauthorized_access"
RUNTIME_ERROR = "runtime_error"
PROGRAM_RUNNER_ERROR = "program_runner_error"
class ResourceLimitType(str, Enum):
TIME = "time"
MEMORY = "memory"
OUTPUT = "output"
class UnauthorizedAccessType(str, Enum):
DISALLOWED_SYSCALL = "disallowed_syscall"
FILE_ACCESS = "file_access"
NETWORK_ACCESS = "network_access"
class RuntimeErrorType(str, Enum):
SIGNALLED = "signalled"
NONZERO_EXIT = "nonzero_exit"
class ExecutionResult(BaseModel):
status: ResultStatus
stdout: str
stderr: str
exit_code: int
detail: Optional[str] = None
resource_limit_type: Optional[ResourceLimitType] = None
unauthorized_access_type: Optional[UnauthorizedAccessType] = None
runtime_error_type: Optional[RuntimeErrorType] = None
class TestResult(BaseModel):
name: str
passed: bool
duration: float
expected_failure: bool = False
result: Optional[ExecutionResult] = None
error: Optional[str] = None
validation_error: Optional[str] = None
def encode_code(code: str) -> str:
return base64.b64encode(code.encode("utf-8")).decode("utf-8")
def execute_single_test(name: str, code: str, language: str, arguments: dict, expect_fail: bool = False) -> TestResult:
"""Execute a single test case"""
payload = {
"code_b64": encode_code(textwrap.dedent(code)),
"language": language,
"arguments": arguments,
}
test_result = TestResult(name=name, passed=False, duration=0, expected_failure=expect_fail)
really_processed = False
try:
while not really_processed:
start_time = time.perf_counter()
resp = requests.post(API_URL, json=payload, timeout=TIMEOUT)
resp.raise_for_status()
response_data = resp.json()
if response_data["exit_code"] == -429: # too many request
print(f"[{name}] Reached request limit, retring...")
time.sleep(0.5)
continue
really_processed = True
print("-------------------")
print(f"{name}:\n{response_data}")
print("-------------------")
test_result.duration = time.perf_counter() - start_time
test_result.result = ExecutionResult(**response_data)
# Validate test result expectations
validate_test_result(name, expect_fail, test_result)
except requests.exceptions.RequestException as e:
test_result.duration = time.perf_counter() - start_time
test_result.error = f"Request failed: {str(e)}"
test_result.result = ExecutionResult(
status=ResultStatus.PROGRAM_RUNNER_ERROR,
stdout="",
stderr=str(e),
exit_code=-999,
detail="request_failed",
)
return test_result
def validate_test_result(name: str, expect_fail: bool, test_result: TestResult):
"""Validate if the test result meets expectations"""
if not test_result.result:
test_result.passed = False
test_result.validation_error = "No result returned"
return
test_result.passed = test_result.result.status == ResultStatus.SUCCESS
# General validation logic
if expect_fail:
# Tests expected to fail should return a non-success status
if test_result.passed:
test_result.validation_error = "Expected failure but actually succeeded"
else:
# Tests expected to succeed should return a success status
if not test_result.passed:
test_result.validation_error = f"Unexpected failure (status={test_result.result.status})"
def get_test_cases() -> Dict[str, dict]:
"""Return test cases (code, whether expected to fail)"""
return {
"1 Infinite loop: Should be forcibly terminated": {
"code": """
def main():
while True:
pass
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"2 Infinite loop: Should be forcibly terminated": {
"code": """
def main():
while True:
pass
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"3 Infinite loop: Should be forcibly terminated": {
"code": """
def main():
while True:
pass
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"4 Infinite loop: Should be forcibly terminated": {
"code": """
def main():
while True:
pass
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"5 Infinite loop: Should be forcibly terminated": {
"code": """
def main():
while True:
pass
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"6 Infinite loop: Should be forcibly terminated": {
"code": """
def main():
while True:
pass
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"7 Normal test: Python without dependencies": {
"code": """
def main():
return {"data": "hello, world"}
""",
"should_fail": False,
"arguments": {},
"language": "python",
},
"8 Normal test: Python with pandas, should pass without any error": {
"code": """
import pandas as pd
def main():
data = {'Name': ['Alice', 'Bob', 'Charlie'],
'Age': [25, 30, 35]}
df = pd.DataFrame(data)
""",
"should_fail": False,
"arguments": {},
"language": "python",
},
"9 Normal test: Nodejs without dependencies, should pass without any error": {
"code": """
const https = require('https');
async function main(args) {
return new Promise((resolve, reject) => {
const req = https.get('https://example.com/', (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
clearTimeout(timeout);
console.log('Body:', data);
resolve(data);
});
});
const timeout = setTimeout(() => {
req.destroy(new Error('Request timeout after 10s'));
}, 10000);
req.on('error', (err) => {
clearTimeout(timeout);
console.error('Error:', err.message);
reject(err);
});
});
}
module.exports = { main };
""",
"should_fail": False,
"arguments": {},
"language": "nodejs",
},
"10 Normal test: Nodejs with axios, should pass without any error": {
"code": """
const axios = require('axios');
async function main(args) {
try {
const response = await axios.get('https://example.com/', {
timeout: 10000
});
console.log('Body:', response.data);
} catch (error) {
console.error('Error:', error.message);
}
}
module.exports = { main };
""",
"should_fail": False,
"arguments": {},
"language": "nodejs",
},
"11 Dangerous import: Should fail due to os module import": {
"code": """
import os
def main():
pass
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"12 Dangerous import from subprocess: Should fail due to subprocess import": {
"code": """
from subprocess import Popen
def main():
pass
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"13 Dangerous call: Should fail due to eval function call": {
"code": """
def main():
eval('os.system("echo hello")')
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"14 Dangerous attribute access: Should fail due to shutil.rmtree": {
"code": """
import shutil
def main():
shutil.rmtree('/some/path')
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"15 Dangerous binary operation: Should fail due to unsafe concatenation leading to eval": {
"code": """
def main():
dangerous_string = "os." + "system"
eval(dangerous_string + '("echo hello")')
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"16 Dangerous function definition: Should fail due to user-defined eval function": {
"code": """
def eval_function():
eval('os.system("echo hello")')
def main():
eval_function()
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
"17 Memory exhaustion(256m): Should fail due to exceeding memory limit(try to allocate 300m)": {
"code": """
def main():
x = ['a' * 1024 * 1024] * 300 # 300MB
""",
"should_fail": True,
"arguments": {},
"language": "python",
},
}
def print_test_report(results: Dict[str, TestResult]):
print("\n=== 🔍 Test Report ===")
max_name_len = max(len(name) for name in results)
for name, result in results.items():
status = "" if result.passed else ""
if result.expected_failure:
status = "⚠️" if result.passed else "" # Expected failure case
print(f"{status} {name.ljust(max_name_len)} {result.duration:.2f}s")
if result.error:
print(f" REQUEST ERROR: {result.error}")
if result.validation_error:
print(f" VALIDATION ERROR: {result.validation_error}")
if result.result and not result.passed:
print(f" STATUS: {result.result.status}")
if result.result.stderr:
print(f" STDERR: {result.result.stderr[:200]}...")
if result.result.detail:
print(f" DETAIL: {result.result.detail}")
passed = sum(1 for r in results.values() if ((not r.expected_failure and r.passed) or (r.expected_failure and not r.passed)))
failed = len(results) - passed
print("\n=== 📊 Statistics ===")
print(f"✅ Passed: {passed}")
print(f"❌ Failed: {failed}")
print(f"📌 Total: {len(results)}")
def main():
print(f"🔐 Starting sandbox security tests (API: {API_URL})")
print(f"🚀 Concurrent threads: {MAX_WORKERS}")
test_cases = get_test_cases()
results = {}
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {}
for name, detail in test_cases.items():
# ✅ Log when a task is submitted
print(f"✅ Task submitted: {name}")
time.sleep(0.4)
future = executor.submit(execute_single_test, name, detail["code"], detail["language"], detail["arguments"], detail["should_fail"])
futures[future] = name
print("\n=== 🚦 Test Progress ===")
for i, future in enumerate(as_completed(futures)):
name = futures[future]
print(f" {i + 1}/{len(test_cases)} completed: {name}")
try:
results[name] = future.result()
except Exception as e:
print(f"⚠️ Test {name} execution exception: {str(e)}")
results[name] = TestResult(name=name, passed=False, duration=0, error=f"Execution exception: {str(e)}")
print_test_report(results)
if any(not r.passed and not r.expected_failure for r in results.values()):
exit(1)
if __name__ == "__main__":
main()