### What problem does this PR solve? fixes https://github.com/infiniflow/ragflow/issues/12774 Add a CLI tool for migrating RAGFlow data from Elasticsearch to OceanBase, enabling users to switch their document storage backend. - Automatic discovery and migration of all `ragflow_*` indices - Schema conversion with vector dimension auto-detection - Batch processing with progress tracking and resume capability - Data consistency validation and migration report generation **Note**: Due to network issues, I was unable to pull the required Docker images (Elasticsearch, OceanBase) to run the full end-to-end verification. Unit tests have been verified to pass. I will complete the e2e verification when network conditions allow, and submit a follow-up PR if any fixes are needed. ```bash ============================= test session starts ============================== platform darwin -- Python 3.13.6, pytest-9.0.2, pluggy-1.6.0 rootdir: /Users/sevenc/code/ai/oceanbase/ragflow/tools/es-to-oceanbase-migration configfile: pyproject.toml testpaths: tests plugins: anyio-4.12.1, asyncio-1.3.0, cov-7.0.0 collected 86 items tests/test_progress.py::TestMigrationProgress::test_create_basic_progress PASSED [ 1%] tests/test_progress.py::TestMigrationProgress::test_create_progress_with_counts PASSED [ 2%] tests/test_progress.py::TestMigrationProgress::test_progress_default_values PASSED [ 3%] tests/test_progress.py::TestMigrationProgress::test_progress_status_values PASSED [ 4%] tests/test_progress.py::TestProgressManager::test_create_progress_manager PASSED [ 5%] tests/test_progress.py::TestProgressManager::test_create_progress_manager_creates_dir PASSED [ 6%] tests/test_progress.py::TestProgressManager::test_create_progress PASSED [ 8%] tests/test_progress.py::TestProgressManager::test_save_and_load_progress PASSED [ 9%] tests/test_progress.py::TestProgressManager::test_load_nonexistent_progress PASSED [ 10%] tests/test_progress.py::TestProgressManager::test_delete_progress PASSED [ 11%] tests/test_progress.py::TestProgressManager::test_update_progress PASSED [ 12%] tests/test_progress.py::TestProgressManager::test_update_progress_multiple_batches PASSED [ 13%] tests/test_progress.py::TestProgressManager::test_mark_completed PASSED [ 15%] tests/test_progress.py::TestProgressManager::test_mark_failed PASSED [ 16%] tests/test_progress.py::TestProgressManager::test_mark_paused PASSED [ 17%] tests/test_progress.py::TestProgressManager::test_can_resume_running PASSED [ 18%] tests/test_progress.py::TestProgressManager::test_can_resume_paused PASSED [ 19%] tests/test_progress.py::TestProgressManager::test_can_resume_completed PASSED [ 20%] tests/test_progress.py::TestProgressManager::test_can_resume_nonexistent PASSED [ 22%] tests/test_progress.py::TestProgressManager::test_get_resume_info PASSED [ 23%] tests/test_progress.py::TestProgressManager::test_get_resume_info_nonexistent PASSED [ 24%] tests/test_progress.py::TestProgressManager::test_progress_file_path PASSED [ 25%] tests/test_progress.py::TestProgressManager::test_progress_file_content PASSED [ 26%] tests/test_schema.py::TestRAGFlowSchemaConverter::test_analyze_ragflow_mapping PASSED [ 27%] tests/test_schema.py::TestRAGFlowSchemaConverter::test_detect_vector_size PASSED [ 29%] tests/test_schema.py::TestRAGFlowSchemaConverter::test_unknown_fields PASSED [ 30%] tests/test_schema.py::TestRAGFlowSchemaConverter::test_get_column_definitions PASSED [ 31%] tests/test_schema.py::TestRAGFlowDataConverter::test_convert_basic_document PASSED [ 32%] tests/test_schema.py::TestRAGFlowDataConverter::test_convert_with_vector PASSED [ 33%] tests/test_schema.py::TestRAGFlowDataConverter::test_convert_array_fields PASSED [ 34%] tests/test_schema.py::TestRAGFlowDataConverter::test_convert_json_fields PASSED [ 36%] tests/test_schema.py::TestRAGFlowDataConverter::test_convert_unknown_fields_to_extra PASSED [ 37%] tests/test_schema.py::TestRAGFlowDataConverter::test_convert_kb_id_list PASSED [ 38%] tests/test_schema.py::TestRAGFlowDataConverter::test_convert_content_with_weight_dict PASSED [ 39%] tests/test_schema.py::TestRAGFlowDataConverter::test_convert_batch PASSED [ 40%] tests/test_schema.py::TestVectorFieldPattern::test_valid_patterns PASSED [ 41%] tests/test_schema.py::TestVectorFieldPattern::test_invalid_patterns PASSED [ 43%] tests/test_schema.py::TestVectorFieldPattern::test_extract_dimension PASSED [ 44%] tests/test_schema.py::TestConstants::test_array_columns PASSED [ 45%] tests/test_schema.py::TestConstants::test_json_columns PASSED [ 46%] tests/test_schema.py::TestConstants::test_ragflow_columns_completeness PASSED [ 47%] tests/test_schema.py::TestConstants::test_fts_columns PASSED [ 48%] tests/test_schema.py::TestConstants::test_ragflow_columns_types PASSED [ 50%] tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_empty_mapping PASSED [ 51%] tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_mapping_without_properties PASSED [ 52%] tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_multiple_vector_fields PASSED [ 53%] tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_get_column_definitions_without_analysis PASSED [ 54%] tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_get_vector_fields PASSED [ 55%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_empty_document PASSED [ 56%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_document_without_source PASSED [ 58%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_boolean_to_integer PASSED [ 59%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_invalid_integer PASSED [ 60%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_float_field PASSED [ 61%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_array_with_special_characters PASSED [ 62%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_already_json_array PASSED [ 63%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_single_value_to_array PASSED [ 65%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_detect_vector_fields_from_document PASSED [ 66%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_with_default_values PASSED [ 67%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_list_content PASSED [ 68%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_batch_empty PASSED [ 69%] tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_existing_extra_field_merged PASSED [ 70%] tests/test_verify.py::TestVerificationResult::test_create_basic_result PASSED [ 72%] tests/test_verify.py::TestVerificationResult::test_result_default_values PASSED [ 73%] tests/test_verify.py::TestVerificationResult::test_result_with_counts PASSED [ 74%] tests/test_verify.py::TestMigrationVerifier::test_verify_counts_match PASSED [ 75%] tests/test_verify.py::TestMigrationVerifier::test_verify_counts_mismatch PASSED [ 76%] tests/test_verify.py::TestMigrationVerifier::test_verify_samples_all_match PASSED [ 77%] tests/test_verify.py::TestMigrationVerifier::test_verify_samples_some_missing PASSED [ 79%] tests/test_verify.py::TestMigrationVerifier::test_verify_samples_data_mismatch PASSED [ 80%] tests/test_verify.py::TestMigrationVerifier::test_values_equal_none_values PASSED [ 81%] tests/test_verify.py::TestMigrationVerifier::test_values_equal_array_columns PASSED [ 82%] tests/test_verify.py::TestMigrationVerifier::test_values_equal_json_columns PASSED [ 83%] tests/test_verify.py::TestMigrationVerifier::test_values_equal_kb_id_list PASSED [ 84%] tests/test_verify.py::TestMigrationVerifier::test_values_equal_content_with_weight_dict PASSED [ 86%] tests/test_verify.py::TestMigrationVerifier::test_determine_result_passed PASSED [ 87%] tests/test_verify.py::TestMigrationVerifier::test_determine_result_failed_count PASSED [ 88%] tests/test_verify.py::TestMigrationVerifier::test_determine_result_failed_samples PASSED [ 89%] tests/test_verify.py::TestMigrationVerifier::test_generate_report PASSED [ 90%] tests/test_verify.py::TestMigrationVerifier::test_generate_report_with_missing PASSED [ 91%] tests/test_verify.py::TestMigrationVerifier::test_generate_report_with_mismatches PASSED [ 93%] tests/test_verify.py::TestValueComparison::test_string_comparison PASSED [ 94%] tests/test_verify.py::TestValueComparison::test_integer_comparison PASSED [ 95%] tests/test_verify.py::TestValueComparison::test_float_comparison PASSED [ 96%] tests/test_verify.py::TestValueComparison::test_boolean_comparison PASSED [ 97%] tests/test_verify.py::TestValueComparison::test_empty_array_comparison PASSED [ 98%] tests/test_verify.py::TestValueComparison::test_nested_json_comparison PASSED [100%] ======================= 86 passed, 88 warnings in 0.66s ======================== ``` ### Type of change - [ ] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality) - [ ] Documentation Update - [ ] Refactoring - [ ] Performance Improvement - [ ] Other (please describe):
RAGFlow ES to OceanBase Migration Tool
A CLI tool for migrating RAGFlow data from Elasticsearch to OceanBase. This tool is specifically designed for RAGFlow's data structure and handles schema conversion, vector data mapping, batch import, and resume capability.
Features
- RAGFlow-Specific: Designed for RAGFlow's fixed data schema
- ES 8+ Support: Uses
search_afterAPI for efficient data scrolling - Vector Support: Auto-detects vector field dimensions from ES mapping
- Batch Processing: Configurable batch size for optimal performance
- Resume Capability: Save and resume migration progress
- Data Consistency Validation: Compare document counts and sample data
- Migration Report Generation: Generate detailed migration reports
Quick Start
This section provides a complete guide to verify the migration works correctly with a real RAGFlow deployment.
Prerequisites
- RAGFlow source code cloned
- Docker and Docker Compose installed
- This migration tool installed (
uv pip install -e .)
Step 1: Start RAGFlow with Elasticsearch Backend
First, start RAGFlow using Elasticsearch as the document storage backend (default configuration).
# Navigate to RAGFlow docker directory
cd /path/to/ragflow/docker
# Ensure DOC_ENGINE=elasticsearch in .env (this is the default)
# DOC_ENGINE=elasticsearch
# Start RAGFlow with Elasticsearch (--profile cpu for CPU, --profile gpu for GPU)
docker compose --profile elasticsearch --profile cpu up -d
# Wait for services to be ready (this may take a few minutes)
docker compose ps
# Check ES is running
curl -X GET "http://localhost:9200/_cluster/health?pretty"
Step 2: Create Test Data in RAGFlow
- Open RAGFlow Web UI: http://localhost:9380
- Create a new Knowledge Base
- Upload some test documents (PDF, TXT, DOCX, etc.)
- Wait for the documents to be parsed and indexed
- Test the knowledge base with some queries to ensure it works
Step 3: Verify ES Data (Optional)
Before migration, verify the data exists in Elasticsearch. This step is important to ensure you have a baseline for comparison after migration.
# Navigate to migration tool directory (from ragflow root)
cd tools/es-to-oceanbase-migration
# Activate the virtual environment if not already done
source .venv/bin/activate
# Check connection and list indices
es-ob-migrate status --es-host localhost --es-port 9200
# First, find your actual index name (pattern: ragflow_{tenant_id})
curl -X GET "http://localhost:9200/_cat/indices/ragflow_*?v"
# List all knowledge bases in the index
# Replace ragflow_{tenant_id} with your actual index from the curl output above
es-ob-migrate list-kb --es-host localhost --es-port 9200 --index ragflow_{tenant_id}
# View sample documents
es-ob-migrate sample --es-host localhost --es-port 9200 --index ragflow_{tenant_id} --size 5
# Check schema
es-ob-migrate schema --es-host localhost --es-port 9200 --index ragflow_{tenant_id}
Step 4: Start OceanBase for Migration
Start RAGFlow's OceanBase service as the migration target:
# Navigate to ragflow docker directory (from ragflow root)
cd ../docker
# Start only OceanBase service from RAGFlow docker compose
docker compose --profile oceanbase up -d
# Wait for OceanBase to be ready
docker compose logs -f oceanbase
Step 5: Run Migration
Execute the migration from Elasticsearch to OceanBase:
cd ../tools/es-to-oceanbase-migration
# Option A: Migrate ALL ragflow_* indices (Recommended)
# If --index and --table are omitted, the tool auto-discovers all ragflow_* indices
es-ob-migrate migrate \
--es-host localhost --es-port 9200 \
--ob-host localhost --ob-port 2881 \
--ob-user "root@ragflow" --ob-password "infini_rag_flow" \
--ob-database ragflow_doc \
--batch-size 1000 \
--verify
# Option B: Migrate a specific index
# Use the SAME name for both --index and --table
# The index name pattern is: ragflow_{tenant_id}
# Find your tenant_id from Step 3's curl output
es-ob-migrate migrate \
--es-host localhost --es-port 9200 \
--ob-host localhost --ob-port 2881 \
--ob-user "root@ragflow" --ob-password "infini_rag_flow" \
--ob-database ragflow_doc \
--index ragflow_{tenant_id} \
--table ragflow_{tenant_id} \
--batch-size 1000 \
--verify
Expected output:
RAGFlow ES to OceanBase Migration
Source: localhost:9200/ragflow_{tenant_id}
Target: localhost:2881/ragflow_doc.ragflow_{tenant_id}
Step 1: Checking connections...
ES cluster status: green
OceanBase connection: OK (version: 4.3.5.1)
Step 2: Analyzing ES index...
Auto-detected vector dimension: 1024
Known RAGFlow fields: 25
Total documents: 1,234
Step 3: Creating OceanBase table...
Created table 'ragflow_{tenant_id}' with RAGFlow schema
Step 4: Migrating data...
Migrating... ━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 1,234/1,234
Step 5: Verifying migration...
✓ Document counts match: 1,234
✓ Sample verification: 100/100 matched
Migration completed successfully!
Total: 1,234 documents
Migrated: 1,234 documents
Failed: 0 documents
Duration: 45.2 seconds
Step 6: Stop RAGFlow and Switch to OceanBase Backend
# Navigate to ragflow docker directory
cd ../../docker
# Stop only Elasticsearch and RAGFlow (but keep OceanBase running)
docker compose --profile elasticsearch --profile cpu down
# Edit .env file, change:
# DOC_ENGINE=elasticsearch -> DOC_ENGINE=oceanbase
#
# The OceanBase connection settings are already configured by default in .env
Step 7: Start RAGFlow with OceanBase Backend
# OceanBase should still be running from Step 4
# Start RAGFlow with OceanBase profile (OceanBase is already running)
docker compose --profile oceanbase --profile cpu up -d
# Wait for services to start
docker compose ps
# Check logs for any errors
docker compose logs -f ragflow-cpu
Step 8: Data Integrity Verification (Optional)
Run the verification command to compare ES and OceanBase data:
es-ob-migrate verify \
--es-host localhost --es-port 9200 \
--ob-host localhost --ob-port 2881 \
--ob-user "root@ragflow" --ob-password "infini_rag_flow" \
--ob-database ragflow_doc \
--index ragflow_{tenant_id} \
--table ragflow_{tenant_id} \
--sample-size 100
Expected output:
╭─────────────────────────────────────────────────────────────╮
│ Migration Verification Report │
├─────────────────────────────────────────────────────────────┤
│ ES Index: ragflow_{tenant_id} │
│ OB Table: ragflow_{tenant_id} │
├─────────────────────────────────────────────────────────────┤
│ Document Counts │
│ ES: 1,234 │
│ OB: 1,234 │
│ Match: ✓ Yes │
├─────────────────────────────────────────────────────────────┤
│ Sample Verification (100 documents) │
│ Matched: 100 │
│ Match Rate: 100.0% │
├─────────────────────────────────────────────────────────────┤
│ Result: ✓ PASSED │
╰─────────────────────────────────────────────────────────────╯
Step 9: Verify RAGFlow Works with OceanBase
- Open RAGFlow Web UI: http://localhost:9380
- Navigate to your Knowledge Base
- Try the same queries you tested before migration
CLI Reference
es-ob-migrate migrate
Run data migration from Elasticsearch to OceanBase.
| Option | Default | Description |
|---|---|---|
--es-host |
localhost | Elasticsearch host |
--es-port |
9200 | Elasticsearch port |
--es-user |
None | ES username (if auth required) |
--es-password |
None | ES password |
--ob-host |
localhost | OceanBase host |
--ob-port |
2881 | OceanBase port |
--ob-user |
root@test | OceanBase user (format: user@tenant) |
--ob-password |
"" | OceanBase password |
--ob-database |
test | OceanBase database name |
-i, --index |
None | Source ES index (omit to migrate all ragflow_* indices) |
-t, --table |
None | Target OB table (omit to use same name as index) |
--batch-size |
1000 | Documents per batch |
--resume |
False | Resume from previous progress |
--verify/--no-verify |
True | Verify after migration |
Example:
# Migrate all ragflow_* indices
es-ob-migrate migrate \
--es-host localhost --es-port 9200 \
--ob-host localhost --ob-port 2881 \
--ob-user "root@ragflow" --ob-password "infini_rag_flow" \
--ob-database ragflow_doc
# Migrate a specific index
es-ob-migrate migrate \
--es-host localhost --es-port 9200 \
--ob-host localhost --ob-port 2881 \
--ob-user "root@ragflow" --ob-password "infini_rag_flow" \
--ob-database ragflow_doc \
--index ragflow_abc123 --table ragflow_abc123
# Resume interrupted migration
es-ob-migrate migrate \
--es-host localhost --es-port 9200 \
--ob-host localhost --ob-port 2881 \
--ob-user "root@ragflow" --ob-password "infini_rag_flow" \
--ob-database ragflow_doc \
--index ragflow_abc123 --table ragflow_abc123 \
--resume
Resume Feature:
Migration progress is automatically saved to .migration_progress/ directory. If migration is interrupted (network error, timeout, etc.), use --resume to continue from where it stopped:
- Progress file:
.migration_progress/{index_name}_progress.json - Contains: total count, migrated count, last document ID, timestamp
- On resume: skips already migrated documents, continues from last position
Output:
RAGFlow ES to OceanBase Migration
Source: localhost:9200/ragflow_abc123
Target: localhost:2881/ragflow_doc.ragflow_abc123
Step 1: Checking connections...
ES cluster status: green
OceanBase connection: OK
Step 2: Analyzing ES index...
Auto-detected vector dimension: 1024
Total documents: 1,234
Step 3: Creating OceanBase table...
Created table 'ragflow_abc123' with RAGFlow schema
Step 4: Migrating data...
Migrating... ━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 1,234/1,234
Migration completed successfully!
Total: 1,234 documents
Duration: 45.2 seconds
es-ob-migrate list-indices
List all RAGFlow indices (ragflow_*) in Elasticsearch.
Example:
es-ob-migrate list-indices --es-host localhost --es-port 9200
Output:
RAGFlow Indices in Elasticsearch:
Index Name Documents Type
ragflow_abc123def456789 1234 Document Chunks
ragflow_doc_meta_abc123def456789 56 Document Metadata
Total: 2 ragflow_* indices found
es-ob-migrate schema
Preview schema analysis from ES mapping.
Example:
es-ob-migrate schema --es-host localhost --es-port 9200 --index ragflow_abc123
Output:
RAGFlow Schema Analysis for index: ragflow_abc123
Vector Fields:
q_1024_vec: dense_vector (dim=1024)
Known RAGFlow Fields (25):
id, kb_id, doc_id, docnm_kwd, content_with_weight, content_ltks,
available_int, important_kwd, question_kwd, tag_kwd, page_num_int...
Unknown Fields (stored in 'extra' column):
custom_field_1, custom_field_2
es-ob-migrate verify
Verify migration data consistency between ES and OceanBase.
Example:
es-ob-migrate verify \
--es-host localhost --es-port 9200 \
--ob-host localhost --ob-port 2881 \
--ob-user "root@ragflow" --ob-password "infini_rag_flow" \
--ob-database ragflow_doc \
--index ragflow_abc123 --table ragflow_abc123 \
--sample-size 100
Output:
╭─────────────────────────────────────────────────────────────╮
│ Migration Verification Report │
├─────────────────────────────────────────────────────────────┤
│ ES Index: ragflow_abc123 │
│ OB Table: ragflow_abc123 │
├─────────────────────────────────────────────────────────────┤
│ Document Counts │
│ ES: 1,234 │
│ OB: 1,234 │
│ Match: ✓ Yes │
├─────────────────────────────────────────────────────────────┤
│ Sample Verification (100 documents) │
│ Matched: 100 │
│ Match Rate: 100.0% │
├─────────────────────────────────────────────────────────────┤
│ Result: ✓ PASSED │
╰─────────────────────────────────────────────────────────────╯
es-ob-migrate list-kb
List all knowledge bases in an ES index.
Example:
es-ob-migrate list-kb --es-host localhost --es-port 9200 --index ragflow_abc123
Output:
Knowledge Bases in index 'ragflow_abc123':
KB ID Documents
kb_001_finance_docs 456
kb_002_technical_manual 321
kb_003_product_faq 457
Total: 3 knowledge bases, 1234 documents
es-ob-migrate sample
Show sample documents from ES index.
Example:
es-ob-migrate sample --es-host localhost --es-port 9200 --index ragflow_abc123 --size 2
Output:
Sample Documents from 'ragflow_abc123':
Document 1:
id: chunk_001_abc123
kb_id: kb_001_finance_docs
doc_id: doc_001
docnm_kwd: quarterly_report.pdf
content_with_weight: The company reported Q3 revenue of $1.2B...
available_int: 1
Document 2:
id: chunk_002_def456
kb_id: kb_001_finance_docs
doc_id: doc_001
docnm_kwd: quarterly_report.pdf
content_with_weight: Operating expenses decreased by 5%...
available_int: 1
es-ob-migrate status
Check connection status to ES and OceanBase.
Example:
es-ob-migrate status \
--es-host localhost --es-port 9200 \
--ob-host localhost --ob-port 2881 \
--ob-user "root@ragflow" --ob-password "infini_rag_flow"
Output:
Connection Status:
Elasticsearch:
Host: localhost:9200
Status: ✓ Connected
Cluster: ragflow-cluster
Version: 8.11.0
Indices: 5
OceanBase:
Host: localhost:2881
Status: ✓ Connected
Version: 4.3.5.1