# pylint: disable = too-many-lines
"""Collection of processor steps."""
from sys import path as sys_path
from importlib import import_module
from os import makedirs, walk
from os.path import join, exists, basename, dirname, normpath, splitext
from os.path import relpath, isdir, isfile
from shutil import copy2
from re import compile as re_compile, error as re_error
from zipfile import ZIP_DEFLATED, BadZipfile, ZipFile
from getpass import getuser
from pwd import getpwnam
from datetime import datetime
from collections import OrderedDict
from lxml import etree
from chrysalio.lib.utils import tostr, execute, copy_content_re
from chrysalio.lib.utils import load_guessing_encoding
from chrysalio.lib.xml import load_xml, validate_xml
from .i18n import _
from .utils import clean_directory, select_files
from .iniscript import IniScript
from .epubize import EPubize
EPUBCHECK = 'epubcheck'
CONTAINER_NS = 'urn:oasis:names:tc:opendocument:xmlns:container'
# =============================================================================
[docs]
class Step():
"""This class manages processor steps."""
# -------------------------------------------------------------------------
[docs]
@classmethod
def execute(cls, pbuild, step):
"""Execute a step on a pbuild.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict params:
Dictionary of parameter of the current step.
:rtype: :class:`pyramid.i18n.TranslationString` or ``None``
"""
# pylint: disable = too-many-branches
if step['type'] != 'stop' and not cls._active(pbuild, step):
return None
if step['type'] == 'check':
error = cls.check(pbuild, step)
elif step['type'] == 'makedir':
error = cls.makedir(pbuild, step)
elif step['type'] == 'copy':
error = cls.copy(pbuild, step)
elif step['type'] == 'copy-resource':
error = cls.copy_resource(pbuild, step)
elif step['type'] == 'copy-template':
error = cls.copy_template(pbuild, step)
elif step['type'] == 'extract-ocf':
error = cls.extract_ocf(pbuild, step)
elif step['type'] == 'regex-transform':
error = cls.regex(pbuild, step)
elif step['type'] == 'xsl-transform':
error = cls.xsl(pbuild, step)
elif step['type'] == 'py-transform':
error = cls.python(pbuild, step)
elif step['type'] == 'unzip':
error = cls.unzip(pbuild, step)
elif step['type'] == 'zip':
error = cls.zipize(pbuild, step)
elif step['type'] == 'epubize':
error = cls.epubize(pbuild, step)
elif step['type'] == 'remove':
error = cls.remove(pbuild, step)
elif step['type'] == 'set-icon':
error = cls.set_icon(pbuild, step)
elif step['type'] == 'stop':
error = cls.stop(pbuild, step)
else:
error = _('Unknown step: ${s}', {'s': step['type']})
return error
# -------------------------------------------------------------------------
[docs]
@classmethod
def check(cls, pbuild, step):
"""Execute a check of the current file.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
if pbuild.current['values'].get('__no_check__') \
and step.get('force') != 'true':
cls._save_trace(pbuild, step, _('No check'))
return None
# XML
if step['mode'] == 'xml':
return cls._check_xml(pbuild, step)
# ePub
if step['mode'] == 'epub':
return cls._check_epub(pbuild, step)
return _('Unknown check mode: ${m}', {'m': step['mode']})
# -------------------------------------------------------------------------
@classmethod
def _check_xml(cls, pbuild, step):
"""Execute a check of the current XML file.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
# pylint: disable = too-many-return-statements
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('XML validation'))))
pbuild.progress_save()
# Apply to files
if 'select' in step:
files = []
for filename in select_files(pbuild, step):
tree = load_xml(filename, relaxngs=pbuild.relaxngs)
# pylint: disable = protected-access
if not isinstance(tree, etree._ElementTree):
return _('${f}: ${e}', {
'f': basename(filename), 'e': pbuild.translate(tree)})
# pylint: enable = protected-access
files.append(relpath(filename, pbuild.directories['ongoing']))
if pbuild.aborted():
break
cls._save_trace(pbuild, step, _('Valid XML for:\n\n${l}', {
'l': '\n'.join(sorted(files))}))
return None
# Apply to Data Under Process as XML
# pylint: disable = protected-access
if isinstance(pbuild.current['dup'], etree._ElementTree):
error = validate_xml(pbuild.current['dup'], pbuild.relaxngs)
if error is not None \
and pbuild.current['dup'].getroot().tag != 'cioset':
return _('${f}: ${e}', {
'f': pbuild.current['file_id'],
'e': pbuild.translate(error)})
cls._save_trace(pbuild, step, _('Valid XML'))
return None
# Out of scope
if not pbuild.current['fup'] and not pbuild.current['dup']:
cls._save_trace(pbuild, step, _('Empty XML'))
return None
# Apply to Data Under Process as XML
filename = pbuild.current['fup'] \
or '{0}.xml'.format(pbuild.current['file_id'])
tree = load_xml(
filename, relaxngs=pbuild.relaxngs, data=pbuild.current['dup'])
if not isinstance(tree, etree._ElementTree):
return _('${f}: ${e}', {
'f': basename(filename), 'e': pbuild.translate(tree)})
cls._save_trace(pbuild, step, _('Valid XML'))
# pylint: enable = protected-access
return None
# -------------------------------------------------------------------------
@classmethod
def _check_epub(cls, pbuild, step):
"""Execute a check of the current ePub file.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
if pbuild.current['fup'] is None:
return _('epubcheck has no file to check')
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('ePub check'))))
pbuild.progress_save()
error = execute(['nice', EPUBCHECK, pbuild.current['fup']])
if error[1]:
error = error[0] or error[1]
error = error.replace(
pbuild.directories['ongoing'], '').replace('\n', ' ')
return error
cls._save_trace(pbuild, step, _('epubcheck OK'))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def makedir(cls, pbuild, step):
"""Make a directory.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
directory = step['directory'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
directory = normpath(join(pbuild.current['workdir'], directory))
if not directory.startswith(pbuild.directories['ongoing']):
return _('Make directory out of ongoing directory')
if exists(directory):
return None
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('Directory creation'))))
pbuild.progress_save()
makedirs(directory)
cls._save_trace(
pbuild, step, _('Making directory "${d}"', {
'd': relpath(directory, pbuild.current['workdir'])}))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def copy(cls, pbuild, step):
"""Copy files from processor directory to current path.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
# Source
src_file = step['file'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
src_file = normpath(join(pbuild.directories['processor'], src_file))
if not exists(src_file):
return None if not step.get('failed') else _(
'File "${f}" does not exists.', {'f': basename(src_file)})
if not src_file.startswith(pbuild.directories['processor']):
return _('Retrieving files out of the processor directory')
# Destination
dst_file = step['to'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
dst_file = normpath(join(pbuild.current['workdir'], dst_file))
if not dst_file.startswith(pbuild.directories['ongoing']):
return _('Copying files out of the ongoing directory')
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('File copy'))))
pbuild.progress_save()
# The source is a directory
if isdir(src_file):
if 'exclude' in step and isinstance(step['exclude'], str):
step['exclude'] = re_compile(step['exclude'])
copy_content_re(src_file, dst_file, step.get('exclude'))
# The source is a file
else:
if not exists(dirname(dst_file)):
makedirs(dirname(dst_file))
copy2(src_file, dst_file)
cls._save_trace(pbuild, step, _('Copy "${f}" to "${t}"', {
'f': relpath(src_file, pbuild.directories['processor']),
't': relpath(dst_file, pbuild.current['workdir'])}))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def copy_resource(cls, pbuild, step):
"""Copy files from resources to current path.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
# Sources
regex = step['from'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
regex = re_compile(regex)
file_path = dirname(pbuild.current['input_file'] or '')
src_files = [
k.format(fpath=file_path) for k in pbuild.current['resources']
if regex.search(k) is not None]
if not src_files:
return None if not step.get('failed') else _(
'No copy from resources.')
src_files = tuple(OrderedDict.fromkeys(src_files))
# Destination
dst_file = step['to'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
dst_file = normpath(join(pbuild.current['workdir'], dst_file))
if not dst_file.startswith(pbuild.directories['ongoing']):
return _('Copying resource out of the ongoing directory')
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('Resource copy'))))
pbuild.progress_save()
failed = None
for src_file in src_files:
# The source is a directory
if isdir(src_file):
if 'exclude' in step and isinstance(step['exclude'], str):
step['exclude'] = re_compile(step['exclude'])
copy_content_re(src_file, dst_file, step.get('exclude'))
# The source is a file
elif exists(src_file):
if not exists(dirname(dst_file)):
makedirs(dirname(dst_file))
copy2(src_file, dst_file)
# The source does not exists
elif step.get('failed'):
failed = _('Resource "${r}" does not exists.', {
'r': basename(src_file)})
if pbuild.aborted():
break
cls._save_trace(pbuild, step, _('Resource(s) "${f}" to "${t}"', {
'f': ', '.join([basename(k) for k in src_files]),
't': relpath(dst_file, pbuild.current['workdir'])}))
return failed
# -------------------------------------------------------------------------
[docs]
@classmethod
def copy_template(cls, pbuild, step):
"""Copy files from template directory to current path.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
# Source
abs_template = pbuild.current.get('abs_template')
if not abs_template:
return None
src_file = join(abs_template, step['from'].format(
fid=pbuild.current['file_id'], **pbuild.current['values']))
if not exists(src_file):
return None if not step.get('failed') else _(
'Template "${t}" does not exists.', {'t': basename(src_file)})
# Destination
dst_file = step['to'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
dst_file = normpath(join(pbuild.current['workdir'], dst_file))
if not dst_file.startswith(pbuild.directories['ongoing']):
return _('Copying template out of the ongoing directory')
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('Template copy'))))
pbuild.progress_save()
# The source is a directory
if isdir(src_file):
if 'exclude' in step and isinstance(step['exclude'], str):
step['exclude'] = re_compile(step['exclude'])
copy_content_re(src_file, dst_file, step.get('exclude'))
# The source is a file
elif step.get('unzip'):
if not exists(dirname(dst_file)):
makedirs(dirname(dst_file))
try:
with ZipFile(src_file, 'r') as zip_file:
zip_file.extractall(dst_file)
except BadZipfile as error:
return error
# The source is a file
else:
if not exists(dirname(dst_file)):
makedirs(dirname(dst_file))
copy2(src_file, dst_file)
cls._save_trace(pbuild, step, _('Template "${f}" to "${t}"', {
'f': relpath(src_file, abs_template),
't': relpath(dst_file, pbuild.current['workdir'])}))
return None
# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
[docs]
@classmethod
def regex(cls, pbuild, step):
"""Execute a transformation with a regular expression.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('Regular expressions'))))
pbuild.progress_save()
# Apply to selected files
if 'select' in step:
files = []
for path in select_files(pbuild, step):
dup = load_guessing_encoding(path)
for regex in step['regex']:
try:
dup = regex[0].sub(regex[1], dup)
except re_error as error:
return _('${p}: ${e}', {
'p': regex[0].pattern, 'e': error})
if pbuild.aborted():
break
dup = tostr(dup)
with open(path, 'w', encoding='utf8') as hdl:
hdl.write(dup)
files.append(relpath(path, pbuild.directories['ongoing']))
if pbuild.aborted():
break
cls._save_trace(
pbuild, step, _('Regular expressions on:\n\n${l}', {
'l': '\n'.join(sorted(files))}))
# Apply to Data Under Process
else:
# Convert DUP into a string
pbuild.dup2unicode()
if not pbuild.current['dup']:
return None
# Apply regular expressions
for regex in step['regex']:
try:
pbuild.current['dup'] = regex[0].sub(
regex[1], pbuild.current['dup'])
except re_error as error:
return _('${p}: ${e}', {'p': regex[0].pattern, 'e': error})
if pbuild.aborted():
break
cls._save(pbuild, step)
cls._save_trace(pbuild, step, _('Regular expressions'))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def xsl(cls, pbuild, step):
"""Execute a transformation with a XSL file.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
# Create params dictionary
params = {
'language': '"{0}"'.format(pbuild.lang),
'pid': '"{0}"'.format(pbuild.environment['processor_id']),
'fid': '"{0}"'.format(pbuild.current['file_id']),
'fpath': '"{0}/"'.format(dirname(pbuild.current['input_relpath'])),
'warehouse': '"{0}"'.format(pbuild.current['input_base_id']),
'processor': '"{0}/"'.format(pbuild.directories['processor']),
'ongoing': '"{0}/"'.format(pbuild.directories['ongoing']),
'workdir': '"{0}/"'.format(pbuild.current['workdir'])}
for name, value in pbuild.current['values'].items():
if isinstance(value, bool):
params[name] = str(int(value))
elif isinstance(value, int):
params[name] = str(value)
else:
params[name] = '"{0}"'.format(value or '')
# Make directory
if not exists(pbuild.current['workdir']):
makedirs(pbuild.current['workdir'])
# Transform
if 'select' in step:
return cls._xsl_select_files(pbuild, step, params)
return cls._xsl_dup(pbuild, step, params)
# -------------------------------------------------------------------------
@classmethod
def _xsl_select_files(cls, pbuild, step, params):
"""Execute a transformation with a XSL file on a selection of files.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:param dict params:
Parameters for the XSL.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
files = []
for path in select_files(pbuild, step):
tree = load_xml(path)
# pylint: disable = protected-access
if not isinstance(tree, etree._ElementTree):
return tree
# pylint: enable = protected-access
if step.get('assembly') in ('true', '1'):
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(
_('Assembly of ${f}', {'f': basename(path)}))))
pbuild.progress_save()
input_file = pbuild.current['input_file']
pbuild.current['input_file'] = path
error = pbuild.assembly_xml(
getpwnam(getuser()).pw_uid, pbuild.current, tree,
step['num'])
pbuild.current['input_file'] = input_file
if error:
return error
tree = pbuild.current['dup']
if pbuild.aborted():
break
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(
_('XSL transformation of ${f}', {'f': basename(path)}))))
pbuild.progress_save()
try:
dup = step['xslt'](tree, **params)
except etree.XSLTApplyError as error:
return error
if pbuild.aborted():
break
if dup.getroot() is not None:
etree.ElementTree(dup.getroot()).write(
path, pretty_print=True, encoding='utf-8',
xml_declaration='<?xml ' in str(dup))
else:
dup = str(dup).strip()
if dup:
with open(path, 'w', encoding='utf8') as hdl:
hdl.write(str(dup))
files.append(relpath(path, pbuild.directories['ongoing']))
err = IniScript().execute(pbuild, step)
if err:
return err
if pbuild.aborted():
break
cls._save_trace(pbuild, step, _('XSL transformation on:\n\n${l}', {
'l': '\n'.join(sorted(files))}))
return None
# -------------------------------------------------------------------------
@classmethod
def _xsl_dup(cls, pbuild, step, params):
"""Execute a transformation with a XSL file on the DUP.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:param dict params:
Parameters for the XSL.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('XSL transformation'))))
pbuild.progress_save()
error = pbuild.dup2xml()
if error:
return error
try:
dup = step['xslt'](pbuild.current['dup'], **params)
except etree.XSLTApplyError as error:
return error
if pbuild.aborted():
return None
pbuild.current['xml_declaration'] = '<?xml ' in str(dup)
if dup.getroot() is not None:
pbuild.current['dup'] = etree.ElementTree(dup.getroot())
else:
pbuild.current['dup'] = str(dup)
cls._save(pbuild, step)
cls._save_trace(pbuild, step, _('XSL transformation'))
return IniScript().execute(pbuild, step)
# -------------------------------------------------------------------------
[docs]
@classmethod
def python(cls, pbuild, step):
"""Execute a transformation with a python script.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('Python execution'))))
pbuild.progress_save()
path = dirname(join(pbuild.directories['processor'], step['file']))
if path not in sys_path:
sys_path.append(path)
try:
module = import_module(basename(splitext(step['file'])[0]))
except ImportError as error:
return _('Unable to load module "${m}" [${e}]', {
'm': step['file'], 'e': error})
try:
err = module.PyTransform(pbuild, step).run()
except AttributeError as error:
return _('Module "${m}" error: ${e}', {
'm': step['file'], 'e': error})
if err:
return err
cls._save_trace(pbuild, step, _('Script "${s}"', {'s': step['file']}))
cls._save(pbuild, step)
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def unzip(cls, pbuild, step):
"""Extract ZIP file and possibly replace the current file by the
entry point.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:return:
An error message or ``None``.
"""
if not pbuild.current['fup']:
return None
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('ZIP extraction'))))
pbuild.progress_save()
# Destination
dst_dir = step['to'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
dst_dir = normpath(join(pbuild.current['workdir'], dst_dir))
if not dst_dir.startswith(pbuild.directories['ongoing']):
return _('Extracting ZIP out of the ongoing directory')
# Open OCF archive
try:
with ZipFile(pbuild.current['fup'], 'r') as zip_file:
zip_file.extractall(dst_dir)
except (OSError, BadZipfile) as error:
return error
# Find entry point
if step.get('entry'):
entry = join(dst_dir, step['entry'])
if exists(entry):
pbuild.current['fup'] = entry
else:
return _('"${f}" does not exists in the ZIP.', {
'f': step['entry']})
cls._save_trace(pbuild, step, _('Unzip "${f}" to "${t}"${e}', {
'f': relpath(pbuild.current['fup'], pbuild.directories['ongoing']),
't': relpath(dst_dir, pbuild.current['workdir']),
'e': ' ({})'.format(step['entry']) if step.get('entry') else ''}))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def zipize(cls, pbuild, step):
"""Create zip a directory or a file.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('ZIP'))))
pbuild.progress_save()
# ZIP source
zip_source = step['from'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
zip_source = normpath(join(pbuild.current['workdir'], zip_source))
if not zip_source.startswith(pbuild.directories['ongoing']):
return _('ZIP source out of the ongoing directory')
# ZIP target
zip_target = step['to'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
zip_target = normpath(join(pbuild.current['workdir'], zip_target))
if not zip_target.startswith(pbuild.directories['ongoing']):
return _('ZIP creation out of the ongoing directory')
if not exists(dirname(zip_target)):
makedirs(dirname(zip_target))
# ZIP creation
if 'exclude' in step and isinstance(step['exclude'], str):
step['exclude'] = re_compile(step['exclude'])
if 'exclude' not in step:
step['exclude'] = re_compile('(\\.zip|~(\\.\\w{{1,4}})?)$')
with ZipFile(zip_target, 'w', ZIP_DEFLATED) as zip_file:
if isfile(zip_source):
zip_file.write(zip_source, basename(zip_source))
else:
source_for_zip = dirname(zip_source) \
if step.get('dirinzip') in ('true', 'True', '1') \
else zip_source
for root, ignored_, files in walk(zip_source):
for name in files:
if step['exclude'].search(name) is None:
zip_file.write(
join(root, name),
relpath(join(root, name), source_for_zip))
if pbuild.aborted():
break
if pbuild.aborted():
break
pbuild.current['fup'] = zip_target
pbuild.current['dup'] = None
cls._save_trace(
pbuild, step,
_('ZIP "${e}" created', {'e': basename(zip_target)}))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def epubize(cls, pbuild, step):
"""Create an ePub.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('ePub creation'))))
pbuild.progress_save()
# EPub directory
epub_dir = step['from'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
epub_dir = normpath(join(pbuild.current['workdir'], epub_dir))
if not epub_dir.startswith(pbuild.directories['ongoing']):
return _('EPub source out of the ongoing directory')
# EPub file
epub_file = step['to'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
epub_file = normpath(join(pbuild.current['workdir'], epub_file))
if not epub_file.startswith(pbuild.directories['ongoing']):
return _('EPub creation out of the ongoing directory')
# Convert
error = EPubize().convert(step, epub_dir, epub_file)
if error is not None:
return error
pbuild.current['fup'] = epub_file
pbuild.current['dup'] = None
cls._save_trace(
pbuild, step, _('ePub "${e}" created', {
'e': basename(epub_file)}))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def remove(cls, pbuild, step):
"""Remove files in current path or one of its sub-directories according
to a regular expression.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
if pbuild.current['values'].get('__no_remove__') \
and step.get('force') != 'true':
cls._save_trace(pbuild, step, _('No removal'))
return None
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('File removal'))))
pbuild.progress_save()
subworkdir = pbuild.current['workdir']
if 'directory' in step:
directory = step['directory'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
subworkdir = normpath(join(subworkdir, directory))
if not subworkdir.startswith(pbuild.directories['ongoing']):
return _('Removing files out of the ongoing directory')
regex = re_compile(step['select'].format(
fid=pbuild.current['file_id'], **pbuild.current['values']))
cls._save_trace(pbuild, step, _('Removal of "${p}" in "${d}/"', {
'p': regex.pattern,
'd': relpath(subworkdir, pbuild.directories['ongoing'])}))
clean_directory(subworkdir, regex)
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def set_icon(cls, pbuild, step):
"""Copy an icon file into the ongoing directory.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
src_file = step['file'].format(
fid=pbuild.current['file_id'], **pbuild.current['values'])
src_file = normpath(join(pbuild.directories['processor'], src_file))
if not isfile(src_file):
return None if not step.get('failed') else _(
'File "${f}" does not exists.', {'f': basename(src_file)})
if not src_file.startswith(pbuild.directories['processor']):
return _('Retrieving files out of the processor directory')
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('Set icon'))))
pbuild.progress_save()
makedirs(pbuild.directories['ongoing'], exist_ok=True)
copy2(src_file, pbuild.directories['ongoing'])
cls._save_trace(pbuild, step, _('Set icon "${f}"', {
'f': relpath(src_file, pbuild.directories['processor'])}))
return None
# -------------------------------------------------------------------------
[docs]
@classmethod
def stop(cls, pbuild, step):
"""Stop the process if corresponding variable is set to ``True``.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:return:
A STOP message or ``None``.
"""
if cls._active(pbuild, step):
pbuild.warning(_(
'Break point "${b}" activated', {'b': step['if']}))
return 'STOP'
return None
# -------------------------------------------------------------------------
@classmethod
def _active(cls, pbuild, step):
"""Check if the possibly `if` attribute of the step is True.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: bool
"""
condition = step.get('if')
if not condition:
return True
if '==' in condition:
variable, value = condition.partition('==')[0::2]
return pbuild.current['values'].get(variable) == value
if '!=' in condition:
variable, value = condition.partition('!=')[0::2]
return pbuild.current['values'].get(variable) != value
if condition[0] == '!':
return not bool(pbuild.current['values'].get(condition[1:]))
return bool(pbuild.current['values'].get(condition))
# -------------------------------------------------------------------------
@classmethod
def _save_trace(cls, pbuild, step, message):
"""Save a trace of step ``step``.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:type message: pyramid.i18n.TranslationString
:param message:
A message.
"""
if pbuild.current['values'].get('__no_remove__'):
filename = '{fid}.{num:0>2}{step}~.txt'.format(
fid=pbuild.current['file_id'], num=step['num'],
step=step['type'].partition('-')[0])
if not exists(pbuild.current['workdir']):
makedirs(pbuild.current['workdir'])
with open(join(pbuild.current['workdir'], filename),
'w', encoding='utf8') as hdl:
hdl.write(datetime.now().isoformat(' '))
hdl.write('\n')
hdl.write(tostr(pbuild.translate(message)))
# -------------------------------------------------------------------------
@classmethod
def _save(cls, pbuild, step):
"""Save current state of Data Under Process into a file.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
"""
if not pbuild.current['values'].get('__no_remove__'):
return
# pylint: disable = protected-access
if pbuild.current['fup'] is not None:
extension = splitext(pbuild.current['fup'])[1]
else:
extension = '.xml' \
if isinstance(pbuild.current['dup'], etree._ElementTree) \
else '.txt'
# pylint: enable = protected-access
filename = '{fid}.{num:0>2}{step}~{ext}'.format(
fid=pbuild.current['file_id'], num=step['num'],
step=step['type'].partition('-')[0], ext=extension)
pbuild.save_dup(filename)