mirror of
https://github.com/haris-musa/excel-mcp-server.git
synced 2025-12-08 17:12:41 +08:00
simplify the format of data in batch read and write function to list of lists (#16)
This commit is contained in:
@ -51,8 +51,14 @@ def read_excel_range(
|
|||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise DataError(f"Invalid end cell format: {str(e)}")
|
raise DataError(f"Invalid end cell format: {str(e)}")
|
||||||
else:
|
else:
|
||||||
# For single cell, use same coordinates
|
# Dynamically expand range until all values are empty
|
||||||
end_row, end_col = start_row, start_col
|
end_row, end_col = start_row, start_col
|
||||||
|
while end_row <= ws.max_row and any(ws.cell(row=end_row, column=c).value is not None for c in range(start_col, ws.max_column + 1)):
|
||||||
|
end_row += 1
|
||||||
|
while end_col <= ws.max_column and any(ws.cell(row=r, column=end_col).value is not None for r in range(start_row, ws.max_row + 1)):
|
||||||
|
end_col += 1
|
||||||
|
end_row -= 1 # Adjust back to last non-empty row
|
||||||
|
end_col -= 1 # Adjust back to last non-empty column
|
||||||
|
|
||||||
# Validate range bounds
|
# Validate range bounds
|
||||||
if start_row > ws.max_row or start_col > ws.max_column:
|
if start_row > ws.max_row or start_col > ws.max_column:
|
||||||
@ -62,30 +68,12 @@ def read_excel_range(
|
|||||||
)
|
)
|
||||||
|
|
||||||
data = []
|
data = []
|
||||||
# If it's a single cell or single row, just read the values directly
|
for row in range(start_row, end_row + 1):
|
||||||
if start_row == end_row:
|
row_data = []
|
||||||
row_data = {}
|
|
||||||
for col in range(start_col, end_col + 1):
|
for col in range(start_col, end_col + 1):
|
||||||
cell = ws.cell(row=start_row, column=col)
|
|
||||||
col_name = f"Column_{col}"
|
|
||||||
row_data[col_name] = cell.value
|
|
||||||
if any(v is not None for v in row_data.values()):
|
|
||||||
data.append(row_data)
|
|
||||||
else:
|
|
||||||
# Multiple rows - use header row
|
|
||||||
headers = []
|
|
||||||
for col in range(start_col, end_col + 1):
|
|
||||||
cell_value = ws.cell(row=start_row, column=col).value
|
|
||||||
headers.append(str(cell_value) if cell_value is not None else f"Column_{col}")
|
|
||||||
|
|
||||||
# Get data rows
|
|
||||||
max_rows = min(start_row + 5, end_row) if preview_only else end_row
|
|
||||||
for row in range(start_row + 1, max_rows + 1):
|
|
||||||
row_data = {}
|
|
||||||
for col, header in enumerate(headers, start=start_col):
|
|
||||||
cell = ws.cell(row=row, column=col)
|
cell = ws.cell(row=row, column=col)
|
||||||
row_data[header] = cell.value
|
row_data.append(cell.value)
|
||||||
if any(v is not None for v in row_data.values()):
|
if any(v is not None for v in row_data):
|
||||||
data.append(row_data)
|
data.append(row_data)
|
||||||
|
|
||||||
wb.close()
|
wb.close()
|
||||||
@ -100,7 +88,7 @@ def read_excel_range(
|
|||||||
def write_data(
|
def write_data(
|
||||||
filepath: str,
|
filepath: str,
|
||||||
sheet_name: str | None,
|
sheet_name: str | None,
|
||||||
data: list[dict[str, Any]] | None,
|
data: list[list] | None,
|
||||||
start_cell: str = "A1",
|
start_cell: str = "A1",
|
||||||
) -> dict[str, str]:
|
) -> dict[str, str]:
|
||||||
"""Write data to Excel sheet with workbook handling
|
"""Write data to Excel sheet with workbook handling
|
||||||
@ -230,7 +218,7 @@ def _determine_header_behavior(worksheet, start_row, start_col, data):
|
|||||||
|
|
||||||
def _write_data_to_worksheet(
|
def _write_data_to_worksheet(
|
||||||
worksheet: Worksheet,
|
worksheet: Worksheet,
|
||||||
data: list[dict[str, Any]],
|
data: list[list],
|
||||||
start_cell: str = "A1",
|
start_cell: str = "A1",
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Write data to worksheet with intelligent header handling"""
|
"""Write data to worksheet with intelligent header handling"""
|
||||||
@ -246,46 +234,10 @@ def _write_data_to_worksheet(
|
|||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise DataError(f"Invalid start cell format: {str(e)}")
|
raise DataError(f"Invalid start cell format: {str(e)}")
|
||||||
|
|
||||||
# Validate data structure
|
# Write data
|
||||||
if not all(isinstance(row, dict) for row in data):
|
for i, row in enumerate(data):
|
||||||
raise DataError("All data rows must be dictionaries")
|
for j, val in enumerate(row):
|
||||||
|
worksheet.cell(row=start_row + i, column=start_col + j, value=val)
|
||||||
# Get headers from first data row's keys
|
|
||||||
headers = list(data[0].keys())
|
|
||||||
|
|
||||||
# Check if first row appears to be headers (keys match values)
|
|
||||||
first_row_is_headers = _looks_like_headers(data[0])
|
|
||||||
|
|
||||||
# Determine if we should write headers based on context
|
|
||||||
should_write_headers = _determine_header_behavior(
|
|
||||||
worksheet, start_row, start_col, data
|
|
||||||
)
|
|
||||||
|
|
||||||
# Determine what data to write
|
|
||||||
actual_data = data
|
|
||||||
|
|
||||||
# Only skip the first row if it contains headers AND we're writing headers
|
|
||||||
if first_row_is_headers and should_write_headers:
|
|
||||||
actual_data = data[1:]
|
|
||||||
elif first_row_is_headers and not should_write_headers:
|
|
||||||
actual_data = data
|
|
||||||
|
|
||||||
# Write headers if needed
|
|
||||||
current_row = start_row
|
|
||||||
if should_write_headers:
|
|
||||||
for i, header in enumerate(headers):
|
|
||||||
cell = worksheet.cell(row=current_row, column=start_col + i)
|
|
||||||
cell.value = header
|
|
||||||
cell.font = Font(bold=True)
|
|
||||||
current_row += 1 # Move down after writing headers
|
|
||||||
|
|
||||||
# Write actual data
|
|
||||||
for i, row_dict in enumerate(actual_data):
|
|
||||||
if not all(h in row_dict for h in headers):
|
|
||||||
raise DataError(f"Row {i+1} is missing required headers")
|
|
||||||
for j, header in enumerate(headers):
|
|
||||||
cell = worksheet.cell(row=current_row + i, column=start_col + j)
|
|
||||||
cell.value = row_dict.get(header, "")
|
|
||||||
except DataError as e:
|
except DataError as e:
|
||||||
logger.error(str(e))
|
logger.error(str(e))
|
||||||
raise
|
raise
|
||||||
|
|||||||
@ -91,7 +91,10 @@ def apply_formula(
|
|||||||
cell: str,
|
cell: str,
|
||||||
formula: str,
|
formula: str,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Apply Excel formula to cell."""
|
"""
|
||||||
|
Apply Excel formula to cell.
|
||||||
|
Excel formula will write to cell with verification.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
full_path = get_excel_path(filepath)
|
full_path = get_excel_path(filepath)
|
||||||
# First validate the formula
|
# First validate the formula
|
||||||
@ -188,7 +191,12 @@ def read_data_from_excel(
|
|||||||
end_cell: str = None,
|
end_cell: str = None,
|
||||||
preview_only: bool = False
|
preview_only: bool = False
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Read data from Excel worksheet."""
|
"""
|
||||||
|
Read data from Excel worksheet.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Data from Excel worksheet as json string. list of lists or empty list if no data found. sublists are assumed to be rows.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
full_path = get_excel_path(filepath)
|
full_path = get_excel_path(filepath)
|
||||||
from excel_mcp.data import read_excel_range
|
from excel_mcp.data import read_excel_range
|
||||||
@ -206,16 +214,19 @@ def read_data_from_excel(
|
|||||||
def write_data_to_excel(
|
def write_data_to_excel(
|
||||||
filepath: str,
|
filepath: str,
|
||||||
sheet_name: str,
|
sheet_name: str,
|
||||||
data: List[Dict],
|
data: List[List],
|
||||||
start_cell: str = "A1",
|
start_cell: str = "A1",
|
||||||
) -> str:
|
) -> str:
|
||||||
"""Write data to Excel worksheet.
|
"""
|
||||||
|
Write data to Excel worksheet.
|
||||||
|
Excel formula will write to cell without any verification.
|
||||||
|
|
||||||
|
PARAMETERS:
|
||||||
|
filepath: Path to Excel file
|
||||||
|
sheet_name: Name of worksheet to write to
|
||||||
|
data: List of lists containing data to write to the worksheet, sublists are assumed to be rows
|
||||||
|
start_cell: Cell to start writing to, default is "A1"
|
||||||
|
|
||||||
The function automatically detects the context and handles headers intelligently:
|
|
||||||
- Headers are added when writing to a new area
|
|
||||||
- Headers are not duplicated when writing below existing headers
|
|
||||||
- Title areas (rows 1-4) are treated specially
|
|
||||||
- If the first row of data appears to be headers, it will be used accordingly
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
full_path = get_excel_path(filepath)
|
full_path = get_excel_path(filepath)
|
||||||
|
|||||||
3
uv.lock
generated
3
uv.lock
generated
@ -1,4 +1,5 @@
|
|||||||
version = 1
|
version = 1
|
||||||
|
revision = 1
|
||||||
requires-python = ">=3.10"
|
requires-python = ">=3.10"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -66,7 +67,7 @@ wheels = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "excel-mcp-server"
|
name = "excel-mcp-server"
|
||||||
version = "0.1.0"
|
version = "0.1.1"
|
||||||
source = { editable = "." }
|
source = { editable = "." }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "mcp", extra = ["cli"] },
|
{ name = "mcp", extra = ["cli"] },
|
||||||
|
|||||||
Reference in New Issue
Block a user