New behavior, new tests, added wizzard and processor

Signed-off-by: Václav Valíček <valicek1994@gmail.com>
This commit is contained in:
2022-08-07 21:44:31 +02:00
parent f4ac509665
commit 1fef7bc404
8 changed files with 203 additions and 118 deletions

View File

@@ -6,12 +6,15 @@ import os
import pyinputplus as pyip
from typing import Optional, Callable
from pathlib import Path
import subprocess
from repo_cloner.process_repository_dir import clone_or_fetch
# base dir
base_dir: Optional[Path] = None
cloner_prefix: str = "cloner-"
data: dict = {}
initialized_folder = None
# determine starting user and devote UID/GID to unprivileged user - safety first :)
@@ -122,11 +125,12 @@ def input_default_str(query: str, default_value: str, validation: Optional[Calla
def input_default_int(query: str, default_value: int, validation: Optional[Callable[[int], None]] = None) -> int:
log.debug(f"Input query {query} with default {default_value}")
ret = default_value
while True:
new_query = f"{query} [{default_value}] "
ret = pyip.inputInt(new_query, blank = True, strip = True, min = 0)
if not ret:
if not ret and not ret == 0:
log.debug(f"Empty query answer => using previous/default value")
ret = default_value
try:
@@ -160,11 +164,14 @@ def input_default_bool(query: str, default_value: bool) -> bool:
def query_repo_info() -> bool:
log.debug(f"Querying base info")
# project name
data["cloner_project_name"] = input_default_str(
"Enter project name:",
data["cloner_project_name"],
check_project_name)
if not initialized_folder:
# project name
data["cloner_project_name"] = input_default_str(
"Enter project name:",
data["cloner_project_name"],
check_project_name)
else:
log.warning(f"Project folder was initialized -> unable to change")
# url
data["cloner_repo_url"] = input_default_str("Enter project url:", data["cloner_repo_url"], check_url)
@@ -183,6 +190,8 @@ def query_repo_info() -> bool:
data["detector"] = input_default_bool("Do you want to enable CI support? (detector) [y/n]", data["detector"])
data["detector_init"] = input_default_bool("Do you want to init CI detector caches? [y/n]", data["detector_init"])
def query_repo_info_recursive():
while True:
@@ -195,6 +204,75 @@ def query_repo_info_recursive():
break
def create_ssh_key(ssh_dir: Path):
log.info(f"Creating SSH key...")
keyfile = ssh_dir.joinpath("identity")
subprocess.run([
"/usr/bin/ssh-keygen",
"-f", keyfile.as_posix(),
"-t", "ed25519",
"-C", f"cloner-deploy-key-{data['cloner_project_name']}"
, "-N", ""
])
public = ssh_dir.joinpath("identity.pub").read_text().strip()
print("Public key is:")
print("-----------------------------------------------------")
print(public)
print("-----------------------------------------------------")
print("Please make sure that key is set up at your git hosting and press enter..")
pyip.inputStr("", blank = True)
def reuse_ssh_key(ssh_dir: Path):
log.info("Reusing any ssh key")
key_contents = ""
print("Paste password-less private key here. End with empty line:")
while True:
ret = pyip.inputStr("", blank = True)
if not len(ret):
break
key_contents += f"{ret}\n"
# public keyfile
keyfile_pub = ssh_dir.joinpath("identity.pub")
if keyfile_pub.exists():
log.debug(f"Removing old public key")
keyfile_pub.unlink()
# private keyfile
keyfile = ssh_dir.joinpath("identity")
log.info(f"Writing key")
keyfile.write_text(key_contents)
keyfile.chmod(0o600)
log.info(f"Checking supplied key for validity...")
rc = subprocess.run(["/usr/bin/ssh-keygen", "-y", "-f", keyfile.as_posix(), "-P", ""], stdout = subprocess.PIPE)
if not rc.returncode == 0:
log.critical(f"Supplied key is invalid.")
if input_default_bool("Do you want to try again? [y/n]", True):
reuse_ssh_key(ssh_dir)
else:
print("Public key is:")
print("-----------------------------------------------------")
public = rc.stdout.decode().strip() + f" cloner-deploy-key-{data['cloner_project_name']}"
print(public)
print("-----------------------------------------------------")
log.info(f"Writing new public key...")
keyfile_pub.write_text(public)
def solve_authorization(conf_dir: Path):
decision: str = pyip.inputMenu(
["Create ssh key", "Reuse ssh key", "Keep unchanged / unauthorized / usernames access only"],
numbered = True)
log.info(f"Preparing auth file..")
ssh_dir = conf_dir.joinpath("auth", "ssh")
if not ssh_dir.exists():
ssh_dir.mkdir(mode = 0o700, parents = True)
if decision.startswith("Create"):
create_ssh_key(ssh_dir)
elif decision.startswith("Reuse"):
reuse_ssh_key(ssh_dir)
def main() -> int:
check_privileges()
parse_args()
@@ -211,18 +289,61 @@ def main() -> int:
data["cloner_submodules"] = False
data["cloner_submodule_depth"] = 0
data["detector"] = True
data["detector_init"] = True
query_repo_info_recursive()
edit_config = True
# endless loop until finished or canceled
while True:
# query params from user?
if edit_config:
query_repo_info_recursive()
project_path = base_dir.joinpath(f"{cloner_prefix}{data['cloner_project_name']}")
project_path.mkdir()
# create dir
project_path = base_dir.joinpath(f"{cloner_prefix}{data['cloner_project_name']}")
project_path.joinpath("repos").mkdir()
project_path.joinpath("cache").mkdir()
global initialized_folder
initialized_folder = True
config_dir = project_path.joinpath("config")
config_dir.mkdir()
project_path.mkdir(exist_ok = True)
project_path.joinpath("repos").mkdir(exist_ok = True)
project_path.joinpath("cache").mkdir(exist_ok = True)
gen_config_file(config_dir, **data)
config_dir = project_path.joinpath("config")
config_dir.mkdir(exist_ok = True)
return 0
gen_config_file(config_dir, **data)
if edit_config:
solve_authorization(config_dir)
# try initializing repos
x = clone_or_fetch(project_path.as_posix(), clone_init = True, detector_init = data["detector_init"])
if x == 0:
return 0
# determine if to continue
log.critical(f"Something has failed. Please see log above and decide what to do")
menu_config = "Edit config and start over"
menu_reclone = "Try cloning again, including base repo"
menu_quit_clean = "Quit & clean"
decision: str = pyip.inputMenu(
[menu_config, menu_reclone, menu_quit_clean],
numbered = True)
# other stuff
if decision == menu_quit_clean:
log.warning(f"Removing old stuff and quitting")
subprocess.run(["/usr/bin/rm", "-Rvf", project_path.as_posix()])
return 0
if decision == menu_config:
edit_config = True
else:
edit_config = False
# remove caches & repos
log.info(f"Cleaning unwanted stuff")
subprocess.run(["/usr/bin/rm", "-Rvf", project_path.joinpath("repos").as_posix()])
subprocess.run(["/usr/bin/rm", "-Rvf", project_path.joinpath("cache").as_posix()])
log.info(f"Starting over.. ")

View File

@@ -66,7 +66,6 @@ class Cloner:
self._repo = RepoTool(repo_path)
return self.__opened
@property
def __opened(self) -> bool:
if not self._repo:
@@ -221,3 +220,7 @@ class Cloner:
if detector.check_fingerprint():
log.debug(f"Starting detector discovery")
detector.run(callback)
def detector_init(self):
detector = Detector(Path(self.main_repo_path), Path(self._dirs.cache_dir), self._config.cloner_project_name)
detector.initialize_caches()

View File

@@ -97,8 +97,8 @@ class ClonerConfigParser:
self.__invalid_lines.append((line, None))
continue
eq = line.find("=")
key: str = line[0:eq]
val: str = line[eq + 1:]
key: str = line[0:eq].strip()
val: str = line[eq + 1:].strip()
l.debug(f"Found config pair: {key} => '{val}'")
try:
@@ -118,4 +118,3 @@ class ClonerConfigParser:
@property
def invalid_lines(self):
return self.__invalid_lines

View File

@@ -48,23 +48,22 @@ def config_try_override(config_writer: GitConfigParser, section: str, option: st
config_writer.set(section, option, value)
def prepare_git_auth(repo: str, config_dir):
log.debug(f"CFG: Opening repo {repo}")
repo = Repo(repo)
path: str = repo._get_config_path("user")
def prepare_git_auth(config_dir: str):
# create mockup repo
git_config = Path(config_dir).joinpath("git")
config_file = git_config.joinpath("config")
path: str = config_file.as_posix()
log.debug(f"CFG config path: {path}")
path = os.path.dirname(path)
log.debug(f"CFG parent path: {path}")
if not os.path.isdir(path):
if not git_config.is_dir():
log.debug(f"CFG Creating config dir")
os.mkdir(path)
git_config.mkdir()
cred_store: str = os.path.join(path, "git-credentials")
ssh_identity: str = os.path.join(config_dir, "ssh", "identity")
cred_store: str = os.path.join(config_dir, "auth", "git-credentials")
ssh_identity: str = os.path.join(config_dir, "auth", "ssh", "identity")
with repo.config_writer("user") as cfgw:
with GitConfigParser(path, read_only = False) as cfgw:
# github personal token
# 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
if token:
config_try_override(
cfgw,

View File

@@ -31,18 +31,9 @@ def detector_executor(commit: DetectedCommit):
subprocess.run(arg_list)
def main() -> int:
# parse input arguments
parser = argparse.ArgumentParser(description = "repo-cloner entering script")
parser.add_argument('--base-dir', help = 'path to directory containing whole cloner structure', required = True,
default = None, type = str)
parser.add_argument('--debug', '-d', help = "enable debug output", action = 'store_true')
args = parser.parse_args()
if args.debug:
log.setLevel(logging.DEBUG)
log.info(f"Started processing git group in folder: {args.base_dir}")
dirs = RepoDirStructure(args.base_dir)
def clone_or_fetch(base_dir: str, clone_init: bool = False, detector_init: bool = False):
log.info(f"Started processing git group in folder: {base_dir}")
dirs = RepoDirStructure(base_dir)
log.debug(f"Patching XDG_CONFIG_HOME to mock up git config")
os.environ['XDG_CONFIG_HOME'] = dirs.conf_dir
@@ -73,7 +64,17 @@ def main() -> int:
log.warning("Config directive cloner_project_name should not be omitted!")
cloner = Cloner(dirs)
prepare_git_auth(cloner.main_repo_path, dirs.conf_dir)
prepare_git_auth(dirs.conf_dir)
if clone_init:
log.info(f"Initial cloning of repositories")
if not cloner.clone():
return 1
if detector_init:
cloner.detector_init()
return 0
# regular run
if not cloner.sync():
log.warning(f"Repo sync did not succeed")
@@ -83,5 +84,18 @@ def main() -> int:
return 0
def main() -> int:
# parse input arguments
parser = argparse.ArgumentParser(description = "repo-cloner entering script")
parser.add_argument('--base-dir', help = 'path to directory containing whole cloner structure', required = True,
default = None, type = str)
parser.add_argument('--debug', '-d', help = "enable debug output", action = 'store_true')
args = parser.parse_args()
if args.debug:
log.setLevel(logging.DEBUG)
return clone_or_fetch(args.base_dir)
if __name__ == "__main__":
exit(main())