|
|
|
@ -5,6 +5,8 @@ from web_utils.task import ProcessTask
|
|
|
|
|
from web_utils.business_exception import BusinessException |
|
|
|
|
import shutil |
|
|
|
|
from unicodedata import normalize |
|
|
|
|
from urllib.parse import quote as url_encode |
|
|
|
|
from urllib.parse import unquote as url_decode |
|
|
|
|
import string |
|
|
|
|
|
|
|
|
|
def os_path_separators(): |
|
|
|
@ -14,32 +16,39 @@ def os_path_separators():
|
|
|
|
|
seps.append(sep) |
|
|
|
|
return seps |
|
|
|
|
|
|
|
|
|
def sanitize_name(initial_name): |
|
|
|
|
def sanitize_name(initial_name, slashReplacement = '_', removeUnknownCharacters = True): |
|
|
|
|
# Sort out unicode characters |
|
|
|
|
name = normalize('NFKD', initial_name).encode('ascii', 'ignore').decode('ascii') |
|
|
|
|
# Replace path separators with underscores |
|
|
|
|
name = name.replace('/', '_slash_') |
|
|
|
|
|
|
|
|
|
# Replace path separators |
|
|
|
|
for sep in os_path_separators(): |
|
|
|
|
name = name.replace(sep, '_') |
|
|
|
|
name = name.replace(sep, slashReplacement) |
|
|
|
|
|
|
|
|
|
# Ensure only valid characters |
|
|
|
|
valid_chars = "-_.(){0}{1}".format(string.ascii_letters, string.digits) |
|
|
|
|
if removeUnknownCharacters: |
|
|
|
|
valid_chars = "-_.{0}{1}{2}".format(string.ascii_letters, string.digits, slashReplacement) |
|
|
|
|
name = "".join(ch for ch in name if ch in valid_chars) |
|
|
|
|
|
|
|
|
|
if len(name) == 0 or '..' in name: |
|
|
|
|
raise BusinessException("Invalid name: " + initial_name) |
|
|
|
|
|
|
|
|
|
return name |
|
|
|
|
|
|
|
|
|
class Document: |
|
|
|
|
def __init__(self, doc_name, branch = 'master', allow_invalid = False): |
|
|
|
|
def __init__(self, origin, doc_name, branch = 'master', allow_invalid = False): |
|
|
|
|
self.origin = Document.decode_origin(origin) if '!' in origin else origin |
|
|
|
|
self.encoded_origin = Document.encode_origin(self.origin) |
|
|
|
|
self.doc_name = doc_name |
|
|
|
|
self.branch = branch |
|
|
|
|
|
|
|
|
|
doc_path = Document.make_doc_path(doc_name, branch) |
|
|
|
|
doc_path = Document.make_doc_path(self.origin, doc_name, branch) |
|
|
|
|
print(doc_path) |
|
|
|
|
if not os.path.isdir(doc_path + "/repo/.git"): |
|
|
|
|
if allow_invalid: |
|
|
|
|
self.valid = False |
|
|
|
|
return |
|
|
|
|
else: |
|
|
|
|
raise BusinessException("This document does not exist: "+doc_name+"@"+branch) |
|
|
|
|
raise BusinessException("This document does not exist: "+self.origin+'/'+doc_name+"@"+branch) |
|
|
|
|
|
|
|
|
|
self.doc_path = doc_path |
|
|
|
|
self.valid = True |
|
|
|
@ -57,32 +66,56 @@ class Document:
|
|
|
|
|
return task |
|
|
|
|
|
|
|
|
|
def delete(self): |
|
|
|
|
shutil.rmtree(self.doc_path) |
|
|
|
|
if not self.valid: |
|
|
|
|
raise Exception("Internal error") |
|
|
|
|
self.delete_folder() |
|
|
|
|
|
|
|
|
|
def delete_folder(self): |
|
|
|
|
doc_path = Document.make_doc_path(self.doc_name, self.branch) |
|
|
|
|
doc_path = Document.make_doc_path(self.origin, self.doc_name, self.branch) |
|
|
|
|
shutil.rmtree(doc_path) |
|
|
|
|
doc_root = os.path.dirname(doc_path) |
|
|
|
|
if len(os.listdir(doc_root)) == 0: |
|
|
|
|
os.rmdir(doc_root) |
|
|
|
|
origin_root = os.path.dirname(doc_root) |
|
|
|
|
if len(os.listdir(origin_root)) == 0: |
|
|
|
|
os.rmdir(origin_root) |
|
|
|
|
|
|
|
|
|
def get_url(self): |
|
|
|
|
return "/doc/" + sanitize_name(self.doc_name)+'/'+sanitize_name(self.branch) + "/index.html" |
|
|
|
|
return "/doc/" + self.encoded_origin + "/" + sanitize_name(self.doc_name)+'/'+sanitize_name(self.branch) + "/index.html" |
|
|
|
|
|
|
|
|
|
def get_api_key(self): |
|
|
|
|
with open(self.doc_path + "/apikey") as f: |
|
|
|
|
return f.read().replace('\n', '') |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def make_doc_path(doc_name, branch): |
|
|
|
|
doc_path = os.path.realpath(get_document_root()+'/'+sanitize_name(doc_name)+'/'+sanitize_name(branch)) |
|
|
|
|
def encode_origin(origin): |
|
|
|
|
return url_encode(origin, safe='').replace('%', '!') |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def decode_origin(origin): |
|
|
|
|
return url_decode(origin.replace('!', '%')) |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def make_doc_path(origin, doc_name, branch): |
|
|
|
|
doc_path = os.path.realpath(get_document_root()+'/'+Document.encode_origin(origin)+'/'+sanitize_name(doc_name)+'/'+sanitize_name(branch)) |
|
|
|
|
if not doc_path.startswith(get_document_root()): |
|
|
|
|
raise BusinessException("Invalid document path for "+doc_name+"@"+branch) |
|
|
|
|
raise BusinessException("Invalid document path for "+origin+"/"+doc_name+"@"+branch) |
|
|
|
|
return doc_path |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def get_origin(repo): |
|
|
|
|
result = sanitize_name(os.path.dirname(repo).replace('https://', ''), '/', False) |
|
|
|
|
if '!' in result: |
|
|
|
|
raise BusinessException("Invalid character: !") |
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def clone(repo, branch, doc_name, source_dir): |
|
|
|
|
# check the document does not already exist |
|
|
|
|
doc_path = Document.make_doc_path(doc_name, branch) |
|
|
|
|
origin = Document.get_origin(repo) |
|
|
|
|
doc_path = Document.make_doc_path(origin, doc_name, branch) |
|
|
|
|
if os.path.isdir(doc_path): |
|
|
|
|
raise BusinessException("This document already exists: "+doc_name+"@"+branch) |
|
|
|
|
raise BusinessException("This document already exists: "+origin+"/"+doc_name+"@"+branch) |
|
|
|
|
|
|
|
|
|
if source_dir != sanitize_name(source_dir): |
|
|
|
|
raise BusinessException("Invalid source directory name: " + source_dir) |
|
|
|
@ -93,11 +126,12 @@ class Document:
|
|
|
|
|
|
|
|
|
|
# Generate an API key |
|
|
|
|
apikey = str(uuid.uuid4()) |
|
|
|
|
print("generated API key: " + apikey) |
|
|
|
|
|
|
|
|
|
target_dir = doc_path + "/repo" |
|
|
|
|
os.makedirs(target_dir, exist_ok = True) |
|
|
|
|
with open(doc_path + "/apikey", "wb") as apikey_file: |
|
|
|
|
apikey_file:write(apikey) |
|
|
|
|
with open(doc_path + "/apikey", "w") as apikey_file: |
|
|
|
|
apikey_file.write(apikey) |
|
|
|
|
|
|
|
|
|
cmd = [] |
|
|
|
|
cmd.append(['git', 'init', '--initial-branch=' + branch]) |
|
|
|
@ -116,9 +150,10 @@ class Document:
|
|
|
|
|
@staticmethod |
|
|
|
|
def list(): |
|
|
|
|
result = [] |
|
|
|
|
for doc_name in os.listdir(get_document_root()): |
|
|
|
|
for branch in os.listdir(get_document_root() + "/" + doc_name): |
|
|
|
|
doc = Document(doc_name, branch, allow_invalid = True) |
|
|
|
|
for origin in os.listdir(get_document_root()): |
|
|
|
|
for doc_name in os.listdir(get_document_root() + "/" + origin): |
|
|
|
|
for branch in os.listdir(get_document_root() + "/" + origin + "/" + doc_name): |
|
|
|
|
doc = Document(origin, doc_name, branch, allow_invalid = True) |
|
|
|
|
result.append(doc) |
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|