imap-archiver/server.py

93 lines
2.7 KiB
Python

import base64
import json
import os
import sqlite3
from datetime import datetime
from pathlib import Path
import magic
from flask import Flask, render_template, send_from_directory
from flask_httpauth import HTTPBasicAuth
app = Flask(__name__)
if not os.environ.get('SQLITE3_DB'):
raise Exception('SQLITE3_DB not set.')
if not os.environ.get('HTTP_AUTH_PASS'):
raise Exception('HTTP_AUTH_PASS not set.')
try:
user_password = base64.b64decode(os.environ.get('HTTP_AUTH_PASS')).decode().replace('\n', '')
except UnicodeDecodeError:
raise Exception('Cannot decode password. Is it base64 encoded?')
auth = HTTPBasicAuth()
users = {
'admin': user_password,
}
@auth.get_password
def get_pw(username):
if username in users:
return users.get(username)
return None
def get_db_connection():
conn = sqlite3.connect(os.environ.get('SQLITE3_DB'))
conn.row_factory = sqlite3.Row
return conn
def dict_from_row(row):
return dict(zip(row.keys(), row))
@app.route('/')
@auth.login_required
def index():
conn = get_db_connection()
folders = conn.execute('SELECT name, table_name FROM folders_mapping').fetchall()
syncs = conn.execute('SELECT * FROM syncs ORDER BY timestamp DESC').fetchall()
conn.close()
syncs = [dict_from_row(sync) for sync in syncs]
for sync in syncs:
sync['timestamp'] = datetime.fromtimestamp(sync['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
return render_template('index.html', folders=folders, syncs=syncs)
@app.route('/folder/<table_name>')
@auth.login_required
def folder(table_name):
conn = get_db_connection()
emails = conn.execute(f'SELECT * FROM {table_name} ORDER BY timestamp DESC').fetchall()
conn.close()
emails = [dict_from_row(email) for email in emails]
for email in emails:
email['timestamp'] = datetime.fromtimestamp(email['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
return render_template('folder.html', emails=emails, table_name=table_name)
@app.route('/email/<table_name>/<id>')
@auth.login_required
def email(table_name, id):
conn = get_db_connection()
email = conn.execute(f'SELECT * FROM {table_name} WHERE id = ?', (id,)).fetchone()
conn.close()
email = dict_from_row(email)
email['timestamp'] = datetime.fromtimestamp(email['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
attachments = json.loads(email['attachments'])
return render_template('email.html', email=email, attachments=attachments)
@app.route('/attachments/<path:filename>')
@auth.login_required
def download_file(filename):
mimetype = magic.from_file(str(Path('attachments', filename)), mime=True)
return send_from_directory('attachments', filename, mimetype=mimetype)
if __name__ == '__main__':
app.run(host='0.0.0.0', debug=True)