Browse Source

Added code to avoid building the same document multiple times concurrently

Also avoids compiling too many PDF at the same time (configurable)
Also skips useless build tasks (if another build task is already pending to start)
master
Youen 2 years ago
parent
commit
ee3babc469
  1. 6
      src/app.py
  2. 4
      src/app_config.py
  3. 14
      src/data/document.py
  4. 205
      src/web_utils/task.py

6
src/app.py

@ -2,6 +2,7 @@ import os
import random
import string
from web_utils.business_exception import BusinessException
from web_utils.task import Resource
from flask import Flask, render_template
import data.document
@ -39,6 +40,11 @@ def create_app():
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
Resource.register('__thread', app.config['MAX_CONCURRENT_TASK_THREADS'])
Resource.register('__process', app.config['MAX_CONCURRENT_TASK_PROCESSES'])
Resource.register('document_files', 1, False)
Resource.register('intensive_task', app.config['MAX_CONCURRENT_INTENSIVE_TASKS'])
from api import api_document
app.register_blueprint(api_document.bp)

4
src/app_config.py

@ -8,3 +8,7 @@ DOCUMENT_ROOT_DIR = os.path.realpath(DATA_ROOT_DIR+'/doc')
ADMIN_PASSWORD = '' # You must override this in config.py or the application won't start
BEHIND_REVERSE_PROXY = False
MAX_CONCURRENT_TASK_THREADS = 10
MAX_CONCURRENT_TASK_PROCESSES = 5
MAX_CONCURRENT_INTENSIVE_TASKS = 1

14
src/data/document.py

@ -75,6 +75,8 @@ class Document:
cmd = []
is_intensive_task = False
# update source files from git
cmd.append(['git', 'pull'])
@ -88,6 +90,8 @@ class Document:
if type(self.settings['build_pdf']) is list:
for pdf_branch_name in self.settings['build_pdf']:
is_intensive_task = True
# Extract the source files to a temporary directory
cmd.append(['rm', '-rf', self.doc_path + '/tmp_source'])
cmd.append(['mkdir', self.doc_path + '/tmp_source'])
@ -107,6 +111,8 @@ class Document:
cmd.append(['make', 'html', 'BUILDDIR=../build'])
if self.settings['build_pdf']:
is_intensive_task = True
# build the PDF version
cmd.append(['make', 'pdf', 'BUILDDIR=../build'])
@ -120,8 +126,12 @@ class Document:
else:
cmd.append(['mv', self.doc_path + '/build/html/', self.doc_path + '/dist/'])
task = ProcessTask(cmd, cwd = self.doc_path + "/repo")
task.start()
resource_usage = [('document_files', self.doc_path)]
if is_intensive_task:
resource_usage.append(('intensive_task', ''))
task = ProcessTask(cmd, cwd = self.doc_path + "/repo", resource_usage = resource_usage)
task.start(skip_if_another_pending = self.doc_path)
return task

205
src/web_utils/task.py

@ -1,18 +1,85 @@
from threading import Thread, Lock
from threading import Thread, Lock, RLock
from io import StringIO
from subprocess import Popen, PIPE, STDOUT, DEVNULL
import uuid
from datetime import datetime
from web_utils.business_exception import BusinessException
import time
resources = {}
tasks = {}
pending_tasks = {}
task_management_mutex = RLock()
resource_mutex = RLock()
class Resource:
def __init__(self, type_id, amount, is_single_resource):
self.type_id = type_id
self.amount = amount
self.is_single_resource = is_single_resource
self.available_amount = {}
def get_available_amount(self, resource_id):
assert(resource_id == '' or not self.is_single_resource)
resource_mutex.acquire()
if resource_id in self.available_amount:
result = self.available_amount[resource_id]
else:
result = self.amount
resource_mutex.release()
return result
def acquire(self, resource_id, amount):
assert(resource_id == '' or not self.is_single_resource)
resource_mutex.acquire()
if resource_id not in self.available_amount:
self.available_amount[resource_id] = self.amount
available_amount = self.available_amount[resource_id]
assert(available_amount >= amount)
self.available_amount[resource_id] = available_amount - amount
resource_mutex.release()
def release(self, resource_id, amount):
assert(resource_id == '' or not self.is_single_resource)
resource_mutex.acquire()
assert(resource_id in self.available_amount)
available_amount = self.available_amount[resource_id]
assert(available_amount + amount <= self.amount)
self.available_amount[resource_id] = available_amount + amount
resource_mutex.release()
@staticmethod
def register(type_id, amount, is_single_resource = True):
assert(type_id not in resources)
resources[type_id] = Resource(type_id, amount, is_single_resource)
class Task(Thread):
def __init__(self):
Thread.__init__(self)
self.task_id = uuid.uuid4()
@staticmethod
def get(type_id):
return resources[type_id]
class Task:
def __init__(self, resource_usage):
self.__thread = None
self.task_id = str(uuid.uuid4())
self.__mutex = Lock()
self.__output = StringIO()
tasks[str(self.task_id)] = self
self.__resource_usage = list(resource_usage)
self.__resource_usage.append(('__thread', ''))
self.__has_logged_waiting = ''
self.__skip_id = ''
self.__finished_at = None
# Check all required resources are registered
for required_resource in self.__resource_usage:
resource = Resource.get(required_resource[0])
single = (required_resource[1] == '')
assert(single == resource.is_single_resource)
task_management_mutex.acquire()
tasks[self.task_id] = self
task_management_mutex.release()
def print(self, str):
self.__mutex.acquire()
@ -25,13 +92,133 @@ class Task(Thread):
self.__mutex.release()
return result
def start(self, skip_if_another_pending = None):
task_management_mutex.acquire()
skip_me = False
self.__skip_id = skip_if_another_pending
if skip_if_another_pending is not None:
for id, other_task in pending_tasks.items():
if other_task.__skip_id == skip_if_another_pending:
skip_me = True
break
if skip_me:
print('Task ' + self.task_id + ' skipped')
self.print('Task skipped (another identical task is already pending for execution)\n')
else:
pending_tasks[self.task_id] = self
task_management_mutex.release()
Task.__update_pending_tasks()
def is_alive(self):
task_management_mutex.acquire()
alive = self.task_id in pending_tasks
if self.__thread is not None:
alive = self.__thread.is_alive()
task_management_mutex.release()
return alive
def join(self):
task_management_mutex.acquire()
while self.__thread is None and self.is_alive():
task_management_mutex.release()
time.sleep(1)
task_management_mutex.acquire()
if self.__thread is not None:
task_management_mutex.release()
self.__thread.join()
else:
task_management_mutex.release()
def __start_thread(self):
print('Starting task ' + self.task_id)
task_management_mutex.acquire()
assert(self.__thread is None)
self.__thread = Thread(target=self.__run)
task_management_mutex.release()
self.__thread.start()
def __run(self):
self.run()
print('Task ' + self.task_id + ' has finished')
self.__finished_at = datetime.now()
Task.__free_resources(self)
Task.__update_pending_tasks()
@staticmethod
def get(task_id):
return tasks[task_id]
task_management_mutex.acquire()
result = tasks[task_id]
task_management_mutex.release()
return result
@staticmethod
def __update_pending_tasks():
task_management_mutex.acquire()
# Clear finished tasks
now = datetime.now()
to_remove = []
for key, task in tasks.items():
if task.__finished_at is not None and (now - task.__finished_at).total_seconds() > 60:
to_remove.append(key)
for key in to_remove:
print('Dropping task ' + key)
tasks.pop(key)
for key, task in pending_tasks.items():
can_start = True
all_wait_causes = ''
has_same_wait_cause = False
resource_mutex.acquire()
for required_resource in task.__resource_usage:
resource = Resource.get(required_resource[0])
if resource.get_available_amount(required_resource[1]) < 1:
can_start = False
wait_cause = required_resource[0] + (' ('+required_resource[1]+')' if required_resource[1] != '' else '')
if wait_cause == task.__has_logged_waiting:
has_same_wait_cause = True
all_wait_causes = all_wait_causes + (', ' if all_wait_causes != '' else '') + wait_cause
if not can_start:
if not has_same_wait_cause:
task.__has_logged_waiting = wait_cause
task.print('Waiting for resource(s): '+all_wait_causes+'\n')
resource_mutex.release()
continue
for required_resource in task.__resource_usage:
resource = Resource.get(required_resource[0])
resource.acquire(required_resource[1], 1)
resource_mutex.release()
pending_tasks.pop(key)
task.__start_thread()
break
task_management_mutex.release()
@staticmethod
def __free_resources(task):
task_management_mutex.acquire()
resource_mutex.acquire()
for required_resource in task.__resource_usage:
resource = Resource.get(required_resource[0])
resource.release(required_resource[1], 1)
resource_mutex.release()
task_management_mutex.release()
class ProcessTask(Task):
def __init__(self, commands, cwd = None, hide_passwords = []):
Task.__init__(self)
def __init__(self, commands, cwd = None, hide_passwords = [], resource_usage = []):
resource_usage = list(resource_usage)
resource_usage.append(('__process', ''))
Task.__init__(self, resource_usage)
self.__commands = commands
self.__cwd = cwd
self.__hide_passwords = hide_passwords

Loading…
Cancel
Save