460 lines
15 KiB
Python
460 lines
15 KiB
Python
import git
|
|
import pytest
|
|
|
|
import repo_cloner.lib.cloner
|
|
from repo_cloner.lib import gen_repo_hashed_name, DirNotFoundError, Cloner
|
|
from cloner_test_fixtures import *
|
|
from pathlib import Path
|
|
import logging
|
|
from unittest.mock import patch, PropertyMock, MagicMock, call
|
|
|
|
some_commits = [
|
|
"d766d9c302463257695a4d53d857a2cecd024414",
|
|
"a2e308bb9c5e59e7a0c319dca0adf3966f3f3a60",
|
|
"36021088abb6ebcb2202897f3f27b26b21c25068",
|
|
"e811211f0895e9e792f569b1b38f8452e0efe42f",
|
|
"6854c793fd2cbd848650a94b9fed4924e5e428b2",
|
|
"2f08263b5aff30ce776bb0350503be69d45647da",
|
|
"fc53c9d8bc9e01c63632376df51abb67ca344a89",
|
|
"c95a862af3d13623da3d7a6b73ff79d68ff2d7ad",
|
|
"b21c1e9e329bcdef1980462cf62c09a9e807dafd",
|
|
"dec7ff27d658412f0088c4c3220990f1f1dc98f1",
|
|
"ad66f820c9dfa06c3767f81cb42b1398fa61ff05",
|
|
"b9af12b9f80b372c645b5f64c5a8f8b6a148ccfc",
|
|
"1fda6025ed303ecf03b9afa7cce882aaddc058f9",
|
|
"d33585f0a72d9de6c481891787e4d956b769ccb5",
|
|
"7f685afca71731cb9276f937fc8cceca6b47b1f2",
|
|
"af112317aa2459e88db9cb10533ea6f29818afee",
|
|
"ddc24a34983ff43b61e1bdf27b7ec2357932e7e0",
|
|
"7ccca22ba367433eedb226310373fd250a2af725",
|
|
"e4843c6d2391d0b45beea81e0dd34078484b474f",
|
|
"fe942d46c33fee0e68d2e0ed64dfa6515dbf89c4",
|
|
"1d80638229e0a7ef62b3b9cdab9ad9f63acf6d38",
|
|
"f968ea00a3007034ddf196285d7b60fec5e7fcf0",
|
|
"490859dbebe02af6bd643a89b6897961674f90fd",
|
|
"066de679c906a78f7f817ee9a48d3021fd8b6b7c",
|
|
"2a8277687fb6dee742e6e0193ea95fcd2264fbc4",
|
|
"8808c649a8b0a5279a2d9a46b730254cad649205",
|
|
"ad5db175fab144a8851b712195733c4a9c00d699",
|
|
"36d45a4c1e5ad8c47a14881c5427045c3de095d0",
|
|
"6b0b9affedb3e3daa4df00cd54678d78ea1c1d94",
|
|
"aa5056610ff57f73bae9633a985c6a8e41f3bc23",
|
|
]
|
|
|
|
|
|
def mock_time() -> float:
|
|
return 1658702099.4258687
|
|
|
|
|
|
class MockConfig:
|
|
def __init__(self):
|
|
self.cloner_repo_url = ""
|
|
self.cloner_project_name = "Mocked Project"
|
|
self.cloner_interval = 0
|
|
self.cloner_submodules = False
|
|
self.cloner_submodule_depth = 100
|
|
|
|
|
|
class MockDirStruct:
|
|
raise_cache_exists = False
|
|
|
|
def __init__(self, tmp: Path):
|
|
self.config = MockConfig()
|
|
self.cache_dir = tmp.joinpath("cache")
|
|
self.config_dir = tmp.joinpath("config")
|
|
self.repos_dir = tmp.joinpath("repos")
|
|
self.raise_cache_exists = False
|
|
|
|
@property
|
|
def cache_dir_exists(self):
|
|
if self.raise_cache_exists:
|
|
raise DirNotFoundError("mock_dir")
|
|
return True
|
|
|
|
|
|
class MockRepoTool:
|
|
path = None
|
|
initialized = False
|
|
|
|
def __init__(self, path: str):
|
|
self.path = path
|
|
self.initialized = False
|
|
|
|
|
|
def test_init_invalid_config(tmp_path, caplog):
|
|
caplog.set_level(logging.INFO)
|
|
mock = MockDirStruct(tmp_path)
|
|
with pytest.raises(KeyError) as e:
|
|
Cloner(mock)
|
|
assert "KeyError: 'cloner_repo_url not defined in config!'" == e.exconly()
|
|
assert "CRITICAL root:cloner.py:" in caplog.text
|
|
assert "Undefined repo cloner URL in config!\n" in caplog.text
|
|
|
|
# valid config
|
|
caplog.clear()
|
|
# set mocks
|
|
mock.config.cloner_repo_url = "git@mocked:/path"
|
|
cache_dir = tmp_path.joinpath("cache")
|
|
# validate dir does not exist
|
|
assert not cache_dir.exists()
|
|
# set mock to raise error
|
|
mock.raise_cache_exists = True
|
|
# create new cloner class
|
|
Cloner(mock)
|
|
# dir should exist now
|
|
assert cache_dir.exists()
|
|
assert "Cache dir for project Mocked Project not found -> creating" in caplog.text
|
|
|
|
|
|
def test_init_create_dirs(cloner_dir_with_config):
|
|
dirs = MockDirStruct(cloner_dir_with_config)
|
|
dirs.config.cloner_repo_url = "https://repo"
|
|
Cloner(dirs)
|
|
assert cloner_dir_with_config.joinpath("cache", "submodules").exists()
|
|
|
|
|
|
def test_check_interval(cloner_dir_struct: Path, monkeypatch):
|
|
mock = MockDirStruct(cloner_dir_struct)
|
|
mock.config.cloner_repo_url = "git@mocked:/path"
|
|
timestamp_file = cloner_dir_struct.joinpath("cache", "last-check-time")
|
|
|
|
with monkeypatch.context() as m:
|
|
m.setattr("repo_cloner.lib.cloner.time", mock_time)
|
|
cloner = Cloner(mock)
|
|
# no timestamp file exists -> timer has to run
|
|
assert cloner.check_interval()
|
|
|
|
# timestamp file in future -> no run
|
|
timestamp_file.touch()
|
|
timestamp_file.write_text(str(int(mock_time() + 1)))
|
|
assert not cloner.check_interval()
|
|
|
|
# timestamp in history, but run everytime is set >> RUN
|
|
timestamp_file.write_text(str(int(mock_time() - 1)))
|
|
assert cloner.check_interval()
|
|
|
|
# second ago, but with 5 minute interval
|
|
mock.config.cloner_interval = 5
|
|
assert not cloner.check_interval()
|
|
|
|
# 6 minutes ago, with 5 min interval -> run!
|
|
timestamp_file.write_text(str(int(mock_time() - 360)))
|
|
assert cloner.check_interval()
|
|
|
|
# corrupted file
|
|
timestamp_file.write_text("1234WTF")
|
|
assert cloner.check_interval()
|
|
|
|
|
|
def test__opened(cloner_dir_with_config, monkeypatch):
|
|
ds = MockDirStruct(cloner_dir_with_config)
|
|
ds.config.cloner_repo_url = cloner_dir_with_config.as_uri()
|
|
c = Cloner(ds)
|
|
# no repo defined
|
|
assert not c._Cloner__opened
|
|
# with mocked repo
|
|
monkeypatch.setattr(c, "_repo", MockRepoTool(cloner_dir_with_config.as_posix()))
|
|
c._repo.initialized = True
|
|
assert c._Cloner__opened
|
|
c._repo.initialized = False
|
|
assert not c._Cloner__opened
|
|
|
|
|
|
def test_repo_path_by_url(cloner_dir_struct: Path, path_repo_base: Path):
|
|
ds = MockDirStruct(cloner_dir_struct)
|
|
ds.config.cloner_repo_url = path_repo_base.as_uri()
|
|
c = Cloner(ds)
|
|
x = c._repo_path_by_url("git@server:namespace/repo.git")
|
|
assert x == ds.repos_dir.joinpath("namespace_repo_3375634822.git").as_posix()
|
|
|
|
|
|
def test__main_repo_path(cloner_dir_struct: Path, path_repo_base: Path):
|
|
ds = MockDirStruct(cloner_dir_struct)
|
|
ds.config.cloner_repo_url = path_repo_base.as_uri()
|
|
hashed = gen_repo_hashed_name(path_repo_base.as_uri())
|
|
c = Cloner(ds)
|
|
x = c._Cloner__main_repo_path
|
|
assert x == ds.repos_dir.joinpath(hashed).as_posix()
|
|
|
|
|
|
def test_check_submodules_repo(tmp_path):
|
|
from collections import namedtuple
|
|
from repo_cloner.lib import DiskStoredList
|
|
|
|
def submodule_scanner_mock(commit = None):
|
|
if commit.hexsha in ["f968ea00a3007034ddf196285d7b60fec5e7fcf0", "36d45a4c1e5ad8c47a14881c5427045c3de095d0"]:
|
|
return ["https://git.hosting:namespace/repo2.git"]
|
|
if commit.hexsha in ["6b0b9affedb3e3daa4df00cd54678d78ea1c1d94", "aa5056610ff57f73bae9633a985c6a8e41f3bc23"]:
|
|
return ["https://git.hosting:namespace/repo2.git", "https://git.hosting:namespace/repo3.git"]
|
|
return False
|
|
|
|
repo_path = tmp_path.joinpath("repo.git").as_posix()
|
|
cache_path = tmp_path.joinpath("cache.list")
|
|
|
|
submodule_cache = tmp_path.joinpath("submodules").as_posix()
|
|
submodules = DiskStoredList(submodule_cache)
|
|
submodules.append("https://git.hosting:namespace/repo1.git")
|
|
|
|
Commit = namedtuple("Commit", ["hexsha"])
|
|
# 30 commits
|
|
mocked_commits = [Commit(commit) for commit in some_commits]
|
|
|
|
# write cache file with collected commits
|
|
with open(cache_path.as_posix(), "w") as f:
|
|
f.writelines([f"{commit}\n" for commit in some_commits[0:20]])
|
|
|
|
mocks = {
|
|
'initialized': PropertyMock(return_value = True),
|
|
"path": PropertyMock(return_value = repo_path),
|
|
"list_commits": MagicMock(return_value = mocked_commits),
|
|
"list_submodules": MagicMock(side_effect = submodule_scanner_mock),
|
|
}
|
|
|
|
# prepare mocks for RepoTool
|
|
with patch.multiple("repo_cloner.lib.repo_tool.RepoTool", **mocks):
|
|
import repo_cloner.lib.repo_tool
|
|
rt = repo_cloner.lib.repo_tool.RepoTool(tmp_path.as_posix())
|
|
|
|
Cloner.check_submodules_repo(rt, cache_path.as_posix(), submodules, 100)
|
|
|
|
assert rt.list_commits.call_args == call(100)
|
|
assert rt.list_submodules.call_count == 10
|
|
|
|
returned_submodules = [x for x in submodules]
|
|
assert returned_submodules == [
|
|
'https://git.hosting:namespace/repo1.git', 'https://git.hosting:namespace/repo2.git',
|
|
'https://git.hosting:namespace/repo3.git'
|
|
]
|
|
|
|
|
|
def test_sync(cloner_dir_struct, tmp_path, monkeypatch):
|
|
called: bool = False
|
|
|
|
def fetch():
|
|
nonlocal called
|
|
called = True
|
|
return True
|
|
|
|
class PatchCloner(Cloner):
|
|
open = False
|
|
|
|
@property
|
|
def __opened(self) -> bool:
|
|
return self.open
|
|
|
|
ds = MockDirStruct(cloner_dir_struct)
|
|
ds.config.cloner_repo_url = "file://wut!"
|
|
repo = tmp_path.joinpath("repo.git")
|
|
c = PatchCloner(ds)
|
|
# invalid repo, closed
|
|
c.open = False
|
|
assert not c.sync()
|
|
assert not called
|
|
# mocked valid repo, opened
|
|
c._repo = MockRepoTool(repo.as_posix())
|
|
c._repo.initialized = True
|
|
c._repo.fetch = fetch
|
|
assert c.sync()
|
|
assert called
|
|
|
|
called = False
|
|
# mocked repo, invalid, closed
|
|
c._repo.initialized = False
|
|
assert not called
|
|
assert not c.sync()
|
|
|
|
|
|
def test_sync_submodules_failed(cloner_dir_with_config, caplog):
|
|
ds = MockDirStruct(cloner_dir_with_config)
|
|
# mock config for submodules
|
|
ds.config.cloner_submodules = True
|
|
ds.config.cloner_repo_url = "asdhlk"
|
|
|
|
# mock repo_tool.fetch to return false
|
|
mocks = {
|
|
'initialized': PropertyMock(return_value = True),
|
|
'fetch': MagicMock(return_value = False)
|
|
}
|
|
|
|
with patch.multiple("repo_cloner.lib.cloner.RepoTool", **mocks) as mock:
|
|
cl = Cloner(ds)
|
|
assert not cl.sync()
|
|
|
|
assert caplog.records[1].levelname == "CRITICAL"
|
|
assert caplog.records[1].message == "Repo fetch failed for Mocked Project"
|
|
|
|
|
|
def my_check_submodules_repo(
|
|
repo_tool, cache_file: str,
|
|
submodule_list: list,
|
|
scan_depth = None):
|
|
if "namespace_whatever_" in cache_file:
|
|
submodule_list.append("https://git.hosting:namespace/submodule1.git")
|
|
submodule_list.append("https://git.hosting:namespace/submodule2.git")
|
|
if "/namespace_submodule1_" in cache_file:
|
|
submodule_list.append("https://git.hosting:namespace/submodule3.git")
|
|
return submodule_list
|
|
|
|
|
|
def test_submodules_sync_succeed(cloner_dir_with_config, caplog):
|
|
cloner_dir_with_config.joinpath("cache", "submodules").mkdir(parents = True)
|
|
cloner_dir_with_config.joinpath("cache", "submodules", "submodules.cache") \
|
|
.write_text("https://git.hosting:previous/submodule.git")
|
|
ds = MockDirStruct(cloner_dir_with_config)
|
|
ds.config.cloner_repo_url = "https://git.hosting:namespace/whatever.git"
|
|
ds.config.cloner_submodules = True
|
|
|
|
# mock almost everything
|
|
mocks = {
|
|
'initialized': PropertyMock(side_effect = [True, False, True, True, True]),
|
|
'fetch': MagicMock(return_value = True),
|
|
'clone': MagicMock(return_value = True),
|
|
}
|
|
|
|
with patch.multiple("repo_cloner.lib.cloner.RepoTool", **mocks):
|
|
Cloner.check_submodules_repo = my_check_submodules_repo
|
|
cl = Cloner(ds)
|
|
assert cl.sync()
|
|
|
|
assert repo_cloner.lib.cloner.RepoTool.fetch.call_count == 4
|
|
assert repo_cloner.lib.cloner.RepoTool.clone.call_count == 1
|
|
|
|
|
|
def test_submodules_sync_one_fail(cloner_dir_with_config, caplog):
|
|
cloner_dir_with_config.joinpath("cache", "submodules").mkdir(parents = True)
|
|
cloner_dir_with_config.joinpath("cache", "submodules", "submodules.cache") \
|
|
.write_text("https://git.hosting:previous/submodule.git")
|
|
ds = MockDirStruct(cloner_dir_with_config)
|
|
ds.config.cloner_repo_url = "https://git.hosting:namespace/whatever.git"
|
|
ds.config.cloner_submodules = True
|
|
# just to make coverage nicer
|
|
ds.config.cloner_submodule_depth = 0
|
|
|
|
# mock almost everything
|
|
mocks = {
|
|
'initialized': PropertyMock(return_value = True),
|
|
'fetch': MagicMock(side_effect = [True, True, True, False, True]),
|
|
'clone': MagicMock(return_value = True),
|
|
}
|
|
|
|
with patch.multiple("repo_cloner.lib.cloner.RepoTool", **mocks):
|
|
Cloner.check_submodules_repo = my_check_submodules_repo
|
|
cl = Cloner(ds)
|
|
assert not cl.sync()
|
|
|
|
assert repo_cloner.lib.cloner.RepoTool.fetch.call_count == 5
|
|
assert repo_cloner.lib.cloner.RepoTool.clone.call_count == 0
|
|
|
|
assert "Clone/fetch of submodule: https://git.hosting:namespace/submodule2.git failed" in caplog.text
|
|
|
|
|
|
def test_perform_check(cloner_dir_with_config, monkeypatch, caplog):
|
|
call_counter: int = 0
|
|
|
|
def ret_true():
|
|
return True
|
|
|
|
def ret_false():
|
|
return False
|
|
|
|
def call_counter_inc():
|
|
nonlocal call_counter
|
|
call_counter += 1
|
|
|
|
# set logger
|
|
caplog.set_level(logging.INFO)
|
|
|
|
mock = MockDirStruct(cloner_dir_with_config)
|
|
mock.config.cloner_repo_url = "git://repo"
|
|
cloner = Cloner(mock)
|
|
# timer did not pass
|
|
monkeypatch.setattr(cloner, "check_interval", ret_false)
|
|
monkeypatch.setattr(cloner, "sync", call_counter_inc)
|
|
assert call_counter == 0
|
|
cloner.perform_check()
|
|
assert call_counter == 0
|
|
# timer passed
|
|
monkeypatch.setattr(cloner, "check_interval", ret_true)
|
|
cloner.perform_check()
|
|
assert call_counter == 1
|
|
# right logs
|
|
assert 2 == sum(
|
|
r.message == "Started check for Mocked Project, url: git://repo"
|
|
and r.levelname == "INFO"
|
|
for r in caplog.records
|
|
)
|
|
assert 2 == sum(
|
|
r.message == "Check finished"
|
|
and r.levelname == "INFO"
|
|
for r in caplog.records
|
|
)
|
|
|
|
|
|
def test_open_uninitialized(cloner_dir_with_config, path_repo_base):
|
|
url = path_repo_base.as_uri()
|
|
mock = MockDirStruct(cloner_dir_with_config)
|
|
mock.config.cloner_repo_url = path_repo_base.as_uri()
|
|
c = Cloner(mock)
|
|
assert not c.open(mock.config.cloner_repo_url)
|
|
assert c._repo_url == path_repo_base.as_uri()
|
|
|
|
|
|
def test_open_initialized(cloner_dir_with_config, path_repo_base, caplog):
|
|
mock = MockDirStruct(cloner_dir_with_config)
|
|
hashed = gen_repo_hashed_name(path_repo_base.as_uri())
|
|
path = cloner_dir_with_config.joinpath("repos", hashed).as_posix()
|
|
mock.config.cloner_repo_url = path_repo_base.as_uri()
|
|
r = git.Repo().clone_from(path_repo_base.as_uri(), path, bare = True)
|
|
commit = r.head.commit.hexsha
|
|
assert r.bare
|
|
c = Cloner(mock)
|
|
assert c.open(path_repo_base.as_uri())
|
|
assert c._repo._repo.head.commit.hexsha == commit
|
|
|
|
|
|
def test_clone(tmp_path, path_repo_base):
|
|
mock = MockDirStruct(tmp_path)
|
|
mock.config.cloner_repo_url = "invalid"
|
|
c = Cloner(mock)
|
|
assert c.clone(path_repo_base.as_uri())
|
|
assert "e0c7e2a72579e24657c05e875201011d2b48bf94" == c._repo._repo.head.commit.hexsha
|
|
|
|
|
|
def test_clone_initialized(tmp_path, path_repo_base, caplog):
|
|
mock = MockDirStruct(tmp_path)
|
|
hashed = gen_repo_hashed_name(path_repo_base.as_uri())
|
|
path = tmp_path.joinpath("repos", hashed).as_posix()
|
|
mock.config.cloner_repo_url = path_repo_base.as_uri()
|
|
git.Repo().init(path)
|
|
c = Cloner(mock)
|
|
assert not c.clone(path_repo_base.as_uri())
|
|
assert caplog.records[0].levelname == "CRITICAL"
|
|
assert caplog.records[0].message == f"Repo path {path} is initialized... Refusing clone!"
|
|
|
|
|
|
def test_clone_recursive(tmp_path, path_repo_base, caplog):
|
|
mock = MockDirStruct(tmp_path)
|
|
mock.config.cloner_repo_url = "https://repo"
|
|
mock.config.cloner_submodules = True
|
|
Path(mock.repos_dir).mkdir()
|
|
|
|
with patch(
|
|
"repo_cloner.lib.cloner.RepoTool.initialized",
|
|
new_callable = PropertyMock,
|
|
return_value = False) as patch_initialized, \
|
|
patch(
|
|
"repo_cloner.lib.cloner.RepoTool.clone_recursive",
|
|
return_value = True) as patch_clone_recursive:
|
|
cloner = Cloner(mock)
|
|
assert cloner.clone()
|
|
|
|
assert patch_initialized.call_count == 1
|
|
assert patch_clone_recursive.call_count == 1
|
|
patch_clone_recursive.assert_called_with(
|
|
'https://repo', tmp_path.joinpath('cache', 'submodules').as_posix(), scan_depth = 100)
|
|
|
|
mock.config.cloner_submodule_depth = 0
|
|
assert cloner.clone()
|
|
patch_clone_recursive.assert_called_with(
|
|
'https://repo', tmp_path.joinpath('cache', 'submodules').as_posix(), scan_depth = None)
|