Source code for cioprocessor.lib.pbuild

# pylint: disable = too-many-lines
"""Processor build (pbuild) management."""

from __future__ import annotations
from sys import version_info
from os import makedirs, remove, stat
from os.path import join, exists, basename, dirname, normpath, splitext
from os.path import getmtime, relpath, isdir, isabs
from shutil import rmtree, copy2
from getpass import getuser
from pwd import getpwnam
from time import time
from glob import glob
from re import MULTILINE, UNICODE, compile as re_compile, sub as re_sub
from io import open as io_open
from tempfile import gettempdir
from copy import deepcopy
from collections import OrderedDict
from threading import Event

from lxml import etree

from pyramid.asset import abspath_from_asset_spec

from chrysalio.lib.utils import EXCLUDED_FILES, copy_content, tounicode, tostr
from chrysalio.lib.utils import load_guessing_encoding, make_id, convert_value
from chrysalio.lib.utils import copy_content_re
from chrysalio.lib.xml import load_xml
from cioservice.lib.utils import location_path2abs_path
from cioservice.lib.build import Build
from cioservice.lib.build_manager import BuildManager
from ..relaxng import RELAXNG_CIOSET
from .i18n import _, translate
from .utils import cioset_path2full_path, cioset_path2abs_path, guess_home

PROCESSOR_DIR = 'Processor'
PROCESSOR_XML = 'processor.xml'
ONGOING_DIR = 'Ongoing'
RESULT_DIR = 'Result'
ZIPPED_DIR = 'Zipped'
CLEAN_PERIOD = 3600
CLEAN_FILE = 'clean'
INSTALL_TTL = 86400
INSTALL_FILE = 'install'
LOCK_FILE = 'lock'
CIOSET_SUFFIX = '.cioset'
CIOSET_FULL = '1'
CIOSET_UNIT = '2'


# =============================================================================
[docs] class PBuild(Build): """This class manages a processor build (pbuild). :type processor: .lib.processor.Processor :param processor: Processor object. :type manager: cioservice.lib.build_manager :param manager: Manager for this build. :param str pbuild_id: Processor build ID. :param dict params: Dictionary defining the parameters of the processor build. :type stopping: threading.Event :param stopping: Flag to stop the processing. In addition to the :class:`~cioservice.lib.build.Build` attributes, a processor build has the following attributes: * ``'environment'``: processor environment * ``'directories'``: a dictionary of absolute pathes to directories * ``'relaxngs'``: a dictionary of Relax NG objects * ``'steps'``: list of steps for this build * ``'rel_files'``: dictionary of relative paths in recursive mode The ``pbuild.directories`` dictionary have the following keys: * ``'root'``: absolute path to the root directory of the build * ``'processor'``: absolute path to the processor directory * ``'ongoing'``: absolute path to the global ongoing directory * ``'result'``: absolute path to the result directory The ``pbuild.current`` dictionary have the following keys: * ``'input_file'``: absolute path to the original input file * ``'input_base'``: possibly absolute path to the base (warehouse) * ``'input_base_id'``: possibly ID of the base * ``'input_relpath'``: relative path of the input with respect to the base * ``'values'``: common defines and values overrided by local values * ``'step_delta'``: the delta of step progress for this file * ``'file_id'``: ID of the current file * ``'workdir'``: absolute path to the directory where work takes place * ``'fup'``: absolute path to the File Under Processing * ``'dup'``: Data Under Process in a native format (DOM, plain...) * ``'resources'``: common resources possibly completed by local resources * ``'template'``: relative path to the used processor template * ``'abs_template'``: absolute path to the used processor template * ``'xml_declaration'``: ``True`` if we have to keep <?xml version='1.0'?> """ # pylint: disable = too-many-public-methods, too-many-instance-attributes # ------------------------------------------------------------------------- def __init__( self, processor, manager: BuildManager, pbuild_id: str, params: dict, stopping: Event | None): """Constructor method.""" root_dir = join(processor.environment['build_root'], pbuild_id) \ if processor is not None else join(gettempdir(), 'Chrysalio') super().__init__(root_dir, manager, pbuild_id, params, stopping) self.rel_files = params.get('rel_files', {}) self._processor = processor self.environment = processor.environment \ if processor is not None else None self.directories = {'root': root_dir} # Check lock and set installation time if self.environment is None or not self.lock(): self.result['no_execution'] = True self.current = { 'values': self.values, 'input_file': None, 'step_delta': 1, 'file_id': None, 'workdir': None, 'fup': None, 'dup': None, 'resources': self.resources } return # Current environment self.current = self.current_initialize(None) # Prepare processor directory path = join(root_dir, PROCESSOR_DIR) if self.environment['develop'] and exists(path): rmtree(path) if not exists(path): makedirs(path) if not self._install_processor( # yapf: disable self.environment['processor_id'], path): rmtree(root_dir, ignore_errors=True) return self.directories['processor'] = path # Compile Relax NG, regex steps and XSL steps self.relaxngs: dict[str, etree.RelaxNG] = {} self.steps = deepcopy(self.environment.get('steps', ())) if not self._compile(): rmtree(root_dir) return # Prepare ongoing directory path = join(root_dir, ONGOING_DIR) if exists(path): rmtree(path, ignore_errors=True) self.directories['ongoing'] = path # Prepare result directory self.directories['result'] = join(root_dir, RESULT_DIR) self.directories['zipped'] = join(root_dir, ZIPPED_DIR) if exists(self.directories['zipped']): rmtree(self.directories['zipped'], ignore_errors=True) # ------------------------------------------------------------------------- def __repr__(self) -> str: """Return a string containing a printable representation of a pbuild.""" return f'<PBuild {self.uid}, values={self.values}>' # -------------------------------------------------------------------------
[docs] def lock(self) -> bool: """Lock the processor build. :rtype: bool """ lock_file = join(self._lock_dir, LOCK_FILE) if exists(lock_file) and getmtime(lock_file) + self.ttl > time(): return False if not exists(self._lock_dir): makedirs(self._lock_dir) with open(lock_file, 'w', encoding='utf8'): pass with open(join(self._lock_dir, INSTALL_FILE), 'w', encoding='utf8'): pass self.deadline = time() + self.ttl return True
# -------------------------------------------------------------------------
[docs] def unlock(self): """Unlock the processor build.""" if exists(self._lock_dir): with open(join(self._lock_dir, INSTALL_FILE), 'w', encoding='utf8'): pass if exists(join(self._lock_dir, LOCK_FILE)): remove(join(self._lock_dir, LOCK_FILE))
# -------------------------------------------------------------------------
[docs] def relock(self): """Refresh the lock of the build.""" with open(join(self._lock_dir, LOCK_FILE), 'w', encoding='utf8'): pass with open(join(self._lock_dir, INSTALL_FILE), 'w', encoding='utf8'): pass self.deadline = time() + self.ttl
# -------------------------------------------------------------------------
[docs] def current_reset(self) -> bool: """Reset current environment and lock for the next file. :rtype: bool :return: ``True`` if we can continue. """ self.current = self.current_initialize(None) if not self.aborted(): self.relock() return True return False
# -------------------------------------------------------------------------
[docs] def current_initialize(self, input_file: str | None) -> dict: """Return a current environment initialized according to the input file. :param str input_file: Absolute path to the input file. :rtype: dict """ environment = self.environment if self.environment is not None else {} current = { 'values': dict(environment['defines'], **self.values), 'input_file': input_file, 'step_delta': 1, 'file_id': self.make_fid(input_file) if input_file else None, 'fup': input_file, 'dup': None, 'resources': self.resources } self._workdir(current) if input_file: current['input_base'], current['input_base_id'] = \ guess_home(input_file, self.settings, self.locations) current['input_relpath'] = \ relpath(input_file, current['input_base']) \ if current['input_base'] is not None else None return current
# -------------------------------------------------------------------------
[docs] def currents(self, input_file: str): """Iterator to retrieve the current environments for the file ``input_file``. :param str input_file: Absolute path to the input file. """ # pylint: disable = too-many-branches # Initialization current: dict | None = self.current_initialize(input_file) if current is None or self._install_processor_template( current, current['values'].get('__template__')) \ and not self._compile(): return # Normal File Under Process environment = self.environment if self.environment is not None else {} if not input_file.endswith(CIOSET_SUFFIX) \ or environment['input'].get('assembly') == 'unaltered': yield current return # Cioset load # pylint: disable = protected-access cioset = load_xml(input_file, relaxngs=self.relaxngs) if not isinstance(cioset, etree._ElementTree): self.error(cioset) return # pylint: enable = protected-access # Updated values self._update_values(current, cioset.getroot()) # Updated processor environment = self.environment if self.environment is not None else {} if environment.get('template') and \ current.get('template') != current['values'].get('__template__'): self._install_processor( environment['processor_id'], self.directories['processor']) self._install_processor_template( current, current['values'].get('__template__')) if not self._compile(): return # Updated resources user_uid = getpwnam(getuser()).pw_uid self._update_resources(user_uid, current, cioset.getroot()) # Copies self._make_copies(user_uid, current, cioset.getroot()) namespaces = {'set': RELAXNG_CIOSET['namespace']} # Cioset: assembly if 'assembly' in environment['input'] and ( environment['input']['assembly_if'] is None or current['values'].get( environment['input']['assembly_if'])): if current['values'].get( environment['input']['assembly_if']) == CIOSET_UNIT: for count1, file1_elt in enumerate(cioset.xpath( "set:files//set:file", namespaces=namespaces)): tmp_cioset = deepcopy(cioset) for count2, file2_elt in enumerate(tmp_cioset.xpath( "set:files//set:file", namespaces=namespaces)): if count1 != count2: file2_elt.getparent().remove(file2_elt) else: path_file = file1_elt.xpath( "normalize-space(set:path)", namespaces=namespaces) current['file_id'] = self.make_fid(path_file) current = self.current_assembly( user_uid, current, tmp_cioset) if current is None: return yield current else: current = self.current_assembly(user_uid, current, cioset) if current is None: return yield current else: # Cioset: loop over files paths: list | tuple = [ self._cioset_path2abs_path(user_uid, current, elt) for elt in cioset.xpath('//set:file', namespaces=namespaces) ] paths = tuple(OrderedDict.fromkeys( # yapf: disable [path for path in paths if path is not None])) current['step_delta'] = 1 / len(paths) if paths else 1 for path in paths: current['file_id'] = self.make_fid(path) current['fup'] = path current['dup'] = None self._workdir(current) yield current
# -------------------------------------------------------------------------
[docs] def current_assembly( self, user_uid: int, current: dict, cioset: etree.ElementTree) -> dict | None: """Select assembly to do. :param int user_uid: Current user UID. :param dict current: Dictionary representing the current object to process. :rtype: bool :type cioset: lxml.etree.ElementTree :param cioset: Root element of the cioset. :return: ``None`` or current. """ self._workdir(current) environment = self.environment if self.environment is not None else {} if environment['input']['assembly'] == 'xml': self.trace('**{0}**'.format(self.translate(_('Assembly')))) self.progress_save() error = self.assembly_xml(user_uid, current, cioset) if error: self.error(error) return None return current return None
# -------------------------------------------------------------------------
[docs] def current_abs_resources(self) -> list | tuple: """Return a list of absolute paths to the current resources. :rtype: list """ if not self.current['resources']: return () file_path = dirname(self.current['input_file']) resources = [ normpath(resource.format(fpath=file_path)) for resource in self.current['resources'] ] return list(OrderedDict.fromkeys(resources))
# -------------------------------------------------------------------------
[docs] def assembly_xml( self, user_uid: int, current: dict, cioset: etree.ElementTree, step_num: int = 0) -> str | None: """Assemble all XML file found in the cioset input file into one XML file. :param int user_uid: Current user UID. :param dict current: Dictionary representing the current object to process. :rtype: bool :type cioset: lxml.etree.ElementTree :param cioset: Root element of the cioset. :param int step_num: (default=0) Number of current step. :rtype: class:`pyramid.i18n.TranslationString` or ``None`` :return: Error message or ``None``. """ # Assembly namespaces = {'set': RELAXNG_CIOSET['namespace']} for file_elt in cioset.xpath('//set:file', namespaces=namespaces): input_file = self._cioset_path2abs_path( user_uid, current, file_elt) assembly_elt = file_elt.xpath( 'ancestor-or-self::*/set:assembly', namespaces=namespaces) assembly_elt = assembly_elt[-1] if assembly_elt else None if input_file is None or assembly_elt is None \ or assembly_elt.get('mode') != 'xml': continue tree = load_xml(input_file, relaxngs=self.relaxngs) # pylint: disable = protected-access if not isinstance(tree, etree._ElementTree): return _('${f}: ${e}', { # yapf: disable 'f': basename(input_file), 'e': translate(tree, self.lang)}) # pylint: enable = protected-access elements = tree.xpath(assembly_elt.get('xpath', '/*')) if elements: content_elt = etree.SubElement(file_elt, 'ciocontent') for elt in elements: content_elt.append(elt) # Data under process current['fup'] = None current['dup'] = etree.tostring( cioset, encoding='utf-8').decode('utf8') if assembly_elt is not None and \ assembly_elt.get('namespace') != 'keep': current['dup'] = current['dup'].replace( ' xmlns="{0}"'.format(RELAXNG_CIOSET['namespace']), '') if assembly_elt is not None and \ assembly_elt.get('namespace') == 'remove': current['dup'] = re_sub( '<(/?)[a-zA-Z0-9_-]+:', '<\\1', current['dup']) current['dup'] = re_sub( ' xmlns:[a-zA-Z0-9_-]+="[^"]+"', '', current['dup']) try: current['dup'] = etree.ElementTree(etree.XML(current['dup'])) except etree.XMLSyntaxError as error: return error # Trace if current['values'].get('__no_remove__'): self.save_dup( '{fid}.{num:0>2}cioset~.xml'.format( fid=current['file_id'], num=step_num), current=current) return None
# -------------------------------------------------------------------------
[docs] def save_dup( self, filename: str, force: bool = False, current: dict | None = None) -> str | None: """Save current Data Under Process into a file. :param str filename: Relatve path to the current path of the file to save. :param bool force: (default=False) If ``True``, copy the file even if exists with an another name. :param dict current: (optional) Forced current environment. :rtype: str :return: Relative path of saved file or ``None``. """ # Check file name current = current or self.current abs_path = normpath(join(current['workdir'], filename)) if not abs_path.startswith(self.directories['ongoing']): self.error(_('Saving file out of the ongoing directory')) return None # Copy FUP if current['fup'] is not None: if current['fup'] != abs_path and force: if not exists(dirname(abs_path)): makedirs(dirname(abs_path)) copy2(current['fup'], abs_path) return relpath(abs_path, self.directories['ongoing']) # Save DUP # pylint: disable = protected-access if isinstance(current['dup'], etree._ElementTree): try: dup = etree.tostring( current['dup'], encoding='utf-8', pretty_print=True, xml_declaration=current.get('xml_declaration', False)) except (ValueError, AssertionError, AttributeError) as error: self.error(str(error)) return None dup = dup.strip() if dup is not None else '' dup = tostr(dup) if version_info > (3, 0) else dup elif current['dup']: dup = tostr(current['dup']) if version_info > (3, 0) \ else current['dup'].encode('utf8') else: return None # pylint: enable = protected-access if not exists(dirname(abs_path)): makedirs(dirname(abs_path)) with open(abs_path, 'w', encoding='utf8') as hdl: hdl.write(dup) return relpath(abs_path, self.directories['ongoing'])
# -------------------------------------------------------------------------
[docs] def dup2unicode(self): """Convert Data Under Process into a string.""" # pylint: disable = protected-access if self.current['dup'] is None and self.current['fup']: self.current['dup'] = load_guessing_encoding(self.current['fup']) self.current['fup'] = None elif not self.current['dup']: self.current['dup'] = '' self.current['fup'] = None elif isinstance(self.current['dup'], etree._ElementTree): self.current['dup'] = etree.tostring( self.current['dup'], encoding='utf-8', pretty_print=True, xml_declaration=self.current.get('xml_declaration', False)) self.current['dup'] = self.current['dup'].decode('utf8') else: self.current['dup'] = tounicode(self.current['dup'])
# -------------------------------------------------------------------------
[docs] def dup2xml(self) -> str | None: """Convert Data Under Process into a XML DOM tree. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ # pylint: disable = protected-access if self.current['dup'] is None: tree = load_xml(self.current['fup']) if not isinstance(tree, etree._ElementTree): return tree self.current['fup'] = None self.current['dup'] = tree elif not isinstance(self.current['dup'], etree._ElementTree): tree = load_xml( '{0}.xml'.format(self.current['file_id']), data=tostr(self.current['dup'])) if not isinstance(tree, etree._ElementTree): return tree self.current['fup'] = None self.current['dup'] = tree # pylint: enable = protected-access return None
# -------------------------------------------------------------------------
[docs] def media_cache(self, target: str, dependencies: list) -> bool: """Try to retrieve the media from the previous result. :param str target: Full path to target file. :param list dependencies: List of files to compare with to known if it is necessary to process. :rtype: bool """ # Get file in previous result archive = join( self.directories['result'], relpath(target, self.directories['ongoing'])) if not exists(archive): return False # Get the more recent date in dependencies last_one = 0.0 for pattern in dependencies: if '*' not in pattern and '?' not in pattern \ and not exists(pattern): return False for filename in glob(pattern): file_time = getmtime(filename) last_one = max(last_one, file_time) # Nothing to do if not last_one or getmtime(archive) < last_one: return False # Copy previous result into ongoing directory if not exists(dirname(target)): makedirs(dirname(target)) copy2(archive, target) return True
# -------------------------------------------------------------------------
[docs] def make_fid(self, input_file: str) -> str | None: """Create a file ID from ``input_file``. :param str input_file: Absolute path to the input file. :rtype: str """ environment = self.environment if self.environment is not None else {} if 'input' not in environment: return None mode = environment['input']['make_fid']['mode'] fid = make_id( splitext(basename(input_file))[0], None if mode == 'none' else mode) if 'pattern' in environment['input']['make_fid']: fid = re_sub( environment['input']['make_fid']['pattern'], environment['input']['make_fid']['replace'], fid) return fid
# -------------------------------------------------------------------------
[docs] def info(self, text: str): """Add an information message in the result. :param str text: Information. """ if 'infos' not in self.result: self.result['infos'] = [] if self.current['file_id'] is not None: text = _( '[${fid}] ${text}', { 'fid': self.current['file_id'], 'text': self.translate(text) }) text = self.translate(text) self.result['infos'].append(text) self.trace(text, 'I')
# -------------------------------------------------------------------------
[docs] def warning(self, text: str): """Add a warning message in the result. :param str text: Warning. """ if 'warnings' not in self.result: self.result['warnings'] = [] if self.current['file_id'] is not None: text = _( '[${fid}] ${text}', { 'fid': self.current['file_id'], 'text': self.translate(text) }) text = self.translate(text) self.result['warnings'].append(text) self.trace(text, 'W')
# -------------------------------------------------------------------------
[docs] def error(self, text: str): """Add an error message in the result. :param str text: Error. """ if 'errors' not in self.result: self.result['errors'] = [] if self.current['file_id'] is not None: text = _( '[${fid}] ${text}', { 'fid': self.current['file_id'], 'text': self.translate(text) }) text = self.translate(text) self.result['errors'].append(text) self.trace(text, 'E')
# -------------------------------------------------------------------------
[docs] def translate(self, text: str) -> str: """Return ``text`` translated. :param str text: Text to translate. :rtype: str """ return translate(text, self.lang)
# ------------------------------------------------------------------------- def _install_processor(self, processor_id: str, directory: str) -> bool: """Install processor, possibly with inheritance, in build directory. :param str processor_id: ID of the processor to return. :param str directory: Absolute path to the installation directory. :rtype: bool :return: ``True`` if it succeeds. """ # Find configuration environment = self.environment if self.environment is not None else {} if processor_id not in environment['processors']: self.error(_('Unknown processor "${p}"', {'p': processor_id})) return False # Read processor.xml file to check if other processors are needed path = join(environment['processors'][processor_id], PROCESSOR_XML) try: tree = etree.parse(path) except OSError as error: self.error(str(error)) return False for ancestor_id in tree.xpath( '//proc:ancestor/text()', namespaces={ # yapf: disable 'proc': tree.getroot().nsmap[None] }): if not self._install_processor(ancestor_id, directory): return False # Import extra files for elt in tree.xpath('//proc:import', namespaces={'proc': tree.getroot().nsmap[None]}): path = elt.text.strip().format(**self.values) if ':' in path: path = abspath_from_asset_spec(path) if not isabs(path): path = join(environment['processors'][processor_id], path) target = normpath(join( # yapf: disable directory, elt.get('to', basename(path)).format(**self.values))) if not target.startswith(directory): continue if isdir(path): copy_content(path, target, exclude=EXCLUDED_FILES, force=True) elif exists(path): if not exists(dirname(target)): makedirs(dirname(target)) copy2(path, target) # Install processor copy_content( environment['processors'][processor_id], directory, exclude=EXCLUDED_FILES + ('Variables', 'Scss'), force=True) return True # ------------------------------------------------------------------------- def _install_processor_template( self, current: dict, template: str) -> bool: """Install processor part of the template. :param dict current: Dictionary representing the current object to process. :param str template: Relative path to the directory containing the processor template directory. :rtype: bool """ environment = self.environment if self.environment is not None else {} if not environment.get('template'): return False current['template'] = template current['abs_template'] = location_path2abs_path( self.locations, template) if current['abs_template'] is None: return False processor_path = join( current['abs_template'], environment['template']['directory']) if not exists(processor_path): return False copy_content_re( processor_path, self.directories['processor'], exclude=environment['template']['exclude']) return True # ------------------------------------------------------------------------- def _compile(self) -> bool: """Compile Relax NG, regular expressions and XSL. :rtype: bool """ # Compile Relax NG self.relaxngs = {} environment = self.environment if self.environment is not None else {} for pattern in environment.get('relaxngs', ''): filename = join( self.directories['processor'], environment['relaxngs'][pattern]) try: self.relaxngs[pattern] = etree.RelaxNG(etree.parse(filename)) except IOError as error: self.error(str(error)) return False except (etree.XMLSyntaxError, etree.RelaxNGParseError) as error: self.error(_('${f}: ${e}', {'f': filename, 'e': error})) return False # Load regular expressions and XSL for steps for step in self.steps: if step['type'] == 'regex-transform': if not self._load_regex_file(step): return False elif step['type'] == 'xsl-transform': if not self._load_xsl(step): return False return True # ------------------------------------------------------------------------- def _load_regex_file(self, step: dict) -> bool: """Load a list of regular expressions from a file. :param dict step: Dictionary defining the current step. :rtype: bool """ regex_file = join(self.directories['processor'], step['file']) if not exists(regex_file): self.error( _( 'Regular expression file "${f}" does not exist', {'f': step['file']})) return False regex = [] with io_open(regex_file, 'r', encoding='utf8') as lines: for line in lines: if line and line[0] != '#' and line[0:7] != '[Regex]': pattern, replace = line.partition(' =')[::2] pattern = pattern.strip() if not pattern: continue if pattern[0] in '\'"' and pattern[-1] in '\'"': pattern = pattern[1:-1] replace = replace.strip() if replace and replace[0] in '\'"' and \ replace[-1] in '\'"': replace = replace[1:-1] # pylint: disable = eval-used if replace.startswith('lambda'): replace = eval(replace) regex.append( (re_compile(pattern, MULTILINE | UNICODE), replace)) step['regex'] = regex return True # ------------------------------------------------------------------------- def _load_xsl(self, step: dict) -> bool: """Load a XSL file. :param dict step: Dictionary defining the current step. :rtype: bool """ xsl_file = join(self.directories['processor'], step['file']) if not exists(xsl_file): self.error( _('XSL file "${f}" does not exist', {'f': step['file']})) return False try: step['xslt'] = etree.XSLT(etree.parse(xsl_file)) except (IOError, etree.XSLTParseError, etree.XMLSyntaxError) as error: self.error(str(error)) return False return True # ------------------------------------------------------------------------- def _update_values(self, current: dict, cioset_elt: etree.Element): """Update current environment with values found in the cioset. :param dict current: Dictionary representing the current object to process. :type cioset_elt: lxml.etree.Element :param cioset_elt: Root element of the cioset. """ environment = self.environment if self.environment is not None else {} if 'variables' not in environment: return namespaces = {'set': RELAXNG_CIOSET['namespace']} for elt in cioset_elt.xpath( 'set:values[not(@for) or @for="{0}"]/set:value'.format( environment['processor_id']), namespaces=namespaces): name = elt.get('variable') if name in environment['variables']: current['values'][name] = convert_value( environment['variables'][name]['type'], elt.text.replace(' ', '‧').strip().replace('‧', ' ') if elt.text else '') # ------------------------------------------------------------------------- def _update_resources( self, user_uid: int, current: dict, cioset_elt: etree.Element): """Update current environment with resources found in the cioset. :param int user_uid: Current user UID. :param dict current: Dictionary representing the current object to process. :type cioset_elt: lxml.etree.Element :param cioset_elt: Root element of the cioset. """ namespaces = {'set': RELAXNG_CIOSET['namespace']} environment = self.environment if self.environment is not None else {} resource_elts = cioset_elt.xpath( 'set:resources[not(@for) or @for="{0}"]/set:resource'.format( environment['processor_id']), namespaces=namespaces) if not resource_elts: return current['resources'] = list(self.resources) resource_elts.reverse() for resource_elt in resource_elts: path = self._cioset_path2abs_path(user_uid, current, resource_elt) if path and exists(path) and stat(path).st_uid == user_uid: current['resources'].insert(0, path) current['resources'] = list(OrderedDict.fromkeys(current['resources'])) # ------------------------------------------------------------------------- def _make_copies( self, user_uid: int, current: dict, cioset_elt: etree.Element): """Make the copies requested by the cioset. :param int user_uid: Current user UID. :param dict current: Dictionary representing the current object to process. :type cioset_elt: lxml.etree.Element :param cioset_elt: Root element of the cioset. """ namespaces = {'set': RELAXNG_CIOSET['namespace']} root = dirname(current['input_file']) environment = self.environment if self.environment is not None else {} for copy_elt in cioset_elt.xpath( 'set:copies[not(@for) or @for="{0}"]/set:copy'.format( environment['processor_id']), namespaces=namespaces): self._make_copy(root, user_uid, current, copy_elt) # ------------------------------------------------------------------------- def _make_copy( self, root: str, user_uid: int, current: dict, copy_elt: etree.Element): """Make one copy requested by the cioset. :param str root: Absolute path to the cioset file. :param int user_uid: File system user ID. :param dict current: Dictionary representing the current object to process. :type copy_elt: lxml.etree.Element :param copy_elt: XML copy element. """ # Source src_file = normpath(join(root, copy_elt.text.strip())) if not exists(src_file) or stat(src_file).st_uid != user_uid: return # Destination dst_file = normpath( join(current['workdir'], copy_elt.get('to').strip())) if not dst_file.startswith(self.directories['ongoing']): return # The source is a directory if isdir(src_file): exclude = copy_elt.get('exclude') if exclude is not None: exclude = re_compile(exclude) copy_content_re(src_file, dst_file, exclude) # The source is a file else: if not exists(dirname(dst_file)): makedirs(dirname(dst_file)) copy2(src_file, dst_file) # ------------------------------------------------------------------------- def _workdir(self, current: dict): """Update the path of the working directory in the ``current`` dictionary. :param dict current: Dictionary representing the current object to process. """ rel_path = '.' if not current['values'].get('__unflattened__') else \ self.rel_files.get(current['input_file'], '.') if current['values'].get('__subdir__') and current['file_id']: current['workdir'] = join( self.directories['ongoing'], rel_path, current['values']['__subdir__'].format( fid=current['file_id'], parent=basename(dirname(current['input_file'])), **current['values'])) current['workdir'] = normpath(current['workdir']) if not current['workdir'].startswith(self.directories['ongoing']): current['workdir'] = normpath( join(self.directories['ongoing'], rel_path)) elif 'ongoing' in self.directories: current['workdir'] = normpath( join(self.directories['ongoing'], rel_path)) else: current['workdir'] = None # ------------------------------------------------------------------------- def _cioset_path2abs_path( self, user_uid: int, current: dict, file_elt: etree.Element) -> str | None: """Absolute path for the file pointed by ``file_elt``. :param int user_uid: Current user UID. :param dict current: Dictionary representing the current object to process. :type file_elt: lxml.etree.Element :param file_elt: Cioset XML file element for the current file. :rtype: :class:`str` or ``None`` """ path = cioset_path2abs_path( self.locations, current['input_file'], file_elt) if not path or not exists(path) or stat(path).st_uid != user_uid: self.warning(_('Unknown file: ${f}', { # yapf: disable 'f': basename(cioset_path2full_path(file_elt))})) return None return path