Source code for cioprocessor.lib.iniscript

"""INI scripts management."""

from os import makedirs, rename, walk
from os.path import join, exists, dirname, basename, normpath, splitext, isfile
from fnmatch import filter as fnfilter
from configparser import ConfigParser
from shlex import split

from chrysalio.lib.utils import EXCLUDED_FILES, tounicode, execute
from chrysalio.lib.config import config_get, config_get_list
from chrysalio.lib.config import config_get_namespace
from .i18n import _


MEDIA_EXT = {
    'image': ('svg', 'png', 'webp', 'tif', 'tiff', 'jpg', 'jpeg', 'pdf', 'eps',
              'gif', 'psd', 'ai'),
    'audio': ('wav', 'ogg', 'mp3', 'm4a', 'aac', 'ac3'),
    'video': ('mp4', 'mkv', 'webm', 'mpg', 'avi', 'mov', 'ogv', 'dv', 'flv'),
    'latex': ('tex',)}


# =============================================================================
[docs] class IniScript(): """Class for INI script managment.""" # ------------------------------------------------------------------------- def __init__(self): """Constructor method.""" self._ini_files = [] # -------------------------------------------------------------------------
[docs] def execute(self, pbuild, step): """Interprete INI file generated by XSL transformation. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ # Retrieve INI files pbuild.trace('**{0}) {1}**'.format( step['num'], pbuild.translate(_('Media conversion')))) pbuild.progress_save() if not self._load_ini_files(pbuild) or pbuild.aborted(): return None # Known sources todo = len(self._ini_files) - self._known_sources(pbuild, step) # Sources in resources for resource in pbuild.current_abs_resources(): if not todo or pbuild.aborted(): break # A file if isfile(resource): todo -= self._try_source_file(pbuild, step, resource) continue # A directory done = set() for path, dirs, ignored_ in walk(resource): for name in tuple(dirs): if name in EXCLUDED_FILES or '~' in name: dirs.remove(name) if path in done: continue done.add(path) todo -= self._try_source_directory(pbuild, step, path) if not todo or pbuild.aborted(): break return self._check_conversion(pbuild, step)
# ------------------------------------------------------------------------- def _load_ini_files(self, pbuild): """Load and check all INI files. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :rtype: bool The attribute ``_ini_files`` is a list of dictionaries with keys ``'config'``, ``'id'``, ``'extensions'``, ``'source'``, ``'target'``. """ self._ini_files = [] for path, ignored_, files in walk(pbuild.current['workdir']): for name in sorted(fnfilter( files, '{0}.--*~.ini'.format(pbuild.current['file_id']))): # Load the INI file config = ConfigParser({ 'ini_file': join(path, name), 'here': path, 'processor': pbuild.directories['processor'], 'lang': pbuild.lang or '', 'id': '{id}', 'ext': '{ext}', 'srcext': '{srcext}', 'source': '{source}', 'sourcepath': '{sourcepath}', 'target': '{target}', 'targetpath': '{targetpath}'}) config.read(tounicode(join(path, name)), encoding='utf8') # Check INI content media_type = config_get(config, 'Source', 'type') media_id = config_get(config, 'Source', 'id') target = normpath(config_get(config, 'Target', 'file', '')) if media_type not in ('image', 'audio', 'video', 'latex') or \ not media_id or not target: pbuild.warning(_('"${f}" is not a media INI script.', { 'f': name})) continue if not target.startswith(pbuild.directories['ongoing']): pbuild.error(_('Target out of the ongoing directory')) return False # Store informations media_exts = tuple(config_get_list( config, 'Source', 'ext_list') or MEDIA_EXT[media_type]) ext = splitext(media_id)[1] if ext and ext[1:] in media_exts: media_exts = (ext[1:],) media_id = splitext(media_id)[0] self._ini_files.append({ 'config': config, 'id': media_id, 'extensions': media_exts, 'source': config_get(config, 'Source', 'file'), 'target': target}) return True # ------------------------------------------------------------------------- def _known_sources(self, pbuild, step): """Use a known source to convert a media. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :rtype: int :return: Number of ini_file done. """ done = 0 for ini_file in tuple(self._ini_files): if ini_file['source'] is not None: self._media_convert(pbuild, step, ini_file, ini_file['source']) done += 1 if pbuild.aborted(): return done return done # ------------------------------------------------------------------------- def _try_source_file(self, pbuild, step, abs_file): """Try to use a file to convert medias. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :param str abs_file: Absolute path to the source file. :rtype: int :return: Number of ini_file done. """ done = 0 source_id, source_ext = splitext(basename(abs_file)) for ini_file in tuple(self._ini_files): if 'done' in ini_file: continue if ini_file['id'] == source_id and \ source_ext[1:] in ini_file['extensions']: self._media_convert(pbuild, step, ini_file, abs_file) done += 1 if pbuild.aborted(): return done return done # ------------------------------------------------------------------------- def _try_source_directory(self, pbuild, step, abs_dir): """Try to use a source to convert medias. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :param str abs_dir: Absolute path to the source directory. :rtype: int :return: Number of ini_file done. """ done = 0 for ini_file in tuple(self._ini_files): if 'done' in ini_file: continue for ext in ini_file['extensions']: abs_file = join(abs_dir, '{0}.{1}'.format(ini_file['id'], ext)) if isfile(abs_file): self._media_convert(pbuild, step, ini_file, abs_file) done += 1 break if pbuild.aborted(): return done return done # ------------------------------------------------------------------------- def _media_convert(self, pbuild, step, ini_file, abs_source): """Convert a media pointed by ``ini_file`` with the source ``abs_source``. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :param tuple ini_file: Tuple describing the INI file under procress. :param str abs_source: Absolute path to the right source file. If the conversion succeeds, ``ini_file`` is removed from the list. Otherwise, the key ``done`` is set to ``True``. """ abs_target = ini_file['target'].format( srcext=splitext(abs_source)[1][1:]) # Try to retrieve from previous result directory if not pbuild.current['values'].get('__no_cache__'): dependencies = [ k.format( id=ini_file['id'], source=abs_source, sourcepath=dirname(abs_source), target=abs_target, targetpath=dirname(abs_target)) for k in config_get_list( ini_file['config'], 'Target', 'dependencies', '')] if pbuild.media_cache(abs_target, dependencies): self._rename_ini_script(step, ini_file) self._ini_files.remove(ini_file) pbuild.trace('→ {0}'.format(basename(abs_target))) pbuild.progress_save() return # Transform media pbuild.trace('{0}{1}'.format( basename(abs_source), basename(abs_target))) pbuild.progress_save() if self._media_steps(pbuild, ini_file, abs_source, abs_target): self._rename_ini_script(step, ini_file) self._ini_files.remove(ini_file) else: self._ini_files[self._ini_files.index(ini_file)]['done'] = True # ------------------------------------------------------------------------- @classmethod def _media_steps(cls, pbuild, ini_file, abs_source, abs_target): """Execute steps to transformation a media. :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param tuple ini_file: Tuple describing the INI file under procress. :param str abs_source: Full path to source file. :param str abs_target: Full path to target file. :rtype: bool """ # Find section for transformation section = 'Transformation:{0}'.format(splitext(abs_source)[1][1:]) if not ini_file['config'].has_section(section): section = 'Transformation' steps = config_get_namespace(ini_file['config'], section, 'step') if not steps: pbuild.error(_('No step')) return False # Excute the steps srcext = splitext(abs_source)[1][1:] cwd = ini_file['config'].get('DEFAULT', 'here') for step in sorted(steps): target_dir = dirname(abs_target) if not exists(target_dir): makedirs(target_dir) cmd = config_get( ini_file['config'], section, 'step.{0}'.format(step)).format( id=ini_file['id'], source=abs_source, sourcepath=dirname(abs_source), srcext=srcext, target=abs_target, targetpath=target_dir) error = execute(split(tounicode(cmd)), cwd) if error[1]: pbuild.error(_('${f}: ${e}', { 'f': ini_file['id'], 'e': pbuild.translate(error[1])})) return False if pbuild.aborted(): break # Check result if not exists(abs_target): pbuild.error(_('Unable to make "${i}"', {'i': ini_file['id']})) return False return True # ------------------------------------------------------------------------- def _check_conversion(self, pbuild, step): """Checks if the conversion was completed without errors . :type pbuild: .lib.pbuild.PBuild :param pbuild: Current processor build object. :param dict step: Dictionary defining the current step. :param int todo: Number of remaining files. :rtype: pyramid.i18n.TranslationString :return: An error message or ``None``. """ success = True for ini_file in self._ini_files: if config_get( ini_file['config'], 'Source', 'if_missing') == 'warning': pbuild.warning( _('No source for "${i}"', {'i': ini_file['id']})) else: success = False pbuild.error(_('No source for "${i}"', {'i': ini_file['id']})) self._rename_ini_script(step, ini_file) return None if success else _('At least one media is missing.') # ------------------------------------------------------------------------- @classmethod def _rename_ini_script(cls, step, ini_file): """Rename the INI script file to disable it for other processing. :param dict step: Dictionary defining the current step. :param tuple ini_file: Tuple describing the INI file under procress. """ abs_ini_file = ini_file['config'].get('DEFAULT', 'ini_file') try: rename(abs_ini_file, abs_ini_file.replace( '.--', '.{num:0>2}'.format(num=step['num']))) except IOError: # pragma: nocover pass