From 39ef2ffba9d61b6fc48eaaa43aae3fdee3ad61a3 Mon Sep 17 00:00:00 2001 From: Yongteng Lei Date: Wed, 30 Jul 2025 09:48:20 +0800 Subject: [PATCH] Feat: parsing supports jsonl or ldjson format (#9087) ### What problem does this PR solve? Supports jsonl or ldjson format. Feature request from [discussion](https://github.com/orgs/infiniflow/discussions/8774). ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- api/utils/file_utils.py | 2 +- deepdoc/parser/json_parser.py | 78 ++++++++++++++++++++++++++++------- rag/app/naive.py | 2 +- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/api/utils/file_utils.py b/api/utils/file_utils.py index 7fefc54a6..c349453bb 100644 --- a/api/utils/file_utils.py +++ b/api/utils/file_utils.py @@ -155,7 +155,7 @@ def filename_type(filename): if re.match(r".*\.pdf$", filename): return FileType.PDF.value - if re.match(r".*\.(eml|doc|docx|ppt|pptx|yml|xml|htm|json|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md|py|js|java|c|cpp|h|php|go|ts|sh|cs|kt|html|sql)$", filename): + if re.match(r".*\.(eml|doc|docx|ppt|pptx|yml|xml|htm|json|jsonl|ldjson|csv|txt|ini|xls|xlsx|wps|rtf|hlp|pages|numbers|key|md|py|js|java|c|cpp|h|php|go|ts|sh|cs|kt|html|sql)$", filename): return FileType.DOC.value if re.match(r".*\.(wav|flac|ape|alac|wavpack|wv|mp3|aac|ogg|vorbis|opus)$", filename): diff --git a/deepdoc/parser/json_parser.py b/deepdoc/parser/json_parser.py index dc73246ef..249ce1ccb 100644 --- a/deepdoc/parser/json_parser.py +++ b/deepdoc/parser/json_parser.py @@ -22,24 +22,22 @@ import json from typing import Any from rag.nlp import find_codec + + class RAGFlowJsonParser: - def __init__( - self, max_chunk_size: int = 2000, min_chunk_size: int | None = None - ): + def __init__(self, max_chunk_size: int = 2000, min_chunk_size: int | None = None): super().__init__() self.max_chunk_size = max_chunk_size * 2 - self.min_chunk_size = ( - min_chunk_size - if min_chunk_size is not None - else max(max_chunk_size - 200, 50) - ) + self.min_chunk_size = min_chunk_size if min_chunk_size is not None else max(max_chunk_size - 200, 50) def __call__(self, binary): encoding = find_codec(binary) txt = binary.decode(encoding, errors="ignore") - json_data = json.loads(txt) - chunks = self.split_json(json_data, True) - sections = [json.dumps(line, ensure_ascii=False) for line in chunks if line] + + if self.is_jsonl_format(txt): + sections = self._parse_jsonl(txt) + else: + sections = self._parse_json(txt) return sections @staticmethod @@ -60,14 +58,11 @@ class RAGFlowJsonParser: return {k: self._list_to_dict_preprocessing(v) for k, v in data.items()} elif isinstance(data, list): # Convert the list to a dictionary with index-based keys - return { - str(i): self._list_to_dict_preprocessing(item) - for i, item in enumerate(data) - } + return {str(i): self._list_to_dict_preprocessing(item) for i, item in enumerate(data)} else: # Base case: the item is neither a dict nor a list, so return it unchanged return data - + def _json_split( self, data, @@ -131,3 +126,54 @@ class RAGFlowJsonParser: # Convert to string return [json.dumps(chunk, ensure_ascii=ensure_ascii) for chunk in chunks] + + def _parse_json(self, content: str) -> list[str]: + sections = [] + try: + json_data = json.loads(content) + chunks = self.split_json(json_data, True) + sections = [json.dumps(line, ensure_ascii=False) for line in chunks if line] + except json.JSONDecodeError: + pass + return sections + + def _parse_jsonl(self, content: str) -> list[str]: + lines = content.strip().splitlines() + all_chunks = [] + for line in lines: + if not line.strip(): + continue + try: + data = json.loads(line) + chunks = self.split_json(data, convert_lists=True) + all_chunks.extend(json.dumps(chunk, ensure_ascii=False) for chunk in chunks if chunk) + except json.JSONDecodeError: + continue + return all_chunks + + def is_jsonl_format(self, txt: str, sample_limit: int = 10, threshold: float = 0.8) -> bool: + lines = [line.strip() for line in txt.strip().splitlines() if line.strip()] + if not lines: + return False + + try: + json.loads(txt) + return False + except json.JSONDecodeError: + pass + + sample_limit = min(len(lines), sample_limit) + sample_lines = lines[:sample_limit] + valid_lines = sum(1 for line in sample_lines if self._is_valid_json(line)) + + if not valid_lines: + return False + + return (valid_lines / len(sample_lines)) >= threshold + + def _is_valid_json(self, line: str) -> bool: + try: + json.loads(line) + return True + except json.JSONDecodeError: + return False diff --git a/rag/app/naive.py b/rag/app/naive.py index 7873e63b9..bf8f3f3a9 100644 --- a/rag/app/naive.py +++ b/rag/app/naive.py @@ -499,7 +499,7 @@ def chunk(filename, binary=None, from_page=0, to_page=100000, sections = [(_, "") for _ in sections if _] callback(0.8, "Finish parsing.") - elif re.search(r"\.json$", filename, re.IGNORECASE): + elif re.search(r"\.(json|jsonl|ldjson)$", filename, re.IGNORECASE): callback(0.1, "Start to parse.") chunk_token_num = int(parser_config.get("chunk_token_num", 128)) sections = JsonParser(chunk_token_num)(binary)