"""Some various utilities."""
from os import remove, rmdir, walk, scandir
from os.path import join, normpath, basename, isdir, isfile, dirname
from os.path import expanduser
from shutil import rmtree
from re import compile as re_compile
from chrysalio.lib.utils import load_guessing_encoding
from cioservice.lib.utils import location_path2abs_path
from ..relaxng import RELAXNG_CIOSET
from .i18n import _
LOCAL_DIR = '.local'
# =============================================================================
[docs]
def cioset_path2full_path(file_elt):
"""Find the full path for the file of a cioset pointed by ``file_elt``,
possibly according to <root> tags.
:type file_elt: lxml.etree.Element
:param file_elt:
Cioset XML file element for the current file.
:rtype: `str`
"""
namespace = RELAXNG_CIOSET['namespace']
path = file_elt.find('{{{0}}}path'.format(namespace))
path = file_elt.text.strip() if path is None else path.text.strip()
if ':' in path:
return path if path[0] == '/' else f'/{path}' # CioPath compatibility
root = None
for elt in file_elt.iterancestors():
root = elt.findtext('{{{0}}}root'.format(namespace))
if root is not None:
if ':' in root and root[0] != '/':
root = f'/{root}' # CioPath compatibility
return normpath(join(root.strip(), path)
.replace(':./', ':').replace(':/', ':'))
return path
# =============================================================================
[docs]
def cioset_path2location_path(base_id, cioset_relpath, file_elt):
"""Find the location path of the file pointed in a cioset file by
``file_elt``, possibly according to <root> tags.
:param str base_id:
ID of the base directory of the cioset file.
:param str cioset_relpath:
Relative path in the warehouse to the cioset file.
:type file_elt: lxml.etree.Element
:param file_elt:
Cioset XML file element for the current file.
:rtype: :class:`str` or ``None``
"""
path = cioset_path2full_path(file_elt)
if ':' in path:
return path
return '/{0}:{1}'.format(
base_id, normpath(join(dirname(cioset_relpath), path)))
# =============================================================================
[docs]
def cioset_path2abs_path(locations, cioset_abspath, file_elt):
"""Find the abosulte path of the file pointed in a cioset file by
``file_elt``, possibly according to <root> tags.
:pram dict locations:
Dictionary of locations. It can be updated during the operation.
:param str cioset_path:
Relative path in the warehouse to the cioset file.
:type file_elt: lxml.etree.Element
:param file_elt:
Cioset XML file element for the current file.
:rtype: :class:`str` or ``None``
"""
path = cioset_path2full_path(file_elt)
if ':' in path:
return location_path2abs_path(locations, path)
return normpath(join(dirname(cioset_abspath), path))
# =============================================================================
[docs]
def guess_home(abs_file, settings=None, locations=None):
"""Try to guess a base directory for this file and an ID for this
base directory. In the case of warehouses, it is the root of the warehouse
containing the file and its ID.
:param str abs_file:
Absolute path to an existing file.
:param dict settings: (optional)
Settings with possibly ``'input.home.path'`` and ``input.home.id'``
keys.
:param dict locations: (optional)
Dictionary of locations. It can be updated during the operation.
:rtype: tuple
:return:
A tuple such as ``(home_path, home_id)``.
"""
# Absolute path
_homepath = settings.get('input.home.path') \
if settings is not None else None
if _homepath is None:
user_home = expanduser('~')
path = dirname(abs_file)
while not isdir(join(path, LOCAL_DIR)) and path != user_home:
path = dirname(path)
_homepath = path if path != user_home else None
if _homepath is None:
return None, None
# ID
_homeid = settings.get('input.home.id') \
if settings is not None else None
if _homeid is None:
_homeid = basename(_homepath)
# Update of location
if locations is not None and _homeid not in locations:
locations[f'/{_homeid}'] = _homepath
return _homepath, _homeid
# =============================================================================
[docs]
def remove_empty_directories(root):
"""Recursively remove empty directories.
:param str root:
Absolute path to the root directory to clean.
"""
for path, dirs, unused_ in walk(root):
for name in dirs:
if not tuple(scandir(join(path, name))):
try:
rmdir(join(path, name))
except OSError:
pass
# =============================================================================
[docs]
def clean_directory(directory, regex):
"""Clean up a directory according to a regular expression.
:param str directory:
Absolute path to the directory to clean.
:param regex:
Regular expression to select files to clean.
"""
for path, dirs, files in walk(directory, topdown=False):
for name in sorted(dirs):
if regex.search(name):
rmtree(join(path, name), ignore_errors=True)
dirs.remove(name)
for name in files:
if regex.search(name):
try:
remove(join(path, name))
except OSError:
pass
# =============================================================================
[docs]
def select_files(pbuild, step):
"""Return files, possibly in a subdirectory, according to a regular
expression.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: generator
"""
if 'select' not in step:
return
subworkdir = pbuild.current['workdir']
if 'directory' in step:
subworkdir = normpath(join(subworkdir, step['directory'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])))
if not subworkdir.startswith(pbuild.directories['ongoing']):
pbuild.warning(_('Files out of the ongoing directory'))
return
if not isdir(subworkdir):
return
regex = re_compile(step['select'].format(
fid=pbuild.current['file_id'], **pbuild.current['values']))
for path, unused_, files in walk(subworkdir):
for name in files:
abs_path = join(path, name)
if isfile(abs_path) and regex.search(name) is not None:
yield abs_path
# =============================================================================
[docs]
def select_file(processor, abs_file, files=None, is_dir=None):
"""If the ``abs_file`` meets the expected criteria, return ``True`` and,
possibly, add it to the ``files`` dictionary.
:type processor: .lib.processor.Processor
:param processor:
Processor object.
:param str abs_file:
Absolute path to the file to analyse.
:param dict files: (optional)
Dictionary to complete.
:param bool is_dir: (optional)
``True`` if ``abs_file`` is a directory.
:rtype: bool
"""
# Regex on file name
file_regex = processor.environment['input'].get('file_regex')
if file_regex is not None and \
file_regex.search(abs_file) is None:
return False
# Directory?
if is_dir or (is_dir is None and isdir(abs_file)):
if file_regex is not None and files is not None:
files.append(abs_file)
return file_regex is not None
# Regex on content
content_regex = processor.environment['input'].get('content_regex')
if content_regex is not None:
content = load_guessing_encoding(abs_file)
if content_regex.search(content) is None:
return False
if files is not None:
files.append(abs_file)
return True
# =============================================================================
[docs]
def abs_output(pbuild):
"""Return an absolute path to the output directory.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:rtype: str
"""
output = pbuild.output
if 'directory' in pbuild.environment['output']:
directory = pbuild.environment['output']['directory'].format(
**pbuild.current['values'])
output = normpath(join(output, directory))
output_home = pbuild.settings.get('output.home.path')
if output_home and not output.startswith(output_home):
output = normpath(join(pbuild.output, basename(directory)))
return output