From ee3babc469194589993a8574a34e3a5544917ad8 Mon Sep 17 00:00:00 2001 From: Youen Date: Thu, 18 May 2023 23:12:14 +0200 Subject: [PATCH] Added code to avoid building the same document multiple times concurrently Also avoids compiling too many PDF at the same time (configurable) Also skips useless build tasks (if another build task is already pending to start) --- src/app.py | 6 ++ src/app_config.py | 4 + src/data/document.py | 14 ++- src/web_utils/task.py | 205 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 218 insertions(+), 11 deletions(-) diff --git a/src/app.py b/src/app.py index 262980c..5e3c523 100644 --- a/src/app.py +++ b/src/app.py @@ -2,6 +2,7 @@ import os import random import string from web_utils.business_exception import BusinessException +from web_utils.task import Resource from flask import Flask, render_template import data.document @@ -38,6 +39,11 @@ def create_app(): if app.config['BEHIND_REVERSE_PROXY']: from werkzeug.middleware.proxy_fix import ProxyFix app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) + + Resource.register('__thread', app.config['MAX_CONCURRENT_TASK_THREADS']) + Resource.register('__process', app.config['MAX_CONCURRENT_TASK_PROCESSES']) + Resource.register('document_files', 1, False) + Resource.register('intensive_task', app.config['MAX_CONCURRENT_INTENSIVE_TASKS']) from api import api_document app.register_blueprint(api_document.bp) diff --git a/src/app_config.py b/src/app_config.py index a0c629e..cccc8ba 100644 --- a/src/app_config.py +++ b/src/app_config.py @@ -8,3 +8,7 @@ DOCUMENT_ROOT_DIR = os.path.realpath(DATA_ROOT_DIR+'/doc') ADMIN_PASSWORD = '' # You must override this in config.py or the application won't start BEHIND_REVERSE_PROXY = False + +MAX_CONCURRENT_TASK_THREADS = 10 +MAX_CONCURRENT_TASK_PROCESSES = 5 +MAX_CONCURRENT_INTENSIVE_TASKS = 1 diff --git a/src/data/document.py b/src/data/document.py index 8098f02..12956fe 100644 --- a/src/data/document.py +++ b/src/data/document.py @@ -74,6 +74,8 @@ class Document: multiversion_build = self.settings['multiversion'] cmd = [] + + is_intensive_task = False # update source files from git cmd.append(['git', 'pull']) @@ -88,6 +90,8 @@ class Document: if type(self.settings['build_pdf']) is list: for pdf_branch_name in self.settings['build_pdf']: + is_intensive_task = True + # Extract the source files to a temporary directory cmd.append(['rm', '-rf', self.doc_path + '/tmp_source']) cmd.append(['mkdir', self.doc_path + '/tmp_source']) @@ -107,6 +111,8 @@ class Document: cmd.append(['make', 'html', 'BUILDDIR=../build']) if self.settings['build_pdf']: + is_intensive_task = True + # build the PDF version cmd.append(['make', 'pdf', 'BUILDDIR=../build']) @@ -120,8 +126,12 @@ class Document: else: cmd.append(['mv', self.doc_path + '/build/html/', self.doc_path + '/dist/']) - task = ProcessTask(cmd, cwd = self.doc_path + "/repo") - task.start() + resource_usage = [('document_files', self.doc_path)] + if is_intensive_task: + resource_usage.append(('intensive_task', '')) + + task = ProcessTask(cmd, cwd = self.doc_path + "/repo", resource_usage = resource_usage) + task.start(skip_if_another_pending = self.doc_path) return task diff --git a/src/web_utils/task.py b/src/web_utils/task.py index a8f7622..7212429 100644 --- a/src/web_utils/task.py +++ b/src/web_utils/task.py @@ -1,18 +1,85 @@ -from threading import Thread, Lock +from threading import Thread, Lock, RLock from io import StringIO from subprocess import Popen, PIPE, STDOUT, DEVNULL import uuid +from datetime import datetime from web_utils.business_exception import BusinessException +import time + +resources = {} tasks = {} +pending_tasks = {} + +task_management_mutex = RLock() +resource_mutex = RLock() + +class Resource: + def __init__(self, type_id, amount, is_single_resource): + self.type_id = type_id + self.amount = amount + self.is_single_resource = is_single_resource + self.available_amount = {} + + def get_available_amount(self, resource_id): + assert(resource_id == '' or not self.is_single_resource) + resource_mutex.acquire() + if resource_id in self.available_amount: + result = self.available_amount[resource_id] + else: + result = self.amount + resource_mutex.release() + return result + + def acquire(self, resource_id, amount): + assert(resource_id == '' or not self.is_single_resource) + resource_mutex.acquire() + if resource_id not in self.available_amount: + self.available_amount[resource_id] = self.amount + available_amount = self.available_amount[resource_id] + assert(available_amount >= amount) + self.available_amount[resource_id] = available_amount - amount + resource_mutex.release() + + def release(self, resource_id, amount): + assert(resource_id == '' or not self.is_single_resource) + resource_mutex.acquire() + assert(resource_id in self.available_amount) + available_amount = self.available_amount[resource_id] + assert(available_amount + amount <= self.amount) + self.available_amount[resource_id] = available_amount + amount + resource_mutex.release() + + @staticmethod + def register(type_id, amount, is_single_resource = True): + assert(type_id not in resources) + resources[type_id] = Resource(type_id, amount, is_single_resource) + + @staticmethod + def get(type_id): + return resources[type_id] -class Task(Thread): - def __init__(self): - Thread.__init__(self) - self.task_id = uuid.uuid4() +class Task: + def __init__(self, resource_usage): + self.__thread = None + self.task_id = str(uuid.uuid4()) self.__mutex = Lock() self.__output = StringIO() - tasks[str(self.task_id)] = self + self.__resource_usage = list(resource_usage) + self.__resource_usage.append(('__thread', '')) + self.__has_logged_waiting = '' + self.__skip_id = '' + self.__finished_at = None + + # Check all required resources are registered + for required_resource in self.__resource_usage: + resource = Resource.get(required_resource[0]) + single = (required_resource[1] == '') + assert(single == resource.is_single_resource) + + task_management_mutex.acquire() + tasks[self.task_id] = self + task_management_mutex.release() def print(self, str): self.__mutex.acquire() @@ -25,13 +92,133 @@ class Task(Thread): self.__mutex.release() return result + def start(self, skip_if_another_pending = None): + task_management_mutex.acquire() + + skip_me = False + self.__skip_id = skip_if_another_pending + if skip_if_another_pending is not None: + for id, other_task in pending_tasks.items(): + if other_task.__skip_id == skip_if_another_pending: + skip_me = True + break + + if skip_me: + print('Task ' + self.task_id + ' skipped') + self.print('Task skipped (another identical task is already pending for execution)\n') + else: + pending_tasks[self.task_id] = self + + task_management_mutex.release() + Task.__update_pending_tasks() + + def is_alive(self): + task_management_mutex.acquire() + alive = self.task_id in pending_tasks + if self.__thread is not None: + alive = self.__thread.is_alive() + task_management_mutex.release() + return alive + + def join(self): + task_management_mutex.acquire() + + while self.__thread is None and self.is_alive(): + task_management_mutex.release() + time.sleep(1) + task_management_mutex.acquire() + + if self.__thread is not None: + task_management_mutex.release() + self.__thread.join() + else: + task_management_mutex.release() + + def __start_thread(self): + print('Starting task ' + self.task_id) + task_management_mutex.acquire() + assert(self.__thread is None) + self.__thread = Thread(target=self.__run) + task_management_mutex.release() + self.__thread.start() + + def __run(self): + self.run() + print('Task ' + self.task_id + ' has finished') + self.__finished_at = datetime.now() + Task.__free_resources(self) + Task.__update_pending_tasks() + @staticmethod def get(task_id): - return tasks[task_id] + task_management_mutex.acquire() + result = tasks[task_id] + task_management_mutex.release() + return result + @staticmethod + def __update_pending_tasks(): + task_management_mutex.acquire() + + # Clear finished tasks + now = datetime.now() + to_remove = [] + for key, task in tasks.items(): + if task.__finished_at is not None and (now - task.__finished_at).total_seconds() > 60: + to_remove.append(key) + for key in to_remove: + print('Dropping task ' + key) + tasks.pop(key) + + for key, task in pending_tasks.items(): + can_start = True + + all_wait_causes = '' + has_same_wait_cause = False + + resource_mutex.acquire() + for required_resource in task.__resource_usage: + resource = Resource.get(required_resource[0]) + if resource.get_available_amount(required_resource[1]) < 1: + can_start = False + wait_cause = required_resource[0] + (' ('+required_resource[1]+')' if required_resource[1] != '' else '') + if wait_cause == task.__has_logged_waiting: + has_same_wait_cause = True + all_wait_causes = all_wait_causes + (', ' if all_wait_causes != '' else '') + wait_cause + + if not can_start: + if not has_same_wait_cause: + task.__has_logged_waiting = wait_cause + task.print('Waiting for resource(s): '+all_wait_causes+'\n') + resource_mutex.release() + continue + + for required_resource in task.__resource_usage: + resource = Resource.get(required_resource[0]) + resource.acquire(required_resource[1], 1) + resource_mutex.release() + + pending_tasks.pop(key) + task.__start_thread() + break + + task_management_mutex.release() + + @staticmethod + def __free_resources(task): + task_management_mutex.acquire() + resource_mutex.acquire() + for required_resource in task.__resource_usage: + resource = Resource.get(required_resource[0]) + resource.release(required_resource[1], 1) + resource_mutex.release() + task_management_mutex.release() + class ProcessTask(Task): - def __init__(self, commands, cwd = None, hide_passwords = []): - Task.__init__(self) + def __init__(self, commands, cwd = None, hide_passwords = [], resource_usage = []): + resource_usage = list(resource_usage) + resource_usage.append(('__process', '')) + Task.__init__(self, resource_usage) self.__commands = commands self.__cwd = cwd self.__hide_passwords = hide_passwords