98 lines
3.0 KiB
Python
98 lines
3.0 KiB
Python
|
from repo_cloner.lib.cloner_config import ClonerConfig
|
||
|
from repo_cloner.lib.repo_dir_structure import RepoDirStructure
|
||
|
from repo_cloner.lib.dir_not_found_error import DirNotFoundError
|
||
|
from repo_cloner.lib.repo_tool import RepoTool
|
||
|
from repo_cloner.lib.checksum import gen_repo_hashed_name
|
||
|
from pathlib import Path
|
||
|
from time import time
|
||
|
import os
|
||
|
import logging
|
||
|
|
||
|
log = logging.getLogger("rc.cloner")
|
||
|
|
||
|
|
||
|
class Cloner:
|
||
|
_dirs: RepoDirStructure = None
|
||
|
_config: ClonerConfig = None
|
||
|
_interval_file: str = "last-check-time"
|
||
|
_repo: RepoTool = None
|
||
|
_repo_url: str = ""
|
||
|
|
||
|
def __init__(self, dir_structure: RepoDirStructure):
|
||
|
self._dirs = dir_structure
|
||
|
self._config = self._dirs.config
|
||
|
if len(self._config.cloner_repo_url) == 0:
|
||
|
logging.critical(f"Undefined repo cloner URL in config!")
|
||
|
raise KeyError(f"cloner_repo_url not defined in config!")
|
||
|
|
||
|
# create cache dir, if missing
|
||
|
try:
|
||
|
assert self._dirs.cache_dir_exists
|
||
|
except DirNotFoundError:
|
||
|
log.info(f"Cache dir for project {self._config.cloner_project_name} not found -> creating")
|
||
|
Path(self._dirs.cache_dir).mkdir()
|
||
|
log.debug(f"Cache dir created")
|
||
|
|
||
|
def check_interval(self):
|
||
|
log.debug(f"Checking interval for {self._config.cloner_project_name}")
|
||
|
# get interval
|
||
|
interval = self._config.cloner_interval
|
||
|
# interval file?
|
||
|
interval_file: Path = Path(self._dirs.cache_dir).joinpath(self._interval_file)
|
||
|
log.debug(f"Interval file: {interval_file}")
|
||
|
file_stamp: int = 0
|
||
|
if interval_file.exists():
|
||
|
str_val = interval_file.read_text()
|
||
|
try:
|
||
|
file_stamp = int(str_val)
|
||
|
except ValueError:
|
||
|
log.warning(f"Interval file file is corrupted, keeping value as nothing happened")
|
||
|
# check time
|
||
|
if time() > file_stamp + interval * 60:
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
def open(self, url: str) -> bool:
|
||
|
log.debug(f"Opening repo with url: {url}")
|
||
|
repo_path = self._repo_path_by_url(url)
|
||
|
self._repo_url = url
|
||
|
self._repo = RepoTool(repo_path)
|
||
|
return self.__opened
|
||
|
|
||
|
@property
|
||
|
def __opened(self) -> bool:
|
||
|
if not self._repo:
|
||
|
return False
|
||
|
return self._repo.initialized
|
||
|
|
||
|
def _repo_path_by_url(self, url: str) -> str:
|
||
|
hashed_name: str = gen_repo_hashed_name(url)
|
||
|
log.debug(f"Repo hashed name for {url} is {hashed_name}")
|
||
|
return os.path.join(self._dirs.repos_dir, hashed_name)
|
||
|
|
||
|
@property
|
||
|
def __main_repo_path(self) -> str:
|
||
|
return self._repo_path_by_url(self._config.cloner_repo_url)
|
||
|
|
||
|
def sync(self) -> bool:
|
||
|
if not self.__opened:
|
||
|
self._repo = RepoTool(self.__main_repo_path)
|
||
|
if not self._repo.initialized:
|
||
|
return False
|
||
|
return self._repo.fetch()
|
||
|
|
||
|
def perform_check(self):
|
||
|
log.info(f"Started check for {self._config.cloner_project_name}, url: {self._config.cloner_repo_url}")
|
||
|
if self.check_interval():
|
||
|
self.sync()
|
||
|
log.info(f"Check finished")
|
||
|
|
||
|
def clone_from_url(self, url: str) -> bool:
|
||
|
path = self._repo_path_by_url(url)
|
||
|
self._repo_url = url
|
||
|
self._repo = RepoTool(path)
|
||
|
if self._repo.initialized:
|
||
|
log.critical(f"Repo path {path} is initialized... Refusing clone!")
|
||
|
return False
|
||
|
return self._repo.clone(url)
|