Source code for cioprocessor.lib.utils

"""Some various utilities."""

from os import remove, rmdir, walk, scandir
from os.path import join, normpath, basename, isdir, isfile, dirname
from os.path import expanduser
from shutil import rmtree
from re import compile as re_compile

from chrysalio.lib.utils import load_guessing_encoding
from cioservice.lib.utils import location_path2abs_path
from ..relaxng import RELAXNG_CIOSET
from .i18n import _

LOCAL_DIR = '.local'


# =============================================================================
[docs] def cioset_path2full_path(file_elt): """Find the full path for the file of a cioset pointed by ``file_elt``, possibly according to <root> tags. :type file_elt: lxml.etree.Element :param file_elt: Cioset XML file element for the current file. :rtype: `str` """ namespace = RELAXNG_CIOSET['namespace'] path = file_elt.find('{{{0}}}path'.format(namespace)) path = file_elt.text.strip() if path is None else path.text.strip() if ':' in path: return path if path[0] == '/' else f'/{path}' # CioPath compatibility root = None for elt in file_elt.iterancestors(): root = elt.findtext('{{{0}}}root'.format(namespace)) if root is not None: if ':' in root and root[0] != '/': root = f'/{root}' # CioPath compatibility return normpath(join(root.strip(), path) .replace(':./', ':').replace(':/', ':')) return path
# =============================================================================
[docs] def cioset_path2location_path(base_id, cioset_relpath, file_elt): """Find the location path of the file pointed in a cioset file by ``file_elt``, possibly according to <root> tags. :param str base_id: ID of the base directory of the cioset file. :param str cioset_relpath: Relative path in the warehouse to the cioset file. :type file_elt: lxml.etree.Element :param file_elt: Cioset XML file element for the current file. :rtype: :class:`str` or ``None`` """ path = cioset_path2full_path(file_elt) if ':' in path: return path return '/{0}:{1}'.format( base_id, normpath(join(dirname(cioset_relpath), path)))
# =============================================================================
[docs] def cioset_path2abs_path(locations, cioset_abspath, file_elt): """Find the abosulte path of the file pointed in a cioset file by ``file_elt``, possibly according to <root> tags. :pram dict locations: Dictionary of locations. It can be updated during the operation. :param str cioset_path: Relative path in the warehouse to the cioset file. :type file_elt: lxml.etree.Element :param file_elt: Cioset XML file element for the current file. :rtype: :class:`str` or ``None`` """ path = cioset_path2full_path(file_elt) if ':' in path: return location_path2abs_path(locations, path) return normpath(join(dirname(cioset_abspath), path))
# =============================================================================
[docs] def guess_home(abs_file, settings=None, locations=None): """Try to guess a base directory for this file and an ID for this base directory. In the case of warehouses, it is the root of the warehouse containing the file and its ID. :param str abs_file: Absolute path to an existing file. :param dict settings: (optional) Settings with possibly ``'input.home.path'`` and ``input.home.id'`` keys. :param dict locations: (optional) Dictionary of locations. It can be updated during the operation. :rtype: tuple :return: A tuple such as ``(home_path, home_id)``. """ # Absolute path _homepath = settings.get('input.home.path') \ if settings is not None else None if _homepath is None: user_home = expanduser('~') path = dirname(abs_file) while not isdir(join(path, LOCAL_DIR)) and path != user_home: path = dirname(path) _homepath = path if path != user_home else None if _homepath is None: return None, None # ID _homeid = settings.get('input.home.id') \ if settings is not None else None if _homeid is None: _homeid = basename(_homepath) # Update of location if locations is not None and _homeid not in locations: locations[f'/{_homeid}'] = _homepath return _homepath, _homeid
# =============================================================================
[docs] def remove_empty_directories(root): """Recursively remove empty directories. :param str root: Absolute path to the root directory to clean. """ for path, dirs, unused_ in walk(root): for name in dirs: if not tuple(scandir(join(path, name))): try: rmdir(join(path, name)) except OSError: pass
# =============================================================================
[docs] def clean_directory(directory, regex): """Clean up a directory according to a regular expression. :param str directory: Absolute path to the directory to clean. :param regex: Regular expression to select files to clean. """ for path, dirs, files in walk(directory, topdown=False): for name in sorted(dirs): if regex.search(name): rmtree(join(path, name), ignore_errors=True) dirs.remove(name) for name in files: if regex.search(name): try: remove(join(path, name)) except OSError: pass
# =============================================================================
[docs] def select_files(pbuild, step): """Return files, possibly in a subdirectory, according to a regular expression. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: generator """ if 'select' not in step: return subworkdir = pbuild.current['workdir'] if 'directory' in step: subworkdir = normpath(join(subworkdir, step['directory'].format( fid=pbuild.current['file_id'], **pbuild.current['values']))) if not subworkdir.startswith(pbuild.directories['ongoing']): pbuild.warning(_('Files out of the ongoing directory')) return if not isdir(subworkdir): return regex = re_compile(step['select'].format( fid=pbuild.current['file_id'], **pbuild.current['values'])) for path, unused_, files in walk(subworkdir): for name in files: abs_path = join(path, name) if isfile(abs_path) and regex.search(name) is not None: yield abs_path
# =============================================================================
[docs] def select_file(processor, abs_file, files=None, is_dir=None): """If the ``abs_file`` meets the expected criteria, return ``True`` and, possibly, add it to the ``files`` dictionary. :type processor: .lib.processor.Processor :param processor: Processor object. :param str abs_file: Absolute path to the file to analyse. :param dict files: (optional) Dictionary to complete. :param bool is_dir: (optional) ``True`` if ``abs_file`` is a directory. :rtype: bool """ # Regex on file name file_regex = processor.environment['input'].get('file_regex') if file_regex is not None and \ file_regex.search(abs_file) is None: return False # Directory? if is_dir or (is_dir is None and isdir(abs_file)): if file_regex is not None and files is not None: files.append(abs_file) return file_regex is not None # Regex on content content_regex = processor.environment['input'].get('content_regex') if content_regex is not None: content = load_guessing_encoding(abs_file) if content_regex.search(content) is None: return False if files is not None: files.append(abs_file) return True
# =============================================================================
[docs] def abs_output(pbuild): """Return an absolute path to the output directory. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :rtype: str """ output = pbuild.output if 'directory' in pbuild.environment['output']: directory = pbuild.environment['output']['directory'].format( **pbuild.current['values']) output = normpath(join(output, directory)) output_home = pbuild.settings.get('output.home.path') if output_home and not output.startswith(output_home): output = normpath(join(pbuild.output, basename(directory))) return output