Source code for cioprocessor.lib.pservice

"""Processor as a service."""

from __future__ import annotations
from os import scandir, walk
from os.path import exists, isdir, join, relpath, dirname, commonpath
from threading import Event

from pyramid.request import Request

from chrysalio.lib.i18n import translate_field
from cioservice.lib.service import Service
from cioservice.lib.build import Build
from ..lib.i18n import _, translate
from ..lib.utils import select_file
from .pbuild import PBuild


# =============================================================================
[docs] class PService(Service): """Class to manage a processor as a service. It uses a :class:`~.lib.pbuild.PBuild` instead of a :class:`~chrysalio.lib.build.Build`.""" uid = 'cioprocessor' label = _('Processor as a service') _need_files = True # -------------------------------------------------------------------------
[docs] def need_write_permission(self, context: str) -> bool: """Return ``True`` if this service needs input files. See: :meth:`chrysalio.lib.service.Service.need_write_permission` """ processor = self._get_processor(context)[0] return processor is not None and 'output' in processor.environment
# -------------------------------------------------------------------------
[docs] def select_files(self, params: dict): """Select in the build parameters dictionary files which are candidate for the service. See: :meth:`cioservice.lib.service.Service.select_files` """ # pylint: disable = too-many-branches if not params.get('files'): return # Retrieve processor processor = self._get_processor(params.get('context'))[0] if processor is None: return # Select approriate files enter_directories = processor.environment['input'].get( 'enter_directories') abs_files: list[str] = [] rel_files = {} recursive = enter_directories['recursive'] \ if enter_directories else False for abs_file in params['files']: is_dir = isdir(abs_file) if not exists(abs_file) or \ select_file(processor, abs_file, abs_files, is_dir) or \ not enter_directories or not is_dir: continue if recursive: for path, dirs, files in walk(abs_file): for name in dirs: name = join(path, name) if select_file(processor, name, abs_files, True): rel_files[name] = relpath(path, dirname(abs_file)) for name in files: name = join(path, name) if select_file(processor, name, abs_files, False): rel_files[name] = relpath(path, dirname(abs_file)) else: for entry in scandir(abs_file): select_file(processor, entry.path, abs_files, entry.is_dir) params['files'] = sorted(abs_files) if rel_files: root = commonpath(rel_files.values()) params['rel_files'] = { k: relpath(v, root) for k, v in rel_files.items()}
# -------------------------------------------------------------------------
[docs] def select_files_message(self, params: dict) -> str: """Return a message specifying files to be selected. See: :meth:`cioservice.lib.service.Service.select_files_message` """ processor = self._get_processor(params.get('context'))[0] if processor is None or \ 'messages' not in processor.environment['input']: return self._select_files_message return processor.environment['input']['messages'].get( params.get('lang', 'en'), self._select_files_message)
# -------------------------------------------------------------------------
[docs] def make_build( self, build_id: str, params: dict, stopping: Event | None = None) -> Build: """Create a pbuild object. See: :meth:`cioservice.lib.service.Service.make_build` """ # Retrieve build manager build_manager = None if 'modules' in self._registry: if 'cioservice' in self._registry['modules']: build_manager = self._registry['modules'][ 'cioservice'].build_manager params['locations'] = self._registry['modules'][ 'cioservice'].locations if 'cioprocessor' in self._registry['modules']: params['locations'] = self._registry['modules'][ 'cioprocessor'].locations # Get processor processor, error = self._get_processor(params.get('context')) if processor is None: pbuild = PBuild(None, build_manager, build_id, params, stopping) pbuild.error(translate(error, params.get('lang'))) self.write_traces(pbuild) return pbuild # Create the pbuild and lock it pbuild = PBuild(processor, build_manager, build_id, params, stopping) processor.clean_pbuilds() if 'no_execution' in pbuild.result: if not pbuild.settings.get('cron'): pbuild.warning( translate(_('Job already in progress.'), pbuild.lang)) self.write_traces(pbuild) return pbuild # Check input if self.need_files(pbuild.context) and not pbuild.files: if not pbuild.settings.get('cron'): pbuild.warning(translate(_('Nothing to do.'), pbuild.lang)) self.write_traces(pbuild) pbuild.result['no_execution'] = True pbuild.unlock() return pbuild return pbuild
# ------------------------------------------------------------------------- def _run(self, build: Build): """Execute the processor on the **pbuild** ``build``. See: :meth:`cioservice.lib.service.Service._run` """ # Check PBuild pbuild = build if 'ongoing' not in pbuild.directories: pbuild.aborted_message() self.write_traces(pbuild) return # Launch execution processor = self._get_processor(pbuild.context)[0] processor.initialize(pbuild) processor.run(pbuild) processor.finalize(pbuild) # Write messages if 'output' in pbuild.result and pbuild.output: for name in pbuild.result.get('files', ''): pbuild.info(_('Successfully generated: **${f}**', {'f': name})) pbuild.output_info() pbuild.output_unrefreshed() pbuild.aborted_message() self.write_traces(pbuild) # -------------------------------------------------------------------------
[docs] def variables( self, context: str | None, request: Request | None = None) -> dict: """Return an ordered dictionary of variables. See: :meth:`cioservice.lib.service.Service.variables` """ # Retrieve processor processor = self._get_processor(context)[0] if processor is None: return {} # Used by a script if request is None: return processor.variables() # Update translations variables = processor.variables().copy() for name in variables: variables[name]['label'] = translate_field( request, variables[name]['i18n_label']) variables[name]['hint'] = translate_field( request, variables[name]['i18n_hint']) if variables[name].get('i18n_options'): variables[name]['options'] = {} for option in variables[name]['i18n_options']: variables[name]['options'][option] = translate_field( request, variables[name]['i18n_options'][option]) return variables
# -------------------------------------------------------------------------
[docs] def variables_groups(self, context: str, request: Request) -> dict: """Return a dictionary of groups of variables. See: :meth:`cioservice.lib.service.Service.variables_groups` """ # Retrieve processor processor = self._get_processor(context)[0] if processor is None: return {} # Update translations groups = dict(processor.variables_groups()) for name in groups: groups[name] = translate_field(request, groups[name]) return groups
# ------------------------------------------------------------------------- def _get_processor(self, context: str | None) -> tuple: """Return the processor. :param str context: Context name = processor ID. :rtype: tuple :return: A tuple such as ``(processor, error)``. """ if context is None or 'cioprocessor' not in self._registry['modules']: return None, _('CioProcessor is not loaded') return self._registry['modules']['cioprocessor'].processor(context) # ------------------------------------------------------------------------- @classmethod def _translate(cls, build: Build, text: str) -> str: """Return ``text`` translated. :type build: .lib.pbuild.PBuild :param build: Current pbuild object :param str text: Text to translate. :rtype: str """ return translate(text, build.lang)