# pylint: disable = too-many-lines
"""Processor build (pbuild) management."""
from __future__ import annotations
from sys import version_info
from os import makedirs, remove, stat
from os.path import join, exists, basename, dirname, normpath, splitext
from os.path import getmtime, relpath, isdir, isabs
from shutil import rmtree, copy2
from getpass import getuser
from pwd import getpwnam
from time import time
from glob import glob
from re import MULTILINE, UNICODE, compile as re_compile, sub as re_sub
from io import open as io_open
from tempfile import gettempdir
from copy import deepcopy
from collections import OrderedDict
from threading import Event
from lxml import etree
from pyramid.asset import abspath_from_asset_spec
from chrysalio.lib.utils import EXCLUDED_FILES, copy_content, tounicode, tostr
from chrysalio.lib.utils import load_guessing_encoding, make_id, convert_value
from chrysalio.lib.utils import copy_content_re
from chrysalio.lib.xml import load_xml
from cioservice.lib.utils import location_path2abs_path
from cioservice.lib.build import Build
from cioservice.lib.build_manager import BuildManager
from ..relaxng import RELAXNG_CIOSET
from .i18n import _, translate
from .utils import cioset_path2full_path, cioset_path2abs_path, guess_home
PROCESSOR_DIR = 'Processor'
PROCESSOR_XML = 'processor.xml'
ONGOING_DIR = 'Ongoing'
RESULT_DIR = 'Result'
ZIPPED_DIR = 'Zipped'
CLEAN_PERIOD = 3600
CLEAN_FILE = 'clean'
INSTALL_TTL = 86400
INSTALL_FILE = 'install'
LOCK_FILE = 'lock'
CIOSET_SUFFIX = '.cioset'
CIOSET_FULL = '1'
CIOSET_UNIT = '2'
# =============================================================================
[docs]
class PBuild(Build):
"""This class manages a processor build (pbuild).
:type processor: .lib.processor.Processor
:param processor:
Processor object.
:type manager: cioservice.lib.build_manager
:param manager:
Manager for this build.
:param str pbuild_id:
Processor build ID.
:param dict params:
Dictionary defining the parameters of the processor build.
:type stopping: threading.Event
:param stopping:
Flag to stop the processing.
In addition to the :class:`~cioservice.lib.build.Build` attributes, a
processor build has the following attributes:
* ``'environment'``: processor environment
* ``'directories'``: a dictionary of absolute pathes to directories
* ``'relaxngs'``: a dictionary of Relax NG objects
* ``'steps'``: list of steps for this build
* ``'rel_files'``: dictionary of relative paths in recursive mode
The ``pbuild.directories`` dictionary have the following keys:
* ``'root'``: absolute path to the root directory of the build
* ``'processor'``: absolute path to the processor directory
* ``'ongoing'``: absolute path to the global ongoing directory
* ``'result'``: absolute path to the result directory
The ``pbuild.current`` dictionary have the following keys:
* ``'input_file'``: absolute path to the original input file
* ``'input_base'``: possibly absolute path to the base (warehouse)
* ``'input_base_id'``: possibly ID of the base
* ``'input_relpath'``: relative path of the input with respect to the base
* ``'values'``: common defines and values overrided by local values
* ``'step_delta'``: the delta of step progress for this file
* ``'file_id'``: ID of the current file
* ``'workdir'``: absolute path to the directory where work takes place
* ``'fup'``: absolute path to the File Under Processing
* ``'dup'``: Data Under Process in a native format (DOM, plain...)
* ``'resources'``: common resources possibly completed by local resources
* ``'template'``: relative path to the used processor template
* ``'abs_template'``: absolute path to the used processor template
* ``'xml_declaration'``: ``True`` if we have to keep <?xml version='1.0'?>
"""
# pylint: disable = too-many-public-methods, too-many-instance-attributes
# -------------------------------------------------------------------------
def __init__(
self, processor, manager: BuildManager, pbuild_id: str,
params: dict, stopping: Event | None):
"""Constructor method."""
root_dir = join(processor.environment['build_root'], pbuild_id) \
if processor is not None else join(gettempdir(), 'Chrysalio')
super().__init__(root_dir, manager, pbuild_id, params, stopping)
self.rel_files = params.get('rel_files', {})
self._processor = processor
self.environment = processor.environment \
if processor is not None else None
self.directories = {'root': root_dir}
# Check lock and set installation time
if self.environment is None or not self.lock():
self.result['no_execution'] = True
self.current = {
'values': self.values,
'input_file': None,
'step_delta': 1,
'file_id': None,
'workdir': None,
'fup': None,
'dup': None,
'resources': self.resources
}
return
# Current environment
self.current = self.current_initialize(None)
# Prepare processor directory
path = join(root_dir, PROCESSOR_DIR)
if self.environment['develop'] and exists(path):
rmtree(path)
if not exists(path):
makedirs(path)
if not self._install_processor( # yapf: disable
self.environment['processor_id'], path):
rmtree(root_dir, ignore_errors=True)
return
self.directories['processor'] = path
# Compile Relax NG, regex steps and XSL steps
self.relaxngs: dict[str, etree.RelaxNG] = {}
self.steps = deepcopy(self.environment.get('steps', ()))
if not self._compile():
rmtree(root_dir)
return
# Prepare ongoing directory
path = join(root_dir, ONGOING_DIR)
if exists(path):
rmtree(path, ignore_errors=True)
self.directories['ongoing'] = path
# Prepare result directory
self.directories['result'] = join(root_dir, RESULT_DIR)
self.directories['zipped'] = join(root_dir, ZIPPED_DIR)
if exists(self.directories['zipped']):
rmtree(self.directories['zipped'], ignore_errors=True)
# -------------------------------------------------------------------------
def __repr__(self) -> str:
"""Return a string containing a printable representation of a
pbuild."""
return f'<PBuild {self.uid}, values={self.values}>'
# -------------------------------------------------------------------------
[docs]
def lock(self) -> bool:
"""Lock the processor build.
:rtype: bool
"""
lock_file = join(self._lock_dir, LOCK_FILE)
if exists(lock_file) and getmtime(lock_file) + self.ttl > time():
return False
if not exists(self._lock_dir):
makedirs(self._lock_dir)
with open(lock_file, 'w', encoding='utf8'):
pass
with open(join(self._lock_dir, INSTALL_FILE), 'w', encoding='utf8'):
pass
self.deadline = time() + self.ttl
return True
# -------------------------------------------------------------------------
[docs]
def unlock(self):
"""Unlock the processor build."""
if exists(self._lock_dir):
with open(join(self._lock_dir, INSTALL_FILE), 'w',
encoding='utf8'):
pass
if exists(join(self._lock_dir, LOCK_FILE)):
remove(join(self._lock_dir, LOCK_FILE))
# -------------------------------------------------------------------------
[docs]
def relock(self):
"""Refresh the lock of the build."""
with open(join(self._lock_dir, LOCK_FILE), 'w', encoding='utf8'):
pass
with open(join(self._lock_dir, INSTALL_FILE), 'w', encoding='utf8'):
pass
self.deadline = time() + self.ttl
# -------------------------------------------------------------------------
[docs]
def current_reset(self) -> bool:
"""Reset current environment and lock for the next file.
:rtype: bool
:return:
``True`` if we can continue.
"""
self.current = self.current_initialize(None)
if not self.aborted():
self.relock()
return True
return False
# -------------------------------------------------------------------------
[docs]
def current_initialize(self, input_file: str | None) -> dict:
"""Return a current environment initialized according to the input
file.
:param str input_file:
Absolute path to the input file.
:rtype: dict
"""
environment = self.environment if self.environment is not None else {}
current = {
'values': dict(environment['defines'], **self.values),
'input_file': input_file,
'step_delta': 1,
'file_id': self.make_fid(input_file) if input_file else None,
'fup': input_file,
'dup': None,
'resources': self.resources
}
self._workdir(current)
if input_file:
current['input_base'], current['input_base_id'] = \
guess_home(input_file, self.settings, self.locations)
current['input_relpath'] = \
relpath(input_file, current['input_base']) \
if current['input_base'] is not None else None
return current
# -------------------------------------------------------------------------
[docs]
def currents(self, input_file: str):
"""Iterator to retrieve the current environments for the file
``input_file``.
:param str input_file:
Absolute path to the input file.
"""
# pylint: disable = too-many-branches
# Initialization
current: dict | None = self.current_initialize(input_file)
if current is None or self._install_processor_template(
current, current['values'].get('__template__')) \
and not self._compile():
return
# Normal File Under Process
environment = self.environment if self.environment is not None else {}
if not input_file.endswith(CIOSET_SUFFIX) \
or environment['input'].get('assembly') == 'unaltered':
yield current
return
# Cioset load
# pylint: disable = protected-access
cioset = load_xml(input_file, relaxngs=self.relaxngs)
if not isinstance(cioset, etree._ElementTree):
self.error(cioset)
return
# pylint: enable = protected-access
# Updated values
self._update_values(current, cioset.getroot())
# Updated processor
environment = self.environment if self.environment is not None else {}
if environment.get('template') and \
current.get('template') != current['values'].get('__template__'):
self._install_processor(
environment['processor_id'], self.directories['processor'])
self._install_processor_template(
current, current['values'].get('__template__'))
if not self._compile():
return
# Updated resources
user_uid = getpwnam(getuser()).pw_uid
self._update_resources(user_uid, current, cioset.getroot())
# Copies
self._make_copies(user_uid, current, cioset.getroot())
namespaces = {'set': RELAXNG_CIOSET['namespace']}
# Cioset: assembly
if 'assembly' in environment['input'] and (
environment['input']['assembly_if'] is None
or current['values'].get(
environment['input']['assembly_if'])):
if current['values'].get(
environment['input']['assembly_if']) == CIOSET_UNIT:
for count1, file1_elt in enumerate(cioset.xpath(
"set:files//set:file", namespaces=namespaces)):
tmp_cioset = deepcopy(cioset)
for count2, file2_elt in enumerate(tmp_cioset.xpath(
"set:files//set:file", namespaces=namespaces)):
if count1 != count2:
file2_elt.getparent().remove(file2_elt)
else:
path_file = file1_elt.xpath(
"normalize-space(set:path)",
namespaces=namespaces)
current['file_id'] = self.make_fid(path_file)
current = self.current_assembly(
user_uid, current, tmp_cioset)
if current is None:
return
yield current
else:
current = self.current_assembly(user_uid, current, cioset)
if current is None:
return
yield current
else:
# Cioset: loop over files
paths: list | tuple = [
self._cioset_path2abs_path(user_uid, current, elt)
for elt in cioset.xpath('//set:file', namespaces=namespaces)
]
paths = tuple(OrderedDict.fromkeys( # yapf: disable
[path for path in paths if path is not None]))
current['step_delta'] = 1 / len(paths) if paths else 1
for path in paths:
current['file_id'] = self.make_fid(path)
current['fup'] = path
current['dup'] = None
self._workdir(current)
yield current
# -------------------------------------------------------------------------
[docs]
def current_assembly(
self, user_uid: int, current: dict,
cioset: etree.ElementTree) -> dict | None:
"""Select assembly to do.
:param int user_uid:
Current user UID.
:param dict current:
Dictionary representing the current object to process.
:rtype: bool
:type cioset: lxml.etree.ElementTree
:param cioset:
Root element of the cioset.
:return:
``None`` or current.
"""
self._workdir(current)
environment = self.environment if self.environment is not None else {}
if environment['input']['assembly'] == 'xml':
self.trace('**{0}**'.format(self.translate(_('Assembly'))))
self.progress_save()
error = self.assembly_xml(user_uid, current, cioset)
if error:
self.error(error)
return None
return current
return None
# -------------------------------------------------------------------------
[docs]
def current_abs_resources(self) -> list | tuple:
"""Return a list of absolute paths to the current resources.
:rtype: list
"""
if not self.current['resources']:
return ()
file_path = dirname(self.current['input_file'])
resources = [
normpath(resource.format(fpath=file_path))
for resource in self.current['resources']
]
return list(OrderedDict.fromkeys(resources))
# -------------------------------------------------------------------------
[docs]
def assembly_xml(
self,
user_uid: int,
current: dict,
cioset: etree.ElementTree,
step_num: int = 0) -> str | None:
"""Assemble all XML file found in the cioset input file into one XML
file.
:param int user_uid:
Current user UID.
:param dict current:
Dictionary representing the current object to process.
:rtype: bool
:type cioset: lxml.etree.ElementTree
:param cioset:
Root element of the cioset.
:param int step_num: (default=0)
Number of current step.
:rtype: class:`pyramid.i18n.TranslationString` or ``None``
:return:
Error message or ``None``.
"""
# Assembly
namespaces = {'set': RELAXNG_CIOSET['namespace']}
for file_elt in cioset.xpath('//set:file', namespaces=namespaces):
input_file = self._cioset_path2abs_path(
user_uid, current, file_elt)
assembly_elt = file_elt.xpath(
'ancestor-or-self::*/set:assembly', namespaces=namespaces)
assembly_elt = assembly_elt[-1] if assembly_elt else None
if input_file is None or assembly_elt is None \
or assembly_elt.get('mode') != 'xml':
continue
tree = load_xml(input_file, relaxngs=self.relaxngs)
# pylint: disable = protected-access
if not isinstance(tree, etree._ElementTree):
return _('${f}: ${e}', { # yapf: disable
'f': basename(input_file),
'e': translate(tree, self.lang)})
# pylint: enable = protected-access
elements = tree.xpath(assembly_elt.get('xpath', '/*'))
if elements:
content_elt = etree.SubElement(file_elt, 'ciocontent')
for elt in elements:
content_elt.append(elt)
# Data under process
current['fup'] = None
current['dup'] = etree.tostring(
cioset, encoding='utf-8').decode('utf8')
if assembly_elt is not None and \
assembly_elt.get('namespace') != 'keep':
current['dup'] = current['dup'].replace(
' xmlns="{0}"'.format(RELAXNG_CIOSET['namespace']), '')
if assembly_elt is not None and \
assembly_elt.get('namespace') == 'remove':
current['dup'] = re_sub(
'<(/?)[a-zA-Z0-9_-]+:', '<\\1', current['dup'])
current['dup'] = re_sub(
' xmlns:[a-zA-Z0-9_-]+="[^"]+"', '', current['dup'])
try:
current['dup'] = etree.ElementTree(etree.XML(current['dup']))
except etree.XMLSyntaxError as error:
return error
# Trace
if current['values'].get('__no_remove__'):
self.save_dup(
'{fid}.{num:0>2}cioset~.xml'.format(
fid=current['file_id'], num=step_num),
current=current)
return None
# -------------------------------------------------------------------------
[docs]
def save_dup(
self,
filename: str,
force: bool = False,
current: dict | None = None) -> str | None:
"""Save current Data Under Process into a file.
:param str filename:
Relatve path to the current path of the file to save.
:param bool force: (default=False)
If ``True``, copy the file even if exists with an another name.
:param dict current: (optional)
Forced current environment.
:rtype: str
:return:
Relative path of saved file or ``None``.
"""
# Check file name
current = current or self.current
abs_path = normpath(join(current['workdir'], filename))
if not abs_path.startswith(self.directories['ongoing']):
self.error(_('Saving file out of the ongoing directory'))
return None
# Copy FUP
if current['fup'] is not None:
if current['fup'] != abs_path and force:
if not exists(dirname(abs_path)):
makedirs(dirname(abs_path))
copy2(current['fup'], abs_path)
return relpath(abs_path, self.directories['ongoing'])
# Save DUP
# pylint: disable = protected-access
if isinstance(current['dup'], etree._ElementTree):
try:
dup = etree.tostring(
current['dup'],
encoding='utf-8',
pretty_print=True,
xml_declaration=current.get('xml_declaration', False))
except (ValueError, AssertionError, AttributeError) as error:
self.error(str(error))
return None
dup = dup.strip() if dup is not None else ''
dup = tostr(dup) if version_info > (3, 0) else dup
elif current['dup']:
dup = tostr(current['dup']) if version_info > (3, 0) \
else current['dup'].encode('utf8')
else:
return None
# pylint: enable = protected-access
if not exists(dirname(abs_path)):
makedirs(dirname(abs_path))
with open(abs_path, 'w', encoding='utf8') as hdl:
hdl.write(dup)
return relpath(abs_path, self.directories['ongoing'])
# -------------------------------------------------------------------------
[docs]
def dup2unicode(self):
"""Convert Data Under Process into a string."""
# pylint: disable = protected-access
if self.current['dup'] is None and self.current['fup']:
self.current['dup'] = load_guessing_encoding(self.current['fup'])
self.current['fup'] = None
elif not self.current['dup']:
self.current['dup'] = ''
self.current['fup'] = None
elif isinstance(self.current['dup'], etree._ElementTree):
self.current['dup'] = etree.tostring(
self.current['dup'],
encoding='utf-8',
pretty_print=True,
xml_declaration=self.current.get('xml_declaration', False))
self.current['dup'] = self.current['dup'].decode('utf8')
else:
self.current['dup'] = tounicode(self.current['dup'])
# -------------------------------------------------------------------------
[docs]
def dup2xml(self) -> str | None:
"""Convert Data Under Process into a XML DOM tree.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
# pylint: disable = protected-access
if self.current['dup'] is None:
tree = load_xml(self.current['fup'])
if not isinstance(tree, etree._ElementTree):
return tree
self.current['fup'] = None
self.current['dup'] = tree
elif not isinstance(self.current['dup'], etree._ElementTree):
tree = load_xml(
'{0}.xml'.format(self.current['file_id']),
data=tostr(self.current['dup']))
if not isinstance(tree, etree._ElementTree):
return tree
self.current['fup'] = None
self.current['dup'] = tree
# pylint: enable = protected-access
return None
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs]
def make_fid(self, input_file: str) -> str | None:
"""Create a file ID from ``input_file``.
:param str input_file:
Absolute path to the input file.
:rtype: str
"""
environment = self.environment if self.environment is not None else {}
if 'input' not in environment:
return None
mode = environment['input']['make_fid']['mode']
fid = make_id(
splitext(basename(input_file))[0],
None if mode == 'none' else mode)
if 'pattern' in environment['input']['make_fid']:
fid = re_sub(
environment['input']['make_fid']['pattern'],
environment['input']['make_fid']['replace'], fid)
return fid
# -------------------------------------------------------------------------
[docs]
def info(self, text: str):
"""Add an information message in the result.
:param str text:
Information.
"""
if 'infos' not in self.result:
self.result['infos'] = []
if self.current['file_id'] is not None:
text = _(
'[${fid}] ${text}', {
'fid': self.current['file_id'],
'text': self.translate(text)
})
text = self.translate(text)
self.result['infos'].append(text)
self.trace(text, 'I')
# -------------------------------------------------------------------------
[docs]
def warning(self, text: str):
"""Add a warning message in the result.
:param str text:
Warning.
"""
if 'warnings' not in self.result:
self.result['warnings'] = []
if self.current['file_id'] is not None:
text = _(
'[${fid}] ${text}', {
'fid': self.current['file_id'],
'text': self.translate(text)
})
text = self.translate(text)
self.result['warnings'].append(text)
self.trace(text, 'W')
# -------------------------------------------------------------------------
[docs]
def error(self, text: str):
"""Add an error message in the result.
:param str text:
Error.
"""
if 'errors' not in self.result:
self.result['errors'] = []
if self.current['file_id'] is not None:
text = _(
'[${fid}] ${text}', {
'fid': self.current['file_id'],
'text': self.translate(text)
})
text = self.translate(text)
self.result['errors'].append(text)
self.trace(text, 'E')
# -------------------------------------------------------------------------
[docs]
def translate(self, text: str) -> str:
"""Return ``text`` translated.
:param str text:
Text to translate.
:rtype: str
"""
return translate(text, self.lang)
# -------------------------------------------------------------------------
def _install_processor(self, processor_id: str, directory: str) -> bool:
"""Install processor, possibly with inheritance, in build directory.
:param str processor_id:
ID of the processor to return.
:param str directory:
Absolute path to the installation directory.
:rtype: bool
:return:
``True`` if it succeeds.
"""
# Find configuration
environment = self.environment if self.environment is not None else {}
if processor_id not in environment['processors']:
self.error(_('Unknown processor "${p}"', {'p': processor_id}))
return False
# Read processor.xml file to check if other processors are needed
path = join(environment['processors'][processor_id], PROCESSOR_XML)
try:
tree = etree.parse(path)
except OSError as error:
self.error(str(error))
return False
for ancestor_id in tree.xpath(
'//proc:ancestor/text()',
namespaces={ # yapf: disable
'proc': tree.getroot().nsmap[None]
}):
if not self._install_processor(ancestor_id, directory):
return False
# Import extra files
for elt in tree.xpath('//proc:import',
namespaces={'proc': tree.getroot().nsmap[None]}):
path = elt.text.strip().format(**self.values)
if ':' in path:
path = abspath_from_asset_spec(path)
if not isabs(path):
path = join(environment['processors'][processor_id], path)
target = normpath(join( # yapf: disable
directory,
elt.get('to', basename(path)).format(**self.values)))
if not target.startswith(directory):
continue
if isdir(path):
copy_content(path, target, exclude=EXCLUDED_FILES, force=True)
elif exists(path):
if not exists(dirname(target)):
makedirs(dirname(target))
copy2(path, target)
# Install processor
copy_content(
environment['processors'][processor_id],
directory,
exclude=EXCLUDED_FILES + ('Variables', 'Scss'),
force=True)
return True
# -------------------------------------------------------------------------
def _install_processor_template(
self, current: dict, template: str) -> bool:
"""Install processor part of the template.
:param dict current:
Dictionary representing the current object to process.
:param str template:
Relative path to the directory containing the processor template
directory.
:rtype: bool
"""
environment = self.environment if self.environment is not None else {}
if not environment.get('template'):
return False
current['template'] = template
current['abs_template'] = location_path2abs_path(
self.locations, template)
if current['abs_template'] is None:
return False
processor_path = join(
current['abs_template'], environment['template']['directory'])
if not exists(processor_path):
return False
copy_content_re(
processor_path,
self.directories['processor'],
exclude=environment['template']['exclude'])
return True
# -------------------------------------------------------------------------
def _compile(self) -> bool:
"""Compile Relax NG, regular expressions and XSL.
:rtype: bool
"""
# Compile Relax NG
self.relaxngs = {}
environment = self.environment if self.environment is not None else {}
for pattern in environment.get('relaxngs', ''):
filename = join(
self.directories['processor'],
environment['relaxngs'][pattern])
try:
self.relaxngs[pattern] = etree.RelaxNG(etree.parse(filename))
except IOError as error:
self.error(str(error))
return False
except (etree.XMLSyntaxError, etree.RelaxNGParseError) as error:
self.error(_('${f}: ${e}', {'f': filename, 'e': error}))
return False
# Load regular expressions and XSL for steps
for step in self.steps:
if step['type'] == 'regex-transform':
if not self._load_regex_file(step):
return False
elif step['type'] == 'xsl-transform':
if not self._load_xsl(step):
return False
return True
# -------------------------------------------------------------------------
def _load_regex_file(self, step: dict) -> bool:
"""Load a list of regular expressions from a file.
:param dict step:
Dictionary defining the current step.
:rtype: bool
"""
regex_file = join(self.directories['processor'], step['file'])
if not exists(regex_file):
self.error(
_(
'Regular expression file "${f}" does not exist',
{'f': step['file']}))
return False
regex = []
with io_open(regex_file, 'r', encoding='utf8') as lines:
for line in lines:
if line and line[0] != '#' and line[0:7] != '[Regex]':
pattern, replace = line.partition(' =')[::2]
pattern = pattern.strip()
if not pattern:
continue
if pattern[0] in '\'"' and pattern[-1] in '\'"':
pattern = pattern[1:-1]
replace = replace.strip()
if replace and replace[0] in '\'"' and \
replace[-1] in '\'"':
replace = replace[1:-1]
# pylint: disable = eval-used
if replace.startswith('lambda'):
replace = eval(replace)
regex.append(
(re_compile(pattern, MULTILINE | UNICODE), replace))
step['regex'] = regex
return True
# -------------------------------------------------------------------------
def _load_xsl(self, step: dict) -> bool:
"""Load a XSL file.
:param dict step:
Dictionary defining the current step.
:rtype: bool
"""
xsl_file = join(self.directories['processor'], step['file'])
if not exists(xsl_file):
self.error(
_('XSL file "${f}" does not exist', {'f': step['file']}))
return False
try:
step['xslt'] = etree.XSLT(etree.parse(xsl_file))
except (IOError, etree.XSLTParseError, etree.XMLSyntaxError) as error:
self.error(str(error))
return False
return True
# -------------------------------------------------------------------------
def _update_values(self, current: dict, cioset_elt: etree.Element):
"""Update current environment with values found in the cioset.
:param dict current:
Dictionary representing the current object to process.
:type cioset_elt: lxml.etree.Element
:param cioset_elt:
Root element of the cioset.
"""
environment = self.environment if self.environment is not None else {}
if 'variables' not in environment:
return
namespaces = {'set': RELAXNG_CIOSET['namespace']}
for elt in cioset_elt.xpath(
'set:values[not(@for) or @for="{0}"]/set:value'.format(
environment['processor_id']), namespaces=namespaces):
name = elt.get('variable')
if name in environment['variables']:
current['values'][name] = convert_value(
environment['variables'][name]['type'],
elt.text.replace(' ', '‧').strip().replace('‧', ' ')
if elt.text else '')
# -------------------------------------------------------------------------
def _update_resources(
self, user_uid: int, current: dict, cioset_elt: etree.Element):
"""Update current environment with resources found in the cioset.
:param int user_uid:
Current user UID.
:param dict current:
Dictionary representing the current object to process.
:type cioset_elt: lxml.etree.Element
:param cioset_elt:
Root element of the cioset.
"""
namespaces = {'set': RELAXNG_CIOSET['namespace']}
environment = self.environment if self.environment is not None else {}
resource_elts = cioset_elt.xpath(
'set:resources[not(@for) or @for="{0}"]/set:resource'.format(
environment['processor_id']),
namespaces=namespaces)
if not resource_elts:
return
current['resources'] = list(self.resources)
resource_elts.reverse()
for resource_elt in resource_elts:
path = self._cioset_path2abs_path(user_uid, current, resource_elt)
if path and exists(path) and stat(path).st_uid == user_uid:
current['resources'].insert(0, path)
current['resources'] = list(OrderedDict.fromkeys(current['resources']))
# -------------------------------------------------------------------------
def _make_copies(
self, user_uid: int, current: dict, cioset_elt: etree.Element):
"""Make the copies requested by the cioset.
:param int user_uid:
Current user UID.
:param dict current:
Dictionary representing the current object to process.
:type cioset_elt: lxml.etree.Element
:param cioset_elt:
Root element of the cioset.
"""
namespaces = {'set': RELAXNG_CIOSET['namespace']}
root = dirname(current['input_file'])
environment = self.environment if self.environment is not None else {}
for copy_elt in cioset_elt.xpath(
'set:copies[not(@for) or @for="{0}"]/set:copy'.format(
environment['processor_id']), namespaces=namespaces):
self._make_copy(root, user_uid, current, copy_elt)
# -------------------------------------------------------------------------
def _make_copy(
self, root: str, user_uid: int, current: dict,
copy_elt: etree.Element):
"""Make one copy requested by the cioset.
:param str root:
Absolute path to the cioset file.
:param int user_uid:
File system user ID.
:param dict current:
Dictionary representing the current object to process.
:type copy_elt: lxml.etree.Element
:param copy_elt:
XML copy element.
"""
# Source
src_file = normpath(join(root, copy_elt.text.strip()))
if not exists(src_file) or stat(src_file).st_uid != user_uid:
return
# Destination
dst_file = normpath(
join(current['workdir'],
copy_elt.get('to').strip()))
if not dst_file.startswith(self.directories['ongoing']):
return
# The source is a directory
if isdir(src_file):
exclude = copy_elt.get('exclude')
if exclude is not None:
exclude = re_compile(exclude)
copy_content_re(src_file, dst_file, exclude)
# The source is a file
else:
if not exists(dirname(dst_file)):
makedirs(dirname(dst_file))
copy2(src_file, dst_file)
# -------------------------------------------------------------------------
def _workdir(self, current: dict):
"""Update the path of the working directory in the ``current``
dictionary.
:param dict current:
Dictionary representing the current object to process.
"""
rel_path = '.' if not current['values'].get('__unflattened__') else \
self.rel_files.get(current['input_file'], '.')
if current['values'].get('__subdir__') and current['file_id']:
current['workdir'] = join(
self.directories['ongoing'], rel_path,
current['values']['__subdir__'].format(
fid=current['file_id'],
parent=basename(dirname(current['input_file'])),
**current['values']))
current['workdir'] = normpath(current['workdir'])
if not current['workdir'].startswith(self.directories['ongoing']):
current['workdir'] = normpath(
join(self.directories['ongoing'], rel_path))
elif 'ongoing' in self.directories:
current['workdir'] = normpath(
join(self.directories['ongoing'], rel_path))
else:
current['workdir'] = None
# -------------------------------------------------------------------------
def _cioset_path2abs_path(
self, user_uid: int, current: dict,
file_elt: etree.Element) -> str | None:
"""Absolute path for the file pointed by ``file_elt``.
:param int user_uid:
Current user UID.
:param dict current:
Dictionary representing the current object to process.
:type file_elt: lxml.etree.Element
:param file_elt:
Cioset XML file element for the current file.
:rtype: :class:`str` or ``None``
"""
path = cioset_path2abs_path(
self.locations, current['input_file'], file_elt)
if not path or not exists(path) or stat(path).st_uid != user_uid:
self.warning(_('Unknown file: ${f}', { # yapf: disable
'f': basename(cioset_path2full_path(file_elt))}))
return None
return path