93 lines
2.7 KiB
Python
93 lines
2.7 KiB
Python
import base64
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import magic
|
|
from flask import Flask, render_template, send_from_directory
|
|
from flask_httpauth import HTTPBasicAuth
|
|
|
|
app = Flask(__name__)
|
|
|
|
if not os.environ.get('SQLITE3_DB'):
|
|
raise Exception('SQLITE3_DB not set.')
|
|
if not os.environ.get('HTTP_AUTH_PASS'):
|
|
raise Exception('HTTP_AUTH_PASS not set.')
|
|
|
|
try:
|
|
user_password = base64.b64decode(os.environ.get('HTTP_AUTH_PASS')).decode().replace('\n', '')
|
|
except UnicodeDecodeError:
|
|
raise Exception('Cannot decode password. Is it base64 encoded?')
|
|
|
|
auth = HTTPBasicAuth()
|
|
users = {
|
|
'admin': user_password,
|
|
}
|
|
|
|
|
|
@auth.get_password
|
|
def get_pw(username):
|
|
if username in users:
|
|
return users.get(username)
|
|
return None
|
|
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect(os.environ.get('SQLITE3_DB'))
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def dict_from_row(row):
|
|
return dict(zip(row.keys(), row))
|
|
|
|
|
|
@app.route('/')
|
|
@auth.login_required
|
|
def index():
|
|
conn = get_db_connection()
|
|
folders = conn.execute('SELECT name, table_name FROM folders_mapping').fetchall()
|
|
syncs = conn.execute('SELECT * FROM syncs ORDER BY timestamp DESC').fetchall()
|
|
conn.close()
|
|
syncs = [dict_from_row(sync) for sync in syncs]
|
|
for sync in syncs:
|
|
sync['timestamp'] = datetime.fromtimestamp(sync['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
|
return render_template('index.html', folders=folders, syncs=syncs)
|
|
|
|
|
|
@app.route('/folder/<table_name>')
|
|
@auth.login_required
|
|
def folder(table_name):
|
|
conn = get_db_connection()
|
|
emails = conn.execute(f'SELECT * FROM {table_name} ORDER BY timestamp DESC').fetchall()
|
|
conn.close()
|
|
emails = [dict_from_row(email) for email in emails]
|
|
for email in emails:
|
|
email['timestamp'] = datetime.fromtimestamp(email['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
|
return render_template('folder.html', emails=emails, table_name=table_name)
|
|
|
|
|
|
@app.route('/email/<table_name>/<id>')
|
|
@auth.login_required
|
|
def email(table_name, id):
|
|
conn = get_db_connection()
|
|
email = conn.execute(f'SELECT * FROM {table_name} WHERE id = ?', (id,)).fetchone()
|
|
conn.close()
|
|
email = dict_from_row(email)
|
|
email['timestamp'] = datetime.fromtimestamp(email['timestamp']).strftime('%Y-%m-%d %H:%M:%S')
|
|
attachments = json.loads(email['attachments'])
|
|
return render_template('email.html', email=email, attachments=attachments)
|
|
|
|
|
|
@app.route('/attachments/<path:filename>')
|
|
@auth.login_required
|
|
def download_file(filename):
|
|
mimetype = magic.from_file(str(Path('attachments', filename)), mime=True)
|
|
return send_from_directory('attachments', filename, mimetype=mimetype)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', debug=True)
|