Add config parser functionality
Signed-off-by: Václav Valíček <valicek1994@gmail.com>
This commit is contained in:
parent
b31b2e199b
commit
8e92bc70a7
|
@ -1,4 +1,10 @@
|
|||
import os
|
||||
|
||||
from repo_cloner.lib.default_cloner_config import DefaultClonerConfig
|
||||
from repo_cloner.lib.config_file_not_found_error import ConfigFileNotFoundError
|
||||
import logging
|
||||
|
||||
l = logging.getLogger("rc.cfg")
|
||||
|
||||
|
||||
class ClonerConfig(DefaultClonerConfig):
|
||||
|
@ -37,6 +43,79 @@ class ClonerConfig(DefaultClonerConfig):
|
|||
def _set_property(self, key: str, value):
|
||||
if not self.has_property(key):
|
||||
raise KeyError(f"{key} is not recognized config option")
|
||||
if not isinstance(value, type(self.__getattribute__(key))):
|
||||
raise ValueError(f"Invalid value for key {key}: type is {type(value)}")
|
||||
|
||||
requested_type = type(self.__getattribute__(key));
|
||||
if not isinstance(value, requested_type):
|
||||
l.debug(f"Trying conversion of {key} from {type(value)} to {requested_type}")
|
||||
try:
|
||||
if requested_type == bool:
|
||||
value = True if value == "1" else False
|
||||
else:
|
||||
value = requested_type(value)
|
||||
l.debug(f"Conversion result: {type(value)}: {value}")
|
||||
except Exception as e:
|
||||
l.critical(f"Conversion failed! ({str(e)}")
|
||||
raise ValueError(
|
||||
f"Invalid value for key {key}: type is {type(value)} instead of {requested_type}, conversion failed")
|
||||
self.__values[key] = value
|
||||
|
||||
|
||||
class ClonerConfigParser:
|
||||
__file: str = ""
|
||||
__invalid_lines: list = []
|
||||
|
||||
def __init__(self, config_file: str):
|
||||
self.__file = config_file
|
||||
self.__config = ClonerConfig()
|
||||
self.__invalid_lines = []
|
||||
l.info(f"Parsing cloner config file: {self.__file}")
|
||||
|
||||
if not os.path.exists(self.__file):
|
||||
l.critical(f"Config file: {self.__file} does not exist!")
|
||||
raise ConfigFileNotFoundError(self.__file)
|
||||
|
||||
# parse config file
|
||||
raw_lines = []
|
||||
with open(self.__file, "r") as f:
|
||||
raw_lines = f.readlines()
|
||||
l.debug(f"Readen {len(raw_lines)} lines")
|
||||
lines = []
|
||||
|
||||
# strip lines, remove comments
|
||||
line: str = ""
|
||||
for line in raw_lines:
|
||||
if not line.startswith('#'):
|
||||
line = line.strip()
|
||||
# empty line? skip
|
||||
if len(line) > 0:
|
||||
lines.append(line)
|
||||
l.debug(f"Found {len(lines)} config lines")
|
||||
|
||||
for line in lines:
|
||||
if not "=" in line:
|
||||
l.warning(f"Line '{line}' has invalid format!")
|
||||
self.__invalid_lines.append((line, None))
|
||||
continue
|
||||
eq = line.find("=")
|
||||
key: str = line[0:eq]
|
||||
val: str = line[eq + 1:]
|
||||
l.debug(f"Found config pair: {key} => '{val}'")
|
||||
|
||||
try:
|
||||
self.__config._set_property(key, val)
|
||||
except BaseException as e:
|
||||
self.__invalid_lines.append((key, val))
|
||||
l.critical(str(e))
|
||||
|
||||
@property
|
||||
def config(self) -> ClonerConfig:
|
||||
return self.__config
|
||||
|
||||
@property
|
||||
def invalid(self) -> bool:
|
||||
return len(self.__invalid_lines) > 0
|
||||
|
||||
@property
|
||||
def invalid_lines(self):
|
||||
return self.__invalid_lines
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import os.path
|
||||
from repo_cloner.lib.dir_not_found_error import DirNotFoundError
|
||||
from repo_cloner.lib.config_file_not_found_error import ConfigFileNotFoundError
|
||||
from repo_cloner.lib.cloner_config import ClonerConfig, ClonerConfigParser
|
||||
|
||||
|
||||
class RepoDirStructure():
|
||||
|
@ -8,12 +9,14 @@ class RepoDirStructure():
|
|||
_conf_dir: str = ""
|
||||
_cache_dir: str = ""
|
||||
_repos_dir: str = ""
|
||||
_config = None
|
||||
|
||||
def __init__(self, base_dir: str):
|
||||
self._base_dir = base_dir
|
||||
self._conf_dir = os.path.join(self._base_dir, "config")
|
||||
self._cache_dir = os.path.join(self._base_dir, "cache")
|
||||
self._repos_dir = os.path.join(self._base_dir, "repos")
|
||||
self._config = None
|
||||
|
||||
@property
|
||||
def __config_filename(self) -> str:
|
||||
|
@ -75,3 +78,15 @@ class RepoDirStructure():
|
|||
if not os.path.exists(self.__config_filename):
|
||||
raise ConfigFileNotFoundError(self.__config_filename)
|
||||
return True
|
||||
|
||||
@property
|
||||
def config_file(self):
|
||||
return self.__config_filename
|
||||
|
||||
@property
|
||||
def config(self) -> ClonerConfig:
|
||||
self.has_config
|
||||
if not self._config:
|
||||
parser = ClonerConfigParser(self.__config_filename)
|
||||
self._config = parser.config
|
||||
return self._config
|
||||
|
|
|
@ -1,24 +1,37 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
|
||||
import logging as l
|
||||
|
||||
#l.basicConfig(level = 0)
|
||||
|
||||
# create console handler with a higher log level
|
||||
console_logger = l.StreamHandler()
|
||||
console_formatter = l.Formatter(
|
||||
"%(asctime)-15s :: [%(levelname)8s] :: %(name)-15s :: %(message)s (%(filename)s:%(lineno)s)",
|
||||
"%Y-%m-%d %H:%M:%S")
|
||||
# setup logger
|
||||
console_logger.setFormatter(console_formatter)
|
||||
log = l.getLogger("rc")
|
||||
log.addHandler(console_logger)
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
from repo_cloner.lib.repo_dir_structure import RepoDirStructure
|
||||
from git.config import GitConfigParser
|
||||
import logging as l
|
||||
|
||||
l.basicConfig(level = 0)
|
||||
|
||||
|
||||
def config_try_override(config_writer: GitConfigParser, section: str, option: str, value: str):
|
||||
if not section in config_writer.sections():
|
||||
l.debug(f"CFG Creating section: {section}")
|
||||
log.debug(f"CFG Creating section: {section}")
|
||||
config_writer.add_section(section)
|
||||
if not config_writer.has_option(section, option):
|
||||
l.debug(f"CFG Creating option: {option}")
|
||||
log.debug(f"CFG Creating option: {option}")
|
||||
config_writer.add_value(section, option, "")
|
||||
|
||||
l.debug(f"Setting {section}.{option} = {value}")
|
||||
log.debug(f"Setting {section}.{option} = {value}")
|
||||
config_writer.set(section, option, value)
|
||||
|
||||
|
||||
|
@ -50,7 +63,9 @@ def main():
|
|||
config_try_override(cfgw, "core", "sshcommand",
|
||||
f"ssh -i {ssh_identity} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o BatchMode=yes -q")
|
||||
|
||||
from repo_cloner.lib.cloner_config import ClonerConfigParser
|
||||
ClonerConfigParser(os.path.join(dirs.conf_dir, "cloner.cfg"))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
main()
|
||||
|
|
1
tests/_support_data/config/empty.cfg
Normal file
1
tests/_support_data/config/empty.cfg
Normal file
|
@ -0,0 +1 @@
|
|||
# empty file with no directives
|
20
tests/_support_data/config/invalid.cfg
Normal file
20
tests/_support_data/config/invalid.cfg
Normal file
|
@ -0,0 +1,20 @@
|
|||
# cloner.cfg
|
||||
# main config
|
||||
# created at 2018-03-23 20:24:57
|
||||
invalid_option1
|
||||
invalid_option=2
|
||||
|
||||
# main url - url of main repo - just to clone
|
||||
cloner_repo_url=git@git.somerandomserver.test:/path
|
||||
|
||||
# project name (names of volumes are derrived from this
|
||||
cloner_project_name=some-random-project
|
||||
|
||||
# cloner interval (in minutes, default=0 - run always)
|
||||
cloner_interval=666
|
||||
|
||||
# do you need submodules support? (1/0)
|
||||
cloner_submodules=1
|
||||
|
||||
# max depth of submodule scan (default = unlimited, uncomment to use)
|
||||
cloner_submodule_depth=500000
|
18
tests/_support_data/config/valid.cfg
Normal file
18
tests/_support_data/config/valid.cfg
Normal file
|
@ -0,0 +1,18 @@
|
|||
# cloner.cfg
|
||||
# main config
|
||||
# created at 2018-03-23 20:24:57
|
||||
|
||||
# main url - url of main repo - just to clone
|
||||
cloner_repo_url=git@git.somerandomserver.test:/path
|
||||
|
||||
# project name (names of volumes are derrived from this
|
||||
cloner_project_name=some-random-project
|
||||
|
||||
# cloner interval (in minutes, default=0 - run always)
|
||||
cloner_interval=666
|
||||
|
||||
# do you need submodules support? (1/0)
|
||||
cloner_submodules=1
|
||||
|
||||
# max depth of submodule scan (default = unlimited, uncomment to use)
|
||||
cloner_submodule_depth=500000
|
|
@ -66,7 +66,7 @@ def test_set_property():
|
|||
# invalid type
|
||||
with pytest.raises(ValueError) as exc:
|
||||
x._set_property("cloner_interval", "č")
|
||||
assert exc.exconly() == "ValueError: Invalid value for key cloner_interval: type is <class 'str'>"
|
||||
assert exc.exconly() == "ValueError: Invalid value for key cloner_interval: type is <class 'str'> instead of <class 'int'>, conversion failed"
|
||||
|
||||
x._set_property("cloner_project_name", "č")
|
||||
|
||||
|
@ -74,3 +74,15 @@ def test_set_property():
|
|||
with pytest.raises(KeyError) as exc:
|
||||
x._set_property("nonexistent_key", 888)
|
||||
assert exc.exconly() == "KeyError: 'nonexistent_key is not recognized config option'"
|
||||
|
||||
# boolean conversion
|
||||
x._set_property("cloner_submodules", "1")
|
||||
assert x.cloner_submodules == True
|
||||
|
||||
x._set_property("cloner_submodules", "0")
|
||||
assert x.cloner_submodules == False
|
||||
|
||||
# integer conversion
|
||||
x._set_property("cloner_interval", 60)
|
||||
assert x.cloner_interval == 60
|
||||
|
||||
|
|
72
tests/lib/test_cloner_config_parser.py
Normal file
72
tests/lib/test_cloner_config_parser.py
Normal file
|
@ -0,0 +1,72 @@
|
|||
from repo_cloner.lib.cloner_config import ClonerConfigParser
|
||||
import pytest
|
||||
from repo_cloner.lib.config_file_not_found_error import ConfigFileNotFoundError
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config_path() -> Path:
|
||||
path = Path(__file__).parent.parent
|
||||
return path.joinpath("_support_data", "config")
|
||||
|
||||
|
||||
def test_no_config_file():
|
||||
with pytest.raises(ConfigFileNotFoundError) as e:
|
||||
x = ClonerConfigParser("/tmp/path/that/does/not/exist")
|
||||
assert e.__str__() == "<ExceptionInfo ConfigFileNotFoundError('/tmp/path/that/does/not/exist') tblen=2>"
|
||||
|
||||
|
||||
def test_valid_config(config_path: Path):
|
||||
cfg = config_path.joinpath("valid.cfg")
|
||||
x = ClonerConfigParser(cfg)
|
||||
assert x.invalid == False
|
||||
assert x.invalid_lines == []
|
||||
c = x.config
|
||||
assert c.cloner_project_name == "some-random-project"
|
||||
assert c.cloner_repo_url == "git@git.somerandomserver.test:/path"
|
||||
assert c.cloner_interval == 666
|
||||
assert c.cloner_submodules == True
|
||||
assert c.cloner_submodule_depth == 500000
|
||||
|
||||
|
||||
def test_empty_config(config_path: Path):
|
||||
cfg = config_path.joinpath("empty.cfg")
|
||||
x = ClonerConfigParser(cfg)
|
||||
assert x.invalid == False
|
||||
assert x.invalid_lines == []
|
||||
c = x.config
|
||||
assert c.cloner_project_name == ""
|
||||
assert c.cloner_repo_url == ""
|
||||
assert c.cloner_interval == 5
|
||||
assert c.cloner_submodules == True
|
||||
assert c.cloner_submodule_depth == 50000
|
||||
|
||||
|
||||
def test_invalid_config(config_path: Path, caplog):
|
||||
cfg = config_path.joinpath("invalid.cfg")
|
||||
with caplog.at_level(30):
|
||||
x = ClonerConfigParser(cfg)
|
||||
|
||||
# two invalid lines
|
||||
assert x.invalid == True
|
||||
assert x.invalid_lines == [
|
||||
('invalid_option1', None),
|
||||
('invalid_option', "2"),
|
||||
]
|
||||
# test logging
|
||||
assert any(
|
||||
(r.levelname == "CRITICAL" and r.message == "'invalid_option is not recognized config option'")
|
||||
for r in caplog.records
|
||||
)
|
||||
|
||||
assert any(
|
||||
(r.levelname == "WARNING" and r.message == "Line 'invalid_option1' has invalid format!")
|
||||
for r in caplog.records
|
||||
)
|
||||
|
||||
c = x.config
|
||||
assert c.cloner_project_name == "some-random-project"
|
||||
assert c.cloner_repo_url == "git@git.somerandomserver.test:/path"
|
||||
assert c.cloner_interval == 666
|
||||
assert c.cloner_submodules == True
|
||||
assert c.cloner_submodule_depth == 500000
|
|
@ -102,3 +102,25 @@ def test_has_config(tmp_path: PosixPath):
|
|||
|
||||
tmp_path.joinpath("config", "cloner.cfg").touch()
|
||||
assert X.has_config
|
||||
|
||||
|
||||
def test_config_file(tmp_path: PosixPath):
|
||||
X = RepoDirStructure(tmp_path)
|
||||
assert X.config_file == tmp_path.joinpath("config", "cloner.cfg").__str__()
|
||||
|
||||
|
||||
def test_get_config(tmp_path: PosixPath):
|
||||
tmp_path.joinpath("config").mkdir()
|
||||
tmp_path.joinpath("cache").mkdir()
|
||||
tmp_path.joinpath("repos").mkdir()
|
||||
|
||||
X = RepoDirStructure(tmp_path)
|
||||
# no file provided
|
||||
with pytest.raises(ConfigFileNotFoundError) as excinfo:
|
||||
X.config
|
||||
|
||||
# create config file
|
||||
tmp_path.joinpath("config", "cloner.cfg").touch()
|
||||
X = RepoDirStructure(tmp_path)
|
||||
assert X.has_config == True
|
||||
assert 5 == X.config.cloner_interval
|
||||
|
|
Loading…
Reference in New Issue
Block a user