#!/usr/bin/env python3 import logging from repo_cloner.lib.logger_setup import log import os import pyinputplus as pyip from typing import Optional, Callable from pathlib import Path import subprocess from repo_cloner.process_repository_dir import clone_or_fetch # base dir base_dir: Optional[Path] = None cloner_prefix: str = "cloner-" data: dict = {} initialized_folder = None # determine starting user and devote UID/GID to unprivileged user - safety first :) def check_privileges(): if os.getuid() == 0: log.info(f"Running as root, downgrading permissions") os.setgid(1000) os.setuid(1000) os.setegid(1000) os.seteuid(1000) log.info(f"New UID:GID: {os.getuid()}:{os.getgid()}") def parse_args(): global base_dir import argparse # parse input arguments parser = argparse.ArgumentParser(description = "repo-cloner initialization wizzard") parser.add_argument('--base-dir', help = 'path to directory containing whole cloner structure', required = True, default = None, type = str) args = parser.parse_args() base_dir = Path(args.base_dir) def gen_config_file( conf_dir: Path, cloner_repo_url, cloner_project_name, cloner_interval, cloner_submodules, cloner_submodule_depth, detector, **kwargs ): from datetime import datetime log.info(f"Creating config files in {conf_dir.as_posix()}") if not conf_dir.exists(): log.info(f"Creating config dir") conf_dir.mkdir(parents = True) log.info("Creating cloner.cfg") conf_file = conf_dir.joinpath("cloner.cfg") conf_file.write_text( f"# cloner.cfg\n" "# main config\n" f"# created at {datetime.now().strftime('%Y-%m-%d %X')}\n\n" "# main url - url of main repo - just to clone\n" f"cloner_repo_url = {cloner_repo_url}\n\n" "# project name (names of volumes are derrived from this\n" f"cloner_project_name = {cloner_project_name}\n\n" "# cloner interval (in minutes, default=0 - run always)\n" f"cloner_interval = {cloner_interval}\n\n" "# do you need submodules support? (1/0)\n" f"cloner_submodules = {cloner_submodules}\n\n" "# max depth of submodule scan (default = unlimited, uncomment to use)\n" f"{'' if cloner_submodules else '# '}cloner_submodule_depth = {cloner_submodule_depth}\n\n" ) if detector: log.info("Creating detector.cfg") conf_file = conf_dir.joinpath("detector.cfg") conf_file.write_text( "# this file is config for detector\n" "# now, it is empty - to disable detector, just delete it!\n" ) def check_project_name(name: str): if not len(name): raise Exception("Empty input is invalid input!") target = os.path.join(base_dir, f"{cloner_prefix}{name}") log.debug(f"Validating project name - path {target}") if os.path.exists(target): log.warning(f"Project name occupied: {target}") raise Exception(f"Project name {name} is occupied by another project") def check_url(name: str): if not len(name): raise Exception("You must input URL") def input_default_str(query: str, default_value: str, validation: Optional[Callable[[str], None]] = None) -> str: log.debug(f"Input query {query} with default {default_value}") while True: new_query = query if len(default_value): new_query += f" [{default_value}] " else: new_query += " " ret = pyip.inputStr(new_query, blank = True, strip = True) if ret == "": log.debug(f"Empty query answer => using previous/default value") ret = default_value try: if validation: validation(ret) except Exception as e: log.warning(e.__str__()) default_value = ret continue log.debug("Query finished") break return ret def input_default_int(query: str, default_value: int, validation: Optional[Callable[[int], None]] = None) -> int: log.debug(f"Input query {query} with default {default_value}") ret = default_value while True: new_query = f"{query} [{default_value}] " ret = pyip.inputInt(new_query, blank = True, strip = True, min = 0) if not ret and not ret == 0: log.debug(f"Empty query answer => using previous/default value") ret = default_value try: if validation: validation(ret) except Exception as e: log.warning(e.__str__()) default_value = ret continue log.debug("Query finished") break return ret def input_default_bool(query: str, default_value: bool) -> bool: log.debug(f"Input query {query} with default {default_value}") new_query = f"{query} [{'Y' if default_value else 'N'}] " ret = pyip.inputYesNo(new_query, blank = True, strip = True) if not len(ret): log.debug(f"Empty query answer => using previous/default value") ret = default_value if ret == "yes": ret = True if ret == "no": ret = False log.debug("Query finished") return ret def query_repo_info() -> bool: log.debug(f"Querying base info") if not initialized_folder: # project name data["cloner_project_name"] = input_default_str( "Enter project name:", data["cloner_project_name"], check_project_name) else: log.warning(f"Project folder was initialized -> unable to change") # url data["cloner_repo_url"] = input_default_str("Enter project url:", data["cloner_repo_url"], check_url) # interval data["cloner_interval"] = input_default_int("Enter sync interval:", data["cloner_interval"]) # submodule support data["cloner_submodules"] = input_default_bool("Mirror including submodules? [y/n]:", data["cloner_submodules"]) if data["cloner_submodules"]: data["cloner_submodule_depth"] = input_default_int( "Limit for submodule discovery [/0]:", data["cloner_submodule_depth"] ) data["detector"] = input_default_bool("Do you want to enable CI support? (detector) [y/n]", data["detector"]) data["detector_init"] = input_default_bool("Do you want to init CI detector caches? [y/n]", data["detector_init"]) def query_repo_info_recursive(): while True: query_repo_info() print("Actual settings:") for key, value in data.items(): print(f"{key: <30} : {value}") if not input_default_bool("Do you want to edit config? [y/n]", False): break def create_ssh_key(ssh_dir: Path): log.info(f"Creating SSH key...") keyfile = ssh_dir.joinpath("identity") subprocess.run([ "/usr/bin/ssh-keygen", "-f", keyfile.as_posix(), "-t", "ed25519", "-C", f"cloner-deploy-key-{data['cloner_project_name']}" , "-N", "" ]) public = ssh_dir.joinpath("identity.pub").read_text().strip() print("Public key is:") print("-----------------------------------------------------") print(public) print("-----------------------------------------------------") print("Please make sure that key is set up at your git hosting and press enter..") pyip.inputStr("", blank = True) def reuse_ssh_key(ssh_dir: Path): log.info("Reusing any ssh key") key_contents = "" print("Paste password-less private key here. End with empty line:") while True: ret = pyip.inputStr("", blank = True) if not len(ret): break key_contents += f"{ret}\n" # public keyfile keyfile_pub = ssh_dir.joinpath("identity.pub") if keyfile_pub.exists(): log.debug(f"Removing old public key") keyfile_pub.unlink() # private keyfile keyfile = ssh_dir.joinpath("identity") log.info(f"Writing key") keyfile.write_text(key_contents) keyfile.chmod(0o600) log.info(f"Checking supplied key for validity...") rc = subprocess.run(["/usr/bin/ssh-keygen", "-y", "-f", keyfile.as_posix(), "-P", ""], stdout = subprocess.PIPE) if not rc.returncode == 0: log.critical(f"Supplied key is invalid.") if input_default_bool("Do you want to try again? [y/n]", True): reuse_ssh_key(ssh_dir) else: print("Public key is:") print("-----------------------------------------------------") public = rc.stdout.decode().strip() + f" cloner-deploy-key-{data['cloner_project_name']}" print(public) print("-----------------------------------------------------") log.info(f"Writing new public key...") keyfile_pub.write_text(public) def solve_authorization(conf_dir: Path): decision: str = pyip.inputMenu( ["Create ssh key", "Reuse ssh key", "Keep unchanged / unauthorized / usernames access only"], numbered = True) log.info(f"Preparing auth file..") ssh_dir = conf_dir.joinpath("auth", "ssh") if not ssh_dir.exists(): ssh_dir.mkdir(mode = 0o700, parents = True) if decision.startswith("Create"): create_ssh_key(ssh_dir) elif decision.startswith("Reuse"): reuse_ssh_key(ssh_dir) def main() -> int: check_privileges() parse_args() # determine debug debug = pyip.inputYesNo("Enable verbose logging? [y/N]", default = "no", blank = True) if debug == "yes": log.info("Setting verbose logging") log.setLevel(logging.DEBUG) # defaults data["cloner_project_name"] = "" data["cloner_repo_url"] = "" data["cloner_interval"] = 5 data["cloner_submodules"] = False data["cloner_submodule_depth"] = 0 data["detector"] = True data["detector_init"] = True edit_config = True # endless loop until finished or canceled while True: # query params from user? if edit_config: query_repo_info_recursive() # create dir project_path = base_dir.joinpath(f"{cloner_prefix}{data['cloner_project_name']}") global initialized_folder initialized_folder = True project_path.mkdir(exist_ok = True) project_path.joinpath("repos").mkdir(exist_ok = True) project_path.joinpath("cache").mkdir(exist_ok = True) config_dir = project_path.joinpath("config") config_dir.mkdir(exist_ok = True) gen_config_file(config_dir, **data) if edit_config: solve_authorization(config_dir) # try initializing repos x = clone_or_fetch(project_path.as_posix(), clone_init = True, detector_init = data["detector_init"]) if x == 0: # enable repository project_path.joinpath(".enabled").touch() return 0 # determine if we want to continue log.critical(f"Something has failed. Please see log above and decide what to do") menu_config = "Edit config and start over" menu_clone_again = "Try cloning again, including base repo" menu_quit_clean = "Quit & clean" decision: str = pyip.inputMenu( [menu_config, menu_clone_again, menu_quit_clean], numbered = True) # other stuff if decision == menu_quit_clean: log.warning(f"Removing old stuff and quitting") subprocess.run(["/usr/bin/rm", "-Rvf", project_path.as_posix()]) return 0 if decision == menu_config: edit_config = True else: edit_config = False # remove caches & repos log.info(f"Cleaning unwanted stuff") subprocess.run(["/usr/bin/rm", "-Rvf", project_path.joinpath("repos").as_posix()]) subprocess.run(["/usr/bin/rm", "-Rvf", project_path.joinpath("cache").as_posix()]) log.info(f"Starting over.. ")