327 lines
9.4 KiB
Python
327 lines
9.4 KiB
Python
import os.path
|
|
|
|
import git
|
|
from git import Repo
|
|
from git.exc import NoSuchPathError, InvalidGitRepositoryError, GitCommandError
|
|
from git import RemoteProgress
|
|
import logging
|
|
import time
|
|
from typing import Union, Optional
|
|
from repo_cloner.lib.checksum import gen_repo_hashed_name
|
|
from io import BytesIO
|
|
|
|
log = logging.getLogger("rc.repo")
|
|
|
|
|
|
class GitRemoteProgress(RemoteProgress):
|
|
OP_CODES = [
|
|
"BEGIN",
|
|
"CHECKING_OUT",
|
|
"COMPRESSING",
|
|
"COUNTING",
|
|
"END",
|
|
"FINDING_SOURCES",
|
|
"RECEIVING",
|
|
"RESOLVING",
|
|
"WRITING",
|
|
]
|
|
OP_CODE_MAP = {
|
|
getattr(RemoteProgress, _op_code): _op_code for _op_code in OP_CODES
|
|
}
|
|
|
|
last_step_time = time.time()
|
|
time_thr = 0.5
|
|
|
|
cur_task: str = ""
|
|
cur_task_max: int = 0
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.last_step_time = time.time() - self.time_thr
|
|
self.cur_task_max = 0
|
|
self.cur_task = ""
|
|
|
|
def __del__(self) -> None:
|
|
self.finish()
|
|
|
|
@classmethod
|
|
def get_curr_op(cls, op_code: int) -> str:
|
|
"""Get OP name from OP code."""
|
|
# Remove BEGIN- and END-flag and get op name
|
|
op_code_masked = op_code & cls.OP_MASK
|
|
return cls.OP_CODE_MAP.get(op_code_masked, "?").title()
|
|
|
|
def finish(self):
|
|
log.info(f"GIT {self.cur_task}: 100.00% ({self.cur_task_max})")
|
|
|
|
def update(
|
|
self,
|
|
op_code: int,
|
|
cur_count: Union[float, str],
|
|
max_count: Union[float, str, None] = None,
|
|
message: Optional[str] = "",
|
|
) -> None:
|
|
# Do i need to update?
|
|
# -> begin : YES
|
|
# -> end : YES
|
|
# -> timer: YES
|
|
|
|
# so check timer
|
|
if (self.last_step_time + self.time_thr) > time.time():
|
|
# timer not passed yet repo.head.reset(commit = 'origin/master', index = True, working_tree = True)
|
|
if not ((op_code & self.BEGIN) or (op_code & self.BEGIN)):
|
|
# skip -> no begin or end
|
|
return
|
|
# update timer
|
|
self.last_step_time = time.time()
|
|
|
|
# Start new bar on each BEGIN-flag
|
|
if op_code & self.BEGIN:
|
|
self.cur_task = self.get_curr_op(op_code).upper()
|
|
try:
|
|
self.cur_task_max = int(max_count)
|
|
except ValueError:
|
|
self.cur_task_max = 100
|
|
|
|
log.info(f"GIT {self.cur_task} started")
|
|
|
|
percent = round(100 * (cur_count / self.cur_task_max), 2)
|
|
|
|
# End progress monitoring on each END-flag
|
|
if op_code & self.END:
|
|
# logger.info("Done: %s", self.curr_op)
|
|
percent = 100
|
|
|
|
log.info(f"GIT {self.cur_task}: {percent}% ({cur_count}; {message})")
|
|
|
|
|
|
class RepoTool:
|
|
_repo: Repo = None
|
|
_initialized: bool = False
|
|
_bare: bool = False
|
|
_path: str = ""
|
|
_last_fetch_data = []
|
|
_recursive_discovery_urls: set = set()
|
|
_recursive_discovery_cloned: set = set()
|
|
_submodule_discovery_history: list = []
|
|
|
|
def __init__(self, path: str):
|
|
log.info(f"Initializing repository at {path}")
|
|
self._path = str(path)
|
|
self._recursive_discovery_cloned = set()
|
|
self._recursive_discovery_urls = set()
|
|
self._submodule_discovery_history: list = []
|
|
try:
|
|
self._repo = Repo(path, expand_vars = False)
|
|
self._initialized = True
|
|
self._bare = self._repo.bare
|
|
|
|
except (NoSuchPathError, InvalidGitRepositoryError) as e:
|
|
log.warning(f"Init failed: {str(e)}, continuing with uninitialized repo")
|
|
self._initialized = False
|
|
self._bare = False
|
|
|
|
@property
|
|
def initialized(self) -> bool:
|
|
return self._initialized
|
|
|
|
@property
|
|
def bare(self) -> bool:
|
|
return self._bare
|
|
|
|
@property
|
|
def path(self) -> str:
|
|
return self._path
|
|
|
|
@property
|
|
def cloned_submodules_url_list(self) -> list:
|
|
return list(self._recursive_discovery_cloned)
|
|
|
|
@property
|
|
def discovered_submodules_commits(self) -> list:
|
|
return self._submodule_discovery_history
|
|
|
|
def _persist_submodule_commits(self, path: str):
|
|
with open(path, "w") as f:
|
|
for commit in self.discovered_submodules_commits:
|
|
line = f"{commit}\n"
|
|
f.write(line)
|
|
|
|
def __check_initialized(self):
|
|
def inner(*args):
|
|
fake_self: RepoTool = args[0]
|
|
if not fake_self._initialized:
|
|
log.critical(f"Repo {fake_self.path} is not initialized!")
|
|
return False
|
|
return self(*args)
|
|
|
|
return inner
|
|
|
|
def clone(self, url: str) -> bool:
|
|
if self._initialized:
|
|
log.warning(f"Trying to clone to initialized repository!")
|
|
return False
|
|
|
|
log.info(f"Cloning repository from url: {url}")
|
|
try:
|
|
self._repo = Repo.clone_from(
|
|
url,
|
|
to_path = self._path,
|
|
progress = GitRemoteProgress(),
|
|
bare = True,
|
|
mirror = True
|
|
)
|
|
except GitCommandError as e:
|
|
log.critical(f"Clone of {url} failed!")
|
|
log.critical(f"Exception: {e}")
|
|
self._initialized = False
|
|
return False
|
|
|
|
self._initialized = True
|
|
self._bare = self._repo.bare
|
|
|
|
return True
|
|
|
|
@__check_initialized
|
|
def fetch(self) -> bool:
|
|
log.info("Fetching repo state")
|
|
if not len(self._repo.remotes):
|
|
log.warning(f"Repo: {self._path} does not contain any remotes!")
|
|
return False
|
|
# fetch all remotes
|
|
remote = self._repo.remotes[0]
|
|
log.debug(f"Fetching remote: {remote.name} url: {next(remote.urls)}")
|
|
self._last_fetch_data = remote.fetch(
|
|
["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"],
|
|
progress = GitRemoteProgress(),
|
|
kill_after_timeout = 60,
|
|
prune = True
|
|
)
|
|
log.debug("Fetch finished!")
|
|
return True
|
|
|
|
@__check_initialized
|
|
def repo_fingerprint(self) -> Union[str, bool]:
|
|
log.debug("Getting repo fingerprint")
|
|
# reference count
|
|
ref_count = self._repo.git.rev_list(count = True, all = True)
|
|
tags = [f"{tag.name}/{tag.commit}" for tag in self._repo.tags]
|
|
branches = [f"{branch.name}/{branch.commit}" for branch in self._repo.branches]
|
|
log.debug(f"{ref_count} references, {len(tags)} tags, {len(branches)} branches")
|
|
cumulative = f"{ref_count} {'.'.join(tags)} {' '.join(branches)}".encode()
|
|
import hashlib
|
|
x = hashlib.sha256(cumulative).hexdigest()
|
|
log.debug(f"Repo fingerprint is {x}")
|
|
return x
|
|
|
|
@__check_initialized
|
|
def list_submodules(self, commit: str = "HEAD") -> Union[list, bool]:
|
|
commit = self._repo.commit(commit)
|
|
submodules = []
|
|
|
|
if '.gitmodules' in commit.tree:
|
|
git_modules: git.Blob = commit.tree['.gitmodules']
|
|
x = BytesIO(git_modules.data_stream.read())
|
|
x.name = git_modules.name
|
|
|
|
try:
|
|
parser = git.GitConfigParser(x, read_only = True)
|
|
for section in parser.sections():
|
|
if parser.has_option(section, "url"):
|
|
submodules.append(parser.get_value(section, "url"))
|
|
except BaseException:
|
|
return False
|
|
|
|
if commit.hexsha not in self._submodule_discovery_history:
|
|
self._submodule_discovery_history.append(commit.hexsha)
|
|
|
|
return submodules
|
|
|
|
@__check_initialized
|
|
def list_submodules_history(self, limit_of_commits: Optional[int] = None) -> Union[list, bool]:
|
|
|
|
log.info(f"Listing repository submodule history")
|
|
iterator = self._repo.iter_commits(all = True, max_count = limit_of_commits)
|
|
submodules = set()
|
|
counter: int = 0
|
|
last_status = time.time()
|
|
status_offset = 0.5
|
|
for commit in iterator:
|
|
counter += 1
|
|
commit_submodules = self.list_submodules(commit)
|
|
if not type(commit_submodules) == bool:
|
|
submodules.update(commit_submodules)
|
|
if time.time() >= last_status + status_offset:
|
|
log.info(f"Submodule discovery: {counter} commits finished, {len(submodules)} discovered")
|
|
last_status = time.time()
|
|
return list(submodules)
|
|
|
|
def clone_recursive(
|
|
self,
|
|
main_url: str,
|
|
scan_cache_dir: Optional[str] = None,
|
|
scan_depth: Optional[int] = None
|
|
) -> bool:
|
|
|
|
log.info(f"Started recursive clone of {main_url} with recursive discovery limited to {scan_depth} commits")
|
|
# clone main repo
|
|
if not self.clone(main_url):
|
|
log.critical(f"Clone of main repository failed!")
|
|
return False
|
|
|
|
# discover submodules for repository
|
|
submodules = self.list_submodules_history(scan_depth)
|
|
if submodules:
|
|
for submodule in submodules:
|
|
self._recursive_discovery_urls.add(submodule)
|
|
|
|
everything_succeed: bool = True
|
|
everything_cloned: bool = False
|
|
while not everything_cloned:
|
|
# recursively scan and clone repositories
|
|
everything_cloned = True
|
|
# for every url in list
|
|
# list() is needed - Runtime Error for set() changed during iteration
|
|
for url in list(self._recursive_discovery_urls):
|
|
if url not in self._recursive_discovery_cloned:
|
|
everything_cloned = False
|
|
# generate new path
|
|
directory = os.path.dirname(self.path)
|
|
submodule_cloner = RepoTool(os.path.join(directory, gen_repo_hashed_name(url)))
|
|
# clone
|
|
cloned: bool = submodule_cloner.clone(url)
|
|
# mark cloned even if failed afterwards - while loop stuck solution
|
|
self._recursive_discovery_cloned.add(url)
|
|
if not cloned:
|
|
log.critical(f"Clone of submodule: {url} failed")
|
|
everything_succeed = False
|
|
continue
|
|
# scan for submodules
|
|
submodules = submodule_cloner.list_submodules_history(scan_depth)
|
|
if type(submodules) == bool and not submodules:
|
|
log.critical(f"Submodule discovery for {url} failed!")
|
|
everything_succeed = False
|
|
continue
|
|
|
|
# persistor
|
|
if scan_cache_dir:
|
|
cache_file = os.path.join(scan_cache_dir, gen_repo_hashed_name(url))
|
|
log.debug(
|
|
f"Saving {len(submodule_cloner.discovered_submodules_commits)} commits into {cache_file}")
|
|
submodule_cloner._persist_submodule_commits(cache_file)
|
|
|
|
for submodule in submodules:
|
|
self._recursive_discovery_urls.add(submodule)
|
|
|
|
if scan_cache_dir:
|
|
# persist main repo commits
|
|
cache_file = os.path.basename(self.path)
|
|
cache_file = os.path.join(scan_cache_dir, cache_file)
|
|
self._persist_submodule_commits(cache_file)
|
|
|
|
# persist discovered submodule urls
|
|
with open(os.path.join(scan_cache_dir, "submodules.cache"), "w") as f:
|
|
f.write("\n".join(self.cloned_submodules_url_list))
|
|
|
|
return everything_succeed
|