mirror of
https://github.com/infiniflow/ragflow.git
synced 2025-12-08 20:42:30 +08:00
Feat: parse email (#10181)
### What problem does this PR solve? - Dataflow support email. - Fix old email parser. - Add new depends to parse msg file. ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) - [x] New Feature (non-breaking change which adds functionality) - [x] Other (please describe): add new depends.
This commit is contained in:
@ -13,7 +13,9 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
from functools import partial
|
||||
|
||||
@ -57,7 +59,10 @@ class ParserParam(ProcessParamBase):
|
||||
"image": [
|
||||
"text"
|
||||
],
|
||||
"email": [],
|
||||
"email": [
|
||||
"text",
|
||||
"json"
|
||||
],
|
||||
"text": [
|
||||
"text",
|
||||
"json"
|
||||
@ -112,7 +117,11 @@ class ParserParam(ProcessParamBase):
|
||||
"output_format": "json",
|
||||
},
|
||||
"email": {
|
||||
"fields": []
|
||||
"suffix": [
|
||||
"eml", "msg"
|
||||
],
|
||||
"fields": ["from", "to", "cc", "bcc", "date", "subject", "body", "attachments", "metadata"],
|
||||
"output_format": "json",
|
||||
},
|
||||
"text": {
|
||||
"suffix": [
|
||||
@ -194,6 +203,11 @@ class ParserParam(ProcessParamBase):
|
||||
audio_language = audio_config.get("lang", "")
|
||||
self.check_empty(audio_language, "Language")
|
||||
|
||||
email_config = self.setups.get("email", "")
|
||||
if email_config:
|
||||
email_output_format = email_config.get("output_format", "")
|
||||
self.check_valid_value(email_output_format, "Email output format abnormal.", self.allowed_output_format["email"])
|
||||
|
||||
def get_input_form(self) -> dict[str, dict]:
|
||||
return {}
|
||||
|
||||
@ -384,6 +398,124 @@ class Parser(ProcessBase):
|
||||
|
||||
self.set_output("text", txt)
|
||||
|
||||
def _email(self, from_upstream: ParserFromUpstream):
|
||||
self.callback(random.randint(1, 5) / 100.0, "Start to work on an email.")
|
||||
|
||||
blob = from_upstream.blob
|
||||
name = from_upstream.name
|
||||
|
||||
email_content = {}
|
||||
conf = self._param.setups["email"]
|
||||
target_fields = conf["fields"]
|
||||
|
||||
_, ext = os.path.splitext(name)
|
||||
if ext == ".eml":
|
||||
# handle eml file
|
||||
from email import policy
|
||||
from email.parser import BytesParser
|
||||
|
||||
msg = BytesParser(policy=policy.default).parse(io.BytesIO(blob))
|
||||
email_content['metadata'] = {}
|
||||
# handle header info
|
||||
for header, value in msg.items():
|
||||
# get fields like from, to, cc, bcc, date, subject
|
||||
if header.lower() in target_fields:
|
||||
email_content[header.lower()] = value
|
||||
# get metadata
|
||||
elif header.lower() not in ["from", "to", "cc", "bcc", "date", "subject"]:
|
||||
email_content["metadata"][header.lower()] = value
|
||||
# get body
|
||||
if "body" in target_fields:
|
||||
body_text, body_html = [], []
|
||||
def _add_content(m, content_type):
|
||||
if content_type == "text/plain":
|
||||
body_text.append(
|
||||
m.get_payload(decode=True).decode(m.get_content_charset())
|
||||
)
|
||||
elif content_type == "text/html":
|
||||
body_html.append(
|
||||
m.get_payload(decode=True).decode(m.get_content_charset())
|
||||
)
|
||||
elif "multipart" in content_type:
|
||||
if m.is_multipart():
|
||||
for part in m.iter_parts():
|
||||
_add_content(part, part.get_content_type())
|
||||
|
||||
_add_content(msg, msg.get_content_type())
|
||||
|
||||
email_content["text"] = body_text
|
||||
email_content["text_html"] = body_html
|
||||
# get attachment
|
||||
if "attachments" in target_fields:
|
||||
attachments = []
|
||||
for part in msg.iter_attachments():
|
||||
content_disposition = part.get("Content-Disposition")
|
||||
if content_disposition:
|
||||
dispositions = content_disposition.strip().split(";")
|
||||
if dispositions[0].lower() == "attachment":
|
||||
filename = part.get_filename()
|
||||
payload = part.get_payload(decode=True)
|
||||
attachments.append({
|
||||
"filename": filename,
|
||||
"payload": payload,
|
||||
})
|
||||
email_content["attachments"] = attachments
|
||||
else:
|
||||
# handle msg file
|
||||
import extract_msg
|
||||
print("handle a msg file.")
|
||||
msg = extract_msg.Message(blob)
|
||||
# handle header info
|
||||
basic_content = {
|
||||
"from": msg.sender,
|
||||
"to": msg.to,
|
||||
"cc": msg.cc,
|
||||
"bcc": msg.bcc,
|
||||
"date": msg.date,
|
||||
"subject": msg.subject,
|
||||
}
|
||||
email_content.update({k: v for k, v in basic_content.items() if k in target_fields})
|
||||
# get metadata
|
||||
email_content['metadata'] = {
|
||||
'message_id': msg.messageId,
|
||||
'in_reply_to': msg.inReplyTo,
|
||||
}
|
||||
# get body
|
||||
if "body" in target_fields:
|
||||
email_content["text"] = msg.body # usually empty. try text_html instead
|
||||
email_content["text_html"] = msg.htmlBody
|
||||
# get attachments
|
||||
if "attachments" in target_fields:
|
||||
attachments = []
|
||||
for t in msg.attachments:
|
||||
attachments.append({
|
||||
"filename": t.name,
|
||||
"payload": t.data # binary
|
||||
})
|
||||
email_content["attachments"] = attachments
|
||||
|
||||
if conf["output_format"] == "json":
|
||||
self.set_output("json", [email_content])
|
||||
else:
|
||||
content_txt = ''
|
||||
for k, v in email_content.items():
|
||||
if isinstance(v, str):
|
||||
# basic info
|
||||
content_txt += f'{k}:{v}' + "\n"
|
||||
elif isinstance(v, dict):
|
||||
# metadata
|
||||
content_txt += f'{k}:{json.dumps(v)}' + "\n"
|
||||
elif isinstance(v, list):
|
||||
# attachments or others
|
||||
for fb in v:
|
||||
if isinstance(fb, dict):
|
||||
# attachments
|
||||
content_txt += f'{fb["filename"]}:{fb["payload"]}' + "\n"
|
||||
else:
|
||||
# str, usually plain text
|
||||
content_txt += fb
|
||||
self.set_output("text", content_txt)
|
||||
|
||||
async def _invoke(self, **kwargs):
|
||||
function_map = {
|
||||
"pdf": self._pdf,
|
||||
@ -394,6 +526,7 @@ class Parser(ProcessBase):
|
||||
"text": self._text,
|
||||
"image": self._image,
|
||||
"audio": self._audio,
|
||||
"email": self._email,
|
||||
}
|
||||
try:
|
||||
from_upstream = ParserFromUpstream.model_validate(kwargs)
|
||||
|
||||
Reference in New Issue
Block a user