repo-cloner/repo_cloner/lib/detector.py
Václav Valíček 94275014c5
Fix bug in RepoTool.repo_fingerprint
Signed-off-by: Václav Valíček <valicek1994@gmail.com>
2022-08-08 09:33:45 +02:00

251 lines
7.3 KiB
Python

from repo_cloner.lib import DiskStoredList, DiskStoredRefs, RepoTool
from pathlib import Path
import logging
import json
from typing import Callable
from datetime import datetime
log = logging.getLogger("rc.detector")
class DetectedCommit:
_commit: str = None
_abbrev: str = None
_author: str = None
_date: int = 0
_is_tag: bool = False
_tags: list = []
_is_branch: bool = False
_branches: list = []
_log: str = ""
_dict = {}
def __init__(self, env: dict, project_name: str):
for key in env.keys():
self.__setattr__(f"_{key}", env[key])
self._dict = env
self.project = project_name
@property
def commit(self) -> str:
return self._commit
@property
def abbrev(self) -> str:
return self._abbrev
@property
def author(self) -> str:
return self._author
@property
def date(self) -> int:
dt = datetime.fromtimestamp(self._date)
return dt.strftime("%d-%m-%Y, %H:%M:%S")
@property
def is_tag(self) -> bool:
return self._is_tag
@property
def tags(self) -> str:
return ", ".join(self._tags)
@property
def is_branch(self) -> bool:
return self._is_branch
@property
def branches(self) -> str:
return ", ".join(self._branches)
@property
def log(self) -> str:
return self._log
@property
def dict(self) -> dict:
return self._dict
class Detector:
_repo: RepoTool = None
_repo_path: Path = None
_detector_dir: Path = None
_detector_sum: Path = None
_executed: DiskStoredList = None
_branches: DiskStoredRefs = None
_tags: DiskStoredRefs = None
_project: str = None
def __init__(self, repo_path: Path, cache_dir: Path, project: str):
log.debug(f"Initializing detector...")
log.debug(f"Repo: {repo_path}")
self._repo_path = Path(repo_path)
self._repo = RepoTool(repo_path)
self._detector_dir = Path(cache_dir).joinpath("detector")
self._detector_sum = Path(cache_dir).joinpath("detectorSum")
if not self._detector_dir.exists():
log.debug(f"Creating detector dir")
self._detector_dir.mkdir()
self._project = project
log.debug(f"Detector cache: {self._detector_dir}")
self._executed = DiskStoredList(self._detector_dir.joinpath("detectorExecuted").as_posix())
log.debug(f"Detector executed: {len(self._executed)} commits")
# modify branches and tags to new standards
self.check_legacy_config()
# parse json files afterwards
self._branches = DiskStoredRefs(self._detector_dir.joinpath("branches"))
self._tags = DiskStoredRefs(self._detector_dir.joinpath("tags"))
log.info(f"Loaded {self._branches.count()} branches and {self._tags.count()} tags")
@classmethod
def _ref_dir_to_json(cls, dir: Path):
def rmdir(directory):
directory = Path(directory)
for item in directory.iterdir():
if item.is_dir():
rmdir(item)
else:
item.unlink()
directory.rmdir()
ref_dict = {}
for item in dir.iterdir():
if item.is_file():
content = item.read_text().strip()
name = item.name
log.debug(f"Found reference {name} -> {content}")
ref_dict[name] = content
rmdir(dir)
dir.touch()
dir.write_text(json.dumps(sorted(ref_dict.items())))
def check_legacy_config(self):
branch_dir = self._detector_dir.joinpath("branches")
tag_dir = self._detector_dir.joinpath("tags")
if branch_dir.exists() and branch_dir.is_dir():
log.info(f"Found legacy branch dir: {branch_dir} - converting now")
Detector._ref_dir_to_json(branch_dir)
if tag_dir.exists() and tag_dir.is_dir():
log.info(f"Found legacy tag dir: {tag_dir} - converting now")
Detector._ref_dir_to_json(tag_dir)
def initialize_caches(self):
# initialize caches
log.info(f"Initializing detector cache")
for commit in self._repo.list_commits():
if commit not in self._executed:
self._executed.append(commit.hexsha)
# cleanup old branches
for branch in self._branches.keys():
self._branches.remove(branch)
for tag in self._tags.keys():
self._tags.remove(tag)
# persist new ones
for branch, commit in self._repo.list_branches().items():
self._branches.update(branch, commit)
for tag, commit in self._repo.list_tags().items():
self._tags.update(tag, commit)
"""
returns true if detector run needs to be done
"""
def check_fingerprint(self) -> bool:
log.info(f"Checking repo-detector fingerprint")
if not self._detector_sum.exists():
log.debug(f"Fingerprint file does not exist - run needed")
return True
old_fingerprint = self._detector_sum.read_text().strip()
return not old_fingerprint == self._repo.repo_fingerprint()
def persist_fingerprint(self):
log.debug(f"Persisting detector fingerprint")
self._detector_sum.write_text(str(self._repo.repo_fingerprint()))
def run(self, callback: Callable[[DetectedCommit], None]) -> int:
log.info(f"Running commit detector")
new_branches = self._repo.list_branches()
new_tags = self._repo.list_tags()
# remove removed
old_keys = self._branches.keys()
for branch in old_keys:
if branch not in new_branches.keys():
log.info(f"Branch {branch} removed in source, removing from detector")
self._branches.remove(branch)
old_keys = self._tags.keys()
for tag in old_keys:
if tag not in new_tags.keys():
log.info(f"Tag {tag} removed in source, removing from detector")
self._tags.remove(tag)
keep_in_mind_commits: list = []
for key, value in new_branches.items():
if not value == self._branches.get(key):
log.debug(f"Keep in mind branch: {key} -> {value}")
keep_in_mind_commits.append(value)
for key, value in new_tags.items():
if not value == self._tags.get(key):
log.debug(f"Keep in mind tag: {key} -> {value}")
keep_in_mind_commits.append(value)
# list commits
executed_count: int = 0
for commit in self._repo.list_commits():
if commit.hexsha in self._executed and commit.hexsha not in keep_in_mind_commits:
continue
special_commit: bool = False
if commit.hexsha in keep_in_mind_commits:
log.debug(f"Found keep-in-mind commit {commit.hexsha}")
special_commit = True
special_branch: list = []
special_tag: list = []
if special_commit:
for branch, commit_candidate in new_branches.items():
if commit.hexsha == commit_candidate:
if not self._branches.get(branch) == commit.hexsha:
log.debug(f"Found branch {branch} for commit {commit_candidate}")
special_branch.append(branch)
for tag, commit_candidate in new_tags.items():
if commit.hexsha == commit_candidate:
if not self._tags.get(tag) == commit.hexsha:
log.debug(f"Found tag {tag} for commit {commit_candidate}")
special_tag.append(tag)
env = {
'commit': commit.hexsha,
'abbrev': commit.hexsha[0:7],
'author': str(commit.author),
'date': commit.authored_date,
'is_tag': len(special_tag) > 0,
'tags': special_tag,
'is_branch': len(special_branch) > 0,
'branches': special_branch,
'log': commit.message,
}
env = DetectedCommit(env, self._project)
log.info(f"Executing CI: commit {self._project}: {env.commit} @ {env.date} by {env.author}")
if env.is_tag or env.is_branch:
log.info(f"Additional info: branch: {env.branches}; tag(s) {env.tags}")
executed_count += 1
callback(env)
# mark commit done
self._executed.append(commit.hexsha)
for item in special_branch:
self._branches.update(item, commit.hexsha)
for item in special_tag:
self._tags.update(item, commit.hexsha)
self.persist_fingerprint()
return executed_count