You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
254 lines
7.5 KiB
254 lines
7.5 KiB
from threading import Thread, Lock, RLock |
|
from io import StringIO |
|
from subprocess import Popen, PIPE, STDOUT, DEVNULL |
|
import uuid |
|
from datetime import datetime |
|
from web_utils.business_exception import BusinessException |
|
import time |
|
|
|
resources = {} |
|
|
|
tasks = {} |
|
pending_tasks = {} |
|
|
|
task_management_mutex = RLock() |
|
resource_mutex = RLock() |
|
|
|
class Resource: |
|
def __init__(self, type_id, amount, is_single_resource): |
|
self.type_id = type_id |
|
self.amount = amount |
|
self.is_single_resource = is_single_resource |
|
self.available_amount = {} |
|
|
|
def get_available_amount(self, resource_id): |
|
assert(resource_id == '' or not self.is_single_resource) |
|
resource_mutex.acquire() |
|
if resource_id in self.available_amount: |
|
result = self.available_amount[resource_id] |
|
else: |
|
result = self.amount |
|
resource_mutex.release() |
|
return result |
|
|
|
def acquire(self, resource_id, amount): |
|
assert(resource_id == '' or not self.is_single_resource) |
|
resource_mutex.acquire() |
|
if resource_id not in self.available_amount: |
|
self.available_amount[resource_id] = self.amount |
|
available_amount = self.available_amount[resource_id] |
|
assert(available_amount >= amount) |
|
self.available_amount[resource_id] = available_amount - amount |
|
resource_mutex.release() |
|
|
|
def release(self, resource_id, amount): |
|
assert(resource_id == '' or not self.is_single_resource) |
|
resource_mutex.acquire() |
|
assert(resource_id in self.available_amount) |
|
available_amount = self.available_amount[resource_id] |
|
assert(available_amount + amount <= self.amount) |
|
self.available_amount[resource_id] = available_amount + amount |
|
resource_mutex.release() |
|
|
|
@staticmethod |
|
def register(type_id, amount, is_single_resource = True): |
|
assert(type_id not in resources) |
|
resources[type_id] = Resource(type_id, amount, is_single_resource) |
|
|
|
@staticmethod |
|
def get(type_id): |
|
return resources[type_id] |
|
|
|
class Task: |
|
def __init__(self, resource_usage): |
|
self.__thread = None |
|
self.task_id = str(uuid.uuid4()) |
|
self.__mutex = Lock() |
|
self.__output = StringIO() |
|
self.__resource_usage = list(resource_usage) |
|
self.__resource_usage.append(('__thread', '')) |
|
self.__has_logged_waiting = '' |
|
self.__skip_id = '' |
|
self.__finished_at = None |
|
|
|
# Check all required resources are registered |
|
for required_resource in self.__resource_usage: |
|
resource = Resource.get(required_resource[0]) |
|
single = (required_resource[1] == '') |
|
assert(single == resource.is_single_resource) |
|
|
|
task_management_mutex.acquire() |
|
tasks[self.task_id] = self |
|
task_management_mutex.release() |
|
|
|
def print(self, str): |
|
self.__mutex.acquire() |
|
self.__output.write(str) |
|
self.__mutex.release() |
|
|
|
def get_output_str(self): |
|
self.__mutex.acquire() |
|
result = self.__output.getvalue() |
|
self.__mutex.release() |
|
return result |
|
|
|
def start(self, skip_if_another_pending = None): |
|
task_management_mutex.acquire() |
|
|
|
skip_me = False |
|
self.__skip_id = skip_if_another_pending |
|
if skip_if_another_pending is not None: |
|
for id, other_task in pending_tasks.items(): |
|
if other_task.__skip_id == skip_if_another_pending: |
|
skip_me = True |
|
break |
|
|
|
if skip_me: |
|
print('Task ' + self.task_id + ' skipped') |
|
self.print('Task skipped (another identical task is already pending for execution)\n') |
|
else: |
|
pending_tasks[self.task_id] = self |
|
|
|
task_management_mutex.release() |
|
Task.__update_pending_tasks() |
|
|
|
def is_alive(self): |
|
task_management_mutex.acquire() |
|
alive = self.task_id in pending_tasks |
|
if self.__thread is not None: |
|
alive = self.__thread.is_alive() |
|
task_management_mutex.release() |
|
return alive |
|
|
|
def join(self): |
|
task_management_mutex.acquire() |
|
|
|
while self.__thread is None and self.is_alive(): |
|
task_management_mutex.release() |
|
time.sleep(1) |
|
task_management_mutex.acquire() |
|
|
|
if self.__thread is not None: |
|
task_management_mutex.release() |
|
self.__thread.join() |
|
else: |
|
task_management_mutex.release() |
|
|
|
def __start_thread(self): |
|
print('Starting task ' + self.task_id) |
|
task_management_mutex.acquire() |
|
assert(self.__thread is None) |
|
self.__thread = Thread(target=self.__run) |
|
task_management_mutex.release() |
|
self.__thread.start() |
|
|
|
def __run(self): |
|
try: |
|
self.run() |
|
print('Task ' + self.task_id + ' has finished') |
|
self.__finished_at = datetime.now() |
|
finally: |
|
Task.__free_resources(self) |
|
Task.__update_pending_tasks() |
|
|
|
@staticmethod |
|
def get(task_id): |
|
task_management_mutex.acquire() |
|
result = tasks[task_id] |
|
task_management_mutex.release() |
|
return result |
|
|
|
@staticmethod |
|
def __update_pending_tasks(): |
|
task_management_mutex.acquire() |
|
|
|
# Clear finished tasks |
|
now = datetime.now() |
|
to_remove = [] |
|
for key, task in tasks.items(): |
|
if task.__finished_at is not None and (now - task.__finished_at).total_seconds() > 60: |
|
to_remove.append(key) |
|
for key in to_remove: |
|
print('Dropping task ' + key) |
|
tasks.pop(key) |
|
|
|
for key, task in pending_tasks.items(): |
|
can_start = True |
|
|
|
all_wait_causes = '' |
|
has_same_wait_cause = False |
|
|
|
resource_mutex.acquire() |
|
for required_resource in task.__resource_usage: |
|
resource = Resource.get(required_resource[0]) |
|
if resource.get_available_amount(required_resource[1]) < 1: |
|
can_start = False |
|
wait_cause = required_resource[0] + (' ('+required_resource[1]+')' if required_resource[1] != '' else '') |
|
if wait_cause == task.__has_logged_waiting: |
|
has_same_wait_cause = True |
|
all_wait_causes = all_wait_causes + (', ' if all_wait_causes != '' else '') + wait_cause |
|
|
|
if not can_start: |
|
if not has_same_wait_cause: |
|
task.__has_logged_waiting = wait_cause |
|
task.print('Waiting for resource(s): '+all_wait_causes+'\n') |
|
resource_mutex.release() |
|
continue |
|
|
|
for required_resource in task.__resource_usage: |
|
resource = Resource.get(required_resource[0]) |
|
resource.acquire(required_resource[1], 1) |
|
resource_mutex.release() |
|
|
|
pending_tasks.pop(key) |
|
task.__start_thread() |
|
break |
|
|
|
task_management_mutex.release() |
|
|
|
@staticmethod |
|
def __free_resources(task): |
|
task_management_mutex.acquire() |
|
resource_mutex.acquire() |
|
for required_resource in task.__resource_usage: |
|
resource = Resource.get(required_resource[0]) |
|
resource.release(required_resource[1], 1) |
|
resource_mutex.release() |
|
task_management_mutex.release() |
|
|
|
class ProcessTask(Task): |
|
def __init__(self, commands, cwd = None, hide_passwords = [], resource_usage = []): |
|
resource_usage = list(resource_usage) |
|
resource_usage.append(('__process', '')) |
|
Task.__init__(self, resource_usage) |
|
self.__commands = commands |
|
self.__cwd = cwd |
|
self.__hide_passwords = hide_passwords |
|
|
|
def on_fail(self, callback): |
|
self.__fail_callback = callback |
|
|
|
def run(self): |
|
is_first_command = True |
|
for command in self.__commands: |
|
command_display = ' '.join(command) |
|
for password in self.__hide_passwords: |
|
command_display = command_display.replace(password, '******') |
|
|
|
if not is_first_command: |
|
self.print('\n') |
|
is_first_command = False |
|
self.print('> ' + command_display + '\n') |
|
|
|
# We use setsid to make a non-interactive session, otherwise any command that expects an input (confirmation, password, etc.) would hang forever |
|
self.__process = Popen(['setsid'] + command, cwd = self.__cwd, stdin=DEVNULL, stdout = PIPE, stderr = STDOUT, shell = False) |
|
|
|
for line in self.__process.stdout: |
|
self.print(line.decode()) |
|
|
|
self.__process.wait() |
|
|
|
if self.__process.returncode != 0: |
|
if self.__fail_callback: |
|
self.__fail_callback() |
|
raise BusinessException("Command failed ("+str(self.__process.returncode)+")\n"+self.get_output_str()) |