Source code for cioprocessor.lib.step

# pylint: disable = too-many-lines
"""Collection of processor steps."""

from sys import path as sys_path
from importlib import import_module
from os import makedirs, walk
from os.path import join, exists, basename, dirname, normpath, splitext
from os.path import relpath, isdir, isfile
from shutil import copy2
from re import compile as re_compile, error as re_error
from zipfile import ZIP_DEFLATED, BadZipfile, ZipFile
from getpass import getuser
from pwd import getpwnam
from datetime import datetime
from collections import OrderedDict

from lxml import etree

from chrysalio.lib.utils import tostr, execute, copy_content_re
from chrysalio.lib.utils import load_guessing_encoding
from chrysalio.lib.xml import load_xml, validate_xml
from .i18n import _
from .utils import clean_directory, select_files
from .iniscript import IniScript
from .epubize import EPubize


EPUBCHECK = 'epubcheck'
CONTAINER_NS = 'urn:oasis:names:tc:opendocument:xmlns:container'


# =============================================================================
[docs] class Step(): """This class manages processor steps.""" # -------------------------------------------------------------------------
[docs] @classmethod def execute(cls, pbuild, step): """Execute a step on a pbuild. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict params: Dictionary of parameter of the current step. :rtype: :class:`pyramid.i18n.TranslationString` or ``None`` """ # pylint: disable = too-many-branches if step['type'] != 'stop' and not cls._active(pbuild, step): return None if step['type'] == 'check': error = cls.check(pbuild, step) elif step['type'] == 'makedir': error = cls.makedir(pbuild, step) elif step['type'] == 'copy': error = cls.copy(pbuild, step) elif step['type'] == 'copy-resource': error = cls.copy_resource(pbuild, step) elif step['type'] == 'copy-template': error = cls.copy_template(pbuild, step) elif step['type'] == 'extract-ocf': error = cls.extract_ocf(pbuild, step) elif step['type'] == 'regex-transform': error = cls.regex(pbuild, step) elif step['type'] == 'xsl-transform': error = cls.xsl(pbuild, step) elif step['type'] == 'py-transform': error = cls.python(pbuild, step) elif step['type'] == 'unzip': error = cls.unzip(pbuild, step) elif step['type'] == 'zip': error = cls.zipize(pbuild, step) elif step['type'] == 'epubize': error = cls.epubize(pbuild, step) elif step['type'] == 'remove': error = cls.remove(pbuild, step) elif step['type'] == 'set-icon': error = cls.set_icon(pbuild, step) elif step['type'] == 'stop': error = cls.stop(pbuild, step) else: error = _('Unknown step: ${s}', {'s': step['type']}) return error
# -------------------------------------------------------------------------
[docs] @classmethod def check(cls, pbuild, step): """Execute a check of the current file. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ if pbuild.current['values'].get('__no_check__') \ and step.get('force') != 'true': cls._save_trace(pbuild, step, _('No check')) return None # XML if step['mode'] == 'xml': return cls._check_xml(pbuild, step) # ePub if step['mode'] == 'epub': return cls._check_epub(pbuild, step) return _('Unknown check mode: ${m}', {'m': step['mode']})
# ------------------------------------------------------------------------- @classmethod def _check_xml(cls, pbuild, step): """Execute a check of the current XML file. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ # pylint: disable = too-many-return-statements pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('XML validation')))) pbuild.progress_save() # Apply to files if 'select' in step: files = [] for filename in select_files(pbuild, step): tree = load_xml(filename, relaxngs=pbuild.relaxngs) # pylint: disable = protected-access if not isinstance(tree, etree._ElementTree): return _('${f}: ${e}', { 'f': basename(filename), 'e': pbuild.translate(tree)}) # pylint: enable = protected-access files.append(relpath(filename, pbuild.directories['ongoing'])) if pbuild.aborted(): break cls._save_trace(pbuild, step, _('Valid XML for:\n\n${l}', { 'l': '\n'.join(sorted(files))})) return None # Apply to Data Under Process as XML # pylint: disable = protected-access if isinstance(pbuild.current['dup'], etree._ElementTree): error = validate_xml(pbuild.current['dup'], pbuild.relaxngs) if error is not None \ and pbuild.current['dup'].getroot().tag != 'cioset': return _('${f}: ${e}', { 'f': pbuild.current['file_id'], 'e': pbuild.translate(error)}) cls._save_trace(pbuild, step, _('Valid XML')) return None # Out of scope if not pbuild.current['fup'] and not pbuild.current['dup']: cls._save_trace(pbuild, step, _('Empty XML')) return None # Apply to Data Under Process as XML filename = pbuild.current['fup'] \ or '{0}.xml'.format(pbuild.current['file_id']) tree = load_xml( filename, relaxngs=pbuild.relaxngs, data=pbuild.current['dup']) if not isinstance(tree, etree._ElementTree): return _('${f}: ${e}', { 'f': basename(filename), 'e': pbuild.translate(tree)}) cls._save_trace(pbuild, step, _('Valid XML')) # pylint: enable = protected-access return None # ------------------------------------------------------------------------- @classmethod def _check_epub(cls, pbuild, step): """Execute a check of the current ePub file. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ if pbuild.current['fup'] is None: return _('epubcheck has no file to check') pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('ePub check')))) pbuild.progress_save() error = execute(['nice', EPUBCHECK, pbuild.current['fup']]) if error[1]: error = error[0] or error[1] error = error.replace( pbuild.directories['ongoing'], '').replace('\n', ' ') return error cls._save_trace(pbuild, step, _('epubcheck OK')) return None # -------------------------------------------------------------------------
[docs] @classmethod def makedir(cls, pbuild, step): """Make a directory. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ directory = step['directory'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) directory = normpath(join(pbuild.current['workdir'], directory)) if not directory.startswith(pbuild.directories['ongoing']): return _('Make directory out of ongoing directory') if exists(directory): return None pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('Directory creation')))) pbuild.progress_save() makedirs(directory) cls._save_trace( pbuild, step, _('Making directory "${d}"', { 'd': relpath(directory, pbuild.current['workdir'])})) return None
# -------------------------------------------------------------------------
[docs] @classmethod def copy(cls, pbuild, step): """Copy files from processor directory to current path. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ # Source src_file = step['file'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) src_file = normpath(join(pbuild.directories['processor'], src_file)) if not exists(src_file): return None if not step.get('failed') else _( 'File "${f}" does not exists.', {'f': basename(src_file)}) if not src_file.startswith(pbuild.directories['processor']): return _('Retrieving files out of the processor directory') # Destination dst_file = step['to'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) dst_file = normpath(join(pbuild.current['workdir'], dst_file)) if not dst_file.startswith(pbuild.directories['ongoing']): return _('Copying files out of the ongoing directory') pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('File copy')))) pbuild.progress_save() # The source is a directory if isdir(src_file): if 'exclude' in step and isinstance(step['exclude'], str): step['exclude'] = re_compile(step['exclude']) copy_content_re(src_file, dst_file, step.get('exclude')) # The source is a file else: if not exists(dirname(dst_file)): makedirs(dirname(dst_file)) copy2(src_file, dst_file) cls._save_trace(pbuild, step, _('Copy "${f}" to "${t}"', { 'f': relpath(src_file, pbuild.directories['processor']), 't': relpath(dst_file, pbuild.current['workdir'])})) return None
# -------------------------------------------------------------------------
[docs] @classmethod def copy_resource(cls, pbuild, step): """Copy files from resources to current path. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ # Sources regex = step['from'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) regex = re_compile(regex) file_path = dirname(pbuild.current['input_file'] or '') src_files = [ k.format(fpath=file_path) for k in pbuild.current['resources'] if regex.search(k) is not None] if not src_files: return None if not step.get('failed') else _( 'No copy from resources.') src_files = tuple(OrderedDict.fromkeys(src_files)) # Destination dst_file = step['to'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) dst_file = normpath(join(pbuild.current['workdir'], dst_file)) if not dst_file.startswith(pbuild.directories['ongoing']): return _('Copying resource out of the ongoing directory') pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('Resource copy')))) pbuild.progress_save() failed = None for src_file in src_files: # The source is a directory if isdir(src_file): if 'exclude' in step and isinstance(step['exclude'], str): step['exclude'] = re_compile(step['exclude']) copy_content_re(src_file, dst_file, step.get('exclude')) # The source is a file elif exists(src_file): if not exists(dirname(dst_file)): makedirs(dirname(dst_file)) copy2(src_file, dst_file) # The source does not exists elif step.get('failed'): failed = _('Resource "${r}" does not exists.', { 'r': basename(src_file)}) if pbuild.aborted(): break cls._save_trace(pbuild, step, _('Resource(s) "${f}" to "${t}"', { 'f': ', '.join([basename(k) for k in src_files]), 't': relpath(dst_file, pbuild.current['workdir'])})) return failed
# -------------------------------------------------------------------------
[docs] @classmethod def copy_template(cls, pbuild, step): """Copy files from template directory to current path. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ # Source abs_template = pbuild.current.get('abs_template') if not abs_template: return None src_file = join(abs_template, step['from'].format( fid=pbuild.current['file_id'], **pbuild.current['values'])) if not exists(src_file): return None if not step.get('failed') else _( 'Template "${t}" does not exists.', {'t': basename(src_file)}) # Destination dst_file = step['to'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) dst_file = normpath(join(pbuild.current['workdir'], dst_file)) if not dst_file.startswith(pbuild.directories['ongoing']): return _('Copying template out of the ongoing directory') pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('Template copy')))) pbuild.progress_save() # The source is a directory if isdir(src_file): if 'exclude' in step and isinstance(step['exclude'], str): step['exclude'] = re_compile(step['exclude']) copy_content_re(src_file, dst_file, step.get('exclude')) # The source is a file elif step.get('unzip'): if not exists(dirname(dst_file)): makedirs(dirname(dst_file)) try: with ZipFile(src_file, 'r') as zip_file: zip_file.extractall(dst_file) except BadZipfile as error: return error # The source is a file else: if not exists(dirname(dst_file)): makedirs(dirname(dst_file)) copy2(src_file, dst_file) cls._save_trace(pbuild, step, _('Template "${f}" to "${t}"', { 'f': relpath(src_file, abs_template), 't': relpath(dst_file, pbuild.current['workdir'])})) return None
# -------------------------------------------------------------------------
[docs] @classmethod def extract_ocf(cls, pbuild, step): """Extract an OCF ZIP container and replace the current file by the first root file. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :return: An error message or ``None``. """ # pylint: disable = too-many-return-statements if not pbuild.current['fup']: return None pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('OCF extraction')))) pbuild.progress_save() # Destination dst_dir = step['to'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) dst_dir = normpath(join(pbuild.current['workdir'], dst_dir)) if not dst_dir.startswith(pbuild.directories['ongoing']): return _('Extracting OCF out of the ongoing directory') # Open OCF archive try: with ZipFile(pbuild.current['fup'], 'r') as zip_file: zip_file.extractall(dst_dir) except BadZipfile as error: return error # Find the root file try: tree = etree.parse( # nosec join(dst_dir, 'META-INF', 'container.xml')) except OSError as error: return error name = tree.find(f'.//{{{CONTAINER_NS}}}rootfile').get('full-path') if name is None: return _('Incorrect OCF ZIP Container') name = join(dst_dir, name) if not exists(name): return _('Incorrect OCF ZIP Container') # Replace the File Under Process pbuild.current['fup'] = name cls._save_trace(pbuild, step, _('OCF extraction "${f}" to "${t}"', { 'f': relpath(pbuild.current['fup'], pbuild.directories['ongoing']), 't': relpath(dst_dir, pbuild.current['workdir'])})) return None
# -------------------------------------------------------------------------
[docs] @classmethod def regex(cls, pbuild, step): """Execute a transformation with a regular expression. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('Regular expressions')))) pbuild.progress_save() # Apply to selected files if 'select' in step: files = [] for path in select_files(pbuild, step): dup = load_guessing_encoding(path) for regex in step['regex']: try: dup = regex[0].sub(regex[1], dup) except re_error as error: return _('${p}: ${e}', { 'p': regex[0].pattern, 'e': error}) if pbuild.aborted(): break dup = tostr(dup) with open(path, 'w', encoding='utf8') as hdl: hdl.write(dup) files.append(relpath(path, pbuild.directories['ongoing'])) if pbuild.aborted(): break cls._save_trace( pbuild, step, _('Regular expressions on:\n\n${l}', { 'l': '\n'.join(sorted(files))})) # Apply to Data Under Process else: # Convert DUP into a string pbuild.dup2unicode() if not pbuild.current['dup']: return None # Apply regular expressions for regex in step['regex']: try: pbuild.current['dup'] = regex[0].sub( regex[1], pbuild.current['dup']) except re_error as error: return _('${p}: ${e}', {'p': regex[0].pattern, 'e': error}) if pbuild.aborted(): break cls._save(pbuild, step) cls._save_trace(pbuild, step, _('Regular expressions')) return None
# -------------------------------------------------------------------------
[docs] @classmethod def xsl(cls, pbuild, step): """Execute a transformation with a XSL file. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ # Create params dictionary params = { 'language': '"{0}"'.format(pbuild.lang), 'pid': '"{0}"'.format(pbuild.environment['processor_id']), 'fid': '"{0}"'.format(pbuild.current['file_id']), 'fpath': '"{0}/"'.format(dirname(pbuild.current['input_relpath'])), 'warehouse': '"{0}"'.format(pbuild.current['input_base_id']), 'processor': '"{0}/"'.format(pbuild.directories['processor']), 'ongoing': '"{0}/"'.format(pbuild.directories['ongoing']), 'workdir': '"{0}/"'.format(pbuild.current['workdir'])} for name, value in pbuild.current['values'].items(): if isinstance(value, bool): params[name] = str(int(value)) elif isinstance(value, int): params[name] = str(value) else: params[name] = '"{0}"'.format(value or '') # Make directory if not exists(pbuild.current['workdir']): makedirs(pbuild.current['workdir']) # Transform if 'select' in step: return cls._xsl_select_files(pbuild, step, params) return cls._xsl_dup(pbuild, step, params)
# ------------------------------------------------------------------------- @classmethod def _xsl_select_files(cls, pbuild, step, params): """Execute a transformation with a XSL file on a selection of files. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :param dict params: Parameters for the XSL. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ files = [] for path in select_files(pbuild, step): tree = load_xml(path) # pylint: disable = protected-access if not isinstance(tree, etree._ElementTree): return tree # pylint: enable = protected-access if step.get('assembly') in ('true', '1'): pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate( _('Assembly of ${f}', {'f': basename(path)})))) pbuild.progress_save() input_file = pbuild.current['input_file'] pbuild.current['input_file'] = path error = pbuild.assembly_xml( getpwnam(getuser()).pw_uid, pbuild.current, tree, step['num']) pbuild.current['input_file'] = input_file if error: return error tree = pbuild.current['dup'] if pbuild.aborted(): break pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate( _('XSL transformation of ${f}', {'f': basename(path)})))) pbuild.progress_save() try: dup = step['xslt'](tree, **params) except etree.XSLTApplyError as error: return error if pbuild.aborted(): break if dup.getroot() is not None: etree.ElementTree(dup.getroot()).write( path, pretty_print=True, encoding='utf-8', xml_declaration='<?xml ' in str(dup)) else: dup = str(dup).strip() if dup: with open(path, 'w', encoding='utf8') as hdl: hdl.write(str(dup)) files.append(relpath(path, pbuild.directories['ongoing'])) err = IniScript().execute(pbuild, step) if err: return err if pbuild.aborted(): break cls._save_trace(pbuild, step, _('XSL transformation on:\n\n${l}', { 'l': '\n'.join(sorted(files))})) return None # ------------------------------------------------------------------------- @classmethod def _xsl_dup(cls, pbuild, step, params): """Execute a transformation with a XSL file on the DUP. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :param dict params: Parameters for the XSL. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('XSL transformation')))) pbuild.progress_save() error = pbuild.dup2xml() if error: return error try: dup = step['xslt'](pbuild.current['dup'], **params) except etree.XSLTApplyError as error: return error if pbuild.aborted(): return None pbuild.current['xml_declaration'] = '<?xml ' in str(dup) if dup.getroot() is not None: pbuild.current['dup'] = etree.ElementTree(dup.getroot()) else: pbuild.current['dup'] = str(dup) cls._save(pbuild, step) cls._save_trace(pbuild, step, _('XSL transformation')) return IniScript().execute(pbuild, step) # -------------------------------------------------------------------------
[docs] @classmethod def python(cls, pbuild, step): """Execute a transformation with a python script. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('Python execution')))) pbuild.progress_save() path = dirname(join(pbuild.directories['processor'], step['file'])) if path not in sys_path: sys_path.append(path) try: module = import_module(basename(splitext(step['file'])[0])) except ImportError as error: return _('Unable to load module "${m}" [${e}]', { 'm': step['file'], 'e': error}) try: err = module.PyTransform(pbuild, step).run() except AttributeError as error: return _('Module "${m}" error: ${e}', { 'm': step['file'], 'e': error}) if err: return err cls._save_trace(pbuild, step, _('Script "${s}"', {'s': step['file']})) cls._save(pbuild, step) return None
# -------------------------------------------------------------------------
[docs] @classmethod def unzip(cls, pbuild, step): """Extract ZIP file and possibly replace the current file by the entry point. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :return: An error message or ``None``. """ if not pbuild.current['fup']: return None pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('ZIP extraction')))) pbuild.progress_save() # Destination dst_dir = step['to'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) dst_dir = normpath(join(pbuild.current['workdir'], dst_dir)) if not dst_dir.startswith(pbuild.directories['ongoing']): return _('Extracting ZIP out of the ongoing directory') # Open OCF archive try: with ZipFile(pbuild.current['fup'], 'r') as zip_file: zip_file.extractall(dst_dir) except (OSError, BadZipfile) as error: return error # Find entry point if step.get('entry'): entry = join(dst_dir, step['entry']) if exists(entry): pbuild.current['fup'] = entry else: return _('"${f}" does not exists in the ZIP.', { 'f': step['entry']}) cls._save_trace(pbuild, step, _('Unzip "${f}" to "${t}"${e}', { 'f': relpath(pbuild.current['fup'], pbuild.directories['ongoing']), 't': relpath(dst_dir, pbuild.current['workdir']), 'e': ' ({})'.format(step['entry']) if step.get('entry') else ''})) return None
# -------------------------------------------------------------------------
[docs] @classmethod def zipize(cls, pbuild, step): """Create zip a directory or a file. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('ZIP')))) pbuild.progress_save() # ZIP source zip_source = step['from'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) zip_source = normpath(join(pbuild.current['workdir'], zip_source)) if not zip_source.startswith(pbuild.directories['ongoing']): return _('ZIP source out of the ongoing directory') # ZIP target zip_target = step['to'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) zip_target = normpath(join(pbuild.current['workdir'], zip_target)) if not zip_target.startswith(pbuild.directories['ongoing']): return _('ZIP creation out of the ongoing directory') if not exists(dirname(zip_target)): makedirs(dirname(zip_target)) # ZIP creation if 'exclude' in step and isinstance(step['exclude'], str): step['exclude'] = re_compile(step['exclude']) if 'exclude' not in step: step['exclude'] = re_compile('(\\.zip|~(\\.\\w{{1,4}})?)$') with ZipFile(zip_target, 'w', ZIP_DEFLATED) as zip_file: if isfile(zip_source): zip_file.write(zip_source, basename(zip_source)) else: source_for_zip = dirname(zip_source) \ if step.get('dirinzip') in ('true', 'True', '1') \ else zip_source for root, ignored_, files in walk(zip_source): for name in files: if step['exclude'].search(name) is None: zip_file.write( join(root, name), relpath(join(root, name), source_for_zip)) if pbuild.aborted(): break if pbuild.aborted(): break pbuild.current['fup'] = zip_target pbuild.current['dup'] = None cls._save_trace( pbuild, step, _('ZIP "${e}" created', {'e': basename(zip_target)})) return None
# -------------------------------------------------------------------------
[docs] @classmethod def epubize(cls, pbuild, step): """Create an ePub. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('ePub creation')))) pbuild.progress_save() # EPub directory epub_dir = step['from'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) epub_dir = normpath(join(pbuild.current['workdir'], epub_dir)) if not epub_dir.startswith(pbuild.directories['ongoing']): return _('EPub source out of the ongoing directory') # EPub file epub_file = step['to'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) epub_file = normpath(join(pbuild.current['workdir'], epub_file)) if not epub_file.startswith(pbuild.directories['ongoing']): return _('EPub creation out of the ongoing directory') # Convert error = EPubize().convert(step, epub_dir, epub_file) if error is not None: return error pbuild.current['fup'] = epub_file pbuild.current['dup'] = None cls._save_trace( pbuild, step, _('ePub "${e}" created', { 'e': basename(epub_file)})) return None
# -------------------------------------------------------------------------
[docs] @classmethod def remove(cls, pbuild, step): """Remove files in current path or one of its sub-directories according to a regular expression. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ if pbuild.current['values'].get('__no_remove__') \ and step.get('force') != 'true': cls._save_trace(pbuild, step, _('No removal')) return None pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('File removal')))) pbuild.progress_save() subworkdir = pbuild.current['workdir'] if 'directory' in step: directory = step['directory'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) subworkdir = normpath(join(subworkdir, directory)) if not subworkdir.startswith(pbuild.directories['ongoing']): return _('Removing files out of the ongoing directory') regex = re_compile(step['select'].format( fid=pbuild.current['file_id'], **pbuild.current['values'])) cls._save_trace(pbuild, step, _('Removal of "${p}" in "${d}/"', { 'p': regex.pattern, 'd': relpath(subworkdir, pbuild.directories['ongoing'])})) clean_directory(subworkdir, regex) return None
# -------------------------------------------------------------------------
[docs] @classmethod def set_icon(cls, pbuild, step): """Copy an icon file into the ongoing directory. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ src_file = step['file'].format( fid=pbuild.current['file_id'], **pbuild.current['values']) src_file = normpath(join(pbuild.directories['processor'], src_file)) if not isfile(src_file): return None if not step.get('failed') else _( 'File "${f}" does not exists.', {'f': basename(src_file)}) if not src_file.startswith(pbuild.directories['processor']): return _('Retrieving files out of the processor directory') pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('Set icon')))) pbuild.progress_save() makedirs(pbuild.directories['ongoing'], exist_ok=True) copy2(src_file, pbuild.directories['ongoing']) cls._save_trace(pbuild, step, _('Set icon "${f}"', { 'f': relpath(src_file, pbuild.directories['processor'])})) return None
# -------------------------------------------------------------------------
[docs] @classmethod def stop(cls, pbuild, step): """Stop the process if corresponding variable is set to ``True``. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :return: A STOP message or ``None``. """ if cls._active(pbuild, step): pbuild.warning(_( 'Break point "${b}" activated', {'b': step['if']})) return 'STOP' return None
# ------------------------------------------------------------------------- @classmethod def _active(cls, pbuild, step): """Check if the possibly `if` attribute of the step is True. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: bool """ condition = step.get('if') if not condition: return True if '==' in condition: variable, value = condition.partition('==')[0::2] return pbuild.current['values'].get(variable) == value if '!=' in condition: variable, value = condition.partition('!=')[0::2] return pbuild.current['values'].get(variable) != value if condition[0] == '!': return not bool(pbuild.current['values'].get(condition[1:])) return bool(pbuild.current['values'].get(condition)) # ------------------------------------------------------------------------- @classmethod def _save_trace(cls, pbuild, step, message): """Save a trace of step ``step``. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :type message: pyramid.i18n.TranslationString :param message: A message. """ if pbuild.current['values'].get('__no_remove__'): filename = '{fid}.{num:0>2}{step}~.txt'.format( fid=pbuild.current['file_id'], num=step['num'], step=step['type'].partition('-')[0]) if not exists(pbuild.current['workdir']): makedirs(pbuild.current['workdir']) with open(join(pbuild.current['workdir'], filename), 'w', encoding='utf8') as hdl: hdl.write(datetime.now().isoformat(' ')) hdl.write('\n') hdl.write(tostr(pbuild.translate(message))) # ------------------------------------------------------------------------- @classmethod def _save(cls, pbuild, step): """Save current state of Data Under Process into a file. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. """ if not pbuild.current['values'].get('__no_remove__'): return # pylint: disable = protected-access if pbuild.current['fup'] is not None: extension = splitext(pbuild.current['fup'])[1] else: extension = '.xml' \ if isinstance(pbuild.current['dup'], etree._ElementTree) \ else '.txt' # pylint: enable = protected-access filename = '{fid}.{num:0>2}{step}~{ext}'.format( fid=pbuild.current['file_id'], num=step['num'], step=step['type'].partition('-')[0], ext=extension) pbuild.save_dup(filename)