diff --git a/src/app.py b/src/app.py index 262980c..5e3c523 100644 --- a/src/app.py +++ b/src/app.py @@ -2,6 +2,7 @@ import os import random import string from web_utils.business_exception import BusinessException +from web_utils.task import Resource from flask import Flask, render_template import data.document @@ -38,6 +39,11 @@ def create_app(): if app.config['BEHIND_REVERSE_PROXY']: from werkzeug.middleware.proxy_fix import ProxyFix app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) + + Resource.register('__thread', app.config['MAX_CONCURRENT_TASK_THREADS']) + Resource.register('__process', app.config['MAX_CONCURRENT_TASK_PROCESSES']) + Resource.register('document_files', 1, False) + Resource.register('intensive_task', app.config['MAX_CONCURRENT_INTENSIVE_TASKS']) from api import api_document app.register_blueprint(api_document.bp) diff --git a/src/app_config.py b/src/app_config.py index a0c629e..cccc8ba 100644 --- a/src/app_config.py +++ b/src/app_config.py @@ -8,3 +8,7 @@ DOCUMENT_ROOT_DIR = os.path.realpath(DATA_ROOT_DIR+'/doc') ADMIN_PASSWORD = '' # You must override this in config.py or the application won't start BEHIND_REVERSE_PROXY = False + +MAX_CONCURRENT_TASK_THREADS = 10 +MAX_CONCURRENT_TASK_PROCESSES = 5 +MAX_CONCURRENT_INTENSIVE_TASKS = 1 diff --git a/src/data/document.py b/src/data/document.py index 8098f02..12956fe 100644 --- a/src/data/document.py +++ b/src/data/document.py @@ -74,6 +74,8 @@ class Document: multiversion_build = self.settings['multiversion'] cmd = [] + + is_intensive_task = False # update source files from git cmd.append(['git', 'pull']) @@ -88,6 +90,8 @@ class Document: if type(self.settings['build_pdf']) is list: for pdf_branch_name in self.settings['build_pdf']: + is_intensive_task = True + # Extract the source files to a temporary directory cmd.append(['rm', '-rf', self.doc_path + '/tmp_source']) cmd.append(['mkdir', self.doc_path + '/tmp_source']) @@ -107,6 +111,8 @@ class Document: cmd.append(['make', 'html', 'BUILDDIR=../build']) if self.settings['build_pdf']: + is_intensive_task = True + # build the PDF version cmd.append(['make', 'pdf', 'BUILDDIR=../build']) @@ -120,8 +126,12 @@ class Document: else: cmd.append(['mv', self.doc_path + '/build/html/', self.doc_path + '/dist/']) - task = ProcessTask(cmd, cwd = self.doc_path + "/repo") - task.start() + resource_usage = [('document_files', self.doc_path)] + if is_intensive_task: + resource_usage.append(('intensive_task', '')) + + task = ProcessTask(cmd, cwd = self.doc_path + "/repo", resource_usage = resource_usage) + task.start(skip_if_another_pending = self.doc_path) return task diff --git a/src/web_utils/task.py b/src/web_utils/task.py index a8f7622..7212429 100644 --- a/src/web_utils/task.py +++ b/src/web_utils/task.py @@ -1,18 +1,85 @@ -from threading import Thread, Lock +from threading import Thread, Lock, RLock from io import StringIO from subprocess import Popen, PIPE, STDOUT, DEVNULL import uuid +from datetime import datetime from web_utils.business_exception import BusinessException +import time + +resources = {} tasks = {} +pending_tasks = {} + +task_management_mutex = RLock() +resource_mutex = RLock() + +class Resource: + def __init__(self, type_id, amount, is_single_resource): + self.type_id = type_id + self.amount = amount + self.is_single_resource = is_single_resource + self.available_amount = {} + + def get_available_amount(self, resource_id): + assert(resource_id == '' or not self.is_single_resource) + resource_mutex.acquire() + if resource_id in self.available_amount: + result = self.available_amount[resource_id] + else: + result = self.amount + resource_mutex.release() + return result + + def acquire(self, resource_id, amount): + assert(resource_id == '' or not self.is_single_resource) + resource_mutex.acquire() + if resource_id not in self.available_amount: + self.available_amount[resource_id] = self.amount + available_amount = self.available_amount[resource_id] + assert(available_amount >= amount) + self.available_amount[resource_id] = available_amount - amount + resource_mutex.release() + + def release(self, resource_id, amount): + assert(resource_id == '' or not self.is_single_resource) + resource_mutex.acquire() + assert(resource_id in self.available_amount) + available_amount = self.available_amount[resource_id] + assert(available_amount + amount <= self.amount) + self.available_amount[resource_id] = available_amount + amount + resource_mutex.release() + + @staticmethod + def register(type_id, amount, is_single_resource = True): + assert(type_id not in resources) + resources[type_id] = Resource(type_id, amount, is_single_resource) + + @staticmethod + def get(type_id): + return resources[type_id] -class Task(Thread): - def __init__(self): - Thread.__init__(self) - self.task_id = uuid.uuid4() +class Task: + def __init__(self, resource_usage): + self.__thread = None + self.task_id = str(uuid.uuid4()) self.__mutex = Lock() self.__output = StringIO() - tasks[str(self.task_id)] = self + self.__resource_usage = list(resource_usage) + self.__resource_usage.append(('__thread', '')) + self.__has_logged_waiting = '' + self.__skip_id = '' + self.__finished_at = None + + # Check all required resources are registered + for required_resource in self.__resource_usage: + resource = Resource.get(required_resource[0]) + single = (required_resource[1] == '') + assert(single == resource.is_single_resource) + + task_management_mutex.acquire() + tasks[self.task_id] = self + task_management_mutex.release() def print(self, str): self.__mutex.acquire() @@ -25,13 +92,133 @@ class Task(Thread): self.__mutex.release() return result + def start(self, skip_if_another_pending = None): + task_management_mutex.acquire() + + skip_me = False + self.__skip_id = skip_if_another_pending + if skip_if_another_pending is not None: + for id, other_task in pending_tasks.items(): + if other_task.__skip_id == skip_if_another_pending: + skip_me = True + break + + if skip_me: + print('Task ' + self.task_id + ' skipped') + self.print('Task skipped (another identical task is already pending for execution)\n') + else: + pending_tasks[self.task_id] = self + + task_management_mutex.release() + Task.__update_pending_tasks() + + def is_alive(self): + task_management_mutex.acquire() + alive = self.task_id in pending_tasks + if self.__thread is not None: + alive = self.__thread.is_alive() + task_management_mutex.release() + return alive + + def join(self): + task_management_mutex.acquire() + + while self.__thread is None and self.is_alive(): + task_management_mutex.release() + time.sleep(1) + task_management_mutex.acquire() + + if self.__thread is not None: + task_management_mutex.release() + self.__thread.join() + else: + task_management_mutex.release() + + def __start_thread(self): + print('Starting task ' + self.task_id) + task_management_mutex.acquire() + assert(self.__thread is None) + self.__thread = Thread(target=self.__run) + task_management_mutex.release() + self.__thread.start() + + def __run(self): + self.run() + print('Task ' + self.task_id + ' has finished') + self.__finished_at = datetime.now() + Task.__free_resources(self) + Task.__update_pending_tasks() + @staticmethod def get(task_id): - return tasks[task_id] + task_management_mutex.acquire() + result = tasks[task_id] + task_management_mutex.release() + return result + @staticmethod + def __update_pending_tasks(): + task_management_mutex.acquire() + + # Clear finished tasks + now = datetime.now() + to_remove = [] + for key, task in tasks.items(): + if task.__finished_at is not None and (now - task.__finished_at).total_seconds() > 60: + to_remove.append(key) + for key in to_remove: + print('Dropping task ' + key) + tasks.pop(key) + + for key, task in pending_tasks.items(): + can_start = True + + all_wait_causes = '' + has_same_wait_cause = False + + resource_mutex.acquire() + for required_resource in task.__resource_usage: + resource = Resource.get(required_resource[0]) + if resource.get_available_amount(required_resource[1]) < 1: + can_start = False + wait_cause = required_resource[0] + (' ('+required_resource[1]+')' if required_resource[1] != '' else '') + if wait_cause == task.__has_logged_waiting: + has_same_wait_cause = True + all_wait_causes = all_wait_causes + (', ' if all_wait_causes != '' else '') + wait_cause + + if not can_start: + if not has_same_wait_cause: + task.__has_logged_waiting = wait_cause + task.print('Waiting for resource(s): '+all_wait_causes+'\n') + resource_mutex.release() + continue + + for required_resource in task.__resource_usage: + resource = Resource.get(required_resource[0]) + resource.acquire(required_resource[1], 1) + resource_mutex.release() + + pending_tasks.pop(key) + task.__start_thread() + break + + task_management_mutex.release() + + @staticmethod + def __free_resources(task): + task_management_mutex.acquire() + resource_mutex.acquire() + for required_resource in task.__resource_usage: + resource = Resource.get(required_resource[0]) + resource.release(required_resource[1], 1) + resource_mutex.release() + task_management_mutex.release() + class ProcessTask(Task): - def __init__(self, commands, cwd = None, hide_passwords = []): - Task.__init__(self) + def __init__(self, commands, cwd = None, hide_passwords = [], resource_usage = []): + resource_usage = list(resource_usage) + resource_usage.append(('__process', '')) + Task.__init__(self, resource_usage) self.__commands = commands self.__cwd = cwd self.__hide_passwords = hide_passwords