from git import Repo from git.exc import NoSuchPathError, InvalidGitRepositoryError from git import RemoteProgress import logging import time from typing import Union, Optional log = logging.getLogger("rc.repo") class GitRemoteProgress(RemoteProgress): OP_CODES = [ "BEGIN", "CHECKING_OUT", "COMPRESSING", "COUNTING", "END", "FINDING_SOURCES", "RECEIVING", "RESOLVING", "WRITING", ] OP_CODE_MAP = { getattr(RemoteProgress, _op_code): _op_code for _op_code in OP_CODES } last_step_time = time.time() time_thr = 0.5 cur_task: str = "" cur_task_max: int = 0 def __init__(self) -> None: super().__init__() self.last_step_time = time.time() - self.time_thr self.cur_task_max = 0 self.cur_task = "" def __del__(self) -> None: self.finish() @classmethod def get_curr_op(cls, op_code: int) -> str: """Get OP name from OP code.""" # Remove BEGIN- and END-flag and get op name op_code_masked = op_code & cls.OP_MASK return cls.OP_CODE_MAP.get(op_code_masked, "?").title() def finish(self): log.info(f"GIT {self.cur_task}: 100.00% ({self.cur_task_max})") def update( self, op_code: int, cur_count: Union[float, str], max_count: Union[float, str, None] = None, message: Optional[str] = "", ) -> None: # Do i need to update? # -> begin : YES # -> end : YES # -> timer: YES # so check timer if (self.last_step_time + self.time_thr) > time.time(): # timer not passed yet if not ((op_code & self.BEGIN) or (op_code & self.BEGIN)): # skip -> no begin or end return # update timer self.last_step_time = time.time() # Start new bar on each BEGIN-flag if op_code & self.BEGIN: self.cur_task = self.get_curr_op(op_code).upper() try: self.cur_task_max = int(max_count) except ValueError: self.cur_task_max = 100 log.info(f"GIT {self.cur_task} started") percent = round(100 * (cur_count / self.cur_task_max), 2) # End progress monitoring on each END-flag if op_code & self.END: # logger.info("Done: %s", self.curr_op) percent = 100 log.info(f"GIT {self.cur_task}: {percent}% ({cur_count}; {message})") class RepoTool: _repo: Repo = None _initialized: bool = False _bare: bool = False _path: str = "" _last_fetch_data = [] def __init__(self, path: str): log.info(f"Initializing repository at {path}") self._path = str(path) try: self._repo = Repo(path, expand_vars = False) self._initialized = True self._bare = self._repo.bare except (NoSuchPathError, InvalidGitRepositoryError) as e: log.warning(f"Init failed: {str(e)}, continuing with uninitialized repo") self._initialized = False self._bare = False @property def initialized(self) -> bool: return self._initialized @property def bare(self) -> bool: return self._bare @property def path(self) -> str: return self._path def __check_initialized(self): def inner(*args): fake_self: RepoTool = args[0] if not fake_self._initialized: log.critical(f"Repo {fake_self.path} is not initialized!") return False return self(*args) return inner def clone(self, url: str) -> bool: if self._initialized: log.warning(f"Trying to clone to initialized repository!") return False log.info(f"Cloning repository from url: {url}") self._repo = Repo.clone_from( url, to_path = self._path, progress = GitRemoteProgress(), bare = True, mirror = True ) self._initialized = True self._bare = self._repo.bare return True @__check_initialized def fetch(self) -> bool: log.info("Fetching repo state") if not len(self._repo.remotes): log.warning(f"Repo: {self._path} does not contain any remotes!") return False # fetch all remotes remote = self._repo.remotes[0] log.debug(f"Fetching remote: {remote.name} url: {next(remote.urls)}") self._last_fetch_data = remote.fetch( ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"], progress = GitRemoteProgress(), kill_after_timeout = 60, prune = True ) log.debug("Fetch finished!") return True @__check_initialized def repo_fingerprint(self) -> Union[str, bool]: log.debug("Getting repo fingerprint") # reference count ref_count = self._repo.git.rev_list(count = True, all = True) tags = [f"{tag.name}/{tag.commit}" for tag in self._repo.tags] branches = [f"{branch.name}/{branch.commit}" for branch in self._repo.branches] log.debug(f"{ref_count} references, {len(tags)} tags, {len(branches)} branches") cumulative = f"{ref_count} {'.'.join(tags)} {' '.join(branches)}".encode() import hashlib x = hashlib.sha256(cumulative).hexdigest() log.debug(f"Repo fingerprint is {x}") return x