Detector: almost complete
Signed-off-by: Václav Valíček <valicek1994@gmail.com>
This commit is contained in:
parent
52c3d03e2f
commit
5ca24960c3
|
@ -4,6 +4,8 @@ from .config_file_not_found_error import ConfigFileNotFoundError
|
|||
from .default_cloner_config import DefaultClonerConfig
|
||||
from .dir_not_found_error import DirNotFoundError
|
||||
from .disk_stored_list import DiskStoredList
|
||||
from .disk_stored_refs import DiskStoredRefs
|
||||
from .repo_tool import RepoTool
|
||||
from .repo_dir_structure import RepoDirStructure
|
||||
from .detector import Detector, DetectedCommit
|
||||
from .cloner import Cloner
|
||||
|
|
210
repo_cloner/lib/detector.py
Normal file
210
repo_cloner/lib/detector.py
Normal file
|
@ -0,0 +1,210 @@
|
|||
from repo_cloner.lib import DiskStoredList, DiskStoredRefs, RepoTool
|
||||
from pathlib import Path
|
||||
import logging
|
||||
import json
|
||||
from typing import Callable
|
||||
|
||||
log = logging.getLogger("rc.detector")
|
||||
|
||||
|
||||
class DetectedCommit:
|
||||
_commit: str = None
|
||||
_abbrev: str = None
|
||||
_author: str = None
|
||||
_date: int = 0
|
||||
_is_tag: bool = False
|
||||
_tags: list = []
|
||||
_is_branch: bool = False
|
||||
_branches: list = []
|
||||
_log: str = ""
|
||||
_dict = {}
|
||||
|
||||
def __init__(self, env: dict):
|
||||
for key in env.keys():
|
||||
self.__setattr__(f"_{key}", env[key])
|
||||
self._dict = env
|
||||
|
||||
@property
|
||||
def commit(self) -> str:
|
||||
return self._commit
|
||||
|
||||
@property
|
||||
def abbrev(self) -> str:
|
||||
return self._abbrev
|
||||
|
||||
@property
|
||||
def author(self) -> str:
|
||||
return self._author
|
||||
|
||||
@property
|
||||
def date(self) -> int:
|
||||
return self._date
|
||||
|
||||
@property
|
||||
def is_tag(self) -> bool:
|
||||
return self._is_tag
|
||||
|
||||
@property
|
||||
def tags(self) -> str:
|
||||
return ", ".join(self._tags)
|
||||
|
||||
@property
|
||||
def is_branch(self) -> bool:
|
||||
return self._is_branch
|
||||
|
||||
@property
|
||||
def branches(self) -> str:
|
||||
return ", ".join(self._branches)
|
||||
|
||||
@property
|
||||
def log(self) -> str:
|
||||
return self._log
|
||||
|
||||
@property
|
||||
def dict(self) -> dict:
|
||||
return self._dict
|
||||
|
||||
|
||||
class Detector:
|
||||
_repo: RepoTool = None
|
||||
_repo_path: Path = None
|
||||
_detector_dir: Path = None
|
||||
_executed: DiskStoredList = None
|
||||
_branches: DiskStoredRefs = None
|
||||
_tags: DiskStoredRefs = None
|
||||
|
||||
def __init__(self, repo_path: Path, cache_dir: Path, project: str):
|
||||
log.debug(f"Initializing detector...")
|
||||
log.debug(f"Repo: {repo_path}")
|
||||
self._repo_path = Path(repo_path)
|
||||
self._repo = RepoTool(repo_path)
|
||||
self._detector_dir = Path(cache_dir).joinpath("detector")
|
||||
if not self._detector_dir.exists():
|
||||
log.debug(f"Creating detector dir")
|
||||
self._detector_dir.mkdir()
|
||||
|
||||
log.debug(f"Detector cache: {self._detector_dir}")
|
||||
self._executed = DiskStoredList(self._detector_dir.joinpath("detectorExecuted").as_posix())
|
||||
log.debug(f"Detector executed: {len(self._executed)} commits")
|
||||
|
||||
# modify branches and tags to new standards
|
||||
self.check_legacy_config()
|
||||
# parse json files afterwards
|
||||
self._branches = DiskStoredRefs(self._detector_dir.joinpath("branches"))
|
||||
self._tags = DiskStoredRefs(self._detector_dir.joinpath("tags"))
|
||||
log.info(f"Loaded {self._branches.count()} branches and {self._tags.count()} tags")
|
||||
|
||||
@classmethod
|
||||
def _ref_dir_to_json(cls, dir: Path):
|
||||
def rmdir(directory):
|
||||
directory = Path(directory)
|
||||
for item in directory.iterdir():
|
||||
if item.is_dir():
|
||||
rmdir(item)
|
||||
else:
|
||||
item.unlink()
|
||||
directory.rmdir()
|
||||
|
||||
ref_dict = {}
|
||||
for item in dir.iterdir():
|
||||
if item.is_file():
|
||||
content = item.read_text().strip()
|
||||
name = item.name
|
||||
log.debug(f"Found reference {name} -> {content}")
|
||||
ref_dict[name] = content
|
||||
rmdir(dir)
|
||||
dir.touch()
|
||||
dir.write_text(json.dumps(sorted(ref_dict.items())))
|
||||
|
||||
def check_legacy_config(self):
|
||||
branch_dir = self._detector_dir.joinpath("branches")
|
||||
tag_dir = self._detector_dir.joinpath("tags")
|
||||
|
||||
if branch_dir.exists() and branch_dir.is_dir():
|
||||
log.info(f"Found legacy branch dir: {branch_dir} - converting now")
|
||||
Detector._ref_dir_to_json(branch_dir)
|
||||
|
||||
if tag_dir.exists() and tag_dir.is_dir():
|
||||
log.info(f"Found legacy tag dir: {tag_dir} - converting now")
|
||||
Detector._ref_dir_to_json(tag_dir)
|
||||
|
||||
def initialize_caches(self):
|
||||
# initialize caches
|
||||
log.info(f"Initializing detector cache")
|
||||
for commit in self._repo.list_commits():
|
||||
if commit not in self._executed:
|
||||
self._executed.append(commit.hexsha)
|
||||
# cleanup old branches
|
||||
for branch in self._branches.keys():
|
||||
self._branches.remove(branch)
|
||||
for tag in self._tags.keys():
|
||||
self._tags.remove(tag)
|
||||
|
||||
# persist new ones
|
||||
for branch, commit in self._repo.list_branches().items():
|
||||
self._branches.update(branch, commit)
|
||||
for tag, commit in self._repo.list_tags().items():
|
||||
self._tags.update(tag, commit)
|
||||
|
||||
def run(self, callback: Callable[[DetectedCommit], None]) -> int:
|
||||
log.info(f"Running commit detector")
|
||||
new_branches = self._repo.list_branches()
|
||||
new_tags = self._repo.list_tags()
|
||||
|
||||
# remove removed
|
||||
old_keys = self._branches.keys()
|
||||
for branch in old_keys:
|
||||
if branch not in new_branches.keys():
|
||||
log.info(f"Branch {branch} removed in source, removing from detector")
|
||||
self._branches.remove(branch)
|
||||
old_keys = self._tags.keys()
|
||||
for tag in old_keys:
|
||||
if tag not in new_tags.keys():
|
||||
log.info(f"Tag {tag} removed in source, removing from detector")
|
||||
self._tags.remove(tag)
|
||||
|
||||
keep_in_mind_commits: list = []
|
||||
for commit in list(new_tags.values()) + list(new_branches.values()):
|
||||
if commit not in keep_in_mind_commits:
|
||||
keep_in_mind_commits.append(commit)
|
||||
|
||||
# list commits
|
||||
executed_count: int = 0
|
||||
for commit in self._repo.list_commits():
|
||||
if commit.hexsha in self._executed and commit.hexsha not in keep_in_mind_commits:
|
||||
continue
|
||||
special_commit: bool = False
|
||||
if commit.hexsha in keep_in_mind_commits:
|
||||
log.debug(f"Found keep-in-mind commit {commit.hexsha}")
|
||||
special_commit = True
|
||||
|
||||
special_branch: list = []
|
||||
special_tag: list = []
|
||||
|
||||
if special_commit:
|
||||
for branch, commit_candidate in new_branches.items():
|
||||
if commit.hexsha == commit_candidate:
|
||||
if not self._branches.get(branch) == commit.hexsha:
|
||||
log.debug(f"Found branch {branch} for commit {commit_candidate}")
|
||||
special_branch.append(branch)
|
||||
for tag, commit_candidate in new_tags.items():
|
||||
if commit.hexsha == commit_candidate:
|
||||
if not self._tags.get(tag) == commit.hexsha:
|
||||
log.debug(f"Found tag {tag} for commit {commit_candidate}")
|
||||
special_tag.append(tag)
|
||||
|
||||
env = {
|
||||
'commit': commit.hexsha,
|
||||
'abbrev': commit.hexsha[0:7],
|
||||
'author': str(commit.author),
|
||||
'date': commit.authored_date,
|
||||
'is_tag': len(special_tag) > 0,
|
||||
'tags': special_tag,
|
||||
'is_branch': len(special_branch) > 0,
|
||||
'branches': special_branch,
|
||||
'log': commit.message,
|
||||
}
|
||||
env = DetectedCommit(env)
|
||||
executed_count += 1
|
||||
callback(env)
|
||||
return executed_count
|
52
repo_cloner/lib/disk_stored_refs.py
Normal file
52
repo_cloner/lib/disk_stored_refs.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
import json
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
log = logging.getLogger("rc.refstor")
|
||||
|
||||
|
||||
class DiskStoredRefs:
|
||||
__file: Path = None
|
||||
__refs: dict = {}
|
||||
|
||||
def __init__(self, file: Path):
|
||||
log.debug(f"Initializing disk stored refs: {file.as_posix()}")
|
||||
self.__file = file
|
||||
self.__refs = {}
|
||||
|
||||
if self.__file.is_file():
|
||||
log.debug(f"Loading from file")
|
||||
self.__refs = dict(json.loads(self.__file.read_text()))
|
||||
|
||||
def __persist(self):
|
||||
|
||||
self.__file.write_text(json.dumps(sorted(self.__refs.items())))
|
||||
|
||||
def keys(self):
|
||||
return list(self.__refs.keys())
|
||||
|
||||
def count(self):
|
||||
log.debug(f"DiskStoredRefs: {len(self.__refs)} items")
|
||||
return len(self.__refs)
|
||||
|
||||
def __len__(self):
|
||||
log.debug(f"DiskStoredRefs: {len(self.__refs)} items")
|
||||
return len(self.__refs)
|
||||
|
||||
def get(self, key):
|
||||
log.debug(f"DiskStoredRefs: getting key {key}")
|
||||
return self.__refs.get(key)
|
||||
|
||||
def remove(self, key) -> bool:
|
||||
if key not in self.__refs:
|
||||
log.debug(f"DiskStoredRefs: {key} not found for deletion")
|
||||
return False
|
||||
log.debug(f"DiskStoredRefs: Deleting key {key}")
|
||||
del self.__refs[key]
|
||||
self.__persist()
|
||||
return True
|
||||
|
||||
def update(self, key: str, value: str):
|
||||
log.debug(f"DiskStoredRefs: update {key} => {value}")
|
||||
self.__refs[key] = value
|
||||
self.__persist()
|
|
@ -218,6 +218,24 @@ class RepoTool:
|
|||
def list_commits(self, max_depth: Optional[int] = None):
|
||||
return self._repo.iter_commits(all = True, max_count = max_depth, reverse = True)
|
||||
|
||||
@__check_initialized
|
||||
def list_branches(self) -> Union[dict, bool]:
|
||||
log.debug(f"Listing branches of repo")
|
||||
branches = {}
|
||||
for branch in self._repo.branches:
|
||||
branches[branch.name] = branch.commit.hexsha
|
||||
log.debug(f"Found {len(branches)}")
|
||||
return branches
|
||||
|
||||
@__check_initialized
|
||||
def list_tags(self):
|
||||
log.debug(f"Listing tags of repo")
|
||||
tags = {}
|
||||
for tag in self._repo.tags:
|
||||
tags[tag.name] = tag.commit.hexsha
|
||||
log.debug(f"Found {len(tags)} tags")
|
||||
return tags
|
||||
|
||||
@__check_initialized
|
||||
def list_submodules(self, commit: str = "HEAD") -> Union[list, bool]:
|
||||
commit = self._repo.commit(commit)
|
||||
|
|
330
tests/lib/test_detector.py
Normal file
330
tests/lib/test_detector.py
Normal file
|
@ -0,0 +1,330 @@
|
|||
import logging
|
||||
|
||||
from repo_cloner.lib import Detector, DetectedCommit
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
from git import Actor
|
||||
from cloner_test_fixtures import cloner_dir_struct
|
||||
from collections import namedtuple
|
||||
import json
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def detector_obj(cloner_dir_struct):
|
||||
return Detector(
|
||||
cloner_dir_struct.joinpath("repos", "repo.git"),
|
||||
cloner_dir_struct.joinpath("cache"),
|
||||
"Mocked Project"
|
||||
)
|
||||
|
||||
|
||||
some_commits = [
|
||||
"d766d9c302463257695a4d53d857a2cecd024414",
|
||||
"a2e308bb9c5e59e7a0c319dca0adf3966f3f3a60",
|
||||
"36021088abb6ebcb2202897f3f27b26b21c25068",
|
||||
"e811211f0895e9e792f569b1b38f8452e0efe42f",
|
||||
"6854c793fd2cbd848650a94b9fed4924e5e428b2",
|
||||
"2f08263b5aff30ce776bb0350503be69d45647da",
|
||||
"fc53c9d8bc9e01c63632376df51abb67ca344a89",
|
||||
"c95a862af3d13623da3d7a6b73ff79d68ff2d7ad",
|
||||
"b21c1e9e329bcdef1980462cf62c09a9e807dafd",
|
||||
"dec7ff27d658412f0088c4c3220990f1f1dc98f1",
|
||||
"ad66f820c9dfa06c3767f81cb42b1398fa61ff05",
|
||||
"b9af12b9f80b372c645b5f64c5a8f8b6a148ccfc",
|
||||
"1fda6025ed303ecf03b9afa7cce882aaddc058f9",
|
||||
"d33585f0a72d9de6c481891787e4d956b769ccb5",
|
||||
"7f685afca71731cb9276f937fc8cceca6b47b1f2",
|
||||
"af112317aa2459e88db9cb10533ea6f29818afee",
|
||||
"ddc24a34983ff43b61e1bdf27b7ec2357932e7e0",
|
||||
"7ccca22ba367433eedb226310373fd250a2af725",
|
||||
"e4843c6d2391d0b45beea81e0dd34078484b474f",
|
||||
"fe942d46c33fee0e68d2e0ed64dfa6515dbf89c4",
|
||||
"1d80638229e0a7ef62b3b9cdab9ad9f63acf6d38",
|
||||
"f968ea00a3007034ddf196285d7b60fec5e7fcf0",
|
||||
"490859dbebe02af6bd643a89b6897961674f90fd",
|
||||
"066de679c906a78f7f817ee9a48d3021fd8b6b7c",
|
||||
"2a8277687fb6dee742e6e0193ea95fcd2264fbc4",
|
||||
"8808c649a8b0a5279a2d9a46b730254cad649205",
|
||||
"ad5db175fab144a8851b712195733c4a9c00d699",
|
||||
"36d45a4c1e5ad8c47a14881c5427045c3de095d0",
|
||||
"6b0b9affedb3e3daa4df00cd54678d78ea1c1d94",
|
||||
"aa5056610ff57f73bae9633a985c6a8e41f3bc23",
|
||||
]
|
||||
|
||||
branch_payload = [
|
||||
("doctrine", "fb96c1445ac20f224bbc6c32fdce145fb91af33d"),
|
||||
("gitlab", "26538d26f9c377b2e81cd001205bf85a8241b8be"),
|
||||
("limonade", "6c77dc4fdff0438b6badc7344e45cd0f03f5c872"),
|
||||
("master", "aa5056610ff57f73bae9633a985c6a8e41f3bc23"),
|
||||
("xlsshop", "5f59a880b4db820109f9f0420e66f7047322b12b"),
|
||||
]
|
||||
|
||||
tag_payload = [
|
||||
("v582", "0915ef3cb1890450c66ea4104c6b4ee021e1a0d3"),
|
||||
("v584", "70508264e064bd1130a32ad8b66095f75d565b3c"),
|
||||
("v585", "70508264e064bd1130a32ad8b66095f75d565b3c"),
|
||||
("v587", "a7bdbe986402921b67443a4748f3b0f56f4aa19d"),
|
||||
("v588", "6ed749cf30f25b2b08f94970dcb598af273ca0cb"),
|
||||
("v589", "78d810634e0925e29c33ac203fa4997baeabf533"),
|
||||
("v590", "01abecc6a90f2257878f6929ec495720164d9e3d"),
|
||||
("v595", "7c7db8c441f1e4864cc7396cf633dc45ef4b2894"),
|
||||
("v597", "21a829cf4479c0482fe5ef815a2d3833cc39bc76"),
|
||||
("v601", "dd470e1072ebc79a5c4b3f1faa89885e38c0134f"),
|
||||
("test-testu", "729bbbccdb7a92abb4c9aa36b8a6ba18dc25d9b3"),
|
||||
("v1001", "02ea4a101a6fc0558ddceca03ff1241abcf3c338"),
|
||||
("v1003", "50c64c32e65a54db8403362b5bcd3a336990bc57"),
|
||||
("v1004", "e9e87bb8cb1805835f7622ef53b07909ba6ac02c"),
|
||||
("v1005", "c1ea58e1773a9e7c81a335e89f4b59739b07b672"),
|
||||
("v1006", "fde748fdad0df54d027e3243f4ae504cbdb137f0"),
|
||||
("v1007", "739737e5fd2cb8d3e2973fb6077dc97ca4e08483"),
|
||||
]
|
||||
|
||||
|
||||
def test_init(cloner_dir_struct):
|
||||
with patch("repo_cloner.lib.Detector.check_legacy_config", autospec = True) as p:
|
||||
det = Detector(
|
||||
cloner_dir_struct.joinpath("repos", "repo.git").as_posix(),
|
||||
cloner_dir_struct.joinpath("cache").as_posix(),
|
||||
"Mocked Project"
|
||||
)
|
||||
assert p.called
|
||||
|
||||
assert cloner_dir_struct.joinpath("cache", "detector").exists()
|
||||
assert cloner_dir_struct.joinpath("cache", "detector").is_dir()
|
||||
assert len(det._executed) == 0
|
||||
|
||||
# test loading of content
|
||||
|
||||
cloner_dir_struct.joinpath("cache", "detector", "detectorExecuted").write_text(
|
||||
"commit1\ncommit2\ncommit3\n")
|
||||
|
||||
det = Detector(
|
||||
cloner_dir_struct.joinpath("repos", "repo.git").as_posix(),
|
||||
cloner_dir_struct.joinpath("cache").as_posix(),
|
||||
"Mocked Project"
|
||||
)
|
||||
assert 3 == len(det._executed)
|
||||
assert not det._repo.initialized
|
||||
assert det._repo_path == cloner_dir_struct.joinpath("repos", "repo.git")
|
||||
|
||||
assert det._branches.count() == 0
|
||||
assert det._tags.count() == 0
|
||||
|
||||
|
||||
def test_ref_dir_to_json(cloner_dir_struct, caplog):
|
||||
caplog.set_level(0)
|
||||
with patch("repo_cloner.lib.Detector.check_legacy_config", autospec = True) as p:
|
||||
det = Detector(
|
||||
cloner_dir_struct.joinpath("repos", "repo.git").as_posix(),
|
||||
cloner_dir_struct.joinpath("cache").as_posix(),
|
||||
"Mocked Project"
|
||||
)
|
||||
assert p.called
|
||||
|
||||
dir = cloner_dir_struct.joinpath("tst")
|
||||
dir.mkdir()
|
||||
dir.joinpath("messed-up-dir").mkdir(parents = True)
|
||||
for branch, commit in branch_payload:
|
||||
dir.joinpath(branch).write_text(f"{commit}\n")
|
||||
|
||||
Detector._ref_dir_to_json(dir)
|
||||
|
||||
assert dir.is_file()
|
||||
|
||||
assert dir.read_text() == \
|
||||
'[["doctrine", "fb96c1445ac20f224bbc6c32fdce145fb91af33d"], ["gitlab", ' \
|
||||
'"26538d26f9c377b2e81cd001205bf85a8241b8be"], ["limonade", ' \
|
||||
'"6c77dc4fdff0438b6badc7344e45cd0f03f5c872"], ["master", ' \
|
||||
'"aa5056610ff57f73bae9633a985c6a8e41f3bc23"], ["xlsshop", ' \
|
||||
'"5f59a880b4db820109f9f0420e66f7047322b12b"]]'
|
||||
|
||||
|
||||
def test_check_legacy_config(detector_obj):
|
||||
branch_dir = detector_obj._detector_dir.joinpath("branches")
|
||||
tag_dir = detector_obj._detector_dir.joinpath("tags")
|
||||
|
||||
branch_dir.mkdir()
|
||||
tag_dir.mkdir()
|
||||
|
||||
for branch, commit in branch_payload:
|
||||
branch_dir.joinpath(branch).write_text(f"{commit}\n")
|
||||
for tag, commit in tag_payload:
|
||||
tag_dir.joinpath(tag).write_text(f"{commit}\n")
|
||||
|
||||
detector_obj.check_legacy_config()
|
||||
|
||||
assert branch_dir.is_file()
|
||||
assert tag_dir.is_file()
|
||||
|
||||
branches = json.loads(branch_dir.read_text())
|
||||
tags = json.loads(tag_dir.read_text())
|
||||
|
||||
assert dict(branches) == dict(branch_payload)
|
||||
assert dict(tags) == dict(tag_payload)
|
||||
|
||||
|
||||
def test_initialize_caches(tmp_path):
|
||||
commits = [commit for commit in some_commits]
|
||||
[commits.append(commit) for _, commit in branch_payload]
|
||||
[commits.append(commit) for _, commit in tag_payload]
|
||||
|
||||
Commit = namedtuple("Commit", ["hexsha"])
|
||||
commits.sort()
|
||||
commits_named = [Commit(commit) for commit in commits]
|
||||
|
||||
branches = {}
|
||||
tags = {}
|
||||
|
||||
for key, value in branch_payload:
|
||||
branches[key] = value
|
||||
for key, value in tag_payload:
|
||||
tags[key] = value
|
||||
|
||||
branches = dict(sorted(branches.items()))
|
||||
tags = dict(sorted(tags.items()))
|
||||
|
||||
mocks = {
|
||||
'list_commits': MagicMock(return_value = commits_named),
|
||||
'list_branches': MagicMock(return_value = branches),
|
||||
'list_tags': MagicMock(return_value = tags),
|
||||
}
|
||||
|
||||
repo = tmp_path.joinpath("repo.git")
|
||||
cache_dir = tmp_path.joinpath("cache")
|
||||
cache_dir.mkdir()
|
||||
|
||||
with patch.multiple("repo_cloner.lib.RepoTool", **mocks):
|
||||
from repo_cloner.lib import RepoTool
|
||||
det = Detector(repo, cache_dir, "Mocked Project")
|
||||
det._tags._DiskStoredRefs__refs = {"tag1": "commit"}
|
||||
det._branches._DiskStoredRefs__refs = {"branch1": "commit"}
|
||||
det.initialize_caches()
|
||||
|
||||
assert RepoTool.list_commits.called
|
||||
assert RepoTool.list_branches.called
|
||||
assert RepoTool.list_tags.called
|
||||
|
||||
# load files
|
||||
cache_dir = cache_dir.joinpath("detector")
|
||||
assert cache_dir.joinpath("detectorExecuted").read_text() == "\n".join(commits) + "\n"
|
||||
assert dict(json.loads(cache_dir.joinpath("branches").read_text())) == branches
|
||||
assert dict(json.loads(cache_dir.joinpath("tags").read_text())) == tags
|
||||
|
||||
|
||||
def test_run(tmp_path, caplog):
|
||||
stamp = 1659533160
|
||||
executed = []
|
||||
|
||||
def time():
|
||||
nonlocal stamp
|
||||
stamp += 1
|
||||
return stamp
|
||||
|
||||
def exec(env: DetectedCommit):
|
||||
nonlocal executed
|
||||
executed.append(env)
|
||||
|
||||
# commits - new version would have 10 more commits
|
||||
commits_old = [commit for commit in some_commits[0:25]]
|
||||
commits_new = [commit for commit in some_commits]
|
||||
|
||||
# branches
|
||||
# - removed branch doctrine
|
||||
# - master targets 36d45a4c1e5ad8c47a14881c5427045c3de095d0 (not cached)
|
||||
# - new branch dupla targeting 7f685afca71731cb9276f937fc8cceca6b47b1f2
|
||||
branches_old = {}
|
||||
branches_new = {}
|
||||
for key, value in branch_payload:
|
||||
branches_old[key] = value
|
||||
if key == "doctrine":
|
||||
continue
|
||||
if key == "master":
|
||||
branches_new[key] = "36d45a4c1e5ad8c47a14881c5427045c3de095d0"
|
||||
continue
|
||||
branches_new[key] = value
|
||||
branches_new['dupla'] = "7f685afca71731cb9276f937fc8cceca6b47b1f2"
|
||||
|
||||
# tags
|
||||
# - removed test testu
|
||||
# - v1001 and v1003 point to different commit
|
||||
# - new tag super pointing at 7f685afca71731cb9276f937fc8cceca6b47b1f2
|
||||
tags_old = {}
|
||||
tags_new = {}
|
||||
for key, value in tag_payload:
|
||||
tags_old[key] = value
|
||||
if key == "test-testu":
|
||||
continue
|
||||
if key == "v1001":
|
||||
tags_new[key] = "d766d9c302463257695a4d53d857a2cecd024414"
|
||||
continue
|
||||
if key == "v1003":
|
||||
tags_new[key] = "a2e308bb9c5e59e7a0c319dca0adf3966f3f3a60"
|
||||
continue
|
||||
tags_new[key] = value
|
||||
tags_new['super'] = "7f685afca71731cb9276f937fc8cceca6b47b1f2"
|
||||
tags_new['v1003.3'] = "a2e308bb9c5e59e7a0c319dca0adf3966f3f3a60"
|
||||
|
||||
# new commits
|
||||
for _, commit in branches_new.items():
|
||||
if commit not in commits_new:
|
||||
commits_new.append(commit)
|
||||
for _, commit in tags_new.items():
|
||||
if commit not in commits_new:
|
||||
commits_new.append(commit)
|
||||
|
||||
# storage directory
|
||||
cache_dir = tmp_path.joinpath("cache")
|
||||
detector_dir = cache_dir.joinpath("detector")
|
||||
cache_dir.mkdir()
|
||||
detector_dir.mkdir()
|
||||
|
||||
# sort
|
||||
commits_old.sort()
|
||||
commits_new.sort()
|
||||
|
||||
# prepare files
|
||||
# detectorExecuted
|
||||
detector_dir.joinpath("detectorExecuted").write_text("\n".join(commits_old) + "\n")
|
||||
# branches
|
||||
detector_dir.joinpath("branches").write_text(json.dumps(dict(sorted(branches_old.items()))))
|
||||
# tags
|
||||
detector_dir.joinpath("tags").write_text(json.dumps(dict(sorted(tags_old.items()))))
|
||||
|
||||
Commit = namedtuple("Commit", ["hexsha", "author", "authored_date", "message"])
|
||||
|
||||
author = Actor("Tester", "tester@email.me")
|
||||
|
||||
commits_named_new = [Commit(commit, author, authored_date = time(), message = "Test commit")
|
||||
for commit in commits_new]
|
||||
|
||||
mocks = {
|
||||
'list_commits': MagicMock(return_value = commits_named_new),
|
||||
'list_branches': MagicMock(return_value = branches_new),
|
||||
'list_tags': MagicMock(return_value = tags_new),
|
||||
}
|
||||
|
||||
repo = tmp_path.joinpath("repo.git")
|
||||
|
||||
caplog.set_level(logging.DEBUG)
|
||||
|
||||
with patch.multiple("repo_cloner.lib.RepoTool", **mocks):
|
||||
from repo_cloner.lib import RepoTool
|
||||
det = Detector(repo, cache_dir, "Mocked Project")
|
||||
|
||||
det.run(exec)
|
||||
|
||||
assert RepoTool.list_commits.called
|
||||
assert RepoTool.list_branches.called
|
||||
assert RepoTool.list_tags.called
|
||||
|
||||
assert "Branch doctrine removed in source, removing from detector" in caplog.text
|
||||
assert "Tag test-testu removed in source, removing from detector" in caplog.text
|
||||
|
||||
assert len(executed) == 24
|
||||
assert sum(ex.is_tag for ex in executed) == 3
|
||||
assert sum(ex.is_branch for ex in executed) == 2
|
||||
assert any(ex.branches == 'dupla' and ex.tags == 'super' for ex in executed)
|
||||
assert any(ex.tags == 'v1003, v1003.3' for ex in executed)
|
||||
assert any(ex.tags == 'v1001' for ex in executed)
|
||||
assert any(ex.branches == 'master' for ex in executed)
|
||||
assert all(ex.commit not in commits_old or ex.is_tag for ex in executed)
|
85
tests/lib/test_disk_stored_refs.py
Normal file
85
tests/lib/test_disk_stored_refs.py
Normal file
|
@ -0,0 +1,85 @@
|
|||
import pytest
|
||||
from repo_cloner.lib import DiskStoredRefs
|
||||
|
||||
|
||||
def test_init(tmp_path):
|
||||
file = tmp_path.joinpath("refs")
|
||||
# nonexistent file
|
||||
refs = DiskStoredRefs(file)
|
||||
assert refs._DiskStoredRefs__file == file
|
||||
assert refs._DiskStoredRefs__refs == {}
|
||||
|
||||
file.write_text('[["a", "b"], ["c", "d"]]')
|
||||
refs = DiskStoredRefs(file)
|
||||
assert refs._DiskStoredRefs__refs == {'a': 'b', 'c': 'd'}
|
||||
|
||||
|
||||
def test__persist(tmp_path):
|
||||
file = tmp_path.joinpath("refs")
|
||||
refs = DiskStoredRefs(file)
|
||||
refs._DiskStoredRefs__refs = {"key": "value"}
|
||||
refs._DiskStoredRefs__persist()
|
||||
|
||||
assert file.read_text() == '[["key", "value"]]'
|
||||
|
||||
|
||||
def test_keys(tmp_path):
|
||||
file = tmp_path.joinpath("refs")
|
||||
refs = DiskStoredRefs(file)
|
||||
refs._DiskStoredRefs__refs = {"key": "value", "key2": "value"}
|
||||
|
||||
assert refs.keys() == ["key", "key2"]
|
||||
|
||||
|
||||
def test_count(tmp_path):
|
||||
file = tmp_path.joinpath("refs")
|
||||
refs = DiskStoredRefs(file)
|
||||
assert refs.count() == 0
|
||||
refs._DiskStoredRefs__refs = {"key": "value", "key2": "value"}
|
||||
|
||||
assert refs.count() == 2
|
||||
|
||||
|
||||
def test__len__(tmp_path):
|
||||
file = tmp_path.joinpath("refs")
|
||||
refs = DiskStoredRefs(file)
|
||||
assert len(refs) == 0
|
||||
refs._DiskStoredRefs__refs = {"key": "value", "key2": "value"}
|
||||
|
||||
assert len(refs) == 2
|
||||
|
||||
|
||||
def test_get(tmp_path):
|
||||
file = tmp_path.joinpath("refs")
|
||||
refs = DiskStoredRefs(file)
|
||||
refs._DiskStoredRefs__refs = {"key": "value", "key2": "value2"}
|
||||
|
||||
assert refs.get("key") == "value"
|
||||
assert refs.get("key2") == "value2"
|
||||
|
||||
assert refs.get("wut") is None
|
||||
|
||||
|
||||
def test_remove(tmp_path):
|
||||
file = tmp_path.joinpath("refs")
|
||||
refs = DiskStoredRefs(file)
|
||||
refs._DiskStoredRefs__refs = {"key": "value", "key2": "value2"}
|
||||
|
||||
assert refs.remove("key")
|
||||
assert refs._DiskStoredRefs__refs == {"key2": "value2"}
|
||||
|
||||
assert not refs.remove("key33")
|
||||
|
||||
assert file.read_text() == '[["key2", "value2"]]'
|
||||
|
||||
|
||||
def test_update(tmp_path):
|
||||
file = tmp_path.joinpath("refs")
|
||||
refs = DiskStoredRefs(file)
|
||||
refs._DiskStoredRefs__refs = {"key": "value", "key2": "value2"}
|
||||
|
||||
refs.update("key", "val")
|
||||
assert refs._DiskStoredRefs__refs == {"key": "val", "key2": "value2"}
|
||||
refs.update("key3", "valvalval")
|
||||
|
||||
assert file.read_text() == '[["key", "val"], ["key2", "value2"], ["key3", "valvalval"]]'
|
|
@ -492,6 +492,24 @@ def test_list_commits(cloned_base_repo_obj):
|
|||
assert 339 == sum(1 for commit in cloned_base_repo_obj.list_commits())
|
||||
|
||||
|
||||
def test_list_branches(cloned_base_repo_obj):
|
||||
branches = cloned_base_repo_obj.list_branches()
|
||||
branches = dict(sorted(branches.items()))
|
||||
compare = {}
|
||||
for branch, commit in base_repo_branches:
|
||||
compare[branch] = commit
|
||||
|
||||
assert branches == compare
|
||||
|
||||
def test_list_tags(cloned_base_repo_obj):
|
||||
tags = cloned_base_repo_obj.list_tags()
|
||||
tags = dict(sorted(tags.items()))
|
||||
compare = {}
|
||||
for tag, commit in base_repo_tags:
|
||||
compare[tag] = commit
|
||||
assert tags == compare
|
||||
|
||||
|
||||
def test_list_submodules_no_submodules(cloned_base_repo_obj):
|
||||
assert cloned_base_repo_obj.list_submodules() == []
|
||||
assert cloned_base_repo_obj.discovered_submodules_commits == ["e0c7e2a72579e24657c05e875201011d2b48bf94"]
|
||||
|
|
Loading…
Reference in New Issue
Block a user