fix:data operations update (#11013)

### What problem does this PR solve?

change:data operations update

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
This commit is contained in:
buua436
2025-11-05 19:59:10 +08:00
committed by GitHub
parent e658beee38
commit f29a3dd651
2 changed files with 37 additions and 9 deletions

View File

@ -153,6 +153,33 @@ class Graph:
def get_tenant_id(self):
return self._tenant_id
def get_value_with_variable(self,value: str) -> Any:
pat = re.compile(r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z:0-9_.-]+|sys\.[a-z_]+)\} *\}*")
out_parts = []
last = 0
for m in pat.finditer(value):
out_parts.append(value[last:m.start()])
key = m.group(1)
v = self.get_variable_value(key)
if v is None:
rep = ""
elif isinstance(v, partial):
buf = []
for chunk in v():
buf.append(chunk)
rep = "".join(buf)
elif isinstance(v, str):
rep = v
else:
rep = json.dumps(v, ensure_ascii=False)
out_parts.append(rep)
last = m.end()
out_parts.append(value[last:])
return("".join(out_parts))
def get_variable_value(self, exp: str) -> Any:
exp = exp.strip("{").strip("}").strip(" ").strip("{").strip("}")
if exp.find("@") < 0:

View File

@ -10,7 +10,7 @@ class DataOperationsParam(ComponentParamBase):
"""
def __init__(self):
super().__init__()
self.inputs = []
self.query = []
self.operations = "literal_eval"
self.select_keys = []
self.filter_values=[]
@ -35,17 +35,17 @@ class DataOperations(ComponentBase,ABC):
def get_input_form(self) -> dict[str, dict]:
return {
k: {"name": o.get("name", ""), "type": "line"}
for input_item in (self._param.inputs or [])
for input_item in (self._param.query or [])
for k, o in self.get_input_elements_from_text(input_item).items()
}
@timeout(int(os.environ.get("COMPONENT_EXEC_TIMEOUT", 10*60)))
def _invoke(self, **kwargs):
self.input_objects=[]
inputs = getattr(self._param, "inputs", None)
inputs = getattr(self._param, "query", None)
if not isinstance(inputs, (list, tuple)):
inputs = [inputs]
for input_ref in self._param.inputs:
for input_ref in inputs:
input_object=self._canvas.get_variable_value(input_ref)
if input_object is None:
continue
@ -57,7 +57,7 @@ class DataOperations(ComponentBase,ABC):
continue
if self._param.operations == "select_keys":
self._select_keys()
elif self._param.operations == "literal_eval":
elif self._param.operations == "recursive_eval":
self._literal_eval()
elif self._param.operations == "combine":
self._combine()
@ -100,7 +100,7 @@ class DataOperations(ComponentBase,ABC):
def _combine(self):
result={}
for obj in self.input_objects():
for obj in self.input_objects:
for key, value in obj.items():
if key not in result:
result[key] = value
@ -123,6 +123,7 @@ class DataOperations(ComponentBase,ABC):
key = rule.get("key")
op = (rule.get("operator") or "equals").lower()
target = self.norm(rule.get("value"))
target = self._canvas.get_value_with_variable(target) or target
if key not in obj:
return False
val = obj.get(key, None)
@ -142,7 +143,7 @@ class DataOperations(ComponentBase,ABC):
def _filter_values(self):
results=[]
rules = (getattr(self._param, "filter_values", None) or [])
for obj in self.input_objects():
for obj in self.input_objects:
if not rules:
results.append(obj)
continue
@ -154,7 +155,7 @@ class DataOperations(ComponentBase,ABC):
def _append_or_update(self):
results=[]
updates = getattr(self._param, "updates", []) or []
for obj in self.input_objects():
for obj in self.input_objects:
new_obj = dict(obj)
for item in updates:
if not isinstance(item, dict):
@ -162,7 +163,7 @@ class DataOperations(ComponentBase,ABC):
k = (item.get("key") or "").strip()
if not k:
continue
new_obj[k] = item.get("value")
new_obj[k] = self._canvas.get_value_with_variable(item.get("value")) or item.get("value")
results.append(new_obj)
self.set_output("result", results)