Update, repo tool cloner: recursive clones

Signed-off-by: Václav Valíček <valicek1994@gmail.com>
This commit is contained in:
2022-07-29 17:03:15 +02:00
parent 8a150c63c5
commit 6463a6bb95
6 changed files with 142 additions and 65 deletions

View File

@@ -1,3 +1,5 @@
import os.path
import git
from git import Repo
from git.exc import NoSuchPathError, InvalidGitRepositoryError
@@ -5,6 +7,7 @@ from git import RemoteProgress
import logging
import time
from typing import Union, Optional
from repo_cloner.lib.checksum import gen_repo_hashed_name
log = logging.getLogger("rc.repo")
@@ -64,7 +67,7 @@ class GitRemoteProgress(RemoteProgress):
# so check timer
if (self.last_step_time + self.time_thr) > time.time():
# timer not passed yet
# timer not passed yet repo.head.reset(commit = 'origin/master', index = True, working_tree = True)
if not ((op_code & self.BEGIN) or (op_code & self.BEGIN)):
# skip -> no begin or end
return
@@ -97,10 +100,16 @@ class RepoTool:
_bare: bool = False
_path: str = ""
_last_fetch_data = []
_recursive_discovery_urls: set = set()
_recursive_discovery_cloned: set = set()
_submodule_discovery_history: list = []
def __init__(self, path: str):
log.info(f"Initializing repository at {path}")
self._path = str(path)
self._recursive_discovery_cloned = set()
self._recursive_discovery_urls = set()
self._submodule_discovery_history: list = []
try:
self._repo = Repo(path, expand_vars = False)
self._initialized = True
@@ -123,6 +132,14 @@ class RepoTool:
def path(self) -> str:
return self._path
@property
def cloned_submodules_url_list(self) -> list:
return list(self._recursive_discovery_cloned)
@property
def discovered_submodules_commits(self) -> list:
return self._submodule_discovery_history
def __check_initialized(self):
def inner(*args):
fake_self: RepoTool = args[0]
@@ -183,7 +200,8 @@ class RepoTool:
log.debug(f"Repo fingerprint is {x}")
return x
def list_submodules(self, commit: str = "HEAD") -> list:
@__check_initialized
def list_submodules(self, commit: str = "HEAD") -> Union[list, bool]:
commit = self._repo.commit(commit)
submodules = []
@@ -198,9 +216,13 @@ class RepoTool:
if parser.has_option(section, "url"):
submodules.append(parser.get_value(section, "url"))
if commit.hexsha not in self._submodule_discovery_history:
self._submodule_discovery_history.append(commit.hexsha)
return submodules
def list_submodules_history(self, limit_of_commits: Optional[int] = None):
@__check_initialized
def list_submodules_history(self, limit_of_commits: Optional[int] = None) -> Union[list, bool]:
log.info(f"Listing repository submodule history")
iterator = self._repo.iter_commits(all = True, max_count = limit_of_commits)
@@ -216,3 +238,55 @@ class RepoTool:
log.info(f"Submodule discovery: {counter} commits finished, {len(submodules)} discovered")
last_status = time.time()
return list(submodules)
def clone_recursive(
self,
main_url: str,
scan_cache_dir: Optional[str] == None,
scan_depth: Optional[int] = None
) -> bool:
log.info(f"Started recursive clone of {main_url} with recursive discovery limited to {scan_depth} commits")
# clone main repo
if not self.clone(main_url):
log.critical(f"Clone of main repository failed!")
return False
# discover submodules for repository
submodules = self.list_submodules_history(scan_depth)
if submodules:
for submodule in submodules:
self._recursive_discovery_urls.add(submodule)
everything_succeed: bool = True
everything_cloned: bool = False
while not everything_cloned:
# recursively scan and clone repositories
everything_cloned = True
# for every url in list
# list() is needed - Runtime Error for set() changed during iteration
for url in list(self._recursive_discovery_urls):
if url not in self._recursive_discovery_cloned:
everything_cloned = False
# generate new path
directory = os.path.dirname(self.path)
submodule_cloner = RepoTool(os.path.join(directory, gen_repo_hashed_name(url)))
# clone
cloned: bool = submodule_cloner.clone(url)
# mark cloned even if failed afterwards - while loop stuck solution
self._recursive_discovery_cloned.add(url)
if not cloned:
log.critical(f"Clone of submodule: {url} failed")
everything_succeed = False
continue
# scan for submodules
submodules = submodule_cloner.list_submodules_history(scan_depth)
if type(submodules) == bool and not submodules:
log.critical(f"Submodule discovery for {url} failed!")
everything_succeed = False
continue
for submodule in submodules:
self._recursive_discovery_urls.add(submodule)
return everything_succeed