140 lines
4.9 KiB
Python
140 lines
4.9 KiB
Python
import multiprocessing
|
|
from datetime import datetime
|
|
from multiprocessing import Process
|
|
|
|
from flask import Blueprint, jsonify, request
|
|
|
|
from .queue import job_queue, job_status, queued_jobs
|
|
from .. import shared
|
|
from ..job_tracker import Job, JobManager
|
|
from ..list.video_list import VideoList
|
|
from ... import opts
|
|
from ...helpers.misc import get_elapsed_time_from_ms
|
|
from ...mysql import query
|
|
from ...process.main import do_download
|
|
|
|
job_route = Blueprint('job', __name__)
|
|
|
|
JobManager.register('Job', Job)
|
|
|
|
|
|
@job_route.route('/start', methods=['POST'])
|
|
def submit_job():
|
|
data = request.get_json(silent=True)
|
|
if not isinstance(data, dict) or not data:
|
|
return jsonify({'error': 'Data should be a key-value mapping.'}), 400
|
|
|
|
target_list = data.get('list')
|
|
ignore_downloaded = data.get('ignore-downloaded', False)
|
|
if not target_list:
|
|
return jsonify({'error': 'list parameter is required'}), 400
|
|
|
|
l = VideoList.query.filter((VideoList.name == target_list) | (VideoList.id == target_list)).first()
|
|
if not l:
|
|
return jsonify({'error': 'list not found'}), 400
|
|
|
|
running_lists = {v.list_id(): v for k, v in shared.global_job_tracker.jobs.items() if v.status() == 'running'}
|
|
if l.id in running_lists.keys():
|
|
return jsonify({'error': 'job for list already running', 'job_id': running_lists[l.id].id()}), 409
|
|
|
|
manager = JobManager()
|
|
manager.start()
|
|
job = manager.Job(l.id)
|
|
shared.global_job_tracker.add_job(job)
|
|
|
|
query('INSERT INTO `jobs` (`job_id`, `result`, `started`) VALUES (%s, %s, UNIX_TIMESTAMP())', (job.id(), 'running'))
|
|
|
|
# Add the job to the queue and the list of queued jobs
|
|
job_queue.put((job, l.id, l.url, opts.base_output, ignore_downloaded))
|
|
queued_jobs.append(job.id())
|
|
|
|
# Update the job status
|
|
job_status[job.id] = 'queued'
|
|
queued = True
|
|
|
|
# Start a new process for each job if it's not already running
|
|
if job_queue.qsize() > 0 and not any(p.name == 'do_download' and p.is_alive() for p in multiprocessing.active_children()):
|
|
p = Process(target=do_download, name='do_download')
|
|
p.start()
|
|
queued = False
|
|
|
|
message = 'Job queued' if queued else 'Job started'
|
|
status = 'started' if not queued else 'queued'
|
|
adjusted_queue_size = job_queue.qsize() - 1 if job_queue.qsize() > 0 else 0
|
|
|
|
return jsonify({'status': status, 'message': message, 'job_id': job.id(), 'queue_size': adjusted_queue_size}), 200
|
|
|
|
|
|
@job_route.route('/status', methods=['GET'])
|
|
def job_get_status():
|
|
job_id = request.args.get('id')
|
|
if not job_id:
|
|
return jsonify({'error': 'id parameter is required'}), 400
|
|
|
|
in_queue = job_id in queued_jobs
|
|
status = job_status.get(job_id)
|
|
if not status:
|
|
return jsonify({'error': 'Job not found'}), 400
|
|
|
|
job = shared.global_job_tracker.get_job(job_id)
|
|
if not job:
|
|
return jsonify({'error': 'Job not found'}), 400
|
|
|
|
if job.status() == 'running':
|
|
elapsed_s = int(get_elapsed_time_from_ms(job.start_time()).total_seconds())
|
|
else:
|
|
elapsed_s = int((datetime.fromtimestamp(job.end_time() / 1000.0) - datetime.fromtimestamp(job.start_time() / 1000.0)).total_seconds())
|
|
|
|
return jsonify({'in_queue': in_queue, 'job': job.dict(), 'elapsed': elapsed_s}), 200
|
|
|
|
|
|
@job_route.route('/result', methods=['GET'])
|
|
def job_result():
|
|
job_id = request.args.get('id')
|
|
if not job_id:
|
|
return jsonify({'error': 'id parameter is required'}), 400
|
|
status_query = query('SELECT * FROM `jobs` WHERE `job_id`=%s', (job_id,), dictionary=True)
|
|
if len(status_query) > 1:
|
|
return jsonify({'error': 'multiple jobs for that ID'}), 500
|
|
if len(status_query) == 0:
|
|
return jsonify({'error': 'no jobs for that ID'}), 400
|
|
del status_query[0]['id']
|
|
return jsonify(status_query[0]), 200
|
|
|
|
|
|
@job_route.route('/log', methods=['GET'])
|
|
def job_log():
|
|
job_id = request.args.get('id')
|
|
if not job_id:
|
|
return jsonify({'error': 'id parameter is required'}), 400
|
|
job_query = query('''
|
|
SELECT j.*, l.*
|
|
FROM `jobs` j
|
|
LEFT JOIN `logging_job_output` l ON j.job_id = l.job_id
|
|
WHERE j.job_id=%s
|
|
''', (job_id,), dictionary=True)
|
|
if len(job_query) == 0:
|
|
return jsonify({'error': 'Job not found'}), 400
|
|
if job_query[0]['level'] is None:
|
|
return jsonify({'error': 'No logs for this job'}), 400
|
|
|
|
result = []
|
|
for line in job_query:
|
|
l = line.copy()
|
|
del l["job_id"]
|
|
del l["result"]
|
|
del l["started"]
|
|
del l['id']
|
|
result.append(l)
|
|
return jsonify({
|
|
'items': result,
|
|
'job_id': job_query[0]['job_id'],
|
|
'result': job_query[0]['result'],
|
|
'started': job_query[0]['started']
|
|
}), 200
|
|
|
|
|
|
@job_route.route('/active', methods=['GET'])
|
|
def job_job_status():
|
|
return jsonify({'jobs': [k for k, v in shared.global_job_tracker.jobs.items() if v.status() == 'running'], 'queue': list(queued_jobs), 'queue_size': job_queue.qsize()}), 200
|