Source code for core.plugin.instance

import weakref

from openalea.core import logger
from openalea.core.plugin.plugin import plugin_name


def enhanced_error(error, **kwds):
    """
[docs] Add plugin information to given exception. By default, if a plugin fails, for example because a dependency cannot be imported, user get error message "ImportError: No module named mydep". This message is useless because we don't know which plugin has failed. Once enhanced, error message become: "MyLab (mypackage.lab.mylab): ImportError: No module named mydep" kwds: - plugin: plugin instance - plugin_class: plugin class """ plugin = kwds.pop('plugin', None) plugin_class = kwds.pop('plugin_class', plugin) if plugin: plugin_class = kwds.pop('plugin_class', plugin.__class__) path = '%s.%s' % (plugin_class.__module__, plugin_class.__name__) message = '%s (%s): %s' % (plugin.name, path, error.message) return error.__class__(message) elif plugin_class: path = '%s.%s' % (plugin_class.__module__, plugin_class.__name__) message = '%s: %s' % (path, error.message) return error.__class__(message) else: return error class PluginInstanceManager(object):
[docs] def __init__(self, manager, plugins=None, proxy_class=None): self._plugin_instances = {} self._debug = [] self.set_manager(manager) def set_manager(self, manager): self.pm = manager
[docs] def clear(self): self._plugin_instances = {}
[docs] @property def debug(self):
return self._debug @debug.setter def debug(self, value): if value in (True, 'all', ['all']):
[docs] self._debug = [True] elif value is False: self._debug = [] else: self._debug = value def _debug_mode(self, group): return group in self.debug or True in self.debug or 'all' in self.debug
def __call__(self, group, func=None, **kwds): """ Use this method to launch a function in debug mode. If debug is enabled for this group, errors are raised, else debug is disable, errors pass silently. """ func_args = kwds.pop('func_args', []) func_kwds = kwds.pop('func_kwds', {}) callback = kwds.pop('callback', None) if self._debug_mode(group): return func(*func_args, **func_kwds) else: try: return func(*func_args, **func_kwds) except: logger.error('%s: error calling %s ' % (group, func)) if callback: callback() def register(self, group, name, instance): """
[docs] Add a weakref to instance in dict group -> name -> [list of instances] """ self._plugin_instances.setdefault(group, {}).setdefault(name, []).append(weakref.ref(instance)) def unregister(self, group, name, instance): """
[docs] Unregistered instances won't be list by "instances" method """ try: self._plugin_instances[group][name].remove(instance) except KeyError: # No instances have been registered for this plugin or this group pass except ValueError: # Passed instance is not registered for this plugin pass def _function(self, group, name): plugin = self.pm.item(name, group)
try: function = plugin.implementation except TypeError as e: raise enhanced_error(e, plugin=plugin, plugin_class=plugin.__class__) return function def _new(self, group, name, class_args=None, class_kwds=None): plugin = self.pm.item(name, group) if class_args is None: class_args = [] if class_kwds is None: class_kwds = {} try: klass = plugin.implementation except TypeError as e: raise enhanced_error(e, plugin=plugin, plugin_class=plugin.__class__) instance = klass(*class_args, **class_kwds) try: instance = klass(*class_args, **class_kwds) except TypeError as e: raise enhanced_error(e, plugin=plugin, plugin_class=klass) self.register(group, name, instance) return instance def function(self, group, name): if self._debug_mode(group):
[docs] return self._function(group, name) else: try: return self._function(group, name) except: return None def new(self, group, name, class_args=None, class_kwds=None): """
[docs] Create a new instance and register it. You can get all created instances with instances method. """ return self._new(group, name, class_args, class_kwds) if self._debug_mode(group): return self._new(group, name, class_args, class_kwds) else: try: return self._new(group, name, class_args, class_kwds) except: return None def has_instance(self, group, name): return name in self._plugin_instances.get(group, {})
[docs] def instance(self, group, name, class_args=None, class_kwds=None): """
[docs] Use this function if you always want the same instance: If plugin has never been called, create a new instance else return first created one. """ if name in self._plugin_instances.get(group, {}): instances = self._plugin_instances[group][name] if instances: obj = instances[0]() else: obj = None if obj: return obj # return actual value instead of weakref else: # Object is no more reachable, remove it and generate new one del self._plugin_instances[group][name] return self.instance(group, name, class_args, class_kwds) else: instance = self.new(group, name, class_args, class_kwds) if instance is None: return self._plugin_instances.setdefault(group, {})[name] = [weakref.ref(instance)] return instance def instances(self, group, name=None, class_args=None, class_kwds=None): """
[docs] Return all existing instances corresponding to this plugin """ valid_instances = [] if name is None: for plugin_name in self._plugin_instances.get(group, []): instances = list(self._plugin_instances[group][plugin_name]) for weakref in instances: obj = weakref() if obj is None: self._plugin_instances[group][plugin_name].remove(weakref) else: valid_instances.append(obj) else: try: # return actual value instead of weakref valid_instances = [] for weakref in self._plugin_instances[group][name]: obj = weakref() if obj is None: self._plugin_instances[group][name].remove(weakref) else: valid_instances.append(obj) except KeyError: pass return valid_instances def implementations(self, interface, **kwds): """
[docs] Return all instances implementing this interface """ raise NotImplementedError