from repo_cloner.lib.cloner_config import ClonerConfig from repo_cloner.lib.repo_dir_structure import RepoDirStructure from repo_cloner.lib.dir_not_found_error import DirNotFoundError from repo_cloner.lib.repo_tool import RepoTool from repo_cloner.lib.checksum import gen_repo_hashed_name from pathlib import Path from time import time import os import logging log = logging.getLogger("rc.cloner") class Cloner: _dirs: RepoDirStructure = None _config: ClonerConfig = None _interval_file: str = "last-check-time" _repo: RepoTool = None _repo_url: str = "" def __init__(self, dir_structure: RepoDirStructure): self._dirs = dir_structure self._config = self._dirs.config if len(self._config.cloner_repo_url) == 0: logging.critical(f"Undefined repo cloner URL in config!") raise KeyError(f"cloner_repo_url not defined in config!") # create cache dir, if missing try: assert self._dirs.cache_dir_exists except DirNotFoundError: log.info(f"Cache dir for project {self._config.cloner_project_name} not found -> creating") Path(self._dirs.cache_dir).mkdir() log.debug(f"Cache dir created") def check_interval(self): log.debug(f"Checking interval for {self._config.cloner_project_name}") # get interval interval = self._config.cloner_interval # interval file? interval_file: Path = Path(self._dirs.cache_dir).joinpath(self._interval_file) log.debug(f"Interval file: {interval_file}") file_stamp: int = 0 if interval_file.exists(): str_val = interval_file.read_text() try: file_stamp = int(str_val) except ValueError: log.warning(f"Interval file file is corrupted, keeping value as nothing happened") # check time if time() > file_stamp + interval * 60: return True return False def open(self, url: str) -> bool: log.debug(f"Opening repo with url: {url}") repo_path = self._repo_path_by_url(url) self._repo_url = url self._repo = RepoTool(repo_path) return self.__opened @property def __opened(self) -> bool: if not self._repo: return False return self._repo.initialized def _repo_path_by_url(self, url: str) -> str: hashed_name: str = gen_repo_hashed_name(url) log.debug(f"Repo hashed name for {url} is {hashed_name}") return os.path.join(self._dirs.repos_dir, hashed_name) @property def __main_repo_path(self) -> str: return self._repo_path_by_url(self._config.cloner_repo_url) def sync(self) -> bool: if not self.__opened: self._repo = RepoTool(self.__main_repo_path) if not self._repo.initialized: return False return self._repo.fetch() def perform_check(self): log.info(f"Started check for {self._config.cloner_project_name}, url: {self._config.cloner_repo_url}") if self.check_interval(): self.sync() log.info(f"Check finished") def clone_from_url(self, url: str) -> bool: path = self._repo_path_by_url(url) self._repo_url = url self._repo = RepoTool(path) if self._repo.initialized: log.critical(f"Repo path {path} is initialized... Refusing clone!") return False return self._repo.clone(url)