110 lines
3.5 KiB
Python
110 lines
3.5 KiB
Python
import base64
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import magic
|
|
from flask import Flask, render_template, send_from_directory, abort
|
|
from flask_httpauth import HTTPBasicAuth
|
|
|
|
app = Flask(__name__)
|
|
|
|
if not os.environ.get('HTTP_AUTH_PASS'):
|
|
raise Exception('HTTP_AUTH_PASS not set.')
|
|
if not os.environ.get('EMAIL_ARCHIVE_ROOT'):
|
|
raise Exception('EMAIL_ARCHIVE_ROOT not set.')
|
|
|
|
try:
|
|
user_password = base64.b64decode(os.environ.get('HTTP_AUTH_PASS')).decode().replace('\n', '')
|
|
except UnicodeDecodeError:
|
|
raise Exception('Cannot decode password. Is it base64 encoded?')
|
|
|
|
auth = HTTPBasicAuth()
|
|
users = {
|
|
'admin': user_password,
|
|
}
|
|
|
|
|
|
@auth.get_password
|
|
def get_pw(username):
|
|
if username in users:
|
|
return users.get(username)
|
|
return None
|
|
|
|
|
|
def dict_from_row(row):
|
|
return dict(zip(row.keys(), row))
|
|
|
|
|
|
@app.route('/')
|
|
@auth.login_required
|
|
def index():
|
|
return render_template('index.html', accounts=[x.name for x in list(Path(os.environ.get('EMAIL_ARCHIVE_ROOT')).iterdir())])
|
|
|
|
|
|
@app.route('/<email_account>/')
|
|
@auth.login_required
|
|
def account(email_account):
|
|
conn = get_db_connection(email_account)
|
|
if not conn:
|
|
abort(404)
|
|
folders = conn.execute('SELECT name, table_name FROM folders_mapping').fetchall()
|
|
syncs = conn.execute('SELECT * FROM syncs ORDER BY timestamp DESC').fetchall()
|
|
conn.close()
|
|
syncs = [dict_from_row(sync) for sync in syncs]
|
|
for sync in syncs:
|
|
sync['timestamp'] = datetime.fromtimestamp(sync['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
|
return render_template('account.html', folders=folders, syncs=syncs, email_account=email_account)
|
|
|
|
|
|
@app.route('/<email_account>/folder/<table_name>')
|
|
@auth.login_required
|
|
def folder(email_account, table_name):
|
|
conn = get_db_connection(email_account)
|
|
if not conn:
|
|
abort(404)
|
|
emails = conn.execute(f'SELECT * FROM {table_name} ORDER BY timestamp DESC').fetchall()
|
|
conn.close()
|
|
emails = [dict_from_row(email) for email in emails]
|
|
for item in emails:
|
|
item['timestamp'] = datetime.fromtimestamp(item['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
|
return render_template('folder.html', emails=emails, table_name=table_name, email_account=email_account)
|
|
|
|
|
|
@app.route('/<email_account>/email/<table_name>/<id>')
|
|
@auth.login_required
|
|
def email(email_account, table_name, id):
|
|
conn = get_db_connection(email_account)
|
|
if not conn:
|
|
abort(404)
|
|
email = dict_from_row(conn.execute(f'SELECT * FROM {table_name} WHERE id = ?', (id,)).fetchone())
|
|
conn.close()
|
|
email['timestamp'] = datetime.fromtimestamp(email['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
|
attachments = json.loads(email['attachments'])
|
|
return render_template('email.html', email=email, attachments=attachments, email_account=email_account)
|
|
|
|
|
|
@app.route('/<email_account>/attachments/<path:filename>')
|
|
@auth.login_required
|
|
def download_file(email_account, filename):
|
|
p = Path(os.environ.get('EMAIL_ARCHIVE_ROOT'), email_account, 'attachments', filename)
|
|
if not p.exists():
|
|
abort(404)
|
|
mimetype = magic.from_file(str(p), mime=True)
|
|
return send_from_directory(Path(os.environ.get('EMAIL_ARCHIVE_ROOT'), email_account, 'attachments'), filename, mimetype=mimetype)
|
|
|
|
|
|
def get_db_connection(email_account):
|
|
try:
|
|
conn = sqlite3.connect(os.path.join(os.environ.get('EMAIL_ARCHIVE_ROOT'), email_account, 'emails.db'))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
except:
|
|
return None
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', debug=True)
|