Skip to content
Permalink
29ea396dd3
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
executable file 262 lines (231 sloc) 7.39 KB
#!/usr/bin/env python3
from pathlib import Path
from configparser import ConfigParser
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from time import sleep
from subprocess import check_call, check_output, PIPE, STDOUT, Popen
import shlex
from os import environ
import logging
# these can be overwritten by cmd line args:
REPO_DIR = Path ( "." )
STORE_DIR = REPO_DIR / "dependencies"
DEP_DIR = REPO_DIR / "dependencies"
CONFIG_FILE = REPO_DIR / "dependencies.conf"
repo_keys = ('uri', 'hash', 'init')
def read_config(
config_file
):
parser = ConfigParser()
parser.read( config_file )
for dep in parser.sections():
for key in ('uri', 'hash'):
if key not in parser[dep]:
raise( Exception( f"error reading config file: dependency '{dep}': missing key '{key}'" ) )
for key in parser[dep]:
if key not in repo_keys:
raise( Exception( f"error reading config file: dependency '{dep}': unknown field '{key}'" ) )
return parser
def get_dep(
dest_dir,
log_dir,
name,
uri,
hash,
init_script = None
):
logging.info( f"dependency: {name}" )
# clone, if necessary:
repo_dir = (dest_dir / name)
log_file = (log_dir / name) . with_suffix( ".log" )
if not repo_dir . is_dir():
exec_command(
f'git clone "{uri}" "{repo_dir}"'
)
exec_command(
f'git checkout "{hash}"',
cwd = repo_dir
)
# run script, if any:
if init_script is not None:
logging.debug( f"calling init script '{init_script}'..." )
env = environ.copy()
env['PYTHONUNBUFFERED'] = '1'
exec_command(
init_script,
cwd = repo_dir,
env = env,
output_to = ToFile( log_file ),
)
# if wrong version, error!
current_version = \
check_output(
["git", "rev-parse", "HEAD"],
cwd = repo_dir,
universal_newlines = True
).strip()
if current_version != hash:
logging.error( f"{name}: version conflict. Needed: '{hash}', found: '{current_version}'" )
logging.info( f"remove the repo and try again!" )
exit(1)
def init_logging(
log_level, # log level in the terminal
log_file,
log_level_file = logging.DEBUG
):
######################
# Set up logging #
######################
log_dir = log_file.parent
if not (log_dir.exists() and log_dir.is_dir()):
log_dir . mkdir( parents=True, exist_ok=True )
sleep( 1 )
# always log to file:
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
filename = log_file,
filemode = "w"
)
rootLogger = logging.getLogger()
# set up logging to terminal:
terminal_formatter = \
logging.Formatter(
"%(levelname)s - %(message)s"
)
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(terminal_formatter)
consoleHandler.setLevel( log_level )
rootLogger.addHandler(consoleHandler)
class ToFile:
def __init__( self, filename ):
self.filename = filename
class ToLog:
pass
def exec_command(
command,
error_msg = "ERROR while running {command}",
cwd = None,
env = None,
output_to = ToLog(),
log_level = "INFO",
# ignore_fail = False
exit_code_ok = lambda x: x == 0,
):
logging.log(
getattr(logging,log_level),
f"executing '{command}'",
)
arguments = shlex.split(command)
stdout_file = None
if isinstance( output_to, ToFile ):
log_file = Path( output_to.filename )
log_dir = log_file.parent
logging.info(
f"output: '{log_file}'",
)
if not (log_dir.exists() and log_dir.is_dir()):
os.makedirs( log_dir )
stdout_file = open( output_to.filename, "w", 1)
elif isinstance( output_to, ToLog ):
logging.info(
f"output:",
)
if env is not None:
env_arg = env
else:
env_arg = environ.copy()
with Popen(
arguments,
cwd = cwd,
env = env_arg,
stdout= PIPE,
stderr= STDOUT,
universal_newlines = True,
) as proc:
for line in proc.stdout:
if isinstance( output_to, ToFile ):
stdout_file.write( line )
elif isinstance( output_to, ToLog ):
logging.debug( "> " + line )
ret = proc.wait() # 0 means success
if stdout_file is not None:
stdout_file.close()
if (not exit_code_ok(ret)) and ret != 0:
logging.error( error_msg.format( command = command) )
raise( Exception( error_msg.format( command=command ) ) )
if __name__ == '__main__':
parser = ArgumentParser(
description="download dependencies to other git repositories",
formatter_class = ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"-c", "--config",
dest = "CONFIG_FILE",
default = CONFIG_FILE,
type = Path,
help = "config file specifying the current repositories dependencies"
)
parser.add_argument(
"-d", "--dep-dir",
dest = "DEP_DIR",
default = DEP_DIR,
type = Path,
help = "where to the current repository expects dependencies"
)
parser.add_argument(
"-s", "--store-dir",
dest = "STORE_DIR",
default = STORE_DIR,
type = Path,
help = "where to store dependencies"
)
parser.add_argument(
"-v", "--verbose",
default = 'INFO',
help = "verbosity level. can be one of DEBUG, INFO, WARNING, ERROR"
)
parser.add_argument(
"-l", "--log-dir",
default = DEP_DIR / 'log',
type = Path,
help = "config file specifying the current repositories dependencies"
)
args = parser.parse_args()
CONFIG_FILE = args.CONFIG_FILE
DEP_DIR = args.DEP_DIR
STORE_DIR = args.STORE_DIR
init_logging(
log_level = args.verbose,
log_file = args.log_dir / "git_deps_py.log"
);
for val in ('CONFIG_FILE', 'DEP_DIR', 'STORE_DIR'):
logging.debug(
f"{val} = {globals()[val]}"
)
config = read_config( CONFIG_FILE )
for dep in config.sections():
get_dep(
dest_dir = STORE_DIR,
log_dir = args.log_dir,
name = dep,
uri = config[dep]['uri'],
hash = config[dep]['hash'],
init_script =
config[dep]['init'] if 'init' in config[dep] else None
)
# if the deps are expected in DEP_DIR, but stored in STORE_DIR:
# make links DEP_DIR/* -> STORE_DIR/:
if DEP_DIR != STORE_DIR:
src = DEP_DIR / dep
if src.is_symlink():
src.unlink()
import os
dst = STORE_DIR / dep
if not dst.is_absolute():
dst = os.path.relpath(dst, start = src.parent )
logging.debug( f"creating link: {src} -> {dst}")
src . symlink_to(
dst,
target_is_directory = True
)