mirror of
https://github.com/haris-musa/excel-mcp-server.git
synced 2025-12-08 17:12:41 +08:00
@ -58,21 +58,6 @@ def create_pivot_table(
|
||||
# Create range string
|
||||
data_range_str = f"{get_column_letter(start_col)}{start_row}:{get_column_letter(end_col)}{end_row}"
|
||||
|
||||
# Read source data
|
||||
try:
|
||||
data = read_excel_range(filepath, sheet_name, start_cell, end_cell)
|
||||
if not data:
|
||||
raise PivotError("No data found in range")
|
||||
except Exception as e:
|
||||
raise PivotError(f"Failed to read source data: {str(e)}")
|
||||
|
||||
# Validate aggregation function
|
||||
valid_agg_funcs = ["sum", "average", "count", "min", "max"]
|
||||
if agg_func.lower() not in valid_agg_funcs:
|
||||
raise ValidationError(
|
||||
f"Invalid aggregation function. Must be one of: {', '.join(valid_agg_funcs)}"
|
||||
)
|
||||
|
||||
# Clean up field names by removing aggregation suffixes
|
||||
def clean_field_name(field: str) -> str:
|
||||
field = str(field).strip()
|
||||
@ -81,17 +66,39 @@ def create_pivot_table(
|
||||
return field[:-len(suffix)]
|
||||
return field
|
||||
|
||||
# Read source data and convert to list of dicts
|
||||
try:
|
||||
data_as_list = read_excel_range(filepath, sheet_name, start_cell, end_cell)
|
||||
if not data_as_list or len(data_as_list) < 2:
|
||||
raise PivotError("Source data must have a header row and at least one data row.")
|
||||
|
||||
headers = [str(h) for h in data_as_list[0]]
|
||||
data = [dict(zip(headers, row)) for row in data_as_list[1:]]
|
||||
|
||||
if not data:
|
||||
raise PivotError("No data rows found after header.")
|
||||
|
||||
except Exception as e:
|
||||
raise PivotError(f"Failed to read or process source data: {str(e)}")
|
||||
|
||||
# Validate aggregation function
|
||||
valid_agg_funcs = ["sum", "average", "count", "min", "max"]
|
||||
if agg_func.lower() not in valid_agg_funcs:
|
||||
raise ValidationError(
|
||||
f"Invalid aggregation function. Must be one of: {', '.join(valid_agg_funcs)}"
|
||||
)
|
||||
|
||||
# Validate field names exist in data
|
||||
if data:
|
||||
first_row = data[0]
|
||||
available_fields = {clean_field_name(str(header)).lower() for header in first_row.keys()}
|
||||
available_fields_raw = data[0].keys()
|
||||
available_fields = {clean_field_name(str(header)).lower() for header in available_fields_raw}
|
||||
|
||||
for field_list, field_type in [(rows, "row"), (values, "value")]:
|
||||
for field in field_list:
|
||||
if clean_field_name(str(field)).lower() not in available_fields:
|
||||
raise ValidationError(
|
||||
f"Invalid {field_type} field '{field}'. "
|
||||
f"Available fields: {', '.join(sorted(available_fields))}"
|
||||
f"Available fields: {', '.join(sorted(available_fields_raw))}"
|
||||
)
|
||||
|
||||
if columns:
|
||||
@ -99,17 +106,9 @@ def create_pivot_table(
|
||||
if clean_field_name(str(field)).lower() not in available_fields:
|
||||
raise ValidationError(
|
||||
f"Invalid column field '{field}'. "
|
||||
f"Available fields: {', '.join(sorted(available_fields))}"
|
||||
f"Available fields: {', '.join(sorted(available_fields_raw))}"
|
||||
)
|
||||
|
||||
# Skip header row if it matches our fields
|
||||
if all(
|
||||
any(clean_field_name(str(header)).lower() == clean_field_name(str(field)).lower()
|
||||
for field in rows + values)
|
||||
for header in first_row.keys()
|
||||
):
|
||||
data = data[1:]
|
||||
|
||||
# Clean up row and value field names
|
||||
cleaned_rows = [clean_field_name(field) for field in rows]
|
||||
cleaned_values = [clean_field_name(field) for field in values]
|
||||
|
||||
Reference in New Issue
Block a user