from repo_cloner.lib.cloner_config import ClonerConfig from repo_cloner.lib.repo_dir_structure import RepoDirStructure from repo_cloner.lib.dir_not_found_error import DirNotFoundError from repo_cloner.lib.repo_tool import RepoTool from repo_cloner.lib.checksum import gen_repo_hashed_name from pathlib import Path from typing import Optional from time import time import os import logging log = logging.getLogger("rc.cloner") class Cloner: _dirs: RepoDirStructure = None _config: ClonerConfig = None _interval_file: str = "last-check-time" __submodule_cache: str = None _repo: RepoTool = None _repo_url: str = "" def __init__(self, dir_structure: RepoDirStructure): self._dirs = dir_structure self._config = self._dirs.config if len(self._config.cloner_repo_url) == 0: logging.critical(f"Undefined repo cloner URL in config!") raise KeyError(f"cloner_repo_url not defined in config!") # create cache dir, if missing try: assert self._dirs.cache_dir_exists except DirNotFoundError: log.info(f"Cache dir for project {self._config.cloner_project_name} not found -> creating") Path(self._dirs.cache_dir).mkdir() log.debug(f"Cache dir created") # submodule cache self.__submodule_cache = os.path.join(self._dirs.cache_dir, "submodules") if not os.path.exists(self.__submodule_cache): log.info("Submodule cache dir does not exist! -> creating") Path(self.__submodule_cache).mkdir(parents = True) def check_interval(self): log.debug(f"Checking interval for {self._config.cloner_project_name}") # get interval interval = self._config.cloner_interval # interval file? interval_file: Path = Path(self._dirs.cache_dir).joinpath(self._interval_file) log.debug(f"Interval file: {interval_file}") file_stamp: int = 0 if interval_file.exists(): str_val = interval_file.read_text() try: file_stamp = int(str_val) except ValueError: log.warning(f"Interval file file is corrupted, keeping value as nothing happened") # check time if time() > file_stamp + interval * 60: return True return False def open(self, url: str) -> bool: log.debug(f"Opening repo with url: {url}") repo_path = self._repo_path_by_url(url) self._repo_url = url self._repo = RepoTool(repo_path) return self.__opened @property def __opened(self) -> bool: if not self._repo: return False return self._repo.initialized def _repo_path_by_url(self, url: str) -> str: hashed_name: str = gen_repo_hashed_name(url) log.debug(f"Repo hashed name for {url} is {hashed_name}") return os.path.join(self._dirs.repos_dir, hashed_name) @property def __main_repo_path(self) -> str: return self._repo_path_by_url(self._config.cloner_repo_url) def sync(self) -> bool: if not self.__opened: self._repo = RepoTool(self.__main_repo_path) if not self._repo.initialized: return False return self._repo.fetch() def perform_check(self): log.info(f"Started check for {self._config.cloner_project_name}, url: {self._config.cloner_repo_url}") if self.check_interval(): self.sync() log.info(f"Check finished") def clone(self, url: Optional[str] = None) -> bool: # optional parameters - othervise use config if not url: url = self._config.cloner_repo_url # generate path path = self._repo_path_by_url(url) self._repo_url = url self._repo = RepoTool(path) # uninitialized repo if self._repo.initialized: log.critical(f"Repo path {path} is initialized... Refusing clone!") return False # recursive or standard? if not self._config.cloner_submodules: return self._repo.clone(url) else: scan_depth_limit = self._config.cloner_submodule_depth # handle dept limit for submodule discovery if scan_depth_limit == 0: scan_depth_limit = None # another levels are handled internally as non-recursive clones and discovers by repo-tool return self._repo.clone_recursive(url, self.__submodule_cache, scan_depth = scan_depth_limit)