import base64 import json import os import sqlite3 from datetime import datetime from pathlib import Path import magic from flask import Flask, render_template, send_from_directory, abort from flask_httpauth import HTTPBasicAuth app = Flask(__name__) if not os.environ.get('HTTP_AUTH_PASS'): raise Exception('HTTP_AUTH_PASS not set.') if not os.environ.get('EMAIL_ARCHIVE_ROOT'): raise Exception('EMAIL_ARCHIVE_ROOT not set.') try: user_password = base64.b64decode(os.environ.get('HTTP_AUTH_PASS')).decode().replace('\n', '') except UnicodeDecodeError: raise Exception('Cannot decode password. Is it base64 encoded?') auth = HTTPBasicAuth() users = { 'admin': user_password, } @auth.get_password def get_pw(username): if username in users: return users.get(username) return None def dict_from_row(row): return dict(zip(row.keys(), row)) @app.route('/') @auth.login_required def index(): return render_template('index.html', accounts=[x.name for x in list(Path(os.environ.get('EMAIL_ARCHIVE_ROOT')).iterdir())]) @app.route('//') @auth.login_required def account(email_account): conn = get_db_connection(email_account) if not conn: abort(404) folders = conn.execute('SELECT name, table_name FROM folders_mapping').fetchall() syncs = conn.execute('SELECT * FROM syncs ORDER BY timestamp DESC').fetchall() conn.close() syncs = [dict_from_row(sync) for sync in syncs] for sync in syncs: sync['timestamp'] = datetime.fromtimestamp(sync['timestamp']).strftime('%Y-%m-%d %H:%M:%S') return render_template('account.html', folders=folders, syncs=syncs, email_account=email_account) @app.route('//folder/') @auth.login_required def folder(email_account, table_name): conn = get_db_connection(email_account) if not conn: abort(404) emails = conn.execute(f'SELECT * FROM {table_name} ORDER BY timestamp DESC').fetchall() conn.close() emails = [dict_from_row(email) for email in emails] for item in emails: item['timestamp'] = datetime.fromtimestamp(item['timestamp']).strftime('%Y-%m-%d %H:%M:%S') return render_template('folder.html', emails=emails, table_name=table_name, email_account=email_account) @app.route('//email//') @auth.login_required def email(email_account, table_name, id): conn = get_db_connection(email_account) if not conn: abort(404) email = dict_from_row(conn.execute(f'SELECT * FROM {table_name} WHERE id = ?', (id,)).fetchone()) conn.close() email['timestamp'] = datetime.fromtimestamp(email['timestamp']).strftime('%Y-%m-%d %H:%M:%S') attachments = json.loads(email['attachments']) return render_template('email.html', email=email, attachments=attachments, email_account=email_account) @app.route('//attachments/') @auth.login_required def download_file(email_account, filename): p = Path(os.environ.get('EMAIL_ARCHIVE_ROOT'), email_account, 'attachments', filename) if not p.exists(): abort(404) mimetype = magic.from_file(str(p), mime=True) return send_from_directory(Path(os.environ.get('EMAIL_ARCHIVE_ROOT'), email_account, 'attachments'), filename, mimetype=mimetype) def get_db_connection(email_account): try: conn = sqlite3.connect(os.path.join(os.environ.get('EMAIL_ARCHIVE_ROOT'), email_account, 'emails.db')) conn.row_factory = sqlite3.Row return conn except: return None if __name__ == '__main__': app.run(host='0.0.0.0', debug=True)