import concurrent.futures import email import imaplib import logging from email.header import decode_header from email.utils import parsedate_to_datetime from json import JSONEncoder from pathlib import Path from typing import List import chardet from iarchiver.email import extract_emails, normalize_for_imap_folder from iarchiver.hash import murmur3_chunked class FileAttachment: def __init__(self, file_name: str, file_hash: str): self.filename = file_name self.hash = file_hash def to_dict(self): return {'filename': self.filename, 'hash': self.hash} class FileAttachmentEncoder(JSONEncoder): def default(self, o): if isinstance(o, FileAttachment): return o.to_dict() return super().default(o) class MailConnection: def __init__(self, host: str, username: str, password: str, attachments_dir: Path): self.mail = imaplib.IMAP4_SSL(host) self.mail.login(username, password) self.attachments_dir = attachments_dir.expanduser().absolute().resolve() self.folder_structure = {} self.logger = logging.getLogger('iarchiver.mail_conn') self.logger.setLevel(logging.INFO) def load_folders(self): self.folder_structure = [f.decode().split(' "/" ')[1].replace('"', '').replace("\\'", "'") for f in self.mail.list()[1]] return self.folder_structure def __fetch_email(self, i): result, data = self.mail.uid('fetch', str(i), '(BODY[])') # fetch the raw email if data[0] is None: return raw_email_bytes = data[0][1] try: detected = chardet.detect(raw_email_bytes) except TypeError as e: self.logger.critical(f'Failed to decode an email. Timeout? Server error? - "{e}"') return encoding = detected['encoding'] if not encoding: encoding = 'utf-8' raw_email = raw_email_bytes.decode(encoding, errors='replace') email_message = email.message_from_string(raw_email) date_header = email_message['Date'] if not date_header: date_header = 'Thu, 1 Jan 1970 00:00:00 +0000' parsed_date = email.utils.parsedate_to_datetime(date_header) unix_timestamp = int(parsed_date.timestamp()) from_addr = email_message['From'] to_addr = email_message['To'] if not from_addr and not to_addr: return if not from_addr: from_addr = '' if not to_addr: to_addr = '' from_header = ', '.join(extract_emails(from_addr)) to_header = ', '.join(extract_emails(to_addr)) if '@' not in from_header: from_header = from_addr if '@' not in to_header: to_header = to_addr subject_header = email_message['Subject'] if subject_header: subject = decode_header(subject_header)[0][0] if isinstance(subject, bytes): try: detected = chardet.detect(subject) encoding = detected['encoding'] if not encoding: encoding = 'utf-8' subject = subject.decode(encoding, errors='replace') except UnicodeDecodeError: subject = subject.decode('utf-8') else: return attachments = [] if email_message.is_multipart(): for part in email_message.walk(): # content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if "attachment" in content_disposition: filename = part.get_filename() if filename: # The filename of the file is the hash of its content, which should de-duplicate files. filecontents = part.get_payload(decode=True) if not filecontents: continue filehash = murmur3_chunked(filecontents) part.set_payload(f'MD5:{filehash}') # replace the attachment with its hash filepath = self.attachments_dir / filehash file_obj = FileAttachment(filename, filehash) if not filepath.is_file(): with open(filepath, 'wb') as f: f.write(filecontents) attachments.append(file_obj) raw_email_clean = email_message.as_string() return unix_timestamp, to_header, from_header, subject, raw_email_clean, attachments def fetch_folder(self, folder: str, search_criterion: List[str] = None, max_threads: int = 1): """ Don't use multiple threads because most mail servers don't allow the client to multiplex. """ if not search_criterion: search_criterion = ['ALL'] self.mail.select(normalize_for_imap_folder(folder)) for search_item in search_criterion: result, data = self.mail.uid('search', search_item) mail_ids = data[0] id_list = mail_ids.split() if not len(id_list): # Empty folder return first_email_id = int(id_list[0]) latest_email_id = int(id_list[-1]) with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor: futures = {executor.submit(self.__fetch_email, i) for i in range(latest_email_id, first_email_id, -1)} for future in concurrent.futures.as_completed(futures): result = future.result() if result is not None: yield result