imap-archiver/iarchiver/database.py

86 lines
3.8 KiB
Python

import json
import re
import sqlite3
import time
from pathlib import Path
from typing import List
from iarchiver.hash import murmur3_chunked
from iarchiver.mail_conn import FileAttachment, FileAttachmentEncoder
def is_valid_table_name(table_name):
return re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', table_name) is not None
def sanitize_table_name(name):
name = name.replace('/', '_')
# Remove any non-alphanumeric characters
sanitized_name = re.sub(r'\W+', '', name)
# If the first character is a digit, prepend an underscore
if sanitized_name and sanitized_name[0].isdigit():
sanitized_name = '_' + sanitized_name
return sanitized_name
class EmailDatabase:
__restricted_strings = ['folders_mapping', 'syncs']
def __init__(self, filepath: Path):
filepath = filepath.expanduser().absolute().resolve()
self.conn = sqlite3.connect(filepath)
cursor = self.conn.cursor()
cursor.execute(f'CREATE TABLE IF NOT EXISTS folders_mapping (name TEXT UNIQUE, table_name TEXT UNIQUE)')
cursor.execute(f'CREATE TABLE IF NOT EXISTS syncs (timestamp INTEGER UNIQUE, type TEXT, new_emails INTEGER, new_attachments INTEGER, duration INTEGER)')
self.conn.commit()
cursor.close()
def __create_table(self, table_name: str):
sanitized_table_name = sanitize_table_name(table_name)
if sanitized_table_name in self.__restricted_strings:
raise ValueError(f'Invalid table name, conflicts with system tables: {table_name}')
cursor = self.conn.cursor()
cursor.execute(f'CREATE TABLE IF NOT EXISTS {sanitized_table_name} (timestamp INTEGER, to_email TEXT, from_email TEXT, subject TEXT, raw_content TEXT, raw_content_hash TEXT, attachments TEXT, id INTEGER PRIMARY KEY AUTOINCREMENT)')
cursor.execute('INSERT OR IGNORE INTO folders_mapping (name, table_name) VALUES (?, ?)', (table_name, sanitized_table_name))
self.conn.commit()
cursor.close()
def insert_email(self, folder: str, timestamp: int, subject: str, raw_content: str, to_email: str, from_email: str, attachments: List[FileAttachment]):
raw_content_hash = murmur3_chunked(raw_content.encode())
sanitized_table_name = sanitize_table_name(folder)
self.__create_table(folder)
cursor = self.conn.cursor()
# Check if record already exists
stmt_check = f"SELECT * FROM {sanitized_table_name} WHERE timestamp = ? AND raw_content_hash = ?"
cursor.execute(stmt_check, (timestamp, raw_content_hash))
data = cursor.fetchone()
# If record does not exist, insert it
new_email = False
if data is None:
stmt = f"INSERT INTO {sanitized_table_name} (timestamp, to_email, from_email, subject, raw_content, raw_content_hash, attachments) VALUES (?, ?, ?, ?, ?, ?, ?)"
cursor.execute(stmt, (timestamp, to_email, from_email, subject, raw_content, raw_content_hash, json.dumps(attachments, cls=FileAttachmentEncoder)))
self.conn.commit()
new_email = True
cursor.close()
return new_email
def finish_sync(self, sync_type: str, new_emails: int, new_attachments: int, duration: int):
now = int(time.time())
cursor = self.conn.cursor()
cursor.execute('INSERT INTO syncs (timestamp, type, new_emails, new_attachments, duration) VALUES (?, ?, ?, ?, ?)', (now, sync_type, new_emails, new_attachments, duration))
self.conn.commit()
cursor.close()
return now
def have_we_done_a_full_sync_at_all(self):
cursor = self.conn.cursor()
cursor.execute("SELECT * FROM syncs ORDER BY timestamp LIMIT 1")
row = cursor.fetchone()
cursor.close()
if row is not None:
return row[0]
else:
return None