diff --git a/agent/component/data_operations.py b/agent/component/data_operations.py new file mode 100644 index 000000000..e0934433e --- /dev/null +++ b/agent/component/data_operations.py @@ -0,0 +1,201 @@ +from abc import ABC +import ast +import os +from agent.component.base import ComponentBase, ComponentParamBase +from api.utils.api_utils import timeout + +class DataOperationsParam(ComponentParamBase): + """ + Define the Data Operations component parameters. + """ + def __init__(self): + super().__init__() + self.inputs = [] + self.operations = "literal_eval" + self.select_keys = [] + self.filter_values=[] + self.updates=[] + self.remove_keys=[] + self.rename_keys=[] + self.outputs = { + "result": { + "value": [], + "type": "Array of Object" + } + } + + def check(self): + self.check_valid_value(self.operations, "Support operations", ["select_keys", "literal_eval","combine","filter_values","append_or_update","remove_keys","rename_keys"]) + + + +class DataOperations(ComponentBase,ABC): + component_name = "DataOperations" + + def get_input_form(self) -> dict[str, dict]: + return { + k: {"name": o.get("name", ""), "type": "line"} + for input_item in (self._param.inputs or []) + for k, o in self.get_input_elements_from_text(input_item).items() + } + + @timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60))) + def _invoke(self, **kwargs): + self.input_objects=[] + inputs = getattr(self._param, "inputs", None) + if not isinstance(inputs, (list, tuple)): + inputs = [inputs] + for input_ref in self._param.inputs: + input_object=self._canvas.get_variable_value(input_ref) + if input_object is None: + continue + if isinstance(input_object,dict): + self.input_objects.append(input_object) + elif isinstance(input_object,list): + self.input_objects.extend(x for x in input_object if isinstance(x, dict)) + else: + continue + if self._param.operations == "select_keys": + self._select_keys() + elif self._param.operations == "literal_eval": + self._literal_eval() + elif self._param.operations == "combine": + self._combine() + elif self._param.operations == "filter_values": + self._filter_values() + elif self._param.operations == "append_or_update": + self._append_or_update() + elif self._param.operations == "remove_keys": + self._remove_keys() + else: + self._rename_keys() + + def _select_keys(self): + filter_criteria: list[str] = self._param.select_keys + results = [{key: value for key, value in data_dict.items() if key in filter_criteria} for data_dict in self.input_objects] + self.set_output("result", results) + + + def _recursive_eval(self, data): + if isinstance(data, dict): + return {k: self.recursive_eval(v) for k, v in data.items()} + if isinstance(data, list): + return [self.recursive_eval(item) for item in data] + if isinstance(data, str): + try: + if ( + data.strip().startswith(("{", "[", "(", "'", '"')) + or data.strip().lower() in ("true", "false", "none") + or data.strip().replace(".", "").isdigit() + ): + return ast.literal_eval(data) + except (ValueError, SyntaxError, TypeError, MemoryError): + return data + else: + return data + return data + + def _literal_eval(self): + self.set_output("result", self._recursive_eval(self.input_objects)) + + def _combine(self): + result={} + for obj in self.input_objects(): + for key, value in obj.items(): + if key not in result: + result[key] = value + elif isinstance(result[key], list): + if isinstance(value, list): + result[key].extend(value) + else: + result[key].append(value) + else: + result[key] = ( + [result[key], value] if not isinstance(value, list) else [result[key], *value] + ) + self.set_output("result", result) + + def norm(self,v): + s = "" if v is None else str(v) + return s + + def match_rule(self, obj, rule): + key = rule.get("key") + op = (rule.get("operator") or "equals").lower() + target = self.norm(rule.get("value")) + if key not in obj: + return False + val = obj.get(key, None) + v = self.norm(val) + if op == "=": + return v == target + if op == "≠": + return v != target + if op == "contains": + return target in v + if op == "start with": + return v.startswith(target) + if op == "end with": + return v.endswith(target) + return False + + def _filter_values(self): + results=[] + rules = (getattr(self._param, "filter_values", None) or []) + for obj in self.input_objects(): + if not rules: + results.append(obj) + continue + if all(self.match_rule(obj, r) for r in rules): + results.append(obj) + self.set_output("result", results) + + + def _append_or_update(self): + results=[] + updates = getattr(self._param, "updates", []) or [] + for obj in self.input_objects(): + new_obj = dict(obj) + for item in updates: + if not isinstance(item, dict): + continue + k = (item.get("key") or "").strip() + if not k: + continue + new_obj[k] = item.get("value") + results.append(new_obj) + self.set_output("result", results) + + def _remove_keys(self): + results = [] + remove_keys = getattr(self._param, "remove_keys", []) or [] + + for obj in (self.input_objects or []): + new_obj = dict(obj) + for k in remove_keys: + if not isinstance(k, str): + continue + new_obj.pop(k, None) + results.append(new_obj) + self.set_output("result", results) + + def _rename_keys(self): + results = [] + rename_pairs = getattr(self._param, "rename_keys", []) or [] + + for obj in (self.input_objects or []): + new_obj = dict(obj) + for pair in rename_pairs: + if not isinstance(pair, dict): + continue + old = (pair.get("old_key") or "").strip() + new = (pair.get("new_key") or "").strip() + if not old or not new or old == new: + continue + if old in new_obj: + new_obj[new] = new_obj.pop(old) + results.append(new_obj) + self.set_output("result", results) + + def thoughts(self) -> str: + return "DataOperation in progress"