You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
177 lines
6.7 KiB
177 lines
6.7 KiB
import os |
|
import uuid |
|
from flask import current_app |
|
from web_utils.task import ProcessTask |
|
from web_utils.business_exception import BusinessException |
|
import shutil |
|
from unicodedata import normalize |
|
from urllib.parse import quote as url_encode |
|
from urllib.parse import unquote as url_decode |
|
import string |
|
|
|
def os_path_separators(): |
|
seps = ['/','\\'] |
|
for sep in os.path.sep, os.path.altsep: |
|
if sep: |
|
seps.append(sep) |
|
return seps |
|
|
|
def sanitize_name(initial_name, slashReplacement = '_', removeUnknownCharacters = True): |
|
# Sort out unicode characters |
|
name = normalize('NFKD', initial_name).encode('ascii', 'ignore').decode('ascii') |
|
|
|
# Replace path separators |
|
for sep in os_path_separators(): |
|
name = name.replace(sep, slashReplacement) |
|
|
|
# Ensure only valid characters |
|
if removeUnknownCharacters: |
|
valid_chars = "-_.{0}{1}{2}".format(string.ascii_letters, string.digits, slashReplacement) |
|
name = "".join(ch for ch in name if ch in valid_chars) |
|
|
|
if len(name) == 0 or '..' in name: |
|
raise BusinessException("Invalid name: " + initial_name) |
|
|
|
return name |
|
|
|
class Document: |
|
def __init__(self, origin, doc_name, branch = 'master', allow_invalid = False): |
|
self.origin = Document.decode_origin(origin) if '!' in origin else origin |
|
self.encoded_origin = Document.encode_origin(self.origin) |
|
self.doc_name = doc_name |
|
self.branch = branch |
|
|
|
doc_path = Document.make_doc_path(self.origin, doc_name, branch) |
|
print(doc_path) |
|
if not os.path.isdir(doc_path + "/repo/.git"): |
|
if allow_invalid: |
|
self.valid = False |
|
return |
|
else: |
|
raise BusinessException("This document does not exist: "+self.origin+'/'+doc_name+"@"+branch) |
|
|
|
self.doc_path = doc_path |
|
self.valid = True |
|
|
|
def build(self): |
|
#venv_path = os.getenv('VIRTUAL_ENV') |
|
|
|
cmd = [] |
|
|
|
# update source files from git |
|
cmd.append(['git', 'pull']) |
|
|
|
# build the HTML version |
|
cmd.append(['sphinx-build', '-M', 'html', self.doc_path + "/repo/source", self.doc_path + "/build"]) |
|
|
|
# build the PDF version |
|
cmd.append(['sphinx-build', '-M', 'weasyprint', self.doc_path + '/repo/source', self.doc_path + '/build']) |
|
cmd.append(['weasyprint', self.doc_path + '/build/weasyprint/index.html', self.doc_path + '/build/weasyprint/index.pdf', '-s', self.doc_path + '/repo/source/css/print-theme.css']) |
|
cmd.append(['sh', '-c', current_app.config['DATA_ROOT_DIR'] + '/../pdftoc-to-latex "' + self.doc_path + '/build/weasyprint/index.pdf" > "' + self.doc_path + '/build/weasyprint/toc.tex"']) |
|
cmd.append(['sh', '-c', 'pdflatex -interaction nonstopmode -output-directory="' + self.doc_path + '/build/weasyprint" "' + self.doc_path + '/build/weasyprint/toc.tex" || echo OK']) |
|
cmd.append(['pdftk', 'A=' + self.doc_path + '/build/weasyprint/index.pdf', 'B=' + self.doc_path + '/build/weasyprint/toc.pdf', 'cat', 'A1', 'B', 'A2-end', 'output', self.doc_path + '/build/weasyprint/vheliotech.pdf']) |
|
#cmd.append(['rm', self.doc_path + '/build/weasyprint/index.pdf', self.doc_path + '/build/weasyprint/toc.tex', self.doc_path + '/build/weasyprint/toc.pdf', self.doc_path + '/build/weasyprint/GuidedemontageVheliotech.pdf']) |
|
|
|
# Copy the generated PDF file to the HTML directory, so that it is accessible for download by users |
|
cmd.append(['cp', self.doc_path + '/build/weasyprint/vheliotech.pdf', self.doc_path + '/build/html/vheliotech.pdf']) |
|
|
|
task = ProcessTask(cmd, cwd = self.doc_path + "/repo") |
|
task.start() |
|
|
|
return task |
|
|
|
def delete(self): |
|
if not self.valid: |
|
raise Exception("Internal error") |
|
self.delete_folder() |
|
|
|
def delete_folder(self): |
|
doc_path = Document.make_doc_path(self.origin, self.doc_name, self.branch) |
|
shutil.rmtree(doc_path) |
|
doc_root = os.path.dirname(doc_path) |
|
if len(os.listdir(doc_root)) == 0: |
|
os.rmdir(doc_root) |
|
origin_root = os.path.dirname(doc_root) |
|
if len(os.listdir(origin_root)) == 0: |
|
os.rmdir(origin_root) |
|
|
|
def get_url(self): |
|
return "/doc/" + self.encoded_origin + "/" + sanitize_name(self.doc_name)+'/'+sanitize_name(self.branch) + "/index.html" |
|
|
|
def get_api_key(self): |
|
with open(self.doc_path + "/apikey") as f: |
|
return f.read().replace('\n', '') |
|
|
|
@staticmethod |
|
def encode_origin(origin): |
|
return url_encode(origin, safe='').replace('%', '!') |
|
|
|
@staticmethod |
|
def decode_origin(origin): |
|
return url_decode(origin.replace('!', '%')) |
|
|
|
@staticmethod |
|
def make_doc_path(origin, doc_name, branch): |
|
doc_path = os.path.realpath(get_document_root()+'/'+Document.encode_origin(origin)+'/'+sanitize_name(doc_name)+'/'+sanitize_name(branch)) |
|
if not doc_path.startswith(get_document_root()): |
|
raise BusinessException("Invalid document path for "+origin+"/"+doc_name+"@"+branch) |
|
return doc_path |
|
|
|
@staticmethod |
|
def get_origin(repo): |
|
result = sanitize_name(os.path.dirname(repo).replace('https://', ''), '/', False) |
|
if '!' in result: |
|
raise BusinessException("Invalid character: !") |
|
return result |
|
|
|
@staticmethod |
|
def clone(repo, branch, doc_name, source_dir): |
|
# check the document does not already exist |
|
origin = Document.get_origin(repo) |
|
doc_path = Document.make_doc_path(origin, doc_name, branch) |
|
if os.path.isdir(doc_path): |
|
raise BusinessException("This document already exists: "+origin+"/"+doc_name+"@"+branch) |
|
|
|
if source_dir != sanitize_name(source_dir): |
|
raise BusinessException("Invalid source directory name: " + source_dir) |
|
|
|
# we have potentially serious security issues related to cloning anything. For example cloning from SSH may use a pre-configured server identity, etc. |
|
if not repo.startswith("https://"): |
|
raise BusinessException("Only HTTPS repositories are allowed in current implementation") |
|
|
|
# Generate an API key |
|
apikey = str(uuid.uuid4()) |
|
print("generated API key: " + apikey) |
|
|
|
target_dir = doc_path + "/repo" |
|
os.makedirs(target_dir, exist_ok = True) |
|
with open(doc_path + "/apikey", "w") as apikey_file: |
|
apikey_file.write(apikey) |
|
|
|
cmd = [] |
|
cmd.append(['git', 'init', '--initial-branch=' + branch]) |
|
cmd.append(['git', 'remote', 'add', '-f', 'origin', repo]) |
|
cmd.append(['git', 'sparse-checkout', 'init']) |
|
cmd.append(['git', 'sparse-checkout', 'set', source_dir]) |
|
cmd.append(['git', 'pull', 'origin', branch]) |
|
cmd.append(['git', 'branch', '--set-upstream-to=origin/' + branch, branch]) |
|
|
|
task = ProcessTask(cmd, cwd = target_dir) |
|
task.on_fail(lambda : shutil.rmtree(doc_path, ignore_errors = True)) |
|
task.start() |
|
|
|
return task |
|
|
|
@staticmethod |
|
def list(): |
|
result = [] |
|
for origin in os.listdir(get_document_root()): |
|
for doc_name in os.listdir(get_document_root() + "/" + origin): |
|
for branch in os.listdir(get_document_root() + "/" + origin + "/" + doc_name): |
|
doc = Document(origin, doc_name, branch, allow_invalid = True) |
|
result.append(doc) |
|
return result |
|
|
|
def get_document_root(): |
|
return current_app.config['DOCUMENT_ROOT_DIR'] |
|
|
|
|