diff --git a/common/constants.py b/common/constants.py index 7bd00c3b0..ccf3547cc 100644 --- a/common/constants.py +++ b/common/constants.py @@ -128,6 +128,7 @@ class FileSource(StrEnum): R2 = "r2" OCI_STORAGE = "oci_storage" GOOGLE_CLOUD_STORAGE = "google_cloud_storage" + AIRTABLE = "airtable" class PipelineTaskType(StrEnum): diff --git a/common/data_source/__init__.py b/common/data_source/__init__.py index bcbc368cc..914233460 100644 --- a/common/data_source/__init__.py +++ b/common/data_source/__init__.py @@ -36,6 +36,7 @@ from .sharepoint_connector import SharePointConnector from .teams_connector import TeamsConnector from .webdav_connector import WebDAVConnector from .moodle_connector import MoodleConnector +from .airtable_connector import AirtableConnector from .config import BlobType, DocumentSource from .models import Document, TextSection, ImageSection, BasicExpertInfo from .exceptions import ( @@ -70,5 +71,6 @@ __all__ = [ "ConnectorValidationError", "CredentialExpiredError", "InsufficientPermissionsError", - "UnexpectedValidationError" + "UnexpectedValidationError", + "AirtableConnector", ] diff --git a/common/data_source/airtable_connector.py b/common/data_source/airtable_connector.py new file mode 100644 index 000000000..03d0dc3d9 --- /dev/null +++ b/common/data_source/airtable_connector.py @@ -0,0 +1,149 @@ +from datetime import datetime, timezone +import logging +from typing import Any + +import requests + +from pyairtable import Api as AirtableApi + +from common.data_source.config import AIRTABLE_CONNECTOR_SIZE_THRESHOLD, INDEX_BATCH_SIZE, DocumentSource +from common.data_source.exceptions import ConnectorMissingCredentialError +from common.data_source.interfaces import LoadConnector +from common.data_source.models import Document, GenerateDocumentsOutput +from common.data_source.utils import extract_size_bytes, get_file_ext + +class AirtableClientNotSetUpError(PermissionError): + def __init__(self) -> None: + super().__init__( + "Airtable client is not set up. Did you forget to call load_credentials()?" + ) + + +class AirtableConnector(LoadConnector): + """ + Lightweight Airtable connector. + + This connector ingests Airtable attachments as raw blobs without + parsing file content or generating text/image sections. + """ + + def __init__( + self, + base_id: str, + table_name_or_id: str, + batch_size: int = INDEX_BATCH_SIZE, + ) -> None: + self.base_id = base_id + self.table_name_or_id = table_name_or_id + self.batch_size = batch_size + self._airtable_client: AirtableApi | None = None + self.size_threshold = AIRTABLE_CONNECTOR_SIZE_THRESHOLD + + # ------------------------- + # Credentials + # ------------------------- + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + self._airtable_client = AirtableApi(credentials["airtable_access_token"]) + return None + + @property + def airtable_client(self) -> AirtableApi: + if not self._airtable_client: + raise AirtableClientNotSetUpError() + return self._airtable_client + + # ------------------------- + # Core logic + # ------------------------- + def load_from_state(self) -> GenerateDocumentsOutput: + """ + Fetch all Airtable records and ingest attachments as raw blobs. + + Each attachment is converted into a single Document(blob=...). + """ + if not self._airtable_client: + raise ConnectorMissingCredentialError("Airtable credentials not loaded") + + table = self.airtable_client.table(self.base_id, self.table_name_or_id) + records = table.all() + + logging.info( + f"Starting Airtable blob ingestion for table {self.table_name_or_id}, " + f"{len(records)} records found." + ) + + batch: list[Document] = [] + + for record in records: + print(record) + record_id = record.get("id") + fields = record.get("fields", {}) + created_time = record.get("createdTime") + + for field_value in fields.values(): + # We only care about attachment fields (lists of dicts with url/filename) + if not isinstance(field_value, list): + continue + + for attachment in field_value: + url = attachment.get("url") + filename = attachment.get("filename") + attachment_id = attachment.get("id") + + if not url or not filename or not attachment_id: + continue + + try: + resp = requests.get(url, timeout=30) + resp.raise_for_status() + content = resp.content + except Exception: + logging.exception( + f"Failed to download attachment {filename} " + f"(record={record_id})" + ) + continue + size_bytes = extract_size_bytes(attachment) + if ( + self.size_threshold is not None + and isinstance(size_bytes, int) + and size_bytes > self.size_threshold + ): + logging.warning( + f"{filename} exceeds size threshold of {self.size_threshold}. Skipping." + ) + continue + batch.append( + Document( + id=f"airtable:{record_id}:{attachment_id}", + blob=content, + source=DocumentSource.AIRTABLE, + semantic_identifier=filename, + extension=get_file_ext(filename), + size_bytes=size_bytes if size_bytes else 0, + doc_updated_at=datetime.strptime(created_time, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc) + ) + ) + + if len(batch) >= self.batch_size: + yield batch + batch = [] + + if batch: + yield batch + +if __name__ == "__main__": + import os + + logging.basicConfig(level=logging.DEBUG) + connector = AirtableConnector("xxx","xxx") + connector.load_credentials({"airtable_access_token": os.environ.get("AIRTABLE_ACCESS_TOKEN")}) + connector.validate_connector_settings() + document_batches = connector.load_from_state() + try: + first_batch = next(document_batches) + print(f"Loaded {len(first_batch)} documents in first batch.") + for doc in first_batch: + print(f"- {doc.semantic_identifier} ({doc.size_bytes} bytes)") + except StopIteration: + print("No documents available in Dropbox.") \ No newline at end of file diff --git a/common/data_source/config.py b/common/data_source/config.py index a2a2e709e..cbf5aade0 100644 --- a/common/data_source/config.py +++ b/common/data_source/config.py @@ -53,6 +53,7 @@ class DocumentSource(str, Enum): S3_COMPATIBLE = "s3_compatible" DROPBOX = "dropbox" BOX = "box" + AIRTABLE = "airtable" class FileOrigin(str, Enum): """File origins""" @@ -251,6 +252,10 @@ WEB_CONNECTOR_IGNORED_ELEMENTS = os.environ.get( "WEB_CONNECTOR_IGNORED_ELEMENTS", "nav,footer,meta,script,style,symbol,aside" ).split(",") +AIRTABLE_CONNECTOR_SIZE_THRESHOLD = int( + os.environ.get("AIRTABLE_CONNECTOR_SIZE_THRESHOLD", 10 * 1024 * 1024) +) + _USER_NOT_FOUND = "Unknown Confluence User" _COMMENT_EXPANSION_FIELDS = ["body.storage.value"] diff --git a/common/data_source/models.py b/common/data_source/models.py index ad825eafa..aaff1c1f9 100644 --- a/common/data_source/models.py +++ b/common/data_source/models.py @@ -94,7 +94,7 @@ class Document(BaseModel): blob: bytes doc_updated_at: datetime size_bytes: int - primary_owners: list + primary_owners: Optional[list] = None metadata: Optional[dict[str, Any]] = None diff --git a/pyproject.toml b/pyproject.toml index 065f6a3f3..98dbad9b8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -115,39 +115,40 @@ dependencies = [ "xpinyin==0.7.6", "yfinance==0.2.65", "zhipuai==2.0.1", -# following modules aren't necessary -# "nltk==3.9.1", -# "numpy>=1.26.0,<2.0.0", -# "openai>=1.45.0", -# "openpyxl>=3.1.0,<4.0.0", -# "pandas>=2.2.0,<3.0.0", -# "peewee==3.17.1", -# "pillow>=10.4.0,<13.0.0", -# "protobuf==5.27.2", -# "pymysql>=1.1.1,<2.0.0", -# "python-dotenv==1.0.1", -# "python-dateutil==2.8.2", -# "Quart==0.20.0", -# "requests>=2.32.3,<3.0.0", -# "scikit-learn==1.5.0", -# "selenium==4.22.0", -# "setuptools>=78.1.1,<81.0.0", -# "shapely==2.0.5", -# "six==1.16.0", -# "tabulate==0.9.0", -# "tiktoken==0.7.0", -# "umap_learn==0.5.6", -# "werkzeug==3.0.6", -# "xxhash>=3.5.0,<4.0.0", -# "trio>=0.17.0,<0.29.0", -# "debugpy>=1.8.13", -# "click>=8.1.8", -# "litellm>=1.74.15.post1", -# "lark>=1.2.2", -# "pip>=25.2", -# "imageio-ffmpeg>=0.6.0", -# "cryptography==46.0.3", -# "jinja2>=3.1.0", + # following modules aren't necessary + # "nltk==3.9.1", + # "numpy>=1.26.0,<2.0.0", + # "openai>=1.45.0", + # "openpyxl>=3.1.0,<4.0.0", + # "pandas>=2.2.0,<3.0.0", + # "peewee==3.17.1", + # "pillow>=10.4.0,<13.0.0", + # "protobuf==5.27.2", + # "pymysql>=1.1.1,<2.0.0", + # "python-dotenv==1.0.1", + # "python-dateutil==2.8.2", + # "Quart==0.20.0", + # "requests>=2.32.3,<3.0.0", + # "scikit-learn==1.5.0", + # "selenium==4.22.0", + # "setuptools>=78.1.1,<81.0.0", + # "shapely==2.0.5", + # "six==1.16.0", + # "tabulate==0.9.0", + # "tiktoken==0.7.0", + # "umap_learn==0.5.6", + # "werkzeug==3.0.6", + # "xxhash>=3.5.0,<4.0.0", + # "trio>=0.17.0,<0.29.0", + # "debugpy>=1.8.13", + # "click>=8.1.8", + # "litellm>=1.74.15.post1", + # "lark>=1.2.2", + # "pip>=25.2", + # "imageio-ffmpeg>=0.6.0", + # "cryptography==46.0.3", + # "jinja2>=3.1.0", + "pyairtable>=3.3.0", ] [dependency-groups] diff --git a/rag/svr/sync_data_source.py b/rag/svr/sync_data_source.py index 5674b73d0..d79c2a20c 100644 --- a/rag/svr/sync_data_source.py +++ b/rag/svr/sync_data_source.py @@ -38,7 +38,7 @@ from api.db.services.connector_service import ConnectorService, SyncLogsService from api.db.services.knowledgebase_service import KnowledgebaseService from common import settings from common.config_utils import show_configs -from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector, MoodleConnector, JiraConnector, DropboxConnector, WebDAVConnector +from common.data_source import BlobStorageConnector, NotionConnector, DiscordConnector, GoogleDriveConnector, MoodleConnector, JiraConnector, DropboxConnector, WebDAVConnector, AirtableConnector from common.constants import FileSource, TaskStatus from common.data_source.config import INDEX_BATCH_SIZE from common.data_source.confluence_connector import ConfluenceConnector @@ -738,7 +738,52 @@ class BOX(SyncBase): begin_info = "from {}".format(poll_start) logging.info("Connect to Box: folder_id({}) {}".format(self.conf["folder_id"], begin_info)) return document_generator - + +class Airtable(SyncBase): + SOURCE_NAME: str = FileSource.AIRTABLE + + async def _generate(self, task: dict): + """ + Sync files from Airtable attachments. + """ + + self.connector = AirtableConnector( + base_id=self.conf.get("base_id"), + table_name_or_id=self.conf.get("table_name_or_id"), + ) + + credentials = self.conf.get("credentials", {}) + if "airtable_access_token" not in credentials: + raise ValueError("Missing airtable_access_token in credentials") + + self.connector.load_credentials( + {"airtable_access_token": credentials["airtable_access_token"]} + ) + + if task.get("reindex") == "1" or not task.get("poll_range_start"): + document_generator = self.connector.load_from_state() + begin_info = "totally" + else: + poll_start = task.get("poll_range_start") + if poll_start is None: + document_generator = self.connector.load_from_state() + begin_info = "totally" + else: + document_generator = self.connector.poll_source( + poll_start.timestamp(), + datetime.now(timezone.utc).timestamp(), + ) + begin_info = f"from {poll_start}" + + logging.info( + "Connect to Airtable: base_id(%s), table(%s) %s", + self.conf.get("base_id"), + self.conf.get("table_name_or_id"), + begin_info, + ) + + return document_generator + func_factory = { FileSource.S3: S3, FileSource.R2: R2, @@ -756,7 +801,8 @@ func_factory = { FileSource.MOODLE: Moodle, FileSource.DROPBOX: Dropbox, FileSource.WEBDAV: WebDAV, - FileSource.BOX: BOX + FileSource.BOX: BOX, + FileSource.AIRTABLE: Airtable, } @@ -775,7 +821,6 @@ async def dispatch_tasks(): task["poll_range_start"] = task["poll_range_start"].astimezone(timezone.utc) if task["poll_range_end"]: task["poll_range_end"] = task["poll_range_end"].astimezone(timezone.utc) - func = func_factory[task["source"]](task["config"]) tasks.append(asyncio.create_task(func(task))) diff --git a/uv.lock b/uv.lock index d3c436831..f4f7d87bd 100644 --- a/uv.lock +++ b/uv.lock @@ -3073,6 +3073,15 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/f4/a0/8f1e134fdf4ca8bebac7b62caace1816953bb5ffc720d9f0004246c8c38d/infinity_sdk-0.6.13-py3-none-any.whl", hash = "sha256:c08a523d2c27e9a7e6e88be640970530b4661a67c3e9dc3e1aa89533a822fd78", size = 29737403, upload-time = "2025-12-24T09:56:16.93Z" }, ] +[[package]] +name = "inflection" +version = "0.5.1" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/e1/7e/691d061b7329bc8d54edbf0ec22fbfb2afe61facb681f9aaa9bff7a27d04/inflection-0.5.1.tar.gz", hash = "sha256:1a29730d366e996aaacffb2f1f1cb9593dc38e2ddd30c91250c6dde09ea9b417", size = 15091, upload-time = "2020-08-22T08:16:29.139Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/59/91/aa6bde563e0085a02a435aa99b49ef75b0a4b062635e606dab23ce18d720/inflection-0.5.1-py2.py3-none-any.whl", hash = "sha256:f38b2b640938a4f35ade69ac3d053042959b62a0f1076a5bbaa1b9526605a8a2", size = 9454, upload-time = "2020-08-22T08:16:27.816Z" }, +] + [[package]] name = "iniconfig" version = "2.3.0" @@ -5176,6 +5185,22 @@ wheels = [ { url = "https://pypi.tuna.tsinghua.edu.cn/packages/29/a9/8ce0ca222ef04d602924a1e099be93f5435ca6f3294182a30574d4159ca2/py_mini_racer-0.6.0-py2.py3-none-manylinux1_x86_64.whl", hash = "sha256:42896c24968481dd953eeeb11de331f6870917811961c9b26ba09071e07180e2", size = 5416149, upload-time = "2021-04-22T07:58:25.615Z" }, ] +[[package]] +name = "pyairtable" +version = "3.3.0" +source = { registry = "https://pypi.tuna.tsinghua.edu.cn/simple" } +dependencies = [ + { name = "inflection" }, + { name = "pydantic" }, + { name = "requests" }, + { name = "typing-extensions" }, + { name = "urllib3" }, +] +sdist = { url = "https://pypi.tuna.tsinghua.edu.cn/packages/2c/1d/8a572580e02297cef7ae01053a8b550b7759ea80326cd3231df87b00555b/pyairtable-3.3.0.tar.gz", hash = "sha256:d6d3b77f6feb7a02a84779c2235d37a46605f36030cf20ed99b08bab73108a8c", size = 150168, upload-time = "2025-11-05T20:11:41.435Z" } +wheels = [ + { url = "https://pypi.tuna.tsinghua.edu.cn/packages/13/7b/bebb0ebb86353b63740869ed10ac1fef1636ccc6042beb1d8d3956cad02d/pyairtable-3.3.0-py2.py3-none-any.whl", hash = "sha256:38af09c18659918b96539ac4d9730c9656f6ce2088cdff692dd311fa16802acf", size = 101513, upload-time = "2025-11-05T20:11:40.137Z" }, +] + [[package]] name = "pyarrow" version = "22.0.0" @@ -6127,6 +6152,7 @@ dependencies = [ { name = "pdfplumber" }, { name = "pluginlib" }, { name = "psycopg2-binary" }, + { name = "pyairtable" }, { name = "pyclipper" }, { name = "pycryptodomex" }, { name = "pyobvector" }, @@ -6255,6 +6281,7 @@ requires-dist = [ { name = "pdfplumber", specifier = "==0.10.4" }, { name = "pluginlib", specifier = "==0.9.4" }, { name = "psycopg2-binary", specifier = ">=2.9.11,<3.0.0" }, + { name = "pyairtable", specifier = ">=3.3.0" }, { name = "pyclipper", specifier = ">=1.4.0,<2.0.0" }, { name = "pycryptodomex", specifier = "==3.20.0" }, { name = "pyobvector", specifier = "==0.2.18" }, diff --git a/web/src/assets/svg/data-source/airtable.svg b/web/src/assets/svg/data-source/airtable.svg new file mode 100644 index 000000000..a37923102 --- /dev/null +++ b/web/src/assets/svg/data-source/airtable.svg @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/web/src/locales/en.ts b/web/src/locales/en.ts index c973cf9fe..277eb6861 100644 --- a/web/src/locales/en.ts +++ b/web/src/locales/en.ts @@ -922,6 +922,8 @@ Example: Virtual Hosted Style`, dropboxDescription: 'Connect your Dropbox to sync files and folders from a chosen account.', boxDescription: 'Connect your Box drive to sync files and folders.', + airtableDescription: + 'Connect to Airtable and synchronize files from a specified table within a designated workspace.', dropboxAccessTokenTip: 'Generate a long-lived access token in the Dropbox App Console with files.metadata.read, files.content.read, and sharing.read scopes.', moodleDescription: diff --git a/web/src/locales/ru.ts b/web/src/locales/ru.ts index e2855d487..4c0c42401 100644 --- a/web/src/locales/ru.ts +++ b/web/src/locales/ru.ts @@ -747,6 +747,8 @@ export default { 'Синхронизируйте страницы и базы данных из Notion для извлечения знаний.', boxDescription: 'Подключите ваш диск Box для синхронизации файлов и папок.', + airtableDescription: + 'Подключите Airtable и синхронизируйте файлы из указанной таблицы в заданном рабочем пространстве.', google_driveDescription: 'Подключите ваш Google Drive через OAuth и синхронизируйте определенные папки или диски.', gmailDescription: diff --git a/web/src/locales/zh.ts b/web/src/locales/zh.ts index cefd66fd1..950e0c060 100644 --- a/web/src/locales/zh.ts +++ b/web/src/locales/zh.ts @@ -853,6 +853,7 @@ General:实体和关系提取提示来自 GitHub - microsoft/graphrag:基于 '请上传由 Google Console 生成的 OAuth JSON。如果仅包含 client credentials,请通过浏览器授权一次以获取长期有效的刷新 Token。', dropboxDescription: '连接 Dropbox,同步指定账号下的文件与文件夹。', boxDescription: '连接你的 Box 云盘以同步文件和文件夹。', + airtableDescription: '连接 Airtable,同步指定工作区下指定表格中的文件。', r2Description: '连接你的 Cloudflare R2 存储桶以导入和同步文件。', dropboxAccessTokenTip: '请在 Dropbox App Console 生成 Access Token,并勾选 files.metadata.read、files.content.read、sharing.read 等必要权限。', diff --git a/web/src/pages/user-setting/data-source/contant.tsx b/web/src/pages/user-setting/data-source/contant.tsx index acb6860ae..14c6e1642 100644 --- a/web/src/pages/user-setting/data-source/contant.tsx +++ b/web/src/pages/user-setting/data-source/contant.tsx @@ -25,6 +25,7 @@ export enum DataSourceKey { R2 = 'r2', OCI_STORAGE = 'oci_storage', GOOGLE_CLOUD_STORAGE = 'google_cloud_storage', + AIRTABLE = 'airtable', // SHAREPOINT = 'sharepoint', // SLACK = 'slack', // TEAMS = 'teams', @@ -104,6 +105,11 @@ export const generateDataSourceInfo = (t: TFunction) => { description: t(`setting.${DataSourceKey.BOX}Description`), icon: , }, + [DataSourceKey.AIRTABLE]: { + name: 'Airtable', + description: t(`setting.${DataSourceKey.AIRTABLE}Description`), + icon: , + }, }; }; @@ -672,6 +678,26 @@ export const DataSourceFormFields = { placeholder: 'Defaults root', }, ], + [DataSourceKey.AIRTABLE]: [ + { + label: 'Access Token', + name: 'config.credentials.airtable_access_token', + type: FormFieldType.Text, + required: true, + }, + { + label: 'Base ID', + name: 'config.base_id', + type: FormFieldType.Text, + required: true, + }, + { + label: 'Table Name OR ID', + name: 'config.table_name_or_id', + type: FormFieldType.Text, + required: true, + }, + ], }; export const DataSourceFormDefaultValues = { @@ -858,4 +884,16 @@ export const DataSourceFormDefaultValues = { }, }, }, + [DataSourceKey.AIRTABLE]: { + name: '', + source: DataSourceKey.AIRTABLE, + config: { + name: '', + base_id: '', + table_name_or_id: '', + credentials: { + airtable_access_token: '', + }, + }, + }, };