"""Processor as a service."""
from __future__ import annotations
from os import scandir, walk
from os.path import exists, isdir, join, relpath, dirname, commonpath
from threading import Event
from pyramid.request import Request
from chrysalio.lib.i18n import translate_field
from cioservice.lib.service import Service
from cioservice.lib.build import Build
from ..lib.i18n import _, translate
from ..lib.utils import select_file
from .pbuild import PBuild
# =============================================================================
[docs]
class PService(Service):
"""Class to manage a processor as a service. It uses a
:class:`~.lib.pbuild.PBuild` instead of a
:class:`~chrysalio.lib.build.Build`."""
uid = 'cioprocessor'
label = _('Processor as a service')
_need_files = True
# -------------------------------------------------------------------------
[docs]
def need_write_permission(self, context: str) -> bool:
"""Return ``True`` if this service needs input files.
See: :meth:`chrysalio.lib.service.Service.need_write_permission`
"""
processor = self._get_processor(context)[0]
return processor is not None and 'output' in processor.environment
# -------------------------------------------------------------------------
[docs]
def select_files(self, params: dict):
"""Select in the build parameters dictionary files which are candidate
for the service.
See: :meth:`cioservice.lib.service.Service.select_files`
"""
# pylint: disable = too-many-branches
if not params.get('files'):
return
# Retrieve processor
processor = self._get_processor(params.get('context'))[0]
if processor is None:
return
# Select approriate files
enter_directories = processor.environment['input'].get(
'enter_directories')
abs_files: list[str] = []
rel_files = {}
recursive = enter_directories['recursive'] \
if enter_directories else False
for abs_file in params['files']:
is_dir = isdir(abs_file)
if not exists(abs_file) or \
select_file(processor, abs_file, abs_files, is_dir) or \
not enter_directories or not is_dir:
continue
if recursive:
for path, dirs, files in walk(abs_file):
for name in dirs:
name = join(path, name)
if select_file(processor, name, abs_files, True):
rel_files[name] = relpath(path, dirname(abs_file))
for name in files:
name = join(path, name)
if select_file(processor, name, abs_files, False):
rel_files[name] = relpath(path, dirname(abs_file))
else:
for entry in scandir(abs_file):
select_file(processor, entry.path, abs_files, entry.is_dir)
params['files'] = sorted(abs_files)
if rel_files:
root = commonpath(rel_files.values())
params['rel_files'] = {
k: relpath(v, root) for k, v in rel_files.items()}
# -------------------------------------------------------------------------
[docs]
def select_files_message(self, params: dict) -> str:
"""Return a message specifying files to be selected.
See: :meth:`cioservice.lib.service.Service.select_files_message`
"""
processor = self._get_processor(params.get('context'))[0]
if processor is None or \
'messages' not in processor.environment['input']:
return self._select_files_message
return processor.environment['input']['messages'].get(
params.get('lang', 'en'), self._select_files_message)
# -------------------------------------------------------------------------
[docs]
def make_build(
self,
build_id: str,
params: dict,
stopping: Event | None = None) -> Build:
"""Create a pbuild object.
See: :meth:`cioservice.lib.service.Service.make_build`
"""
# Retrieve build manager
build_manager = None
if 'modules' in self._registry:
if 'cioservice' in self._registry['modules']:
build_manager = self._registry['modules'][
'cioservice'].build_manager
params['locations'] = self._registry['modules'][
'cioservice'].locations
if 'cioprocessor' in self._registry['modules']:
params['locations'] = self._registry['modules'][
'cioprocessor'].locations
# Get processor
processor, error = self._get_processor(params.get('context'))
if processor is None:
pbuild = PBuild(None, build_manager, build_id, params, stopping)
pbuild.error(translate(error, params.get('lang')))
self.write_traces(pbuild)
return pbuild
# Create the pbuild and lock it
pbuild = PBuild(processor, build_manager, build_id, params, stopping)
processor.clean_pbuilds()
if 'no_execution' in pbuild.result:
if not pbuild.settings.get('cron'):
pbuild.warning(
translate(_('Job already in progress.'), pbuild.lang))
self.write_traces(pbuild)
return pbuild
# Check input
if self.need_files(pbuild.context) and not pbuild.files:
if not pbuild.settings.get('cron'):
pbuild.warning(translate(_('Nothing to do.'), pbuild.lang))
self.write_traces(pbuild)
pbuild.result['no_execution'] = True
pbuild.unlock()
return pbuild
return pbuild
# -------------------------------------------------------------------------
def _run(self, build: Build):
"""Execute the processor on the **pbuild** ``build``.
See: :meth:`cioservice.lib.service.Service._run`
"""
# Check PBuild
pbuild = build
if 'ongoing' not in pbuild.directories:
pbuild.aborted_message()
self.write_traces(pbuild)
return
# Launch execution
processor = self._get_processor(pbuild.context)[0]
processor.initialize(pbuild)
processor.run(pbuild)
processor.finalize(pbuild)
# Write messages
if 'output' in pbuild.result and pbuild.output:
for name in pbuild.result.get('files', ''):
pbuild.info(_('Successfully generated: **${f}**', {'f': name}))
pbuild.output_info()
pbuild.output_unrefreshed()
pbuild.aborted_message()
self.write_traces(pbuild)
# -------------------------------------------------------------------------
[docs]
def variables(
self, context: str | None, request: Request | None = None) -> dict:
"""Return an ordered dictionary of variables.
See: :meth:`cioservice.lib.service.Service.variables`
"""
# Retrieve processor
processor = self._get_processor(context)[0]
if processor is None:
return {}
# Used by a script
if request is None:
return processor.variables()
# Update translations
variables = processor.variables().copy()
for name in variables:
variables[name]['label'] = translate_field(
request, variables[name]['i18n_label'])
variables[name]['hint'] = translate_field(
request, variables[name]['i18n_hint'])
if variables[name].get('i18n_options'):
variables[name]['options'] = {}
for option in variables[name]['i18n_options']:
variables[name]['options'][option] = translate_field(
request, variables[name]['i18n_options'][option])
return variables
# -------------------------------------------------------------------------
[docs]
def variables_groups(self, context: str, request: Request) -> dict:
"""Return a dictionary of groups of variables.
See: :meth:`cioservice.lib.service.Service.variables_groups`
"""
# Retrieve processor
processor = self._get_processor(context)[0]
if processor is None:
return {}
# Update translations
groups = dict(processor.variables_groups())
for name in groups:
groups[name] = translate_field(request, groups[name])
return groups
# -------------------------------------------------------------------------
def _get_processor(self, context: str | None) -> tuple:
"""Return the processor.
:param str context:
Context name = processor ID.
:rtype: tuple
:return:
A tuple such as ``(processor, error)``.
"""
if context is None or 'cioprocessor' not in self._registry['modules']:
return None, _('CioProcessor is not loaded')
return self._registry['modules']['cioprocessor'].processor(context)
# -------------------------------------------------------------------------
@classmethod
def _translate(cls, build: Build, text: str) -> str:
"""Return ``text`` translated.
:type build: .lib.pbuild.PBuild
:param build:
Current pbuild object
:param str text:
Text to translate.
:rtype: str
"""
return translate(text, build.lang)