mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Add iteration for agent. (#4258)
### What problem does this PR solve? #4242 ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@ -18,6 +18,9 @@ import json
|
||||
from abc import ABC
|
||||
from copy import deepcopy
|
||||
from functools import partial
|
||||
|
||||
import pandas as pd
|
||||
|
||||
from agent.component import component_class
|
||||
from agent.component.base import ComponentBase
|
||||
|
||||
@ -83,7 +86,8 @@ class Canvas(ABC):
|
||||
}
|
||||
},
|
||||
"downstream": [],
|
||||
"upstream": []
|
||||
"upstream": [],
|
||||
"parent_id": ""
|
||||
}
|
||||
},
|
||||
"history": [],
|
||||
@ -207,6 +211,14 @@ class Canvas(ABC):
|
||||
waiting.append(c)
|
||||
continue
|
||||
yield "*'{}'* is running...🕞".format(self.get_compnent_name(c))
|
||||
|
||||
if cpn.component_name.lower() == "iteration":
|
||||
st_cpn = cpn.get_start()
|
||||
assert st_cpn, "Start component not found for Iteration."
|
||||
if not st_cpn["obj"].end():
|
||||
cpn = st_cpn["obj"]
|
||||
c = cpn._id
|
||||
|
||||
try:
|
||||
ans = cpn.run(self.history, **kwargs)
|
||||
except Exception as e:
|
||||
@ -215,16 +227,26 @@ class Canvas(ABC):
|
||||
ran += 1
|
||||
raise e
|
||||
self.path[-1].append(c)
|
||||
|
||||
ran += 1
|
||||
|
||||
for m in prepare2run(self.components[self.path[-2][-1]]["downstream"]):
|
||||
downstream = self.components[self.path[-2][-1]]["downstream"]
|
||||
if not downstream and self.components[self.path[-2][-1]].get("parent_id"):
|
||||
cid = self.path[-2][-1]
|
||||
pid = self.components[cid]["parent_id"]
|
||||
o, _ = self.components[cid]["obj"].output(allow_partial=False)
|
||||
oo, _ = self.components[pid]["obj"].output(allow_partial=False)
|
||||
self.components[pid]["obj"].set(pd.concat([oo, o], ignore_index=True))
|
||||
downstream = [pid]
|
||||
|
||||
for m in prepare2run(downstream):
|
||||
yield {"content": m, "running_status": True}
|
||||
|
||||
while 0 <= ran < len(self.path[-1]):
|
||||
logging.debug(f"Canvas.run: {ran} {self.path}")
|
||||
cpn_id = self.path[-1][ran]
|
||||
cpn = self.get_component(cpn_id)
|
||||
if not cpn["downstream"]:
|
||||
if not any([cpn["downstream"], cpn.get("parent_id"), waiting]):
|
||||
break
|
||||
|
||||
loop = self._find_loop()
|
||||
@ -239,7 +261,15 @@ class Canvas(ABC):
|
||||
yield {"content": m, "running_status": True}
|
||||
continue
|
||||
|
||||
for m in prepare2run(cpn["downstream"]):
|
||||
downstream = cpn["downstream"]
|
||||
if not downstream and cpn.get("parent_id"):
|
||||
pid = cpn["parent_id"]
|
||||
_, o = cpn["obj"].output(allow_partial=False)
|
||||
_, oo = self.components[pid]["obj"].output(allow_partial=False)
|
||||
self.components[pid]["obj"].set_output(pd.concat([oo.dropna(axis=1), o.dropna(axis=1)], ignore_index=True))
|
||||
downstream = [pid]
|
||||
|
||||
for m in prepare2run(downstream):
|
||||
yield {"content": m, "running_status": True}
|
||||
|
||||
if ran >= len(self.path[-1]) and waiting:
|
||||
@ -247,6 +277,7 @@ class Canvas(ABC):
|
||||
waiting = []
|
||||
for m in prepare2run(without_dependent_checking):
|
||||
yield {"content": m, "running_status": True}
|
||||
without_dependent_checking = []
|
||||
ran -= 1
|
||||
|
||||
if self.answer:
|
||||
@ -294,7 +325,7 @@ class Canvas(ABC):
|
||||
return False
|
||||
|
||||
for i in range(len(path)):
|
||||
if path[i].lower().find("answer") >= 0:
|
||||
if path[i].lower().find("answer") == 0 or path[i].lower().find("iterationitem") == 0:
|
||||
path = path[:i]
|
||||
break
|
||||
|
||||
|
||||
Reference in New Issue
Block a user