#!/usr/bin/env python3 import logging from repo_cloner.lib.logger_setup import log import os import pyinputplus as pyip from typing import Optional, Callable from pathlib import Path # base dir base_dir: Optional[Path] = None cloner_prefix: str = "cloner-" data: dict = {} # determine starting user and devote UID/GID to unprivileged user - safety first :) def check_privileges(): if os.getuid() == 0: log.info(f"Running as root, downgrading permissions") os.setgid(1000) os.setuid(1000) os.setegid(1000) os.seteuid(1000) log.info(f"New UID:GID: {os.getuid()}:{os.getgid()}") def parse_args(): global base_dir import argparse # parse input arguments parser = argparse.ArgumentParser(description = "repo-cloner initialization wizzard") parser.add_argument('--base-dir', help = 'path to directory containing whole cloner structure', required = True, default = None, type = str) args = parser.parse_args() base_dir = Path(args.base_dir) def gen_config_file( conf_dir: Path, cloner_repo_url, cloner_project_name, cloner_interval, cloner_submodules, cloner_submodule_depth, detector, **kwargs ): from datetime import datetime log.info(f"Creating config files in {conf_dir.as_posix()}") if not conf_dir.exists(): log.info(f"Creating config dir") conf_dir.mkdir(parents = True) log.info("Creating cloner.cfg") conf_file = conf_dir.joinpath("cloner.cfg") conf_file.write_text( f"# cloner.cfg\n" "# main config\n" f"# created at {datetime.now().strftime('%Y-%m-%d %X')}\n\n" "# main url - url of main repo - just to clone\n" f"cloner_repo_url = {cloner_repo_url}\n\n" "# project name (names of volumes are derrived from this\n" f"cloner_project_name = {cloner_project_name}\n\n" "# cloner interval (in minutes, default=0 - run always)\n" f"cloner_interval = {cloner_interval}\n\n" "# do you need submodules support? (1/0)\n" f"cloner_submodules = {cloner_submodules}\n\n" "# max depth of submodule scan (default = unlimited, uncomment to use)\n" f"{'' if cloner_submodules else '# '}cloner_submodule_depth = {cloner_submodule_depth}\n\n" ) if detector: log.info("Creating detector.cfg") conf_file = conf_dir.joinpath("detector.cfg") conf_file.write_text( "# this file is config for detector\n"" "# now, it is empty - to disable detector, just delete it!\n" ) def check_project_name(name: str): if not len(name): raise Exception("Empty input is invalid input!") target = os.path.join(base_dir, f"{cloner_prefix}{name}") log.debug(f"Validating project name - path {target}") if os.path.exists(target): log.warning(f"Project name occupied: {target}") raise Exception(f"Project name {name} is occupied by another project") def check_url(name: str): if not len(name): raise Exception("You must input URL") def input_default_str(query: str, default_value: str, validation: Optional[Callable[[str], None]] = None) -> str: log.debug(f"Input query {query} with default {default_value}") while True: new_query = query if len(default_value): new_query += f" [{default_value}] " else: new_query += " " ret = pyip.inputStr(new_query, blank = True, strip = True) if ret == "": log.debug(f"Empty query answer => using previous/default value") ret = default_value try: if validation: validation(ret) except Exception as e: log.warning(e.__str__()) default_value = ret continue log.debug("Query finished") break return ret def input_default_int(query: str, default_value: int, validation: Optional[Callable[[int], None]] = None) -> int: log.debug(f"Input query {query} with default {default_value}") while True: new_query = f"{query} [{default_value}] " ret = pyip.inputInt(new_query, blank = True, strip = True, min = 0) if not ret: log.debug(f"Empty query answer => using previous/default value") ret = default_value try: if validation: validation(ret) except Exception as e: log.warning(e.__str__()) default_value = ret continue log.debug("Query finished") break return ret def input_default_bool(query: str, default_value: bool) -> bool: log.debug(f"Input query {query} with default {default_value}") new_query = f"{query} [{'Y' if default_value else 'N'}] " ret = pyip.inputYesNo(new_query, blank = True, strip = True) if not len(ret): log.debug(f"Empty query answer => using previous/default value") ret = default_value if ret == "yes": ret = True if ret == "no": ret = False log.debug("Query finished") return ret def query_repo_info() -> bool: log.debug(f"Querying base info") # project name data["cloner_project_name"] = input_default_str( "Enter project name:", data["cloner_project_name"], check_project_name) # url data["cloner_repo_url"] = input_default_str("Enter project url:", data["cloner_repo_url"], check_url) # interval data["cloner_interval"] = input_default_int("Enter sync interval:", data["cloner_interval"]) # submodule support data["cloner_submodules"] = input_default_bool("Mirror including submodules? [y/n]:", data["cloner_submodules"]) if data["cloner_submodules"]: data["cloner_submodule_depth"] = input_default_int( "Limit for submodule discovery [/0]:", data["cloner_submodule_depth"] ) data["detector"] = input_default_bool("Do you want to enable CI support? (detector) [y/n]", data["detector"]) def query_repo_info_recursive(): while True: query_repo_info() print("Actual settings:") for key, value in data.items(): print(f"{key: <30} : {value}") if not input_default_bool("Do you want to edit config? [y/n]", False): break def main() -> int: check_privileges() parse_args() # determine debug debug = pyip.inputYesNo("Enable verbose logging? [y/N]", default = "no", blank = True) if debug == "yes": log.info("Setting verbose logging") log.setLevel(logging.DEBUG) # defaults data["cloner_project_name"] = "" data["cloner_repo_url"] = "" data["cloner_interval"] = 5 data["cloner_submodules"] = False data["cloner_submodule_depth"] = 0 data["detector"] = True query_repo_info_recursive() project_path = base_dir.joinpath(f"{cloner_prefix}{data['cloner_project_name']}") project_path.mkdir() project_path.joinpath("repos").mkdir() project_path.joinpath("cache").mkdir() config_dir = project_path.joinpath("config") config_dir.mkdir() gen_config_file(config_dir, **data) return 0