Feat: add auto parse to connector. (#11099)

### What problem does this PR solve?

#10953

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
Kevin Hu
2025-11-07 16:49:29 +08:00
committed by GitHub
parent 526ba3388f
commit dd1c8c5779
5 changed files with 18 additions and 11 deletions

View File

@ -122,7 +122,7 @@ def update():
if not e: if not e:
return get_data_error_result( return get_data_error_result(
message="Database error (Knowledgebase rename)!") message="Database error (Knowledgebase rename)!")
errors = Connector2KbService.link_connectors(kb.id, [conn["id"] for conn in connectors], current_user.id) errors = Connector2KbService.link_connectors(kb.id, [conn for conn in connectors], current_user.id)
if errors: if errors:
logging.error("Link KB errors: ", errors) logging.error("Link KB errors: ", errors)
kb = kb.to_dict() kb = kb.to_dict()

View File

@ -73,7 +73,9 @@ class ConnectorService(CommonService):
return return
SyncLogsService.filter_delete([SyncLogs.connector_id==connector_id, SyncLogs.kb_id==kb_id]) SyncLogsService.filter_delete([SyncLogs.connector_id==connector_id, SyncLogs.kb_id==kb_id])
docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}") docs = DocumentService.query(source_type=f"{conn.source}/{conn.id}")
return FileService.delete_docs([d.id for d in docs], tenant_id) err = FileService.delete_docs([d.id for d in docs], tenant_id)
SyncLogsService.schedule(connector_id, kb_id, reindex=True)
return err
class SyncLogsService(CommonService): class SyncLogsService(CommonService):
@ -226,16 +228,20 @@ class Connector2KbService(CommonService):
model = Connector2Kb model = Connector2Kb
@classmethod @classmethod
def link_connectors(cls, kb_id:str, connector_ids: list[str], tenant_id:str): def link_connectors(cls, kb_id:str, connectors: list[dict], tenant_id:str):
arr = cls.query(kb_id=kb_id) arr = cls.query(kb_id=kb_id)
old_conn_ids = [a.connector_id for a in arr] old_conn_ids = [a.connector_id for a in arr]
for conn_id in connector_ids: connector_ids = []
for conn in connectors:
conn_id = conn["id"]
connector_ids.append(conn_id)
if conn_id in old_conn_ids: if conn_id in old_conn_ids:
continue continue
cls.save(**{ cls.save(**{
"id": get_uuid(), "id": get_uuid(),
"connector_id": conn_id, "connector_id": conn_id,
"kb_id": kb_id "kb_id": kb_id,
"auto_parse": conn.get("auto_parse", "1")
}) })
SyncLogsService.schedule(conn_id, kb_id, reindex=True) SyncLogsService.schedule(conn_id, kb_id, reindex=True)

View File

@ -63,7 +63,7 @@ def _convert_message_to_document(
semantic_identifier=semantic_identifier, semantic_identifier=semantic_identifier,
doc_updated_at=doc_updated_at, doc_updated_at=doc_updated_at,
blob=message.content.encode("utf-8"), blob=message.content.encode("utf-8"),
extension="txt", extension=".txt",
size_bytes=len(message.content.encode("utf-8")), size_bytes=len(message.content.encode("utf-8")),
) )
@ -275,7 +275,7 @@ class DiscordConnector(LoadConnector, PollConnector):
semantic_identifier=f"{min_updated_at} -> {max_updated_at}", semantic_identifier=f"{min_updated_at} -> {max_updated_at}",
doc_updated_at=max_updated_at, doc_updated_at=max_updated_at,
blob=blob, blob=blob,
extension="txt", extension=".txt",
size_bytes=size_bytes, size_bytes=size_bytes,
) )

View File

@ -1,6 +1,5 @@
import logging import logging
from collections.abc import Generator from collections.abc import Generator
from datetime import datetime, timezone
from typing import Any, Optional from typing import Any, Optional
from retry import retry from retry import retry
@ -33,7 +32,7 @@ from common.data_source.utils import (
batch_generator, batch_generator,
fetch_notion_data, fetch_notion_data,
properties_to_str, properties_to_str,
filter_pages_by_time filter_pages_by_time, datetime_from_string
) )
@ -293,9 +292,9 @@ class NotionConnector(LoadConnector, PollConnector):
blob=blob, blob=blob,
source=DocumentSource.NOTION, source=DocumentSource.NOTION,
semantic_identifier=page_title, semantic_identifier=page_title,
extension="txt", extension=".txt",
size_bytes=len(blob), size_bytes=len(blob),
doc_updated_at=datetime.fromisoformat(page.last_edited_time).astimezone(timezone.utc) doc_updated_at=datetime_from_string(page.last_edited_time)
) )
if self.recursive_index_enabled and all_child_page_ids: if self.recursive_index_enabled and all_child_page_ids:

View File

@ -63,6 +63,8 @@ class SyncBase:
if task["poll_range_start"]: if task["poll_range_start"]:
next_update = task["poll_range_start"] next_update = task["poll_range_start"]
for document_batch in document_batch_generator: for document_batch in document_batch_generator:
if not document_batch:
continue
min_update = min([doc.doc_updated_at for doc in document_batch]) min_update = min([doc.doc_updated_at for doc in document_batch])
max_update = max([doc.doc_updated_at for doc in document_batch]) max_update = max([doc.doc_updated_at for doc in document_batch])
next_update = max([next_update, max_update]) next_update = max([next_update, max_update])