Feat: add data type invoke (#5126)

### What problem does this PR solve?
```
Invoke agent
To be able to interact dynamically with the API, there is a customizable Data Type JSON or FormData, the default is JSON 
```

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
so95
2025-02-27 15:15:33 +07:00
committed by GitHub
parent 7a6e70d6b3
commit 11de7599e5
5 changed files with 39 additions and 13 deletions

View File

@ -35,12 +35,14 @@ class InvokeParam(ComponentParamBase):
self.url = ""
self.timeout = 60
self.clean_html = False
self.datatype = "json" # New parameter to determine data posting type
def check(self):
self.check_valid_value(self.method.lower(), "Type of content from the crawler", ['get', 'post', 'put'])
self.check_empty(self.url, "End point URL")
self.check_positive_integer(self.timeout, "Timeout time in second")
self.check_boolean(self.clean_html, "Clean HTML")
self.check_valid_value(self.datatype.lower(), "Data post type", ['json', 'formdata']) # Check for valid datapost value
class Invoke(ComponentBase, ABC):
@ -94,22 +96,36 @@ class Invoke(ComponentBase, ABC):
return Invoke.be_output(response.text)
if method == 'put':
response = requests.put(url=url,
json=args,
headers=headers,
proxies=proxies,
timeout=self._param.timeout)
if self._param.datatype.lower() == 'json':
response = requests.put(url=url,
json=args,
headers=headers,
proxies=proxies,
timeout=self._param.timeout)
else:
response = requests.put(url=url,
data=args,
headers=headers,
proxies=proxies,
timeout=self._param.timeout)
if self._param.clean_html:
sections = HtmlParser()(None, response.content)
return Invoke.be_output("\n".join(sections))
return Invoke.be_output(response.text)
if method == 'post':
response = requests.post(url=url,
json=args,
headers=headers,
proxies=proxies,
timeout=self._param.timeout)
if self._param.datatype.lower() == 'json':
response = requests.post(url=url,
json=args,
headers=headers,
proxies=proxies,
timeout=self._param.timeout)
else:
response = requests.post(url=url,
data=args,
headers=headers,
proxies=proxies,
timeout=self._param.timeout)
if self._param.clean_html:
sections = HtmlParser()(None, response.content)
return Invoke.be_output("\n".join(sections))