mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-26 17:16:52 +08:00
### What problem does this PR solve? change: add Airtable connector and integration for data synchronization ### Type of change - [x] New Feature (non-breaking change which adds functionality)
149 lines
5.5 KiB
Python
149 lines
5.5 KiB
Python
from datetime import datetime, timezone
|
|
import logging
|
|
from typing import Any
|
|
|
|
import requests
|
|
|
|
from pyairtable import Api as AirtableApi
|
|
|
|
from common.data_source.config import AIRTABLE_CONNECTOR_SIZE_THRESHOLD, INDEX_BATCH_SIZE, DocumentSource
|
|
from common.data_source.exceptions import ConnectorMissingCredentialError
|
|
from common.data_source.interfaces import LoadConnector
|
|
from common.data_source.models import Document, GenerateDocumentsOutput
|
|
from common.data_source.utils import extract_size_bytes, get_file_ext
|
|
|
|
class AirtableClientNotSetUpError(PermissionError):
|
|
def __init__(self) -> None:
|
|
super().__init__(
|
|
"Airtable client is not set up. Did you forget to call load_credentials()?"
|
|
)
|
|
|
|
|
|
class AirtableConnector(LoadConnector):
|
|
"""
|
|
Lightweight Airtable connector.
|
|
|
|
This connector ingests Airtable attachments as raw blobs without
|
|
parsing file content or generating text/image sections.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_id: str,
|
|
table_name_or_id: str,
|
|
batch_size: int = INDEX_BATCH_SIZE,
|
|
) -> None:
|
|
self.base_id = base_id
|
|
self.table_name_or_id = table_name_or_id
|
|
self.batch_size = batch_size
|
|
self._airtable_client: AirtableApi | None = None
|
|
self.size_threshold = AIRTABLE_CONNECTOR_SIZE_THRESHOLD
|
|
|
|
# -------------------------
|
|
# Credentials
|
|
# -------------------------
|
|
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
|
self._airtable_client = AirtableApi(credentials["airtable_access_token"])
|
|
return None
|
|
|
|
@property
|
|
def airtable_client(self) -> AirtableApi:
|
|
if not self._airtable_client:
|
|
raise AirtableClientNotSetUpError()
|
|
return self._airtable_client
|
|
|
|
# -------------------------
|
|
# Core logic
|
|
# -------------------------
|
|
def load_from_state(self) -> GenerateDocumentsOutput:
|
|
"""
|
|
Fetch all Airtable records and ingest attachments as raw blobs.
|
|
|
|
Each attachment is converted into a single Document(blob=...).
|
|
"""
|
|
if not self._airtable_client:
|
|
raise ConnectorMissingCredentialError("Airtable credentials not loaded")
|
|
|
|
table = self.airtable_client.table(self.base_id, self.table_name_or_id)
|
|
records = table.all()
|
|
|
|
logging.info(
|
|
f"Starting Airtable blob ingestion for table {self.table_name_or_id}, "
|
|
f"{len(records)} records found."
|
|
)
|
|
|
|
batch: list[Document] = []
|
|
|
|
for record in records:
|
|
print(record)
|
|
record_id = record.get("id")
|
|
fields = record.get("fields", {})
|
|
created_time = record.get("createdTime")
|
|
|
|
for field_value in fields.values():
|
|
# We only care about attachment fields (lists of dicts with url/filename)
|
|
if not isinstance(field_value, list):
|
|
continue
|
|
|
|
for attachment in field_value:
|
|
url = attachment.get("url")
|
|
filename = attachment.get("filename")
|
|
attachment_id = attachment.get("id")
|
|
|
|
if not url or not filename or not attachment_id:
|
|
continue
|
|
|
|
try:
|
|
resp = requests.get(url, timeout=30)
|
|
resp.raise_for_status()
|
|
content = resp.content
|
|
except Exception:
|
|
logging.exception(
|
|
f"Failed to download attachment {filename} "
|
|
f"(record={record_id})"
|
|
)
|
|
continue
|
|
size_bytes = extract_size_bytes(attachment)
|
|
if (
|
|
self.size_threshold is not None
|
|
and isinstance(size_bytes, int)
|
|
and size_bytes > self.size_threshold
|
|
):
|
|
logging.warning(
|
|
f"{filename} exceeds size threshold of {self.size_threshold}. Skipping."
|
|
)
|
|
continue
|
|
batch.append(
|
|
Document(
|
|
id=f"airtable:{record_id}:{attachment_id}",
|
|
blob=content,
|
|
source=DocumentSource.AIRTABLE,
|
|
semantic_identifier=filename,
|
|
extension=get_file_ext(filename),
|
|
size_bytes=size_bytes if size_bytes else 0,
|
|
doc_updated_at=datetime.strptime(created_time, "%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=timezone.utc)
|
|
)
|
|
)
|
|
|
|
if len(batch) >= self.batch_size:
|
|
yield batch
|
|
batch = []
|
|
|
|
if batch:
|
|
yield batch
|
|
|
|
if __name__ == "__main__":
|
|
import os
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
connector = AirtableConnector("xxx","xxx")
|
|
connector.load_credentials({"airtable_access_token": os.environ.get("AIRTABLE_ACCESS_TOKEN")})
|
|
connector.validate_connector_settings()
|
|
document_batches = connector.load_from_state()
|
|
try:
|
|
first_batch = next(document_batches)
|
|
print(f"Loaded {len(first_batch)} documents in first batch.")
|
|
for doc in first_batch:
|
|
print(f"- {doc.semantic_identifier} ({doc.size_bytes} bytes)")
|
|
except StopIteration:
|
|
print("No documents available in Dropbox.") |