Feat: add code_executor_manager (#7814)

### What problem does this PR solve?

Add code_executor_manager. #4977.

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Yongteng Lei
2025-05-23 16:33:38 +08:00
committed by GitHub
parent db4371c745
commit 2d7c1368f0
39 changed files with 3240 additions and 0 deletions

View File

@ -0,0 +1,23 @@
FROM python:3.11-slim-bookworm
RUN grep -rl 'deb.debian.org' /etc/apt/ | xargs sed -i 's|http[s]*://deb.debian.org|https://mirrors.tuna.tsinghua.edu.cn|g' && \
apt-get update && \
apt-get install -y curl gcc && \
rm -rf /var/lib/apt/lists/*
RUN curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/static/stable/x86_64/docker-24.0.7.tgz -o docker.tgz && \
tar -xzf docker.tgz && \
mv docker/docker /usr/bin/docker && \
rm -rf docker docker.tgz
COPY --from=ghcr.io/astral-sh/uv:0.7.5 /uv /uvx /bin/
ENV UV_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
WORKDIR /app
COPY executor_manager/ .
RUN uv pip install --system -r requirements.txt
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "9385"]

View File

@ -0,0 +1,15 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,44 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
from core.logger import logger
from fastapi import Request
from models.enums import ResultStatus
from models.schemas import CodeExecutionRequest, CodeExecutionResult
from services.execution import execute_code
from services.limiter import limiter
from services.security import analyze_code_security
async def healthz_handler():
return {"status": "ok"}
@limiter.limit("5/second")
async def run_code_handler(req: CodeExecutionRequest, request: Request):
logger.info("🟢 Received /run request")
code = base64.b64decode(req.code_b64).decode("utf-8")
is_safe, issues = analyze_code_security(code, language=req.language)
if not is_safe:
issue_details = "\n".join([f"Line {lineno}: {issue}" for issue, lineno in issues])
return CodeExecutionResult(status=ResultStatus.PROGRAM_RUNNER_ERROR, stdout="", stderr=issue_details, exit_code=-999, detail="Code is unsafe")
try:
return await execute_code(req)
except Exception as e:
return CodeExecutionResult(status=ResultStatus.PROGRAM_RUNNER_ERROR, stdout="", stderr=str(e), exit_code=-999, detail="unhandled_exception")

View File

@ -0,0 +1,23 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from fastapi import APIRouter
from api.handlers import healthz_handler, run_code_handler
router = APIRouter()
router.get("/healthz")(healthz_handler)
router.post("/run")(run_code_handler)

View File

@ -0,0 +1,15 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,44 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI
from util import format_timeout_duration, parse_timeout_duration
from core.container import init_containers, teardown_containers
from core.logger import logger
TIMEOUT = 10
@asynccontextmanager
async def _lifespan(app: FastAPI):
"""Asynchronous lifecycle management"""
size = int(os.getenv("SANDBOX_EXECUTOR_MANAGER_POOL_SIZE", 1))
success_count, total_task_count = await init_containers(size)
logger.info(f"\n📊 Container pool initialization complete: {success_count}/{total_task_count} available")
yield
await teardown_containers()
def init():
TIMEOUT = parse_timeout_duration(os.getenv("SANDBOX_TIMEOUT"))
logger.info(f"Global timeout: {format_timeout_duration(TIMEOUT)}")
return _lifespan

View File

@ -0,0 +1,190 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import contextlib
import os
import time
from queue import Empty, Queue
from threading import Lock
from models.enums import SupportLanguage
from util import env_setting_enabled, is_valid_memory_limit
from utils.common import async_run_command
from core.logger import logger
_CONTAINER_QUEUES: dict[SupportLanguage, Queue] = {}
_CONTAINER_LOCK: Lock = Lock()
async def init_containers(size: int) -> tuple[int, int]:
global _CONTAINER_QUEUES
_CONTAINER_QUEUES = {SupportLanguage.PYTHON: Queue(), SupportLanguage.NODEJS: Queue()}
with _CONTAINER_LOCK:
while not _CONTAINER_QUEUES[SupportLanguage.PYTHON].empty():
_CONTAINER_QUEUES[SupportLanguage.PYTHON].get_nowait()
while not _CONTAINER_QUEUES[SupportLanguage.NODEJS].empty():
_CONTAINER_QUEUES[SupportLanguage.NODEJS].get_nowait()
create_tasks = []
for i in range(size):
name = f"sandbox_python_{i}"
logger.info(f"🛠️ Creating Python container {i + 1}/{size}")
create_tasks.append(_prepare_container(name, SupportLanguage.PYTHON))
name = f"sandbox_nodejs_{i}"
logger.info(f"🛠️ Creating Node.js container {i + 1}/{size}")
create_tasks.append(_prepare_container(name, SupportLanguage.NODEJS))
results = await asyncio.gather(*create_tasks, return_exceptions=True)
success_count = sum(1 for r in results if r is True)
total_task_count = len(create_tasks)
return success_count, total_task_count
async def teardown_containers():
with _CONTAINER_LOCK:
while not _CONTAINER_QUEUES[SupportLanguage.PYTHON].empty():
name = _CONTAINER_QUEUES[SupportLanguage.PYTHON].get_nowait()
await async_run_command("docker", "rm", "-f", name, timeout=5)
while not _CONTAINER_QUEUES[SupportLanguage.NODEJS].empty():
name = _CONTAINER_QUEUES[SupportLanguage.NODEJS].get_nowait()
await async_run_command("docker", "rm", "-f", name, timeout=5)
async def _prepare_container(name: str, language: SupportLanguage) -> bool:
"""Prepare a single container"""
with contextlib.suppress(Exception):
await async_run_command("docker", "rm", "-f", name, timeout=5)
if await create_container(name, language):
_CONTAINER_QUEUES[language].put(name)
return True
return False
async def create_container(name: str, language: SupportLanguage) -> bool:
"""Asynchronously create a container"""
create_args = [
"docker",
"run",
"-d",
"--runtime=runsc",
"--name",
name,
"--read-only",
"--tmpfs",
"/workspace:rw,exec,size=100M,uid=65534,gid=65534",
"--tmpfs",
"/tmp:rw,exec,size=50M",
"--user",
"nobody",
"--workdir",
"/workspace",
]
if os.getenv("SANDBOX_MAX_MEMORY"):
memory_limit = os.getenv("SANDBOX_MAX_MEMORY") or "256m"
if is_valid_memory_limit(memory_limit):
logger.info(f"SANDBOX_MAX_MEMORY: {os.getenv('SANDBOX_MAX_MEMORY')}")
else:
logger.info("Invalid SANDBOX_MAX_MEMORY, using default value: 256m")
memory_limit = "256m"
create_args.extend(["--memory", memory_limit])
else:
logger.info("Set default SANDBOX_MAX_MEMORY: 256m")
create_args.extend(["--memory", "256m"])
if env_setting_enabled("SANDBOX_ENABLE_SECCOMP", "false"):
logger.info(f"SANDBOX_ENABLE_SECCOMP: {os.getenv('SANDBOX_ENABLE_SECCOMP')}")
create_args.extend(["--security-opt", "seccomp=/app/seccomp-profile-default.json"])
if language == SupportLanguage.PYTHON:
create_args.append(os.getenv("SANDBOX_BASE_PYTHON_IMAGE", "sandbox-base-python:latest"))
elif language == SupportLanguage.NODEJS:
create_args.append(os.getenv("SANDBOX_BASE_NODEJS_IMAGE", "sandbox-base-nodejs:latest"))
logger.info(f"Sandbox config:\n\t {create_args}")
try:
returncode, _, stderr = await async_run_command(*create_args, timeout=10)
if returncode != 0:
logger.error(f"❌ Container creation failed {name}: {stderr}")
return False
if language == SupportLanguage.NODEJS:
copy_cmd = ["docker", "exec", name, "bash", "-c", "cp -a /app/node_modules /workspace/"]
returncode, _, stderr = await async_run_command(*copy_cmd, timeout=10)
if returncode != 0:
logger.error(f"❌ Failed to prepare dependencies for {name}: {stderr}")
return False
return await container_is_running(name)
except Exception as e:
logger.error(f"❌ Container creation exception {name}: {str(e)}")
return False
async def recreate_container(name: str, language: SupportLanguage) -> bool:
"""Asynchronously recreate a container"""
logger.info(f"🛠️ Recreating container: {name}")
try:
await async_run_command("docker", "rm", "-f", name, timeout=5)
return await create_container(name, language)
except Exception as e:
logger.error(f"❌ Container {name} recreation failed: {str(e)}")
return False
async def release_container(name: str, language: SupportLanguage):
"""Asynchronously release a container"""
with _CONTAINER_LOCK:
if await container_is_running(name):
_CONTAINER_QUEUES[language].put(name)
logger.info(f"🟢 Released container: {name} (remaining available: {_CONTAINER_QUEUES[language].qsize()})")
else:
logger.warning(f"⚠️ Container {name} has crashed, attempting to recreate...")
if await recreate_container(name, language):
_CONTAINER_QUEUES[language].put(name)
logger.info(f"✅ Container {name} successfully recreated and returned to queue")
async def allocate_container_blocking(language: SupportLanguage, timeout=10) -> str:
"""Asynchronously allocate an available container"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
name = _CONTAINER_QUEUES[language].get_nowait()
with _CONTAINER_LOCK:
if not await container_is_running(name) and not await recreate_container(name, language):
continue
return name
except Empty:
await asyncio.sleep(0.1)
return ""
async def container_is_running(name: str) -> bool:
"""Asynchronously check the container status"""
try:
returncode, stdout, _ = await async_run_command("docker", "inspect", "-f", "{{.State.Running}}", name, timeout=2)
return returncode == 0 and stdout.strip() == "true"
except Exception:
return False

View File

@ -0,0 +1,19 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("sandbox")

View File

@ -0,0 +1,25 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from api.routes import router as api_router
from core.config import init
from fastapi import FastAPI
from services.limiter import limiter, rate_limit_exceeded_handler
from slowapi.errors import RateLimitExceeded
app = FastAPI(lifespan=init())
app.include_router(api_router)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, rate_limit_exceeded_handler)

View File

@ -0,0 +1,15 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,47 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from enum import Enum
class SupportLanguage(str, Enum):
PYTHON = "python"
NODEJS = "nodejs"
class ResultStatus(str, Enum):
SUCCESS = "success"
PROGRAM_ERROR = "program_error"
RESOURCE_LIMIT_EXCEEDED = "resource_limit_exceeded"
UNAUTHORIZED_ACCESS = "unauthorized_access"
RUNTIME_ERROR = "runtime_error"
PROGRAM_RUNNER_ERROR = "program_runner_error"
class ResourceLimitType(str, Enum):
TIME = "time"
MEMORY = "memory"
OUTPUT = "output"
class UnauthorizedAccessType(str, Enum):
DISALLOWED_SYSCALL = "disallowed_syscall"
FILE_ACCESS = "file_access"
NETWORK_ACCESS = "network_access"
class RuntimeErrorType(str, Enum):
SIGNALLED = "signalled"
NONZERO_EXIT = "nonzero_exit"

View File

@ -0,0 +1,53 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
from typing import Optional
from pydantic import BaseModel, Field, field_validator
from models.enums import ResourceLimitType, ResultStatus, RuntimeErrorType, SupportLanguage, UnauthorizedAccessType
class CodeExecutionResult(BaseModel):
status: ResultStatus
stdout: str
stderr: str
exit_code: int
detail: Optional[str] = None
# Resource usage
time_used_ms: Optional[float] = None
memory_used_kb: Optional[float] = None
# Error details
resource_limit_type: Optional[ResourceLimitType] = None
unauthorized_access_type: Optional[UnauthorizedAccessType] = None
runtime_error_type: Optional[RuntimeErrorType] = None
class CodeExecutionRequest(BaseModel):
code_b64: str = Field(..., description="Base64 encoded code string")
language: SupportLanguage = Field(default=SupportLanguage.PYTHON, description="Programming language")
arguments: Optional[dict] = Field(default={}, description="Arguments")
@field_validator("code_b64")
@classmethod
def validate_base64(cls, v: str) -> str:
try:
base64.b64decode(v, validate=True)
return v
except Exception as e:
raise ValueError(f"Invalid base64 encoding: {str(e)}")

View File

@ -0,0 +1,3 @@
fastapi
uvicorn
slowapi

View File

@ -0,0 +1,55 @@
{
"defaultAction": "SCMP_ACT_ERRNO",
"archMap": [
{
"architecture": "SCMP_ARCH_X86_64",
"subArchitectures": [
"SCMP_ARCH_X86",
"SCMP_ARCH_X32"
]
}
],
"syscalls": [
{
"names": [
"read",
"write",
"exit",
"sigreturn",
"brk",
"mmap",
"munmap",
"rt_sigaction",
"rt_sigprocmask",
"futex",
"clone",
"execve",
"arch_prctl",
"access",
"openat",
"close",
"stat",
"fstat",
"lstat",
"getpid",
"gettid",
"getuid",
"getgid",
"geteuid",
"getegid",
"clock_gettime",
"nanosleep",
"uname",
"writev",
"readlink",
"getrandom",
"statx",
"faccessat2",
"pread64",
"pwrite64",
"rt_sigreturn"
],
"action": "SCMP_ACT_ALLOW"
}
]
}

View File

@ -0,0 +1,15 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,245 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import base64
import json
import os
import time
import uuid
from core.config import TIMEOUT
from core.container import allocate_container_blocking, release_container
from core.logger import logger
from models.enums import ResourceLimitType, ResultStatus, RuntimeErrorType, SupportLanguage, UnauthorizedAccessType
from models.schemas import CodeExecutionRequest, CodeExecutionResult
from utils.common import async_run_command
async def execute_code(req: CodeExecutionRequest):
"""Fully asynchronous execution logic"""
language = req.language
container = await allocate_container_blocking(language)
if not container:
return CodeExecutionResult(
status=ResultStatus.PROGRAM_RUNNER_ERROR,
stdout="",
stderr="Container pool is busy",
exit_code=-10,
detail="no_available_container",
)
task_id = str(uuid.uuid4())
workdir = f"/tmp/sandbox_{task_id}"
os.makedirs(workdir, mode=0o700, exist_ok=True)
try:
if language == SupportLanguage.PYTHON:
code_name = "main.py"
# code
code_path = os.path.join(workdir, code_name)
with open(code_path, "wb") as f:
f.write(base64.b64decode(req.code_b64))
# runner
runner_name = "runner.py"
runner_path = os.path.join(workdir, runner_name)
with open(runner_path, "w") as f:
f.write("""import json
import os
import sys
sys.path.insert(0, os.path.dirname(__file__))
from main import main
if __name__ == "__main__":
args = json.loads(sys.argv[1])
result = main(**args)
if result is not None:
print(result)
""")
elif language == SupportLanguage.NODEJS:
code_name = "main.js"
code_path = os.path.join(workdir, "main.js")
with open(code_path, "wb") as f:
f.write(base64.b64decode(req.code_b64))
runner_name = "runner.js"
runner_path = os.path.join(workdir, "runner.js")
with open(runner_path, "w") as f:
f.write("""
const fs = require('fs');
const path = require('path');
const args = JSON.parse(process.argv[2]);
const mainPath = path.join(__dirname, 'main.js');
if (fs.existsSync(mainPath)) {
const { main } = require(mainPath);
if (typeof args === 'object' && args !== null) {
main(args).then(result => {
if (result !== null) {
console.log(result);
}
}).catch(err => {
console.error('Error in main function:', err);
});
} else {
console.error('Error: args is not a valid object:', args);
}
} else {
console.error('main.js not found in the current directory');
}
""")
# dirs
returncode, _, stderr = await async_run_command("docker", "exec", container, "mkdir", "-p", f"/workspace/{task_id}", timeout=5)
if returncode != 0:
raise RuntimeError(f"Directory creation failed: {stderr}")
# archive
tar_proc = await asyncio.create_subprocess_exec("tar", "czf", "-", "-C", workdir, code_name, runner_name, stdout=asyncio.subprocess.PIPE)
tar_stdout, _ = await tar_proc.communicate()
# unarchive
docker_proc = await asyncio.create_subprocess_exec(
"docker", "exec", "-i", container, "tar", "xzf", "-", "-C", f"/workspace/{task_id}", stdin=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await docker_proc.communicate(input=tar_stdout)
if docker_proc.returncode != 0:
raise RuntimeError(stderr.decode())
# exec
start_time = time.time()
try:
logger.info(f"Passed in args: {req.arguments}")
args_json = json.dumps(req.arguments or {})
run_args = [
"docker",
"exec",
"--workdir",
f"/workspace/{task_id}",
container,
"timeout",
str(TIMEOUT),
language,
]
# flags
if language == SupportLanguage.PYTHON:
run_args.extend(["-I", "-B"])
elif language == SupportLanguage.NODEJS:
run_args.extend([])
else:
assert True, "Will never reach here"
run_args.extend([runner_name, args_json])
returncode, stdout, stderr = await async_run_command(
*run_args,
timeout=TIMEOUT + 5,
)
time_used_ms = (time.time() - start_time) * 1000
logger.info("----------------------------------------------")
logger.info(f"Code: {str(base64.b64decode(req.code_b64))}")
logger.info(f"{returncode=}")
logger.info(f"{stdout=}")
logger.info(f"{stderr=}")
logger.info(f"{args_json=}")
if returncode == 0:
return CodeExecutionResult(
status=ResultStatus.SUCCESS,
stdout=str(stdout),
stderr=stderr,
exit_code=0,
time_used_ms=time_used_ms,
)
elif returncode == 124:
return CodeExecutionResult(
status=ResultStatus.RESOURCE_LIMIT_EXCEEDED,
stdout="",
stderr="Execution timeout",
exit_code=-124,
resource_limit_type=ResourceLimitType.TIME,
time_used_ms=time_used_ms,
)
elif returncode == 137:
return CodeExecutionResult(
status=ResultStatus.RESOURCE_LIMIT_EXCEEDED,
stdout="",
stderr="Memory limit exceeded (killed by OOM)",
exit_code=-137,
resource_limit_type=ResourceLimitType.MEMORY,
time_used_ms=time_used_ms,
)
return analyze_error_result(stderr, returncode)
except asyncio.TimeoutError:
await async_run_command("docker", "exec", container, "pkill", "-9", language)
return CodeExecutionResult(
status=ResultStatus.RESOURCE_LIMIT_EXCEEDED,
stdout="",
stderr="Execution timeout",
exit_code=-1,
resource_limit_type=ResourceLimitType.TIME,
time_used_ms=(time.time() - start_time) * 1000,
)
except Exception as e:
logger.error(f"Execution exception: {str(e)}")
return CodeExecutionResult(status=ResultStatus.PROGRAM_RUNNER_ERROR, stdout="", stderr=str(e), exit_code=-3, detail="internal_error")
finally:
# cleanup
cleanup_tasks = [async_run_command("docker", "exec", container, "rm", "-rf", f"/workspace/{task_id}"), async_run_command("rm", "-rf", workdir)]
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
await release_container(container, language)
def analyze_error_result(stderr: str, exit_code: int) -> CodeExecutionResult:
"""Analyze the error result and classify it"""
if "Permission denied" in stderr:
return CodeExecutionResult(
status=ResultStatus.UNAUTHORIZED_ACCESS,
stdout="",
stderr=stderr,
exit_code=exit_code,
unauthorized_access_type=UnauthorizedAccessType.FILE_ACCESS,
)
elif "Operation not permitted" in stderr:
return CodeExecutionResult(
status=ResultStatus.UNAUTHORIZED_ACCESS,
stdout="",
stderr=stderr,
exit_code=exit_code,
unauthorized_access_type=UnauthorizedAccessType.DISALLOWED_SYSCALL,
)
elif "MemoryError" in stderr:
return CodeExecutionResult(
status=ResultStatus.RESOURCE_LIMIT_EXCEEDED,
stdout="",
stderr=stderr,
exit_code=exit_code,
resource_limit_type=ResourceLimitType.MEMORY,
)
else:
return CodeExecutionResult(
status=ResultStatus.PROGRAM_ERROR,
stdout="",
stderr=stderr,
exit_code=exit_code,
runtime_error_type=RuntimeErrorType.NONZERO_EXIT,
)

View File

@ -0,0 +1,38 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from fastapi import Request
from fastapi.responses import JSONResponse
from models.enums import ResultStatus
from models.schemas import CodeExecutionResult
from slowapi import Limiter
from slowapi.errors import RateLimitExceeded
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
async def rate_limit_exceeded_handler(request: Request, exc: Exception) -> JSONResponse:
if isinstance(exc, RateLimitExceeded):
return JSONResponse(
content=CodeExecutionResult(
status=ResultStatus.PROGRAM_RUNNER_ERROR,
stdout="",
stderr="Too many requests, please try again later",
exit_code=-429,
detail="Too many requests, please try again later",
).model_dump(),
)
raise exc

View File

@ -0,0 +1,173 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import ast
from typing import List, Tuple
from core.logger import logger
from models.enums import SupportLanguage
class SecurePythonAnalyzer(ast.NodeVisitor):
"""
An AST-based analyzer for detecting unsafe Python code patterns.
"""
DANGEROUS_IMPORTS = {"os", "subprocess", "sys", "shutil", "socket", "ctypes", "pickle", "threading", "multiprocessing", "asyncio", "http.client", "ftplib", "telnetlib"}
DANGEROUS_CALLS = {
"eval",
"exec",
"open",
"__import__",
"compile",
"input",
"system",
"popen",
"remove",
"rename",
"rmdir",
"chdir",
"chmod",
"chown",
"getattr",
"setattr",
"globals",
"locals",
"shutil.rmtree",
"subprocess.call",
"subprocess.Popen",
"ctypes",
"pickle.load",
"pickle.loads",
"pickle.dump",
"pickle.dumps",
}
def __init__(self):
self.unsafe_items: List[Tuple[str, int]] = []
def visit_Import(self, node: ast.Import):
"""Check for dangerous imports."""
for alias in node.names:
if alias.name.split(".")[0] in self.DANGEROUS_IMPORTS:
self.unsafe_items.append((f"Import: {alias.name}", node.lineno))
self.generic_visit(node)
def visit_ImportFrom(self, node: ast.ImportFrom):
"""Check for dangerous imports from specific modules."""
if node.module and node.module.split(".")[0] in self.DANGEROUS_IMPORTS:
self.unsafe_items.append((f"From Import: {node.module}", node.lineno))
self.generic_visit(node)
def visit_Call(self, node: ast.Call):
"""Check for dangerous function calls."""
if isinstance(node.func, ast.Name) and node.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append((f"Call: {node.func.id}", node.lineno))
self.generic_visit(node)
def visit_Attribute(self, node: ast.Attribute):
"""Check for dangerous attribute access."""
if isinstance(node.value, ast.Name) and node.value.id in self.DANGEROUS_IMPORTS:
self.unsafe_items.append((f"Attribute Access: {node.value.id}.{node.attr}", node.lineno))
self.generic_visit(node)
def visit_BinOp(self, node: ast.BinOp):
"""Check for possible unsafe operations like concatenating strings with commands."""
# This could be useful to detect `eval("os." + "system")`
if isinstance(node.left, ast.Constant) and isinstance(node.right, ast.Constant):
self.unsafe_items.append(("Possible unsafe string concatenation", node.lineno))
self.generic_visit(node)
def visit_FunctionDef(self, node: ast.FunctionDef):
"""Check for dangerous function definitions (e.g., user-defined eval)."""
if node.name in self.DANGEROUS_CALLS:
self.unsafe_items.append((f"Function Definition: {node.name}", node.lineno))
self.generic_visit(node)
def visit_Assign(self, node: ast.Assign):
"""Check for assignments to variables that might lead to dangerous operations."""
for target in node.targets:
if isinstance(target, ast.Name) and target.id in self.DANGEROUS_CALLS:
self.unsafe_items.append((f"Assignment to dangerous variable: {target.id}", node.lineno))
self.generic_visit(node)
def visit_Lambda(self, node: ast.Lambda):
"""Check for lambda functions with dangerous operations."""
if isinstance(node.body, ast.Call) and isinstance(node.body.func, ast.Name) and node.body.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Lambda with dangerous function call", node.lineno))
self.generic_visit(node)
def visit_ListComp(self, node: ast.ListComp):
"""Check for list comprehensions with dangerous operations."""
# First, visit the generators to check for any issues there
for elem in node.generators:
if isinstance(elem, ast.comprehension):
self.generic_visit(elem)
if isinstance(node.elt, ast.Call) and isinstance(node.elt.func, ast.Name) and node.elt.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("List comprehension with dangerous function call", node.lineno))
self.generic_visit(node)
def visit_DictComp(self, node: ast.DictComp):
"""Check for dictionary comprehensions with dangerous operations."""
# Check for dangerous calls in both the key and value expressions of the dictionary comprehension
if isinstance(node.key, ast.Call) and isinstance(node.key.func, ast.Name) and node.key.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Dict comprehension with dangerous function call in key", node.lineno))
if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Dict comprehension with dangerous function call in value", node.lineno))
# Visit other sub-nodes (e.g., the generators in the comprehension)
self.generic_visit(node)
def visit_SetComp(self, node: ast.SetComp):
"""Check for set comprehensions with dangerous operations."""
for elt in node.generators:
if isinstance(elt, ast.comprehension):
self.generic_visit(elt)
if isinstance(node.elt, ast.Call) and isinstance(node.elt.func, ast.Name) and node.elt.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Set comprehension with dangerous function call", node.lineno))
self.generic_visit(node)
def visit_Yield(self, node: ast.Yield):
"""Check for yield statements that could be used to produce unsafe values."""
if isinstance(node.value, ast.Call) and isinstance(node.value.func, ast.Name) and node.value.func.id in self.DANGEROUS_CALLS:
self.unsafe_items.append(("Yield with dangerous function call", node.lineno))
self.generic_visit(node)
def analyze_code_security(code: str, language: SupportLanguage) -> Tuple[bool, List[Tuple[str, int]]]:
"""
Analyze the provided code string and return whether it's safe and why.
:param code: The source code to analyze.
:param language: The programming language of the code.
:return: (is_safe: bool, issues: List of (description, line number))
"""
if language == SupportLanguage.PYTHON:
try:
tree = ast.parse(code)
analyzer = SecurePythonAnalyzer()
analyzer.visit(tree)
return len(analyzer.unsafe_items) == 0, analyzer.unsafe_items
except Exception as e:
logger.error(f"[SafeCheck] Python parsing failed: {str(e)}")
return False, [(f"Parsing Error: {str(e)}", -1)]
else:
logger.warning(f"[SafeCheck] Unsupported language for security analysis: {language} — defaulting to SAFE (manual review recommended)")
return True, [(f"Unsupported language for security analysis: {language} — defaulted to SAFE, manual review recommended", -1)]

View File

@ -0,0 +1,76 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import re
def is_enabled(value: str) -> bool:
return str(value).strip().lower() in {"1", "true", "yes", "on"}
def env_setting_enabled(env_key: str, default: str = "false") -> bool:
value = os.getenv(env_key, default)
return is_enabled(value)
def is_valid_memory_limit(mem: str | None) -> bool:
"""
Return True if the input string is a valid Docker memory limit (e.g. '256m', '1g').
Units allowed: b, k, m, g (case-insensitive).
Disallows zero or negative values.
"""
if not mem or not isinstance(mem, str):
return False
mem = mem.strip().lower()
return re.fullmatch(r"[1-9]\d*(b|k|m|g)", mem) is not None
def parse_timeout_duration(timeout: str | None, default_seconds: int = 10) -> int:
"""
Parses a string like '90s', '2m', '1m30s' into total seconds (int).
Supports 's', 'm' (lower or upper case). Returns default if invalid.
'1m30s' -> 90
"""
if not timeout or not isinstance(timeout, str):
return default_seconds
timeout = timeout.strip().lower()
pattern = r"^(?:(\d+)m)?(?:(\d+)s)?$"
match = re.fullmatch(pattern, timeout)
if not match:
return default_seconds
minutes = int(match.group(1)) if match.group(1) else 0
seconds = int(match.group(2)) if match.group(2) else 0
total = minutes * 60 + seconds
return total if total > 0 else default_seconds
def format_timeout_duration(seconds: int) -> str:
"""
Formats an integer number of seconds into a string like '1m30s'.
90 -> '1m30s'
"""
if seconds < 60:
return f"{seconds}s"
minutes, sec = divmod(seconds, 60)
if sec == 0:
return f"{minutes}m"
return f"{minutes}m{sec}s"

View File

@ -0,0 +1,15 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

View File

@ -0,0 +1,36 @@
#
# Copyright 2025 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
from typing import Tuple
async def async_run_command(*args, timeout: float = 5) -> Tuple[int, str, str]:
"""Safe asynchronous command execution tool"""
proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
if proc.returncode is None:
raise RuntimeError("Process finished but returncode is None")
return proc.returncode, stdout.decode(), stderr.decode()
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
raise RuntimeError("Command timed out")
except Exception as e:
proc.kill()
await proc.wait()
raise e