import os.path import git from git import Repo from git.exc import NoSuchPathError, InvalidGitRepositoryError, GitCommandError from git import RemoteProgress import logging import time from typing import Union, Optional from repo_cloner.lib.checksum import gen_repo_hashed_name from io import BytesIO log = logging.getLogger("rc.repo") class GitRemoteProgress(RemoteProgress): OP_CODES = [ "BEGIN", "CHECKING_OUT", "COMPRESSING", "COUNTING", "END", "FINDING_SOURCES", "RECEIVING", "RESOLVING", "WRITING", ] OP_CODE_MAP = { getattr(RemoteProgress, _op_code): _op_code for _op_code in OP_CODES } last_step_time = time.time() time_thr = 0.5 cur_task: str = "" cur_task_max: int = 0 def __init__(self) -> None: super().__init__() self.last_step_time = time.time() - self.time_thr self.cur_task_max = 0 self.cur_task = "" def __del__(self) -> None: self.finish() @classmethod def get_curr_op(cls, op_code: int) -> str: """Get OP name from OP code.""" # Remove BEGIN- and END-flag and get op name op_code_masked = op_code & cls.OP_MASK return cls.OP_CODE_MAP.get(op_code_masked, "?").title() def finish(self): log.info(f"GIT {self.cur_task}: 100.00% ({self.cur_task_max})") def update( self, op_code: int, cur_count: Union[float, str], max_count: Union[float, str, None] = None, message: Optional[str] = "", ) -> None: # Do i need to update? # -> begin : YES # -> end : YES # -> timer: YES # so check timer if (self.last_step_time + self.time_thr) > time.time(): # timer not passed yet repo.head.reset(commit = 'origin/master', index = True, working_tree = True) if not ((op_code & self.BEGIN) or (op_code & self.BEGIN)): # skip -> no begin or end return # update timer self.last_step_time = time.time() # Start new bar on each BEGIN-flag if op_code & self.BEGIN: self.cur_task = self.get_curr_op(op_code).upper() try: self.cur_task_max = int(max_count) except ValueError: self.cur_task_max = 100 log.info(f"GIT {self.cur_task} started") percent = round(100 * (cur_count / self.cur_task_max), 2) # End progress monitoring on each END-flag if op_code & self.END: # logger.info("Done: %s", self.curr_op) percent = 100 log.info(f"GIT {self.cur_task}: {percent}% ({cur_count}; {message})") class RepoTool: _repo: Repo = None _initialized: bool = False _bare: bool = False _path: str = "" _last_fetch_data = [] _recursive_discovery_urls: set = set() _recursive_discovery_cloned: set = set() _submodule_discovery_history: list = [] def __init__(self, path: str): log.info(f"Initializing repository at {path}") self._path = str(path) self._recursive_discovery_cloned = set() self._recursive_discovery_urls = set() self._submodule_discovery_history: list = [] try: self._repo = Repo(path, expand_vars = False) self._initialized = True self._bare = self._repo.bare except (NoSuchPathError, InvalidGitRepositoryError) as e: log.warning(f"Init failed: {str(e)}, continuing with uninitialized repo") self._initialized = False self._bare = False @property def initialized(self) -> bool: return self._initialized @property def bare(self) -> bool: return self._bare @property def path(self) -> str: return self._path @property def cloned_submodules_url_list(self) -> list: return list(self._recursive_discovery_cloned) @property def discovered_submodules_commits(self) -> list: return self._submodule_discovery_history def _persist_submodule_commits(self, path: str): with open(path, "w") as f: for commit in self.discovered_submodules_commits: line = f"{commit}\n" f.write(line) def __check_initialized(self): def inner(*args): fake_self: RepoTool = args[0] if not fake_self._initialized: log.critical(f"Repo {fake_self.path} is not initialized!") return False return self(*args) return inner def clone(self, url: str) -> bool: if self._initialized: log.warning(f"Trying to clone to initialized repository!") return False log.info(f"Cloning repository from url: {url}") try: self._repo = Repo.clone_from( url, to_path = self._path, progress = GitRemoteProgress(), bare = True, mirror = True ) except GitCommandError as e: log.critical(f"Clone of {url} failed!") log.critical(f"Exception: {e}") self._initialized = False return False self._initialized = True self._bare = self._repo.bare return True @__check_initialized def fetch(self) -> bool: log.info("Fetching repo :)") if not len(self._repo.remotes): log.warning(f"Repo: {self._path} does not contain any remotes!") return False # fetch all remotes remote = self._repo.remotes[0] log.debug(f"Fetching remote: {remote.name} url: {next(remote.urls)}") self._last_fetch_data = remote.fetch( ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"], progress = GitRemoteProgress(), kill_after_timeout = 60, prune = True ) log.debug("Fetch finished!") return True @__check_initialized def repo_fingerprint(self) -> Union[str, bool]: log.debug("Getting repo fingerprint") # reference count ref_count = self._repo.git.rev_list(count = True, all = True) tags = [f"{tag.name}/{tag.commit}" for tag in self._repo.tags] branches = [f"{branch.name}/{branch.commit}" for branch in self._repo.branches] log.debug(f"{ref_count} references, {len(tags)} tags, {len(branches)} branches") cumulative = f"{ref_count} {'.'.join(tags)} {' '.join(branches)}".encode() import hashlib x = hashlib.sha256(cumulative).hexdigest() log.debug(f"Repo fingerprint is {x}") return x @__check_initialized def list_commits(self, max_depth: Optional[int] = None): return self._repo.iter_commits(all = True, max_count = max_depth, reverse = True) @__check_initialized def list_branches(self) -> Union[dict, bool]: log.debug(f"Listing branches of repo") branches = {} for branch in self._repo.branches: branches[branch.name] = branch.commit.hexsha log.debug(f"Found {len(branches)}") return branches @__check_initialized def list_tags(self): log.debug(f"Listing tags of repo") tags = {} for tag in self._repo.tags: tags[tag.name] = tag.commit.hexsha log.debug(f"Found {len(tags)} tags") return tags @__check_initialized def list_submodules(self, commit: str = "HEAD") -> Union[list, bool]: commit = self._repo.commit(commit) submodules = [] if '.gitmodules' in commit.tree: git_modules: git.Blob = commit.tree['.gitmodules'] x = BytesIO(git_modules.data_stream.read()) x.name = git_modules.name try: parser = git.GitConfigParser(x, read_only = True) for section in parser.sections(): if parser.has_option(section, "url"): submodules.append(parser.get_value(section, "url")) except BaseException: return False if commit.hexsha not in self._submodule_discovery_history: self._submodule_discovery_history.append(commit.hexsha) return submodules @__check_initialized def list_submodules_history(self, limit_of_commits: Optional[int] = None) -> Union[list, bool]: log.info(f"Listing repository submodule history") iterator = self.list_commits(limit_of_commits) submodules = set() counter: int = 0 last_status = time.time() status_offset = 0.5 for commit in iterator: counter += 1 commit_submodules = self.list_submodules(commit) if not type(commit_submodules) == bool: submodules.update(commit_submodules) if time.time() >= last_status + status_offset: log.info(f"Submodule discovery: {counter} commits finished, {len(submodules)} discovered") last_status = time.time() return list(submodules) def clone_recursive( self, main_url: str, scan_cache_dir: Optional[str] = None, scan_depth: Optional[int] = None ) -> bool: log.info(f"Started recursive clone of {main_url} with recursive discovery limited to {scan_depth} commits") # clone main repo if not self.clone(main_url): log.critical(f"Clone of main repository failed!") return False # discover submodules for repository submodules = self.list_submodules_history(scan_depth) if submodules: for submodule in submodules: self._recursive_discovery_urls.add(submodule) everything_succeed: bool = True everything_cloned: bool = False while not everything_cloned: # recursively scan and clone repositories everything_cloned = True # for every url in list # list() is needed - Runtime Error for set() changed during iteration for url in list(self._recursive_discovery_urls): if url not in self._recursive_discovery_cloned: everything_cloned = False # generate new path directory = os.path.dirname(self.path) submodule_cloner = RepoTool(os.path.join(directory, gen_repo_hashed_name(url))) # clone cloned: bool = submodule_cloner.clone(url) # mark cloned even if failed afterwards - while loop stuck solution self._recursive_discovery_cloned.add(url) if not cloned: log.critical(f"Clone of submodule: {url} failed") everything_succeed = False continue # scan for submodules submodules = submodule_cloner.list_submodules_history(scan_depth) if type(submodules) == bool and not submodules: log.critical(f"Submodule discovery for {url} failed!") everything_succeed = False continue # persistor if scan_cache_dir: cache_file = os.path.join(scan_cache_dir, gen_repo_hashed_name(url)) log.debug( f"Saving {len(submodule_cloner.discovered_submodules_commits)} commits into {cache_file}") submodule_cloner._persist_submodule_commits(cache_file) for submodule in submodules: self._recursive_discovery_urls.add(submodule) if scan_cache_dir: # persist main repo commits cache_file = os.path.basename(self.path) cache_file = os.path.join(scan_cache_dir, cache_file) self._persist_submodule_commits(cache_file) # persist discovered submodule urls with open(os.path.join(scan_cache_dir, "submodules.cache"), "w") as f: f.write("\n".join(self.cloned_submodules_url_list)) return everything_succeed