You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
242 lines
9.1 KiB
242 lines
9.1 KiB
import os |
|
import uuid |
|
from flask import current_app |
|
from web_utils.task import ProcessTask |
|
from web_utils.business_exception import BusinessException |
|
import shutil |
|
from unicodedata import normalize |
|
from urllib.parse import quote as url_encode |
|
from urllib.parse import unquote as url_decode |
|
import string |
|
import json |
|
|
|
def os_path_separators(): |
|
seps = ['/','\\'] |
|
for sep in os.path.sep, os.path.altsep: |
|
if sep: |
|
seps.append(sep) |
|
return seps |
|
|
|
def sanitize_name(initial_name, slashReplacement = '_', removeUnknownCharacters = True): |
|
# Sort out unicode characters |
|
name = normalize('NFKD', initial_name).encode('ascii', 'ignore').decode('ascii') |
|
|
|
# Replace path separators |
|
for sep in os_path_separators(): |
|
name = name.replace(sep, slashReplacement) |
|
|
|
# Ensure only valid characters |
|
if removeUnknownCharacters: |
|
valid_chars = "-_.{0}{1}{2}".format(string.ascii_letters, string.digits, slashReplacement) |
|
name = "".join(ch for ch in name if ch in valid_chars) |
|
|
|
if len(name) == 0 or '..' in name: |
|
raise BusinessException("Invalid name: " + initial_name) |
|
|
|
return name |
|
|
|
class Document: |
|
def __init__(self, origin, doc_name, branch = 'master', allow_invalid = False): |
|
self.origin = Document.decode_origin(origin) if '!' in origin else origin |
|
self.encoded_origin = Document.encode_origin(self.origin) |
|
self.doc_name = doc_name |
|
self.branch = branch |
|
|
|
doc_path = Document.make_doc_path(self.origin, doc_name, branch) |
|
print(doc_path) |
|
if not os.path.isdir(doc_path + "/repo/.git"): |
|
if allow_invalid: |
|
self.valid = False |
|
return |
|
else: |
|
raise BusinessException("This document does not exist: "+self.origin+'/'+doc_name+"@"+branch) |
|
|
|
self.doc_path = doc_path |
|
self.valid = True |
|
|
|
# Init default values |
|
self.settings = { |
|
'multiversion': False, |
|
'default_version': '', # Only used if multiversion is True |
|
'build_pdf': False, # for multiversion, this can be set to an array of strings indicating each branch or tag for which we want to build the PDF |
|
} |
|
|
|
# Read settings.json (if it exists) |
|
json_settings_filename = self.doc_path + '/settings.json' |
|
if os.path.exists(json_settings_filename): |
|
with open(json_settings_filename) as f: |
|
json_settings = json.load(f) |
|
for key in self.settings: |
|
if key in json_settings: |
|
self.settings[key] = json_settings[key] |
|
|
|
def build(self): |
|
multiversion_build = self.settings['multiversion'] |
|
|
|
cmd = [] |
|
|
|
is_intensive_task = False |
|
|
|
# update source files from git |
|
cmd.append(['git', 'reset', '--hard', 'HEAD']) # we must first revert any local change ; this fixes an issue with print-theme.css that can be modified during compilation to update the number of pages |
|
cmd.append(['git', 'pull']) |
|
|
|
if multiversion_build: |
|
# also fetch all branches and tags, so that sphinx-multiversion knows what versions exist and can pull them |
|
cmd.append(['git', 'fetch', '--all', '--tags', '--force']) |
|
cmd.append(['bash', '-c', 'for BRANCH in $(git branch -a | grep remotes | grep -v HEAD | grep -v master); do git branch --track "${BRANCH#remotes/origin/}" "${BRANCH}" || git branch -f "${BRANCH#remotes/origin/}" -t "${BRANCH}"; done']) |
|
|
|
# build the HTML version |
|
cmd.append(['make', 'html_versions', 'BUILDDIR=../build']) |
|
|
|
if type(self.settings['build_pdf']) is list: |
|
for pdf_branch_name in self.settings['build_pdf']: |
|
is_intensive_task = True |
|
|
|
# Extract the source files to a temporary directory |
|
cmd.append(['rm', '-rf', self.doc_path + '/tmp_source']) |
|
cmd.append(['mkdir', self.doc_path + '/tmp_source']) |
|
cmd.append(['bash', '-c', 'git archive "'+pdf_branch_name+'" | tar -x -C "' + self.doc_path + '/tmp_source"']) |
|
|
|
# Build the PDF |
|
cmd.append(['bash', '-c', 'cd "' + self.doc_path + '/tmp_source" && make pdf']) |
|
|
|
# Copy the generated PDF file to the HTML directory, so that it is accessible for download by users |
|
cmd.append(['cp', self.doc_path + '/tmp_source/build/weasyprint/vheliotech.pdf', self.doc_path + '/build/html_versions/' + pdf_branch_name + '/' + self.doc_name + '.pdf']) |
|
|
|
# Clean up |
|
cmd.append(['rm', '-rf', self.doc_path + '/tmp_source']) |
|
|
|
else: |
|
# build the HTML version |
|
cmd.append(['make', 'html', 'BUILDDIR=../build']) |
|
|
|
if self.settings['build_pdf']: |
|
is_intensive_task = True |
|
|
|
# build the PDF version |
|
cmd.append(['make', 'pdf', 'BUILDDIR=../build']) |
|
|
|
# Copy the generated PDF file to the HTML directory, so that it is accessible for download by users |
|
cmd.append(['cp', self.doc_path + '/build/weasyprint/vheliotech.pdf', self.doc_path + '/build/html/' + self.doc_name + '.pdf']) |
|
|
|
# Now that the build is successful, move it to the deployment directory (replacing any existing content) |
|
cmd.append(['rm', '-rf', self.doc_path + '/dist']) |
|
if multiversion_build: |
|
cmd.append(['mv', self.doc_path + '/build/html_versions/', self.doc_path + '/dist/']) |
|
else: |
|
cmd.append(['mv', self.doc_path + '/build/html/', self.doc_path + '/dist/']) |
|
|
|
resource_usage = [('document_files', self.doc_path)] |
|
if is_intensive_task: |
|
resource_usage.append(('intensive_task', '')) |
|
|
|
task = ProcessTask(cmd, cwd = self.doc_path + "/repo", resource_usage = resource_usage) |
|
task.start(skip_if_another_pending = self.doc_path) |
|
|
|
return task |
|
|
|
def delete(self): |
|
if not self.valid: |
|
raise Exception("Internal error") |
|
self.delete_folder() |
|
|
|
def delete_folder(self): |
|
doc_path = Document.make_doc_path(self.origin, self.doc_name, self.branch) |
|
shutil.rmtree(doc_path) |
|
doc_root = os.path.dirname(doc_path) |
|
if len(os.listdir(doc_root)) == 0: |
|
os.rmdir(doc_root) |
|
origin_root = os.path.dirname(doc_root) |
|
if len(os.listdir(origin_root)) == 0: |
|
os.rmdir(origin_root) |
|
|
|
def get_url(self): |
|
if self.settings['multiversion']: |
|
return "/doc/" + self.encoded_origin + "/" + sanitize_name(self.doc_name)+'/'+sanitize_name(self.branch) + "/" + self.settings['default_version'] + "/index.html" |
|
else: |
|
return "/doc/" + self.encoded_origin + "/" + sanitize_name(self.doc_name)+'/'+sanitize_name(self.branch) + "/index.html" |
|
|
|
def get_pdf_url(self): |
|
if self.settings['multiversion']: |
|
return "/doc/" + self.encoded_origin + "/" + sanitize_name(self.doc_name)+'/'+sanitize_name(self.branch) + "/" + self.settings['default_version'] + "/" + self.doc_name + ".pdf" |
|
else: |
|
return "/doc/" + self.encoded_origin + "/" + sanitize_name(self.doc_name)+'/'+sanitize_name(self.branch) + "/" + self.doc_name + ".pdf" |
|
|
|
def get_api_key(self): |
|
with open(self.doc_path + "/apikey") as f: |
|
return f.read().replace('\n', '') |
|
|
|
@staticmethod |
|
def encode_origin(origin): |
|
return url_encode(origin, safe='').replace('%', '!') |
|
|
|
@staticmethod |
|
def decode_origin(origin): |
|
return url_decode(origin.replace('!', '%')) |
|
|
|
@staticmethod |
|
def make_doc_path(origin, doc_name, branch): |
|
doc_path = os.path.realpath(get_document_root()+'/'+Document.encode_origin(origin)+'/'+sanitize_name(doc_name)+'/'+sanitize_name(branch)) |
|
if not doc_path.startswith(get_document_root()): |
|
raise BusinessException("Invalid document path for "+origin+"/"+doc_name+"@"+branch) |
|
return doc_path |
|
|
|
@staticmethod |
|
def get_origin(repo): |
|
result = sanitize_name(os.path.dirname(repo).replace('https://', ''), '/', False) |
|
if '!' in result: |
|
raise BusinessException("Invalid character: !") |
|
return result |
|
|
|
@staticmethod |
|
def clone(repo, branch, doc_name, source_dir): |
|
# check the document does not already exist |
|
origin = Document.get_origin(repo) |
|
doc_path = Document.make_doc_path(origin, doc_name, branch) |
|
if os.path.isdir(doc_path): |
|
raise BusinessException("This document already exists: "+origin+"/"+doc_name+"@"+branch) |
|
|
|
if source_dir != sanitize_name(source_dir): |
|
raise BusinessException("Invalid source directory name: " + source_dir) |
|
|
|
# we have potentially serious security issues related to cloning anything. For example cloning from SSH may use a pre-configured server identity, etc. |
|
if not repo.startswith("https://"): |
|
raise BusinessException("Only HTTPS repositories are allowed in current implementation") |
|
|
|
# Generate an API key |
|
apikey = str(uuid.uuid4()) |
|
print("generated API key: " + apikey) |
|
|
|
target_dir = doc_path + "/repo" |
|
os.makedirs(target_dir, exist_ok = True) |
|
with open(doc_path + "/apikey", "w") as apikey_file: |
|
apikey_file.write(apikey) |
|
|
|
cmd = [] |
|
cmd.append(['git', 'init', '--initial-branch=' + branch]) |
|
cmd.append(['git', 'remote', 'add', '-f', 'origin', repo]) |
|
#cmd.append(['git', 'sparse-checkout', 'init']) |
|
#cmd.append(['git', 'sparse-checkout', 'set', source_dir]) |
|
cmd.append(['git', 'pull', 'origin', branch]) |
|
cmd.append(['git', 'branch', '--set-upstream-to=origin/' + branch, branch]) |
|
|
|
task = ProcessTask(cmd, cwd = target_dir) |
|
task.on_fail(lambda : shutil.rmtree(doc_path, ignore_errors = True)) |
|
task.start() |
|
|
|
return task |
|
|
|
@staticmethod |
|
def list(): |
|
result = [] |
|
for origin in os.listdir(get_document_root()): |
|
for doc_name in os.listdir(get_document_root() + "/" + origin): |
|
for branch in os.listdir(get_document_root() + "/" + origin + "/" + doc_name): |
|
doc = Document(origin, doc_name, branch, allow_invalid = True) |
|
result.append(doc) |
|
return result |
|
|
|
def get_document_root(): |
|
return current_app.config['DOCUMENT_ROOT_DIR'] |
|
|
|
|