Files
Se7en 332b11cf96 feat(tools): add Elasticsearch to OceanBase migration tool (#12927)
### What problem does this PR solve?

fixes https://github.com/infiniflow/ragflow/issues/12774

Add a CLI tool for migrating RAGFlow data from Elasticsearch to
OceanBase, enabling users to switch their document storage backend.

- Automatic discovery and migration of all `ragflow_*` indices
- Schema conversion with vector dimension auto-detection
- Batch processing with progress tracking and resume capability
- Data consistency validation and migration report generation

**Note**: Due to network issues, I was unable to pull the required
Docker images (Elasticsearch, OceanBase) to run the full end-to-end
verification. Unit tests have been verified to pass. I will complete the
e2e verification when network conditions allow, and submit a follow-up
PR if any fixes are needed.

```bash
============================= test session starts ==============================
platform darwin -- Python 3.13.6, pytest-9.0.2, pluggy-1.6.0
rootdir: /Users/sevenc/code/ai/oceanbase/ragflow/tools/es-to-oceanbase-migration
configfile: pyproject.toml
testpaths: tests
plugins: anyio-4.12.1, asyncio-1.3.0, cov-7.0.0
collected 86 items

tests/test_progress.py::TestMigrationProgress::test_create_basic_progress PASSED [  1%]
tests/test_progress.py::TestMigrationProgress::test_create_progress_with_counts PASSED [  2%]
tests/test_progress.py::TestMigrationProgress::test_progress_default_values PASSED [  3%]
tests/test_progress.py::TestMigrationProgress::test_progress_status_values PASSED [  4%]
tests/test_progress.py::TestProgressManager::test_create_progress_manager PASSED [  5%]
tests/test_progress.py::TestProgressManager::test_create_progress_manager_creates_dir PASSED [  6%]
tests/test_progress.py::TestProgressManager::test_create_progress PASSED [  8%]
tests/test_progress.py::TestProgressManager::test_save_and_load_progress PASSED [  9%]
tests/test_progress.py::TestProgressManager::test_load_nonexistent_progress PASSED [ 10%]
tests/test_progress.py::TestProgressManager::test_delete_progress PASSED [ 11%]
tests/test_progress.py::TestProgressManager::test_update_progress PASSED [ 12%]
tests/test_progress.py::TestProgressManager::test_update_progress_multiple_batches PASSED [ 13%]
tests/test_progress.py::TestProgressManager::test_mark_completed PASSED  [ 15%]
tests/test_progress.py::TestProgressManager::test_mark_failed PASSED     [ 16%]
tests/test_progress.py::TestProgressManager::test_mark_paused PASSED     [ 17%]
tests/test_progress.py::TestProgressManager::test_can_resume_running PASSED [ 18%]
tests/test_progress.py::TestProgressManager::test_can_resume_paused PASSED [ 19%]
tests/test_progress.py::TestProgressManager::test_can_resume_completed PASSED [ 20%]
tests/test_progress.py::TestProgressManager::test_can_resume_nonexistent PASSED [ 22%]
tests/test_progress.py::TestProgressManager::test_get_resume_info PASSED [ 23%]
tests/test_progress.py::TestProgressManager::test_get_resume_info_nonexistent PASSED [ 24%]
tests/test_progress.py::TestProgressManager::test_progress_file_path PASSED [ 25%]
tests/test_progress.py::TestProgressManager::test_progress_file_content PASSED [ 26%]
tests/test_schema.py::TestRAGFlowSchemaConverter::test_analyze_ragflow_mapping PASSED [ 27%]
tests/test_schema.py::TestRAGFlowSchemaConverter::test_detect_vector_size PASSED [ 29%]
tests/test_schema.py::TestRAGFlowSchemaConverter::test_unknown_fields PASSED [ 30%]
tests/test_schema.py::TestRAGFlowSchemaConverter::test_get_column_definitions PASSED [ 31%]
tests/test_schema.py::TestRAGFlowDataConverter::test_convert_basic_document PASSED [ 32%]
tests/test_schema.py::TestRAGFlowDataConverter::test_convert_with_vector PASSED [ 33%]
tests/test_schema.py::TestRAGFlowDataConverter::test_convert_array_fields PASSED [ 34%]
tests/test_schema.py::TestRAGFlowDataConverter::test_convert_json_fields PASSED [ 36%]
tests/test_schema.py::TestRAGFlowDataConverter::test_convert_unknown_fields_to_extra PASSED [ 37%]
tests/test_schema.py::TestRAGFlowDataConverter::test_convert_kb_id_list PASSED [ 38%]
tests/test_schema.py::TestRAGFlowDataConverter::test_convert_content_with_weight_dict PASSED [ 39%]
tests/test_schema.py::TestRAGFlowDataConverter::test_convert_batch PASSED [ 40%]
tests/test_schema.py::TestVectorFieldPattern::test_valid_patterns PASSED [ 41%]
tests/test_schema.py::TestVectorFieldPattern::test_invalid_patterns PASSED [ 43%]
tests/test_schema.py::TestVectorFieldPattern::test_extract_dimension PASSED [ 44%]
tests/test_schema.py::TestConstants::test_array_columns PASSED           [ 45%]
tests/test_schema.py::TestConstants::test_json_columns PASSED            [ 46%]
tests/test_schema.py::TestConstants::test_ragflow_columns_completeness PASSED [ 47%]
tests/test_schema.py::TestConstants::test_fts_columns PASSED             [ 48%]
tests/test_schema.py::TestConstants::test_ragflow_columns_types PASSED   [ 50%]
tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_empty_mapping PASSED [ 51%]
tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_mapping_without_properties PASSED [ 52%]
tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_multiple_vector_fields PASSED [ 53%]
tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_get_column_definitions_without_analysis PASSED [ 54%]
tests/test_schema.py::TestRAGFlowSchemaConverterEdgeCases::test_get_vector_fields PASSED [ 55%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_empty_document PASSED [ 56%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_document_without_source PASSED [ 58%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_boolean_to_integer PASSED [ 59%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_invalid_integer PASSED [ 60%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_float_field PASSED [ 61%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_array_with_special_characters PASSED [ 62%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_already_json_array PASSED [ 63%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_single_value_to_array PASSED [ 65%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_detect_vector_fields_from_document PASSED [ 66%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_with_default_values PASSED [ 67%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_list_content PASSED [ 68%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_convert_batch_empty PASSED [ 69%]
tests/test_schema.py::TestRAGFlowDataConverterEdgeCases::test_existing_extra_field_merged PASSED [ 70%]
tests/test_verify.py::TestVerificationResult::test_create_basic_result PASSED [ 72%]
tests/test_verify.py::TestVerificationResult::test_result_default_values PASSED [ 73%]
tests/test_verify.py::TestVerificationResult::test_result_with_counts PASSED [ 74%]
tests/test_verify.py::TestMigrationVerifier::test_verify_counts_match PASSED [ 75%]
tests/test_verify.py::TestMigrationVerifier::test_verify_counts_mismatch PASSED [ 76%]
tests/test_verify.py::TestMigrationVerifier::test_verify_samples_all_match PASSED [ 77%]
tests/test_verify.py::TestMigrationVerifier::test_verify_samples_some_missing PASSED [ 79%]
tests/test_verify.py::TestMigrationVerifier::test_verify_samples_data_mismatch PASSED [ 80%]
tests/test_verify.py::TestMigrationVerifier::test_values_equal_none_values PASSED [ 81%]
tests/test_verify.py::TestMigrationVerifier::test_values_equal_array_columns PASSED [ 82%]
tests/test_verify.py::TestMigrationVerifier::test_values_equal_json_columns PASSED [ 83%]
tests/test_verify.py::TestMigrationVerifier::test_values_equal_kb_id_list PASSED [ 84%]
tests/test_verify.py::TestMigrationVerifier::test_values_equal_content_with_weight_dict PASSED [ 86%]
tests/test_verify.py::TestMigrationVerifier::test_determine_result_passed PASSED [ 87%]
tests/test_verify.py::TestMigrationVerifier::test_determine_result_failed_count PASSED [ 88%]
tests/test_verify.py::TestMigrationVerifier::test_determine_result_failed_samples PASSED [ 89%]
tests/test_verify.py::TestMigrationVerifier::test_generate_report PASSED [ 90%]
tests/test_verify.py::TestMigrationVerifier::test_generate_report_with_missing PASSED [ 91%]
tests/test_verify.py::TestMigrationVerifier::test_generate_report_with_mismatches PASSED [ 93%]
tests/test_verify.py::TestValueComparison::test_string_comparison PASSED [ 94%]
tests/test_verify.py::TestValueComparison::test_integer_comparison PASSED [ 95%]
tests/test_verify.py::TestValueComparison::test_float_comparison PASSED  [ 96%]
tests/test_verify.py::TestValueComparison::test_boolean_comparison PASSED [ 97%]
tests/test_verify.py::TestValueComparison::test_empty_array_comparison PASSED [ 98%]
tests/test_verify.py::TestValueComparison::test_nested_json_comparison PASSED [100%]

======================= 86 passed, 88 warnings in 0.66s ========================
```

### Type of change

- [ ] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [ ] Documentation Update
- [ ] Refactoring
- [ ] Performance Improvement
- [ ] Other (please describe):
2026-01-31 16:11:27 +08:00

322 lines
11 KiB
Python

"""
Tests for progress tracking and resume capability.
"""
import json
import os
import tempfile
import pytest
from pathlib import Path
from datetime import datetime
from es_ob_migration.progress import MigrationProgress, ProgressManager
class TestMigrationProgress:
"""Test MigrationProgress dataclass."""
def test_create_basic_progress(self):
"""Test creating a basic progress object."""
progress = MigrationProgress(
es_index="ragflow_test",
ob_table="ragflow_test",
)
assert progress.es_index == "ragflow_test"
assert progress.ob_table == "ragflow_test"
assert progress.total_documents == 0
assert progress.migrated_documents == 0
assert progress.status == "pending"
assert progress.started_at != ""
assert progress.updated_at != ""
def test_create_progress_with_counts(self):
"""Test creating progress with document counts."""
progress = MigrationProgress(
es_index="ragflow_test",
ob_table="ragflow_test",
total_documents=1000,
migrated_documents=500,
)
assert progress.total_documents == 1000
assert progress.migrated_documents == 500
def test_progress_default_values(self):
"""Test default values."""
progress = MigrationProgress(
es_index="test_index",
ob_table="test_table",
)
assert progress.failed_documents == 0
assert progress.last_sort_values == []
assert progress.last_batch_ids == []
assert progress.error_message == ""
assert progress.schema_converted is False
assert progress.table_created is False
assert progress.indexes_created is False
def test_progress_status_values(self):
"""Test various status values."""
for status in ["pending", "running", "completed", "failed", "paused"]:
progress = MigrationProgress(
es_index="test",
ob_table="test",
status=status,
)
assert progress.status == status
class TestProgressManager:
"""Test ProgressManager class."""
@pytest.fixture
def temp_dir(self):
"""Create a temporary directory for tests."""
with tempfile.TemporaryDirectory() as tmpdir:
yield tmpdir
@pytest.fixture
def manager(self, temp_dir):
"""Create a ProgressManager with temp directory."""
return ProgressManager(progress_dir=temp_dir)
def test_create_progress_manager(self, temp_dir):
"""Test creating a progress manager."""
manager = ProgressManager(progress_dir=temp_dir)
assert manager.progress_dir.exists()
def test_create_progress_manager_creates_dir(self, temp_dir):
"""Test that progress manager creates directory."""
new_dir = os.path.join(temp_dir, "new_progress")
manager = ProgressManager(progress_dir=new_dir)
assert Path(new_dir).exists()
def test_create_progress(self, manager):
"""Test creating new progress."""
progress = manager.create_progress(
es_index="ragflow_abc123",
ob_table="ragflow_abc123",
total_documents=1000,
)
assert progress.es_index == "ragflow_abc123"
assert progress.ob_table == "ragflow_abc123"
assert progress.total_documents == 1000
assert progress.status == "running"
def test_save_and_load_progress(self, manager):
"""Test saving and loading progress."""
# Create and save
progress = manager.create_progress(
es_index="ragflow_test",
ob_table="ragflow_test",
total_documents=500,
)
progress.migrated_documents = 250
progress.last_sort_values = ["doc_250", 1234567890]
manager.save_progress(progress)
# Load
loaded = manager.load_progress("ragflow_test", "ragflow_test")
assert loaded is not None
assert loaded.es_index == "ragflow_test"
assert loaded.total_documents == 500
assert loaded.migrated_documents == 250
assert loaded.last_sort_values == ["doc_250", 1234567890]
def test_load_nonexistent_progress(self, manager):
"""Test loading progress that doesn't exist."""
loaded = manager.load_progress("nonexistent", "nonexistent")
assert loaded is None
def test_delete_progress(self, manager):
"""Test deleting progress."""
# Create progress
manager.create_progress(
es_index="ragflow_delete_test",
ob_table="ragflow_delete_test",
total_documents=100,
)
# Verify it exists
assert manager.load_progress("ragflow_delete_test", "ragflow_delete_test") is not None
# Delete
manager.delete_progress("ragflow_delete_test", "ragflow_delete_test")
# Verify it's gone
assert manager.load_progress("ragflow_delete_test", "ragflow_delete_test") is None
def test_update_progress(self, manager):
"""Test updating progress."""
progress = manager.create_progress(
es_index="ragflow_update",
ob_table="ragflow_update",
total_documents=1000,
)
# Update
manager.update_progress(
progress,
migrated_count=100,
last_sort_values=["doc_100", 9999],
last_batch_ids=["id1", "id2", "id3"],
)
assert progress.migrated_documents == 100
assert progress.last_sort_values == ["doc_100", 9999]
assert progress.last_batch_ids == ["id1", "id2", "id3"]
def test_update_progress_multiple_batches(self, manager):
"""Test updating progress multiple times."""
progress = manager.create_progress(
es_index="ragflow_multi",
ob_table="ragflow_multi",
total_documents=1000,
)
# Update multiple times
for i in range(1, 11):
manager.update_progress(progress, migrated_count=100)
assert progress.migrated_documents == 1000
def test_mark_completed(self, manager):
"""Test marking migration as completed."""
progress = manager.create_progress(
es_index="ragflow_complete",
ob_table="ragflow_complete",
total_documents=100,
)
progress.migrated_documents = 100
manager.mark_completed(progress)
assert progress.status == "completed"
def test_mark_failed(self, manager):
"""Test marking migration as failed."""
progress = manager.create_progress(
es_index="ragflow_fail",
ob_table="ragflow_fail",
total_documents=100,
)
manager.mark_failed(progress, "Connection timeout")
assert progress.status == "failed"
assert progress.error_message == "Connection timeout"
def test_mark_paused(self, manager):
"""Test marking migration as paused."""
progress = manager.create_progress(
es_index="ragflow_pause",
ob_table="ragflow_pause",
total_documents=1000,
)
progress.migrated_documents = 500
manager.mark_paused(progress)
assert progress.status == "paused"
def test_can_resume_running(self, manager):
"""Test can_resume for running migration."""
progress = manager.create_progress(
es_index="ragflow_resume_running",
ob_table="ragflow_resume_running",
total_documents=1000,
)
assert manager.can_resume("ragflow_resume_running", "ragflow_resume_running") is True
def test_can_resume_paused(self, manager):
"""Test can_resume for paused migration."""
progress = manager.create_progress(
es_index="ragflow_resume_paused",
ob_table="ragflow_resume_paused",
total_documents=1000,
)
manager.mark_paused(progress)
assert manager.can_resume("ragflow_resume_paused", "ragflow_resume_paused") is True
def test_can_resume_completed(self, manager):
"""Test can_resume for completed migration."""
progress = manager.create_progress(
es_index="ragflow_resume_complete",
ob_table="ragflow_resume_complete",
total_documents=100,
)
progress.migrated_documents = 100
manager.mark_completed(progress)
# Completed migrations should not be resumed
assert manager.can_resume("ragflow_resume_complete", "ragflow_resume_complete") is False
def test_can_resume_nonexistent(self, manager):
"""Test can_resume for nonexistent migration."""
assert manager.can_resume("nonexistent", "nonexistent") is False
def test_get_resume_info(self, manager):
"""Test getting resume information."""
progress = manager.create_progress(
es_index="ragflow_info",
ob_table="ragflow_info",
total_documents=1000,
)
progress.migrated_documents = 500
progress.last_sort_values = ["doc_500", 12345]
progress.schema_converted = True
progress.table_created = True
manager.save_progress(progress)
info = manager.get_resume_info("ragflow_info", "ragflow_info")
assert info is not None
assert info["migrated_documents"] == 500
assert info["total_documents"] == 1000
assert info["last_sort_values"] == ["doc_500", 12345]
assert info["schema_converted"] is True
assert info["table_created"] is True
assert info["status"] == "running"
def test_get_resume_info_nonexistent(self, manager):
"""Test getting resume info for nonexistent migration."""
info = manager.get_resume_info("nonexistent", "nonexistent")
assert info is None
def test_progress_file_path(self, manager):
"""Test progress file naming."""
progress = manager.create_progress(
es_index="ragflow_abc123",
ob_table="ragflow_abc123",
total_documents=100,
)
expected_file = manager.progress_dir / "ragflow_abc123_to_ragflow_abc123.json"
assert expected_file.exists()
def test_progress_file_content(self, manager):
"""Test progress file JSON content."""
progress = manager.create_progress(
es_index="ragflow_json",
ob_table="ragflow_json",
total_documents=100,
)
progress.migrated_documents = 50
manager.save_progress(progress)
# Read file directly
progress_file = manager.progress_dir / "ragflow_json_to_ragflow_json.json"
with open(progress_file) as f:
data = json.load(f)
assert data["es_index"] == "ragflow_json"
assert data["ob_table"] == "ragflow_json"
assert data["total_documents"] == 100
assert data["migrated_documents"] == 50