repo-cloner/repo_cloner/lib/repo_tool.py
Václav Valíček 6463a6bb95
Update, repo tool cloner: recursive clones
Signed-off-by: Václav Valíček <valicek1994@gmail.com>
2022-07-29 17:03:15 +02:00

293 lines
8.3 KiB
Python

import os.path
import git
from git import Repo
from git.exc import NoSuchPathError, InvalidGitRepositoryError
from git import RemoteProgress
import logging
import time
from typing import Union, Optional
from repo_cloner.lib.checksum import gen_repo_hashed_name
log = logging.getLogger("rc.repo")
class GitRemoteProgress(RemoteProgress):
OP_CODES = [
"BEGIN",
"CHECKING_OUT",
"COMPRESSING",
"COUNTING",
"END",
"FINDING_SOURCES",
"RECEIVING",
"RESOLVING",
"WRITING",
]
OP_CODE_MAP = {
getattr(RemoteProgress, _op_code): _op_code for _op_code in OP_CODES
}
last_step_time = time.time()
time_thr = 0.5
cur_task: str = ""
cur_task_max: int = 0
def __init__(self) -> None:
super().__init__()
self.last_step_time = time.time() - self.time_thr
self.cur_task_max = 0
self.cur_task = ""
def __del__(self) -> None:
self.finish()
@classmethod
def get_curr_op(cls, op_code: int) -> str:
"""Get OP name from OP code."""
# Remove BEGIN- and END-flag and get op name
op_code_masked = op_code & cls.OP_MASK
return cls.OP_CODE_MAP.get(op_code_masked, "?").title()
def finish(self):
log.info(f"GIT {self.cur_task}: 100.00% ({self.cur_task_max})")
def update(
self,
op_code: int,
cur_count: Union[float, str],
max_count: Union[float, str, None] = None,
message: Optional[str] = "",
) -> None:
# Do i need to update?
# -> begin : YES
# -> end : YES
# -> timer: YES
# so check timer
if (self.last_step_time + self.time_thr) > time.time():
# timer not passed yet repo.head.reset(commit = 'origin/master', index = True, working_tree = True)
if not ((op_code & self.BEGIN) or (op_code & self.BEGIN)):
# skip -> no begin or end
return
# update timer
self.last_step_time = time.time()
# Start new bar on each BEGIN-flag
if op_code & self.BEGIN:
self.cur_task = self.get_curr_op(op_code).upper()
try:
self.cur_task_max = int(max_count)
except ValueError:
self.cur_task_max = 100
log.info(f"GIT {self.cur_task} started")
percent = round(100 * (cur_count / self.cur_task_max), 2)
# End progress monitoring on each END-flag
if op_code & self.END:
# logger.info("Done: %s", self.curr_op)
percent = 100
log.info(f"GIT {self.cur_task}: {percent}% ({cur_count}; {message})")
class RepoTool:
_repo: Repo = None
_initialized: bool = False
_bare: bool = False
_path: str = ""
_last_fetch_data = []
_recursive_discovery_urls: set = set()
_recursive_discovery_cloned: set = set()
_submodule_discovery_history: list = []
def __init__(self, path: str):
log.info(f"Initializing repository at {path}")
self._path = str(path)
self._recursive_discovery_cloned = set()
self._recursive_discovery_urls = set()
self._submodule_discovery_history: list = []
try:
self._repo = Repo(path, expand_vars = False)
self._initialized = True
self._bare = self._repo.bare
except (NoSuchPathError, InvalidGitRepositoryError) as e:
log.warning(f"Init failed: {str(e)}, continuing with uninitialized repo")
self._initialized = False
self._bare = False
@property
def initialized(self) -> bool:
return self._initialized
@property
def bare(self) -> bool:
return self._bare
@property
def path(self) -> str:
return self._path
@property
def cloned_submodules_url_list(self) -> list:
return list(self._recursive_discovery_cloned)
@property
def discovered_submodules_commits(self) -> list:
return self._submodule_discovery_history
def __check_initialized(self):
def inner(*args):
fake_self: RepoTool = args[0]
if not fake_self._initialized:
log.critical(f"Repo {fake_self.path} is not initialized!")
return False
return self(*args)
return inner
def clone(self, url: str) -> bool:
if self._initialized:
log.warning(f"Trying to clone to initialized repository!")
return False
log.info(f"Cloning repository from url: {url}")
self._repo = Repo.clone_from(
url,
to_path = self._path,
progress = GitRemoteProgress(),
bare = True,
mirror = True
)
self._initialized = True
self._bare = self._repo.bare
return True
@__check_initialized
def fetch(self) -> bool:
log.info("Fetching repo state")
if not len(self._repo.remotes):
log.warning(f"Repo: {self._path} does not contain any remotes!")
return False
# fetch all remotes
remote = self._repo.remotes[0]
log.debug(f"Fetching remote: {remote.name} url: {next(remote.urls)}")
self._last_fetch_data = remote.fetch(
["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"],
progress = GitRemoteProgress(),
kill_after_timeout = 60,
prune = True
)
log.debug("Fetch finished!")
return True
@__check_initialized
def repo_fingerprint(self) -> Union[str, bool]:
log.debug("Getting repo fingerprint")
# reference count
ref_count = self._repo.git.rev_list(count = True, all = True)
tags = [f"{tag.name}/{tag.commit}" for tag in self._repo.tags]
branches = [f"{branch.name}/{branch.commit}" for branch in self._repo.branches]
log.debug(f"{ref_count} references, {len(tags)} tags, {len(branches)} branches")
cumulative = f"{ref_count} {'.'.join(tags)} {' '.join(branches)}".encode()
import hashlib
x = hashlib.sha256(cumulative).hexdigest()
log.debug(f"Repo fingerprint is {x}")
return x
@__check_initialized
def list_submodules(self, commit: str = "HEAD") -> Union[list, bool]:
commit = self._repo.commit(commit)
submodules = []
if '.gitmodules' in commit.tree:
git_modules: git.Blob = commit.tree['.gitmodules']
from io import BytesIO
x = BytesIO(git_modules.data_stream.read())
x.name = git_modules.name
parser = git.GitConfigParser(x, read_only = True)
for section in parser.sections():
if parser.has_option(section, "url"):
submodules.append(parser.get_value(section, "url"))
if commit.hexsha not in self._submodule_discovery_history:
self._submodule_discovery_history.append(commit.hexsha)
return submodules
@__check_initialized
def list_submodules_history(self, limit_of_commits: Optional[int] = None) -> Union[list, bool]:
log.info(f"Listing repository submodule history")
iterator = self._repo.iter_commits(all = True, max_count = limit_of_commits)
submodules = set()
counter: int = 0
last_status = time.time()
status_offset = 0.5
for commit in iterator:
counter += 1
commit_submodules = self.list_submodules(commit)
submodules.update(commit_submodules)
if time.time() >= last_status + status_offset:
log.info(f"Submodule discovery: {counter} commits finished, {len(submodules)} discovered")
last_status = time.time()
return list(submodules)
def clone_recursive(
self,
main_url: str,
scan_cache_dir: Optional[str] == None,
scan_depth: Optional[int] = None
) -> bool:
log.info(f"Started recursive clone of {main_url} with recursive discovery limited to {scan_depth} commits")
# clone main repo
if not self.clone(main_url):
log.critical(f"Clone of main repository failed!")
return False
# discover submodules for repository
submodules = self.list_submodules_history(scan_depth)
if submodules:
for submodule in submodules:
self._recursive_discovery_urls.add(submodule)
everything_succeed: bool = True
everything_cloned: bool = False
while not everything_cloned:
# recursively scan and clone repositories
everything_cloned = True
# for every url in list
# list() is needed - Runtime Error for set() changed during iteration
for url in list(self._recursive_discovery_urls):
if url not in self._recursive_discovery_cloned:
everything_cloned = False
# generate new path
directory = os.path.dirname(self.path)
submodule_cloner = RepoTool(os.path.join(directory, gen_repo_hashed_name(url)))
# clone
cloned: bool = submodule_cloner.clone(url)
# mark cloned even if failed afterwards - while loop stuck solution
self._recursive_discovery_cloned.add(url)
if not cloned:
log.critical(f"Clone of submodule: {url} failed")
everything_succeed = False
continue
# scan for submodules
submodules = submodule_cloner.list_submodules_history(scan_depth)
if type(submodules) == bool and not submodules:
log.critical(f"Submodule discovery for {url} failed!")
everything_succeed = False
continue
for submodule in submodules:
self._recursive_discovery_urls.add(submodule)
return everything_succeed