"""INI scripts management."""
from os import makedirs, rename, walk
from os.path import join, exists, dirname, basename, normpath, splitext, isfile
from fnmatch import filter as fnfilter
from configparser import ConfigParser
from shlex import split
from chrysalio.lib.utils import EXCLUDED_FILES, tounicode, execute
from chrysalio.lib.config import config_get, config_get_list
from chrysalio.lib.config import config_get_namespace
from .i18n import _
MEDIA_EXT = {
'image': ('svg', 'png', 'webp', 'tif', 'tiff', 'jpg', 'jpeg', 'pdf', 'eps',
'gif', 'psd', 'ai'),
'audio': ('wav', 'ogg', 'mp3', 'm4a', 'aac', 'ac3'),
'video': ('mp4', 'mkv', 'webm', 'mpg', 'avi', 'mov', 'ogv', 'dv', 'flv'),
'latex': ('tex',)}
# =============================================================================
[docs]
class IniScript():
"""Class for INI script managment."""
# -------------------------------------------------------------------------
def __init__(self):
"""Constructor method."""
self._ini_files = []
# -------------------------------------------------------------------------
[docs]
def execute(self, pbuild, step):
"""Interprete INI file generated by XSL transformation.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
# Retrieve INI files
pbuild.trace('**{0}) {1}**'.format(
step['num'], pbuild.translate(_('Media conversion'))))
pbuild.progress_save()
if not self._load_ini_files(pbuild) or pbuild.aborted():
return None
# Known sources
todo = len(self._ini_files) - self._known_sources(pbuild, step)
# Sources in resources
for resource in pbuild.current_abs_resources():
if not todo or pbuild.aborted():
break
# A file
if isfile(resource):
todo -= self._try_source_file(pbuild, step, resource)
continue
# A directory
done = set()
for path, dirs, ignored_ in walk(resource):
for name in tuple(dirs):
if name in EXCLUDED_FILES or '~' in name:
dirs.remove(name)
if path in done:
continue
done.add(path)
todo -= self._try_source_directory(pbuild, step, path)
if not todo or pbuild.aborted():
break
return self._check_conversion(pbuild, step)
# -------------------------------------------------------------------------
def _load_ini_files(self, pbuild):
"""Load and check all INI files.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:rtype: bool
The attribute ``_ini_files`` is a list of dictionaries with keys
``'config'``, ``'id'``, ``'extensions'``, ``'source'``, ``'target'``.
"""
self._ini_files = []
for path, ignored_, files in walk(pbuild.current['workdir']):
for name in sorted(fnfilter(
files, '{0}.--*~.ini'.format(pbuild.current['file_id']))):
# Load the INI file
config = ConfigParser({
'ini_file': join(path, name),
'here': path,
'processor': pbuild.directories['processor'],
'lang': pbuild.lang or '',
'id': '{id}',
'ext': '{ext}',
'srcext': '{srcext}',
'source': '{source}',
'sourcepath': '{sourcepath}',
'target': '{target}',
'targetpath': '{targetpath}'})
config.read(tounicode(join(path, name)), encoding='utf8')
# Check INI content
media_type = config_get(config, 'Source', 'type')
media_id = config_get(config, 'Source', 'id')
target = normpath(config_get(config, 'Target', 'file', ''))
if media_type not in ('image', 'audio', 'video', 'latex') or \
not media_id or not target:
pbuild.warning(_('"${f}" is not a media INI script.', {
'f': name}))
continue
if not target.startswith(pbuild.directories['ongoing']):
pbuild.error(_('Target out of the ongoing directory'))
return False
# Store informations
media_exts = tuple(config_get_list(
config, 'Source', 'ext_list') or MEDIA_EXT[media_type])
ext = splitext(media_id)[1]
if ext and ext[1:] in media_exts:
media_exts = (ext[1:],)
media_id = splitext(media_id)[0]
self._ini_files.append({
'config': config,
'id': media_id,
'extensions': media_exts,
'source': config_get(config, 'Source', 'file'),
'target': target})
return True
# -------------------------------------------------------------------------
def _known_sources(self, pbuild, step):
"""Use a known source to convert a media.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:rtype: int
:return:
Number of ini_file done.
"""
done = 0
for ini_file in tuple(self._ini_files):
if ini_file['source'] is not None:
self._media_convert(pbuild, step, ini_file, ini_file['source'])
done += 1
if pbuild.aborted():
return done
return done
# -------------------------------------------------------------------------
def _try_source_file(self, pbuild, step, abs_file):
"""Try to use a file to convert medias.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:param str abs_file:
Absolute path to the source file.
:rtype: int
:return:
Number of ini_file done.
"""
done = 0
source_id, source_ext = splitext(basename(abs_file))
for ini_file in tuple(self._ini_files):
if 'done' in ini_file:
continue
if ini_file['id'] == source_id and \
source_ext[1:] in ini_file['extensions']:
self._media_convert(pbuild, step, ini_file, abs_file)
done += 1
if pbuild.aborted():
return done
return done
# -------------------------------------------------------------------------
def _try_source_directory(self, pbuild, step, abs_dir):
"""Try to use a source to convert medias.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:param str abs_dir:
Absolute path to the source directory.
:rtype: int
:return:
Number of ini_file done.
"""
done = 0
for ini_file in tuple(self._ini_files):
if 'done' in ini_file:
continue
for ext in ini_file['extensions']:
abs_file = join(abs_dir, '{0}.{1}'.format(ini_file['id'], ext))
if isfile(abs_file):
self._media_convert(pbuild, step, ini_file, abs_file)
done += 1
break
if pbuild.aborted():
return done
return done
# -------------------------------------------------------------------------
def _media_convert(self, pbuild, step, ini_file, abs_source):
"""Convert a media pointed by ``ini_file`` with the source
``abs_source``.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:param tuple ini_file:
Tuple describing the INI file under procress.
:param str abs_source:
Absolute path to the right source file.
If the conversion succeeds, ``ini_file`` is removed from the list.
Otherwise, the key ``done`` is set to ``True``.
"""
abs_target = ini_file['target'].format(
srcext=splitext(abs_source)[1][1:])
# Try to retrieve from previous result directory
if not pbuild.current['values'].get('__no_cache__'):
dependencies = [
k.format(
id=ini_file['id'], source=abs_source,
sourcepath=dirname(abs_source), target=abs_target,
targetpath=dirname(abs_target))
for k in config_get_list(
ini_file['config'], 'Target', 'dependencies', '')]
if pbuild.media_cache(abs_target, dependencies):
self._rename_ini_script(step, ini_file)
self._ini_files.remove(ini_file)
pbuild.trace('→ {0}'.format(basename(abs_target)))
pbuild.progress_save()
return
# Transform media
pbuild.trace('{0} → {1}'.format(
basename(abs_source), basename(abs_target)))
pbuild.progress_save()
if self._media_steps(pbuild, ini_file, abs_source, abs_target):
self._rename_ini_script(step, ini_file)
self._ini_files.remove(ini_file)
else:
self._ini_files[self._ini_files.index(ini_file)]['done'] = True
# -------------------------------------------------------------------------
@classmethod
def _media_steps(cls, pbuild, ini_file, abs_source, abs_target):
"""Execute steps to transformation a media.
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param tuple ini_file:
Tuple describing the INI file under procress.
:param str abs_source:
Full path to source file.
:param str abs_target:
Full path to target file.
:rtype: bool
"""
# Find section for transformation
section = 'Transformation:{0}'.format(splitext(abs_source)[1][1:])
if not ini_file['config'].has_section(section):
section = 'Transformation'
steps = config_get_namespace(ini_file['config'], section, 'step')
if not steps:
pbuild.error(_('No step'))
return False
# Excute the steps
srcext = splitext(abs_source)[1][1:]
cwd = ini_file['config'].get('DEFAULT', 'here')
for step in sorted(steps):
target_dir = dirname(abs_target)
if not exists(target_dir):
makedirs(target_dir)
cmd = config_get(
ini_file['config'], section, 'step.{0}'.format(step)).format(
id=ini_file['id'], source=abs_source,
sourcepath=dirname(abs_source),
srcext=srcext, target=abs_target, targetpath=target_dir)
error = execute(split(tounicode(cmd)), cwd)
if error[1]:
pbuild.error(_('${f}: ${e}', {
'f': ini_file['id'], 'e': pbuild.translate(error[1])}))
return False
if pbuild.aborted():
break
# Check result
if not exists(abs_target):
pbuild.error(_('Unable to make "${i}"', {'i': ini_file['id']}))
return False
return True
# -------------------------------------------------------------------------
def _check_conversion(self, pbuild, step):
"""Checks if the conversion was completed without errors .
:type pbuild: .lib.pbuild.PBuild
:param pbuild:
Current processor build object.
:param dict step:
Dictionary defining the current step.
:param int todo:
Number of remaining files.
:rtype: pyramid.i18n.TranslationString
:return:
An error message or ``None``.
"""
success = True
for ini_file in self._ini_files:
if config_get(
ini_file['config'], 'Source', 'if_missing') == 'warning':
pbuild.warning(
_('No source for "${i}"', {'i': ini_file['id']}))
else:
success = False
pbuild.error(_('No source for "${i}"', {'i': ini_file['id']}))
self._rename_ini_script(step, ini_file)
return None if success else _('At least one media is missing.')
# -------------------------------------------------------------------------
@classmethod
def _rename_ini_script(cls, step, ini_file):
"""Rename the INI script file to disable it for other processing.
:param dict step:
Dictionary defining the current step.
:param tuple ini_file:
Tuple describing the INI file under procress.
"""
abs_ini_file = ini_file['config'].get('DEFAULT', 'ini_file')
try:
rename(abs_ini_file, abs_ini_file.replace(
'.--', '.{num:0>2}'.format(num=step['num'])))
except IOError: # pragma: nocover
pass