mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
feat: improve metadata handling in connector service (#11421)
### What problem does this PR solve? - Update sync data source to handle metadata properly ### Type of change - [x] New Feature (non-breaking change which adds functionality) --------- Co-authored-by: Kevin Hu <kevinhu.sh@gmail.com>
This commit is contained in:
@ -214,9 +214,21 @@ class SyncLogsService(CommonService):
|
|||||||
err, doc_blob_pairs = FileService.upload_document(kb, files, tenant_id, src)
|
err, doc_blob_pairs = FileService.upload_document(kb, files, tenant_id, src)
|
||||||
errs.extend(err)
|
errs.extend(err)
|
||||||
|
|
||||||
|
# Create a mapping from filename to metadata for later use
|
||||||
|
metadata_map = {}
|
||||||
|
for d in docs:
|
||||||
|
if d.get("metadata"):
|
||||||
|
filename = d["semantic_identifier"]+(f"{d['extension']}" if d["semantic_identifier"][::-1].find(d['extension'][::-1])<0 else "")
|
||||||
|
metadata_map[filename] = d["metadata"]
|
||||||
|
|
||||||
kb_table_num_map = {}
|
kb_table_num_map = {}
|
||||||
for doc, _ in doc_blob_pairs:
|
for doc, _ in doc_blob_pairs:
|
||||||
doc_ids.append(doc["id"])
|
doc_ids.append(doc["id"])
|
||||||
|
|
||||||
|
# Set metadata if available for this document
|
||||||
|
if doc["name"] in metadata_map:
|
||||||
|
DocumentService.update_by_id(doc["id"], {"meta_fields": metadata_map[doc["name"]]})
|
||||||
|
|
||||||
if not auto_parse or auto_parse == "0":
|
if not auto_parse or auto_parse == "0":
|
||||||
continue
|
continue
|
||||||
DocumentService.run(tenant_id, doc, kb_table_num_map)
|
DocumentService.run(tenant_id, doc, kb_table_num_map)
|
||||||
|
|||||||
@ -1562,6 +1562,7 @@ class ConfluenceConnector(
|
|||||||
size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes
|
size_bytes=len(page_content.encode("utf-8")), # Calculate size in bytes
|
||||||
doc_updated_at=datetime_from_string(page["version"]["when"]),
|
doc_updated_at=datetime_from_string(page["version"]["when"]),
|
||||||
primary_owners=primary_owners if primary_owners else None,
|
primary_owners=primary_owners if primary_owners else None,
|
||||||
|
metadata=metadata if metadata else None,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.error(f"Error converting page {page.get('id', 'unknown')}: {e}")
|
logging.error(f"Error converting page {page.get('id', 'unknown')}: {e}")
|
||||||
|
|||||||
@ -65,6 +65,7 @@ def _convert_message_to_document(
|
|||||||
blob=message.content.encode("utf-8"),
|
blob=message.content.encode("utf-8"),
|
||||||
extension=".txt",
|
extension=".txt",
|
||||||
size_bytes=len(message.content.encode("utf-8")),
|
size_bytes=len(message.content.encode("utf-8")),
|
||||||
|
metadata=metadata if metadata else None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -94,6 +94,7 @@ class Document(BaseModel):
|
|||||||
blob: bytes
|
blob: bytes
|
||||||
doc_updated_at: datetime
|
doc_updated_at: datetime
|
||||||
size_bytes: int
|
size_bytes: int
|
||||||
|
metadata: Optional[dict[str, Any]] = None
|
||||||
|
|
||||||
|
|
||||||
class BasicExpertInfo(BaseModel):
|
class BasicExpertInfo(BaseModel):
|
||||||
|
|||||||
@ -75,8 +75,9 @@ class SyncBase:
|
|||||||
min_update = min([doc.doc_updated_at for doc in document_batch])
|
min_update = min([doc.doc_updated_at for doc in document_batch])
|
||||||
max_update = max([doc.doc_updated_at for doc in document_batch])
|
max_update = max([doc.doc_updated_at for doc in document_batch])
|
||||||
next_update = max([next_update, max_update])
|
next_update = max([next_update, max_update])
|
||||||
docs = [
|
docs = []
|
||||||
{
|
for doc in document_batch:
|
||||||
|
doc_dict = {
|
||||||
"id": doc.id,
|
"id": doc.id,
|
||||||
"connector_id": task["connector_id"],
|
"connector_id": task["connector_id"],
|
||||||
"source": self.SOURCE_NAME,
|
"source": self.SOURCE_NAME,
|
||||||
@ -86,8 +87,10 @@ class SyncBase:
|
|||||||
"doc_updated_at": doc.doc_updated_at,
|
"doc_updated_at": doc.doc_updated_at,
|
||||||
"blob": doc.blob,
|
"blob": doc.blob,
|
||||||
}
|
}
|
||||||
for doc in document_batch
|
# Add metadata if present
|
||||||
]
|
if doc.metadata:
|
||||||
|
doc_dict["metadata"] = doc.metadata
|
||||||
|
docs.append(doc_dict)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
|
e, kb = KnowledgebaseService.get_by_id(task["kb_id"])
|
||||||
|
|||||||
Reference in New Issue
Block a user