mirror of
https://github.com/haris-musa/excel-mcp-server.git
synced 2025-12-08 17:12:41 +08:00
Initial commit
This commit is contained in:
491
src/excel_mcp/server.py
Normal file
491
src/excel_mcp/server.py
Normal file
@ -0,0 +1,491 @@
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
from typing import Any, List, Dict
|
||||
|
||||
from mcp.server.fastmcp import FastMCP
|
||||
|
||||
# Import exceptions
|
||||
from excel_mcp.exceptions import (
|
||||
ValidationError,
|
||||
WorkbookError,
|
||||
SheetError,
|
||||
DataError,
|
||||
FormattingError,
|
||||
CalculationError,
|
||||
PivotError,
|
||||
ChartError
|
||||
)
|
||||
|
||||
# Import from excel_mcp package with consistent _impl suffixes
|
||||
from excel_mcp.validation import (
|
||||
validate_formula_in_cell_operation as validate_formula_impl,
|
||||
validate_range_in_sheet_operation as validate_range_impl
|
||||
)
|
||||
from excel_mcp.chart import create_chart_in_sheet as create_chart_impl
|
||||
from excel_mcp.workbook import get_workbook_info
|
||||
from excel_mcp.data import write_data
|
||||
from excel_mcp.pivot import create_pivot_table as create_pivot_table_impl
|
||||
from excel_mcp.sheet import (
|
||||
copy_sheet,
|
||||
delete_sheet,
|
||||
rename_sheet,
|
||||
merge_range,
|
||||
unmerge_range,
|
||||
)
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
handlers=[
|
||||
logging.StreamHandler(sys.stdout),
|
||||
logging.FileHandler("excel-mcp.log")
|
||||
],
|
||||
force=True
|
||||
)
|
||||
|
||||
logger = logging.getLogger("excel-mcp")
|
||||
|
||||
# Get Excel files path from environment or use default
|
||||
EXCEL_FILES_PATH = os.environ.get("EXCEL_FILES_PATH", "./excel_files")
|
||||
|
||||
# Create the directory if it doesn't exist
|
||||
os.makedirs(EXCEL_FILES_PATH, exist_ok=True)
|
||||
|
||||
# Initialize FastMCP server
|
||||
mcp = FastMCP(
|
||||
"excel-mcp",
|
||||
version="0.1.0",
|
||||
description="Excel MCP Server for manipulating Excel files",
|
||||
dependencies=["openpyxl>=3.1.2"],
|
||||
env_vars={
|
||||
"EXCEL_FILES_PATH": {
|
||||
"description": "Path to Excel files directory",
|
||||
"required": False,
|
||||
"default": EXCEL_FILES_PATH
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
def get_excel_path(filename: str) -> str:
|
||||
"""Get full path to Excel file.
|
||||
|
||||
Args:
|
||||
filename: Name of Excel file
|
||||
|
||||
Returns:
|
||||
Full path to Excel file
|
||||
"""
|
||||
# If filename is already an absolute path, return it
|
||||
if os.path.isabs(filename):
|
||||
return filename
|
||||
|
||||
# Use the configured Excel files path
|
||||
return os.path.join(EXCEL_FILES_PATH, filename)
|
||||
|
||||
@mcp.tool()
|
||||
def apply_formula(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
cell: str,
|
||||
formula: str,
|
||||
) -> str:
|
||||
"""Apply Excel formula to cell."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
# First validate the formula
|
||||
validation = validate_formula_impl(full_path, sheet_name, cell, formula)
|
||||
if isinstance(validation, dict) and "error" in validation:
|
||||
return f"Error: {validation['error']}"
|
||||
|
||||
# If valid, apply the formula
|
||||
from excel_mcp.calculations import apply_formula as apply_formula_impl
|
||||
result = apply_formula_impl(full_path, sheet_name, cell, formula)
|
||||
return result["message"]
|
||||
except (ValidationError, CalculationError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error applying formula: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def validate_formula_syntax(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
cell: str,
|
||||
formula: str,
|
||||
) -> str:
|
||||
"""Validate Excel formula syntax without applying it."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = validate_formula_impl(full_path, sheet_name, cell, formula)
|
||||
return result["message"]
|
||||
except (ValidationError, CalculationError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating formula: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def format_range(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
start_cell: str,
|
||||
end_cell: str = None,
|
||||
bold: bool = False,
|
||||
italic: bool = False,
|
||||
underline: bool = False,
|
||||
font_size: int = None,
|
||||
font_color: str = None,
|
||||
bg_color: str = None,
|
||||
border_style: str = None,
|
||||
border_color: str = None,
|
||||
number_format: str = None,
|
||||
alignment: str = None,
|
||||
wrap_text: bool = False,
|
||||
merge_cells: bool = False,
|
||||
protection: Dict[str, Any] = None,
|
||||
conditional_format: Dict[str, Any] = None
|
||||
) -> str:
|
||||
"""Apply formatting to a range of cells."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
from excel_mcp.formatting import format_range as format_range_func
|
||||
|
||||
result = format_range_func(
|
||||
filepath=full_path,
|
||||
sheet_name=sheet_name,
|
||||
start_cell=start_cell,
|
||||
end_cell=end_cell,
|
||||
bold=bold,
|
||||
italic=italic,
|
||||
underline=underline,
|
||||
font_size=font_size,
|
||||
font_color=font_color,
|
||||
bg_color=bg_color,
|
||||
border_style=border_style,
|
||||
border_color=border_color,
|
||||
number_format=number_format,
|
||||
alignment=alignment,
|
||||
wrap_text=wrap_text,
|
||||
merge_cells=merge_cells,
|
||||
protection=protection,
|
||||
conditional_format=conditional_format
|
||||
)
|
||||
return "Range formatted successfully"
|
||||
except (ValidationError, FormattingError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error formatting range: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def read_data_from_excel(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
start_cell: str = "A1",
|
||||
end_cell: str = None,
|
||||
preview_only: bool = False
|
||||
) -> str:
|
||||
"""Read data from Excel worksheet."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
from excel_mcp.data import read_excel_range
|
||||
result = read_excel_range(full_path, sheet_name, start_cell, end_cell, preview_only)
|
||||
if not result:
|
||||
return "No data found in specified range"
|
||||
# Convert the list of dicts to a formatted string
|
||||
data_str = "\n".join([str(row) for row in result])
|
||||
return data_str
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading data: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def write_data_to_excel(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
data: List[Dict],
|
||||
start_cell: str = "A1",
|
||||
write_headers: bool = True,
|
||||
) -> str:
|
||||
"""Write data to Excel worksheet."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = write_data(full_path, sheet_name, data, start_cell, write_headers)
|
||||
return result["message"]
|
||||
except (ValidationError, DataError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing data: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def create_workbook(filepath: str) -> str:
|
||||
"""Create new Excel workbook."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
from excel_mcp.workbook import create_workbook as create_workbook_impl
|
||||
result = create_workbook_impl(full_path)
|
||||
return f"Created workbook at {full_path}"
|
||||
except WorkbookError as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating workbook: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def create_worksheet(filepath: str, sheet_name: str) -> str:
|
||||
"""Create new worksheet in workbook."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
from excel_mcp.workbook import create_sheet as create_worksheet_impl
|
||||
result = create_worksheet_impl(full_path, sheet_name)
|
||||
return result["message"]
|
||||
except (ValidationError, WorkbookError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating worksheet: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def create_chart(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
data_range: str,
|
||||
chart_type: str,
|
||||
target_cell: str,
|
||||
title: str = "",
|
||||
x_axis: str = "",
|
||||
y_axis: str = ""
|
||||
) -> str:
|
||||
"""Create chart in worksheet."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = create_chart_impl(
|
||||
filepath=full_path,
|
||||
sheet_name=sheet_name,
|
||||
data_range=data_range,
|
||||
chart_type=chart_type,
|
||||
target_cell=target_cell,
|
||||
title=title,
|
||||
x_axis=x_axis,
|
||||
y_axis=y_axis
|
||||
)
|
||||
return result["message"]
|
||||
except (ValidationError, ChartError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating chart: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def create_pivot_table(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
data_range: str,
|
||||
target_cell: str,
|
||||
rows: List[str],
|
||||
values: List[str],
|
||||
columns: List[str] = None,
|
||||
agg_func: str = "mean"
|
||||
) -> str:
|
||||
"""Create pivot table in worksheet."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = create_pivot_table_impl(
|
||||
filepath=full_path,
|
||||
sheet_name=sheet_name,
|
||||
data_range=data_range,
|
||||
target_cell=target_cell,
|
||||
rows=rows,
|
||||
values=values,
|
||||
columns=columns or [],
|
||||
agg_func=agg_func
|
||||
)
|
||||
return result["message"]
|
||||
except (ValidationError, PivotError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating pivot table: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def copy_worksheet(
|
||||
filepath: str,
|
||||
source_sheet: str,
|
||||
target_sheet: str
|
||||
) -> str:
|
||||
"""Copy worksheet within workbook."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = copy_sheet(full_path, source_sheet, target_sheet)
|
||||
return result["message"]
|
||||
except (ValidationError, SheetError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error copying worksheet: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def delete_worksheet(
|
||||
filepath: str,
|
||||
sheet_name: str
|
||||
) -> str:
|
||||
"""Delete worksheet from workbook."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = delete_sheet(full_path, sheet_name)
|
||||
return result["message"]
|
||||
except (ValidationError, SheetError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting worksheet: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def rename_worksheet(
|
||||
filepath: str,
|
||||
old_name: str,
|
||||
new_name: str
|
||||
) -> str:
|
||||
"""Rename worksheet in workbook."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = rename_sheet(full_path, old_name, new_name)
|
||||
return result["message"]
|
||||
except (ValidationError, SheetError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error renaming worksheet: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def get_workbook_metadata(
|
||||
filepath: str,
|
||||
include_ranges: bool = False
|
||||
) -> str:
|
||||
"""Get metadata about workbook including sheets, ranges, etc."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = get_workbook_info(full_path, include_ranges=include_ranges)
|
||||
return str(result)
|
||||
except WorkbookError as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting workbook metadata: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def merge_cells(filepath: str, sheet_name: str, start_cell: str, end_cell: str) -> str:
|
||||
"""Merge a range of cells."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = merge_range(full_path, sheet_name, start_cell, end_cell)
|
||||
return result["message"]
|
||||
except (ValidationError, SheetError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error merging cells: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def unmerge_cells(filepath: str, sheet_name: str, start_cell: str, end_cell: str) -> str:
|
||||
"""Unmerge a range of cells."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
result = unmerge_range(full_path, sheet_name, start_cell, end_cell)
|
||||
return result["message"]
|
||||
except (ValidationError, SheetError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error unmerging cells: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def copy_range(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
source_start: str,
|
||||
source_end: str,
|
||||
target_start: str,
|
||||
target_sheet: str = None
|
||||
) -> str:
|
||||
"""Copy a range of cells to another location."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
from excel_mcp.sheet import copy_range_operation
|
||||
result = copy_range_operation(
|
||||
full_path,
|
||||
sheet_name,
|
||||
source_start,
|
||||
source_end,
|
||||
target_start,
|
||||
target_sheet
|
||||
)
|
||||
return result["message"]
|
||||
except (ValidationError, SheetError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error copying range: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def delete_range(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
start_cell: str,
|
||||
end_cell: str,
|
||||
shift_direction: str = "up"
|
||||
) -> str:
|
||||
"""Delete a range of cells and shift remaining cells."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
from excel_mcp.sheet import delete_range_operation
|
||||
result = delete_range_operation(
|
||||
full_path,
|
||||
sheet_name,
|
||||
start_cell,
|
||||
end_cell,
|
||||
shift_direction
|
||||
)
|
||||
return result["message"]
|
||||
except (ValidationError, SheetError) as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting range: {e}")
|
||||
raise
|
||||
|
||||
@mcp.tool()
|
||||
def validate_excel_range(
|
||||
filepath: str,
|
||||
sheet_name: str,
|
||||
start_cell: str,
|
||||
end_cell: str = None
|
||||
) -> str:
|
||||
"""Validate if a range exists and is properly formatted."""
|
||||
try:
|
||||
full_path = get_excel_path(filepath)
|
||||
range_str = start_cell if not end_cell else f"{start_cell}:{end_cell}"
|
||||
result = validate_range_impl(full_path, sheet_name, range_str)
|
||||
return result["message"]
|
||||
except ValidationError as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
logger.error(f"Error validating range: {e}")
|
||||
raise
|
||||
|
||||
async def run_server():
|
||||
"""Run the Excel MCP server."""
|
||||
try:
|
||||
logger.info(f"Starting Excel MCP server (files directory: {EXCEL_FILES_PATH})")
|
||||
await mcp.run_sse_async()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Server stopped by user")
|
||||
await mcp.shutdown()
|
||||
except Exception as e:
|
||||
logger.error(f"Server failed: {e}")
|
||||
raise
|
||||
finally:
|
||||
logger.info("Server shutdown complete")
|
||||
Reference in New Issue
Block a user