133
repo_cloner/lib/repo_tool.py
Normal file
133
repo_cloner/lib/repo_tool.py
Normal file
@@ -0,0 +1,133 @@
|
||||
from git import Repo
|
||||
from git.exc import NoSuchPathError, InvalidGitRepositoryError
|
||||
from git import RemoteProgress
|
||||
import logging
|
||||
import time
|
||||
|
||||
log = logging.getLogger("rc.repo")
|
||||
|
||||
|
||||
class GitRemoteProgress(RemoteProgress):
|
||||
OP_CODES = [
|
||||
"BEGIN",
|
||||
"CHECKING_OUT",
|
||||
"COMPRESSING",
|
||||
"COUNTING",
|
||||
"END",
|
||||
"FINDING_SOURCES",
|
||||
"RECEIVING",
|
||||
"RESOLVING",
|
||||
"WRITING",
|
||||
]
|
||||
OP_CODE_MAP = {
|
||||
getattr(RemoteProgress, _op_code): _op_code for _op_code in OP_CODES
|
||||
}
|
||||
|
||||
last_step_time = time.time()
|
||||
time_thr = 0.5
|
||||
|
||||
cur_task: str = ""
|
||||
cur_task_max: int = 0
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.last_step_time = time.time() - self.time_thr
|
||||
self.cur_task_max = 0
|
||||
self.cur_task = ""
|
||||
|
||||
def __del__(self) -> None:
|
||||
self.finish()
|
||||
|
||||
@classmethod
|
||||
def get_curr_op(cls, op_code: int) -> str:
|
||||
"""Get OP name from OP code."""
|
||||
# Remove BEGIN- and END-flag and get op name
|
||||
op_code_masked = op_code & cls.OP_MASK
|
||||
return cls.OP_CODE_MAP.get(op_code_masked, "?").title()
|
||||
|
||||
def finish(self):
|
||||
log.info(f"GIT {self.cur_task}: 100.00% ({self.cur_task_max})")
|
||||
|
||||
def update(
|
||||
self,
|
||||
op_code: int,
|
||||
cur_count: str | float,
|
||||
max_count: str | float | None = None,
|
||||
message: str | None = "",
|
||||
) -> None:
|
||||
# Do i need to update?
|
||||
# -> begin : YES
|
||||
# -> end : YES
|
||||
# -> timer: YES
|
||||
|
||||
# so check timer
|
||||
if (self.last_step_time + self.time_thr) > time.time():
|
||||
# timer not passed yet
|
||||
if not ((op_code & self.BEGIN) or (op_code & self.BEGIN)):
|
||||
# skip -> no begin or end
|
||||
return
|
||||
# update timer
|
||||
self.last_step_time = time.time()
|
||||
|
||||
# Start new bar on each BEGIN-flag
|
||||
if op_code & self.BEGIN:
|
||||
self.cur_task = self.get_curr_op(op_code).upper()
|
||||
try:
|
||||
self.cur_task_max = int(max_count)
|
||||
except ValueError:
|
||||
self.cur_task_max = 100
|
||||
|
||||
log.info(f"GIT {self.cur_task} started")
|
||||
|
||||
percent = round(100 * (cur_count / self.cur_task_max), 2)
|
||||
|
||||
# End progress monitoring on each END-flag
|
||||
if op_code & self.END:
|
||||
# logger.info("Done: %s", self.curr_op)
|
||||
percent = 100
|
||||
|
||||
log.info(f"GIT {self.cur_task}: {percent}% ({cur_count}; {message})")
|
||||
|
||||
|
||||
class RepoTool:
|
||||
_repo: Repo = None
|
||||
_initialized: bool = False
|
||||
_bare: bool = False
|
||||
_path: str = ""
|
||||
|
||||
def __init__(self, path: str):
|
||||
log.info(f"Initializing repository at {path}")
|
||||
self._path = str(path)
|
||||
try:
|
||||
self._repo = Repo(path, expand_vars = False)
|
||||
self._initialized = True
|
||||
self._bare = self._repo.bare
|
||||
|
||||
except (NoSuchPathError, InvalidGitRepositoryError) as e:
|
||||
log.warning(f"Init failed: {str(e)}, continuing with uninitialized repo")
|
||||
self._initialized = False
|
||||
self._bare = False
|
||||
|
||||
@property
|
||||
def initialized(self) -> bool:
|
||||
return self._initialized
|
||||
|
||||
@property
|
||||
def bare(self) -> bool:
|
||||
return self._bare
|
||||
|
||||
@property
|
||||
def path(self) -> str:
|
||||
return self._path
|
||||
|
||||
def clone(self, url: str) -> bool:
|
||||
if self._initialized:
|
||||
log.warning(f"Trying to clone to initialized repository!")
|
||||
return False
|
||||
|
||||
log.info(f"Cloning repository from url: {url}")
|
||||
self._repo = Repo.clone_from(url, to_path = self._path, progress = GitRemoteProgress(), bare = True)
|
||||
self._initialized = True
|
||||
self._bare = self._repo.bare
|
||||
|
||||
return True
|
||||
Reference in New Issue
Block a user