#!/usr/bin/python3 import logging from repo_cloner.lib.logger_setup import log import argparse from pathlib import Path from typing import Optional import git from git import Repo from git.exc import BadName, CheckoutError from repo_cloner.lib import ClonerConfigParser, gen_repo_hashed_name from repo_cloner.lib.repo_tool import GitRemoteProgress def clone_checkout( source: Path, target: Path, reference: Optional[str] = None, recursive: bool = False, recursion_counter: int = 0): # solve recursion log.info(f"Clone recursion level is {recursion_counter}") if recursion_counter > 10: log.critical(f"Recursion limit reached! breaking") return False repo = Repo.clone_from(source.as_posix(), to_path = target.as_posix(), progress = GitRemoteProgress(), bare = False) if reference: log.debug(f"Finding reference: {reference}") try: ref = repo.commit(reference) except BadName as e: log.critical(f"Reference not found: {e.__str__()}") log.warning(f"Continuing with HEAD") ref = repo.commit("HEAD") log.info(f"Checking out commit {ref.hexsha} as branch *test-head*, this may take some time.") try: repo.git.checkout(ref, '-b', 'test-head', force = True) log.info(f"Checkout succeed") except CheckoutError as e: log.critical(f"Checkout raised error: {e.__str__()}") log.warning(f"Continuing with partially checked out shit...") if recursive: log.debug(f"Recursive checkout is enabled, continuing...") log.debug(f"Listing submodules (if any)") for submodule in repo.submodules: new_url = source.parent.joinpath(gen_repo_hashed_name(submodule.url)) new_path = target.joinpath(submodule.path) checkout_submodule: Optional[str] = None try: module = repo.tree().join(submodule.path) checkout_submodule = module.hexsha except KeyError as e: log.warning(f"Failed to find commit id for submodule {submodule.path}") log.warning(f"Continuing with HEAD for submodule") log.info(f"Submodule {submodule.path} with replacement path {new_url} found") clone_checkout(new_url, new_path, checkout_submodule, recursive, recursion_counter + 1) def prepare_tree(source: Path, target: Path, checkout: Optional[str] = None) -> int: # check paths if not source.is_dir(): log.critical(f"Source dir: {source.as_posix()} is not dir!") return 1 if target.exists(): log.critical(f"Target dir: {target.as_posix()} exists!") return 1 # config log.debug(f"Parsing config file") config_file = source.joinpath("config/cloner.cfg") config = ClonerConfigParser(config_file.as_posix()) config = config.config # main repo main_repo = source.joinpath("repos", gen_repo_hashed_name(config.cloner_repo_url)) log.debug(f"Opening main repo from path: {main_repo.as_posix()}") x = clone_checkout(main_repo, target, checkout, config.cloner_submodules) def main() -> int: # parse input arguments parser = argparse.ArgumentParser(description = "clone repository from repo-cloner mirror") parser.add_argument("source", help = "path to directory containing whole cloner structure") parser.add_argument("target", help = "path where to checkout") parser.add_argument("--checkout", help = "checkout this reference, otherwise default HEAD will be left") parser.add_argument('--debug', '-d', help = "enable debug output", action = 'store_true') parser.add_argument('--colored', help = "enable colored log output even tty() is not detected") args = parser.parse_args() if args.debug: log.info(f"Setting loglevel to debug") log.setLevel(logging.DEBUG) log.debug(f"Source directory: {args.source}") log.debug(f"Target directory: {args.target}") log.debug(f"Checkout reference: {args.checkout}") return prepare_tree(Path(args.source), Path(args.target), args.checkout)