Semi-functional prepare-git-tree + process_repository_dir uses which now

Signed-off-by: Václav Valíček <valicek1994@gmail.com>
This commit is contained in:
Václav Valíček 2022-08-08 11:54:57 +02:00
parent 94275014c5
commit 188543a624
Signed by: valicek
GPG Key ID: FF05BDCA0C73BB31
2 changed files with 111 additions and 9 deletions

View File

@ -1,13 +1,98 @@
#!/usr/bin/python3 #!/usr/bin/python3
import sys import logging
from repo_cloner.lib.logger_setup import log
import argparse
from pathlib import Path
from typing import Optional
import git
from git import Repo
from git.exc import BadName, CheckoutError
from repo_cloner.lib import ClonerConfigParser, gen_repo_hashed_name
from repo_cloner.lib.repo_tool import GitRemoteProgress
def clone_checkout(
source: Path, target: Path,
reference: Optional[str] = None,
recursive: bool = False, recursion_counter: int = 0):
# solve recursion
log.info(f"Clone recursion level is {recursion_counter}")
if recursion_counter > 10:
log.critical(f"Recursion limit reached! breaking")
return False
repo = Repo().clone_from(source.as_posix(), to_path = target.as_posix(), progress = GitRemoteProgress(), bare = False)
if reference:
log.debug(f"Finding reference: {reference}")
try:
ref = repo.commit(reference)
except BadName as e:
log.critical(f"Reference not found: {e.__str__()}")
log.warning(f"Continuing with HEAD")
ref = repo.commit("HEAD")
log.info(f"Checking out commit {ref.hexsha} as branch *test-head*, this may take some time.")
try:
repo.git.checkout(ref, '-b', 'test-head', force = True)
log.info(f"Checkout succeed")
except CheckoutError as e:
log.critical(f"Checkout raised error: {e.__str__()}")
log.warning(f"Continuing with partially checked out shit...")
if recursive:
log.debug(f"Recursive checkout is enabled, continuing...")
log.debug(f"Listing submodules (if any)")
for submodule in repo.submodules:
new_url = source.parent.joinpath(gen_repo_hashed_name(submodule.url))
new_path = target.joinpath(submodule.path)
checkout_submodule: Optional[str] = None
try:
module = repo.tree().join(submodule.path)
checkout_submodule = module.hexsha
except KeyError as e:
log.warning(f"Failed to find commit id for submodule {submodule.path}")
log.warning(f"Continuing with HEAD for submodule")
log.info(f"Submodule {submodule.path} with replacement path {new_url} found")
clone_checkout(new_url, new_path, checkout_submodule, recursive, recursion_counter + 1)
def prepare_tree(source: Path, target: Path, checkout: Optional[str] = None) -> int:
# check paths
if not source.is_dir():
log.critical(f"Source dir: {source.as_posix()} is not dir!")
return 1
if target.exists():
log.critical(f"Target dir: {target.as_posix()} exists!")
return 1
# config
log.debug(f"Parsing config file")
config_file = source.joinpath("config/cloner.cfg")
config = ClonerConfigParser(config_file.as_posix())
config = config.config
# main repo
main_repo = source.joinpath("repos", gen_repo_hashed_name(config.cloner_repo_url))
log.debug(f"Opening main repo from path: {main_repo.as_posix()}")
x = clone_checkout(main_repo, target, checkout, config.cloner_submodules)
def main() -> int: def main() -> int:
print("IT WORKS!") # parse input arguments
parser = argparse.ArgumentParser(description = "clone repository from repo-cloner mirror")
return 0 parser.add_argument("source", help = "path to directory containing whole cloner structure")
parser.add_argument("target", help = "path where to checkout")
parser.add_argument("--checkout", help = "checkout this reference, otherwise default HEAD will be left")
parser.add_argument('--debug', '-d', help = "enable debug output", action = 'store_true')
parser.add_argument('--colored', help = "enable colored log output even tty() is not detected")
args = parser.parse_args()
if args.debug:
log.info(f"Setting loglevel to debug")
log.setLevel(logging.DEBUG)
log.debug(f"Source directory: {args.source}")
log.debug(f"Target directory: {args.target}")
log.debug(f"Checkout reference: {args.checkout}")
return prepare_tree(Path(args.source), Path(args.target), args.checkout)

View File

@ -8,6 +8,20 @@ import base64
from repo_cloner.lib.repo_dir_structure import RepoDirStructure from repo_cloner.lib.repo_dir_structure import RepoDirStructure
from repo_cloner.lib import Cloner, DetectedCommit, prepare_git_auth, init_gh_token from repo_cloner.lib import Cloner, DetectedCommit, prepare_git_auth, init_gh_token
laminar_binary: str = ""
def determine_binary() -> str:
global laminar_binary
log.debug(f"Looking for laminar binary")
rc = subprocess.run(["/usr/bin/which", "laminarc", "echo"], stdout = subprocess.PIPE, stderr = subprocess.DEVNULL)
binary = rc.stdout.decode().splitlines()[0].strip()
if rc.returncode == 0:
log.debug(f"Laminar binary found in {binary}")
else:
log.warning(f"laminarc binary was not found - using {binary} as replacement for debugging")
return binary
def detector_executor(commit: DetectedCommit): def detector_executor(commit: DetectedCommit):
message = base64.b64encode(commit.log.encode()).decode() message = base64.b64encode(commit.log.encode()).decode()
@ -23,7 +37,7 @@ def detector_executor(commit: DetectedCommit):
"log": message, "log": message,
} }
arg_list = ["/bin/echo", "laminarc", "queue", commit.project] arg_list = [laminar_binary, "queue", commit.project]
for key, val in env.items(): for key, val in env.items():
arg_list.append(f"COMMIT_{key.upper()}={val}") arg_list.append(f"COMMIT_{key.upper()}={val}")
@ -32,6 +46,7 @@ def detector_executor(commit: DetectedCommit):
def clone_or_fetch(base_dir: str, clone_init: bool = False, detector_init: bool = False): def clone_or_fetch(base_dir: str, clone_init: bool = False, detector_init: bool = False):
global laminar_binary
log.info(f"Started processing git group in folder: {base_dir}") log.info(f"Started processing git group in folder: {base_dir}")
dirs = RepoDirStructure(base_dir) dirs = RepoDirStructure(base_dir)
log.debug(f"Patching XDG_CONFIG_HOME to mock up git config") log.debug(f"Patching XDG_CONFIG_HOME to mock up git config")
@ -78,6 +93,8 @@ def clone_or_fetch(base_dir: str, clone_init: bool = False, detector_init: bool
if not cloner.sync(): if not cloner.sync():
log.warning(f"Repo sync did not succeed") log.warning(f"Repo sync did not succeed")
laminar_binary = determine_binary()
if cloner.detector_enabled: if cloner.detector_enabled:
cloner.detector_run(detector_executor) cloner.detector_run(detector_executor)