mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 12:32:30 +08:00
Feat:Data Operations (#11002)
### What problem does this PR solve? new component:Data Operations #10427 ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
201
agent/component/data_operations.py
Normal file
201
agent/component/data_operations.py
Normal file
@ -0,0 +1,201 @@
|
||||
from abc import ABC
|
||||
import ast
|
||||
import os
|
||||
from agent.component.base import ComponentBase, ComponentParamBase
|
||||
from api.utils.api_utils import timeout
|
||||
|
||||
class DataOperationsParam(ComponentParamBase):
|
||||
"""
|
||||
Define the Data Operations component parameters.
|
||||
"""
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.inputs = []
|
||||
self.operations = "literal_eval"
|
||||
self.select_keys = []
|
||||
self.filter_values=[]
|
||||
self.updates=[]
|
||||
self.remove_keys=[]
|
||||
self.rename_keys=[]
|
||||
self.outputs = {
|
||||
"result": {
|
||||
"value": [],
|
||||
"type": "Array of Object"
|
||||
}
|
||||
}
|
||||
|
||||
def check(self):
|
||||
self.check_valid_value(self.operations, "Support operations", ["select_keys", "literal_eval","combine","filter_values","append_or_update","remove_keys","rename_keys"])
|
||||
|
||||
|
||||
|
||||
class DataOperations(ComponentBase,ABC):
|
||||
component_name = "DataOperations"
|
||||
|
||||
def get_input_form(self) -> dict[str, dict]:
|
||||
return {
|
||||
k: {"name": o.get("name", ""), "type": "line"}
|
||||
for input_item in (self._param.inputs or [])
|
||||
for k, o in self.get_input_elements_from_text(input_item).items()
|
||||
}
|
||||
|
||||
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60)))
|
||||
def _invoke(self, **kwargs):
|
||||
self.input_objects=[]
|
||||
inputs = getattr(self._param, "inputs", None)
|
||||
if not isinstance(inputs, (list, tuple)):
|
||||
inputs = [inputs]
|
||||
for input_ref in self._param.inputs:
|
||||
input_object=self._canvas.get_variable_value(input_ref)
|
||||
if input_object is None:
|
||||
continue
|
||||
if isinstance(input_object,dict):
|
||||
self.input_objects.append(input_object)
|
||||
elif isinstance(input_object,list):
|
||||
self.input_objects.extend(x for x in input_object if isinstance(x, dict))
|
||||
else:
|
||||
continue
|
||||
if self._param.operations == "select_keys":
|
||||
self._select_keys()
|
||||
elif self._param.operations == "literal_eval":
|
||||
self._literal_eval()
|
||||
elif self._param.operations == "combine":
|
||||
self._combine()
|
||||
elif self._param.operations == "filter_values":
|
||||
self._filter_values()
|
||||
elif self._param.operations == "append_or_update":
|
||||
self._append_or_update()
|
||||
elif self._param.operations == "remove_keys":
|
||||
self._remove_keys()
|
||||
else:
|
||||
self._rename_keys()
|
||||
|
||||
def _select_keys(self):
|
||||
filter_criteria: list[str] = self._param.select_keys
|
||||
results = [{key: value for key, value in data_dict.items() if key in filter_criteria} for data_dict in self.input_objects]
|
||||
self.set_output("result", results)
|
||||
|
||||
|
||||
def _recursive_eval(self, data):
|
||||
if isinstance(data, dict):
|
||||
return {k: self.recursive_eval(v) for k, v in data.items()}
|
||||
if isinstance(data, list):
|
||||
return [self.recursive_eval(item) for item in data]
|
||||
if isinstance(data, str):
|
||||
try:
|
||||
if (
|
||||
data.strip().startswith(("{", "[", "(", "'", '"'))
|
||||
or data.strip().lower() in ("true", "false", "none")
|
||||
or data.strip().replace(".", "").isdigit()
|
||||
):
|
||||
return ast.literal_eval(data)
|
||||
except (ValueError, SyntaxError, TypeError, MemoryError):
|
||||
return data
|
||||
else:
|
||||
return data
|
||||
return data
|
||||
|
||||
def _literal_eval(self):
|
||||
self.set_output("result", self._recursive_eval(self.input_objects))
|
||||
|
||||
def _combine(self):
|
||||
result={}
|
||||
for obj in self.input_objects():
|
||||
for key, value in obj.items():
|
||||
if key not in result:
|
||||
result[key] = value
|
||||
elif isinstance(result[key], list):
|
||||
if isinstance(value, list):
|
||||
result[key].extend(value)
|
||||
else:
|
||||
result[key].append(value)
|
||||
else:
|
||||
result[key] = (
|
||||
[result[key], value] if not isinstance(value, list) else [result[key], *value]
|
||||
)
|
||||
self.set_output("result", result)
|
||||
|
||||
def norm(self,v):
|
||||
s = "" if v is None else str(v)
|
||||
return s
|
||||
|
||||
def match_rule(self, obj, rule):
|
||||
key = rule.get("key")
|
||||
op = (rule.get("operator") or "equals").lower()
|
||||
target = self.norm(rule.get("value"))
|
||||
if key not in obj:
|
||||
return False
|
||||
val = obj.get(key, None)
|
||||
v = self.norm(val)
|
||||
if op == "=":
|
||||
return v == target
|
||||
if op == "≠":
|
||||
return v != target
|
||||
if op == "contains":
|
||||
return target in v
|
||||
if op == "start with":
|
||||
return v.startswith(target)
|
||||
if op == "end with":
|
||||
return v.endswith(target)
|
||||
return False
|
||||
|
||||
def _filter_values(self):
|
||||
results=[]
|
||||
rules = (getattr(self._param, "filter_values", None) or [])
|
||||
for obj in self.input_objects():
|
||||
if not rules:
|
||||
results.append(obj)
|
||||
continue
|
||||
if all(self.match_rule(obj, r) for r in rules):
|
||||
results.append(obj)
|
||||
self.set_output("result", results)
|
||||
|
||||
|
||||
def _append_or_update(self):
|
||||
results=[]
|
||||
updates = getattr(self._param, "updates", []) or []
|
||||
for obj in self.input_objects():
|
||||
new_obj = dict(obj)
|
||||
for item in updates:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
k = (item.get("key") or "").strip()
|
||||
if not k:
|
||||
continue
|
||||
new_obj[k] = item.get("value")
|
||||
results.append(new_obj)
|
||||
self.set_output("result", results)
|
||||
|
||||
def _remove_keys(self):
|
||||
results = []
|
||||
remove_keys = getattr(self._param, "remove_keys", []) or []
|
||||
|
||||
for obj in (self.input_objects or []):
|
||||
new_obj = dict(obj)
|
||||
for k in remove_keys:
|
||||
if not isinstance(k, str):
|
||||
continue
|
||||
new_obj.pop(k, None)
|
||||
results.append(new_obj)
|
||||
self.set_output("result", results)
|
||||
|
||||
def _rename_keys(self):
|
||||
results = []
|
||||
rename_pairs = getattr(self._param, "rename_keys", []) or []
|
||||
|
||||
for obj in (self.input_objects or []):
|
||||
new_obj = dict(obj)
|
||||
for pair in rename_pairs:
|
||||
if not isinstance(pair, dict):
|
||||
continue
|
||||
old = (pair.get("old_key") or "").strip()
|
||||
new = (pair.get("new_key") or "").strip()
|
||||
if not old or not new or old == new:
|
||||
continue
|
||||
if old in new_obj:
|
||||
new_obj[new] = new_obj.pop(old)
|
||||
results.append(new_obj)
|
||||
self.set_output("result", results)
|
||||
|
||||
def thoughts(self) -> str:
|
||||
return "DataOperation in progress"
|
||||
Reference in New Issue
Block a user