Test: Update test cases to reduce execution time (#6470)

### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] update test cases
This commit is contained in:
liu an
2025-03-25 09:17:05 +08:00
committed by GitHub
parent 390086c6ab
commit b6f3242c6c
17 changed files with 704 additions and 695 deletions

View File

@ -74,7 +74,7 @@ def delete_dataset(auth, payload=None):
return res.json()
def create_datasets(auth, num):
def batch_create_datasets(auth, num):
ids = []
for i in range(num):
res = create_dataset(auth, {"name": f"dataset_{i}"})
@ -111,18 +111,6 @@ def upload_documnets(auth, dataset_id, files_path=None):
f.close()
def batch_upload_documents(auth, dataset_id, num, tmp_path):
fps = []
for i in range(num):
fp = create_txt_file(tmp_path / f"ragflow_test_upload_{i}.txt")
fps.append(fp)
res = upload_documnets(auth, dataset_id, fps)
document_ids = []
for document in res["data"]:
document_ids.append(document["id"])
return document_ids
def download_document(auth, dataset_id, document_id, save_path):
url = f"{HOST_ADDRESS}{FILE_API_URL}/{document_id}".format(dataset_id=dataset_id)
res = requests.get(url=url, auth=auth, stream=True)
@ -172,8 +160,39 @@ def stop_parse_documnet(auth, dataset_id, payload=None):
return res.json()
def bulk_upload_documents(auth, dataset_id, num, tmp_path):
fps = []
for i in range(num):
fp = create_txt_file(tmp_path / f"ragflow_test_upload_{i}.txt")
fps.append(fp)
res = upload_documnets(auth, dataset_id, fps)
document_ids = []
for document in res["data"]:
document_ids.append(document["id"])
return document_ids
# CHUNK MANAGEMENT WITHIN DATASET
def add_chunk(auth, dataset_id, document_id, payload=None):
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
res = requests.post(url=url, headers=HEADERS, auth=auth, json=payload)
return res.json()
def list_chunks(auth, dataset_id, document_id, params=None):
url = f"{HOST_ADDRESS}{CHUNK_API_URL}".format(dataset_id=dataset_id, document_id=document_id)
res = requests.get(
url=url,
headers=HEADERS,
auth=auth,
params=params,
)
return res.json()
def batch_add_chunks(auth, dataset_id, document_id, num):
ids = []
for i in range(num):
res = add_chunk(auth, dataset_id, document_id, {"content": f"ragflow test {i}"})
ids.append(res["data"]["chunk"]["id"])
return ids