mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Add Firecrawl integration for RAGFlow (#10152)
## 🚀 Firecrawl Integration for RAGFlow This PR implements the Firecrawl integration for RAGFlow as requested in issue https://github.com/firecrawl/firecrawl/issues/2167 ### ✅ Features Implemented - **Data Source Integration**: Firecrawl appears as a selectable data source in RAGFlow - **Configuration Management**: Users can input Firecrawl API keys through RAGFlow's interface - **Web Scraping**: Supports single URL scraping, website crawling, and batch processing - **Content Processing**: Converts scraped content to RAGFlow's document format with chunking - **Error Handling**: Comprehensive error handling for rate limits, failed requests, and malformed content - **UI Components**: Complete UI schema and workflow components for RAGFlow integration ### 📁 Files Added - `intergrations/firecrawl/` - Complete integration package - `intergrations/firecrawl/integration.py` - RAGFlow integration entry point - `intergrations/firecrawl/firecrawl_connector.py` - API communication - `intergrations/firecrawl/firecrawl_config.py` - Configuration management - `intergrations/firecrawl/firecrawl_processor.py` - Content processing - `intergrations/firecrawl/firecrawl_ui.py` - UI components - `intergrations/firecrawl/ragflow_integration.py` - Main integration class - `intergrations/firecrawl/README.md` - Complete documentation - `intergrations/firecrawl/example_usage.py` - Usage examples ### 🧪 Testing The integration has been thoroughly tested with: - Configuration validation - Connection testing - Content processing and chunking - UI component rendering - Error handling scenarios ### 📋 Acceptance Criteria Met - ✅ Integration appears as selectable data source in RAGFlow's data source options - ✅ Users can input Firecrawl API keys through RAGFlow's configuration interface - ✅ Successfully scrapes content from provided URLs and imports into RAGFlow's document store - ✅ Handles common edge cases (rate limits, failed requests, malformed content) - ✅ Includes basic documentation and README updates - ✅ Code follows RAGFlow's existing patterns and coding standards ### �� Related Issue https://github.com/firecrawl/firecrawl/issues/2167 --------- Co-authored-by: AB <aj@Ajays-MacBook-Air.local>
This commit is contained in:
262
intergrations/firecrawl/firecrawl_connector.py
Normal file
262
intergrations/firecrawl/firecrawl_connector.py
Normal file
@ -0,0 +1,262 @@
|
||||
"""
|
||||
Main connector class for integrating Firecrawl with RAGFlow.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from typing import List, Dict, Any, Optional
|
||||
from dataclasses import dataclass
|
||||
import logging
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from firecrawl_config import FirecrawlConfig
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScrapedContent:
|
||||
"""Represents scraped content from Firecrawl."""
|
||||
|
||||
url: str
|
||||
markdown: Optional[str] = None
|
||||
html: Optional[str] = None
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
title: Optional[str] = None
|
||||
description: Optional[str] = None
|
||||
status_code: Optional[int] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrawlJob:
|
||||
"""Represents a crawl job from Firecrawl."""
|
||||
|
||||
job_id: str
|
||||
status: str
|
||||
total: Optional[int] = None
|
||||
completed: Optional[int] = None
|
||||
data: Optional[List[ScrapedContent]] = None
|
||||
error: Optional[str] = None
|
||||
|
||||
|
||||
class FirecrawlConnector:
|
||||
"""Main connector class for Firecrawl integration with RAGFlow."""
|
||||
|
||||
def __init__(self, config: FirecrawlConfig):
|
||||
"""Initialize the Firecrawl connector."""
|
||||
self.config = config
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.session: Optional[aiohttp.ClientSession] = None
|
||||
self._rate_limit_semaphore = asyncio.Semaphore(config.max_concurrent_requests)
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Async context manager entry."""
|
||||
await self._create_session()
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||||
"""Async context manager exit."""
|
||||
await self._close_session()
|
||||
|
||||
async def _create_session(self):
|
||||
"""Create aiohttp session with proper headers."""
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.config.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": "RAGFlow-Firecrawl-Plugin/1.0.0"
|
||||
}
|
||||
|
||||
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
|
||||
self.session = aiohttp.ClientSession(
|
||||
headers=headers,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
async def _close_session(self):
|
||||
"""Close aiohttp session."""
|
||||
if self.session:
|
||||
await self.session.close()
|
||||
|
||||
async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
|
||||
"""Make HTTP request with rate limiting and retry logic."""
|
||||
async with self._rate_limit_semaphore:
|
||||
# Rate limiting
|
||||
await asyncio.sleep(self.config.rate_limit_delay)
|
||||
|
||||
url = f"{self.config.api_url}{endpoint}"
|
||||
|
||||
for attempt in range(self.config.max_retries):
|
||||
try:
|
||||
async with self.session.request(method, url, **kwargs) as response:
|
||||
if response.status == 429: # Rate limited
|
||||
wait_time = 2 ** attempt
|
||||
self.logger.warning(f"Rate limited, waiting {wait_time}s")
|
||||
await asyncio.sleep(wait_time)
|
||||
continue
|
||||
|
||||
response.raise_for_status()
|
||||
return await response.json()
|
||||
|
||||
except aiohttp.ClientError as e:
|
||||
self.logger.error(f"Request failed (attempt {attempt + 1}): {e}")
|
||||
if attempt == self.config.max_retries - 1:
|
||||
raise
|
||||
await asyncio.sleep(2 ** attempt)
|
||||
|
||||
raise Exception("Max retries exceeded")
|
||||
|
||||
async def scrape_url(self, url: str, formats: List[str] = None,
|
||||
extract_options: Dict[str, Any] = None) -> ScrapedContent:
|
||||
"""Scrape a single URL."""
|
||||
if formats is None:
|
||||
formats = ["markdown", "html"]
|
||||
|
||||
payload = {
|
||||
"url": url,
|
||||
"formats": formats
|
||||
}
|
||||
|
||||
if extract_options:
|
||||
payload["extractOptions"] = extract_options
|
||||
|
||||
try:
|
||||
response = await self._make_request("POST", "/v2/scrape", json=payload)
|
||||
|
||||
if not response.get("success"):
|
||||
return ScrapedContent(url=url, error=response.get("error", "Unknown error"))
|
||||
|
||||
data = response.get("data", {})
|
||||
metadata = data.get("metadata", {})
|
||||
|
||||
return ScrapedContent(
|
||||
url=url,
|
||||
markdown=data.get("markdown"),
|
||||
html=data.get("html"),
|
||||
metadata=metadata,
|
||||
title=metadata.get("title"),
|
||||
description=metadata.get("description"),
|
||||
status_code=metadata.get("statusCode")
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to scrape {url}: {e}")
|
||||
return ScrapedContent(url=url, error=str(e))
|
||||
|
||||
async def start_crawl(self, url: str, limit: int = 100,
|
||||
scrape_options: Dict[str, Any] = None) -> CrawlJob:
|
||||
"""Start a crawl job."""
|
||||
if scrape_options is None:
|
||||
scrape_options = {"formats": ["markdown", "html"]}
|
||||
|
||||
payload = {
|
||||
"url": url,
|
||||
"limit": limit,
|
||||
"scrapeOptions": scrape_options
|
||||
}
|
||||
|
||||
try:
|
||||
response = await self._make_request("POST", "/v2/crawl", json=payload)
|
||||
|
||||
if not response.get("success"):
|
||||
return CrawlJob(
|
||||
job_id="",
|
||||
status="failed",
|
||||
error=response.get("error", "Unknown error")
|
||||
)
|
||||
|
||||
job_id = response.get("id")
|
||||
return CrawlJob(job_id=job_id, status="started")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to start crawl for {url}: {e}")
|
||||
return CrawlJob(job_id="", status="failed", error=str(e))
|
||||
|
||||
async def get_crawl_status(self, job_id: str) -> CrawlJob:
|
||||
"""Get the status of a crawl job."""
|
||||
try:
|
||||
response = await self._make_request("GET", f"/v2/crawl/{job_id}")
|
||||
|
||||
if not response.get("success"):
|
||||
return CrawlJob(
|
||||
job_id=job_id,
|
||||
status="failed",
|
||||
error=response.get("error", "Unknown error")
|
||||
)
|
||||
|
||||
status = response.get("status", "unknown")
|
||||
total = response.get("total")
|
||||
data = response.get("data", [])
|
||||
|
||||
# Convert data to ScrapedContent objects
|
||||
scraped_content = []
|
||||
for item in data:
|
||||
metadata = item.get("metadata", {})
|
||||
scraped_content.append(ScrapedContent(
|
||||
url=metadata.get("sourceURL", ""),
|
||||
markdown=item.get("markdown"),
|
||||
html=item.get("html"),
|
||||
metadata=metadata,
|
||||
title=metadata.get("title"),
|
||||
description=metadata.get("description"),
|
||||
status_code=metadata.get("statusCode")
|
||||
))
|
||||
|
||||
return CrawlJob(
|
||||
job_id=job_id,
|
||||
status=status,
|
||||
total=total,
|
||||
completed=len(scraped_content),
|
||||
data=scraped_content
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to get crawl status for {job_id}: {e}")
|
||||
return CrawlJob(job_id=job_id, status="failed", error=str(e))
|
||||
|
||||
async def wait_for_crawl_completion(self, job_id: str,
|
||||
poll_interval: int = 30) -> CrawlJob:
|
||||
"""Wait for a crawl job to complete."""
|
||||
while True:
|
||||
job = await self.get_crawl_status(job_id)
|
||||
|
||||
if job.status in ["completed", "failed", "cancelled"]:
|
||||
return job
|
||||
|
||||
self.logger.info(f"Crawl {job_id} status: {job.status}")
|
||||
await asyncio.sleep(poll_interval)
|
||||
|
||||
async def batch_scrape(self, urls: List[str],
|
||||
formats: List[str] = None) -> List[ScrapedContent]:
|
||||
"""Scrape multiple URLs concurrently."""
|
||||
if formats is None:
|
||||
formats = ["markdown", "html"]
|
||||
|
||||
tasks = [self.scrape_url(url, formats) for url in urls]
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Handle exceptions
|
||||
processed_results = []
|
||||
for i, result in enumerate(results):
|
||||
if isinstance(result, Exception):
|
||||
processed_results.append(ScrapedContent(
|
||||
url=urls[i],
|
||||
error=str(result)
|
||||
))
|
||||
else:
|
||||
processed_results.append(result)
|
||||
|
||||
return processed_results
|
||||
|
||||
def validate_url(self, url: str) -> bool:
|
||||
"""Validate if URL is properly formatted."""
|
||||
try:
|
||||
result = urlparse(url)
|
||||
return all([result.scheme, result.netloc])
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def extract_domain(self, url: str) -> str:
|
||||
"""Extract domain from URL."""
|
||||
try:
|
||||
return urlparse(url).netloc
|
||||
except Exception:
|
||||
return ""
|
||||
Reference in New Issue
Block a user