Source code for core.manager
import inspect
from openalea.core.observer import Observed, AbstractListener
[docs]class UnknownItemError(Exception):
pass
[docs]class GenericManager(Observed, AbstractListener):
def __init__(self, items=None, item_proxy=None, autoload=['entry_points']):
"""
:param plugins: list of plugins you want to add manually
:param plugin_proxy: proxy class to use by default
"""
Observed.__init__(self)
AbstractListener.__init__(self)
self._autoload = autoload
self.default_group = 'default'
self._item = {} # dict group -> item name -> item class or item proxy
self._item_proxy = {}
self.debug = False
self._proxies = {}
self.item_proxy = item_proxy
if items is not None:
self.add_items(items)
# API TO IMPLEMENT
[docs] def generate_item_id(self, item):
raise NotImplementedError
[docs] def load_items(self, group=None):
raise NotImplementedError
[docs] def discover(self, group=None):
raise NotImplementedError
[docs] def instantiate(self, item):
raise NotImplementedError
# API COMMON TO ALL MANAGERS
[docs] def generate_item_name(self, item):
try:
name = item.name
except AttributeError:
try:
name = item.__class__.__name__
except AttributeError:
name = str(item.__class__)
return name
[docs] def clear(self):
self._item = {} # dict group -> item name -> item class or item proxy
self._item_loaded = {}
self._proxies = {}
[docs] def add(self, item, group, item_proxy=None, **kwds):
if item_proxy is None and group in self._item_proxy:
item_proxy = self._item_proxy[group]
if item_proxy:
item = item_proxy(item)
item = self.instantiate(item)
self.patch_item(item)
self._item.setdefault(group, {})[item.identifier] = item
return item
[docs] def add_items(self, items, group):
for group, item in items.iteritems():
self.add(item, group)
[docs] def item(self, identifier, group=None):
"""
item(self, group, identifier)
-> Plugin or raises UnknownPluginError
"""
if group is None:
group = self.default_group
items = self.items(group)
if identifier in self._item[group]:
return self._item[group][identifier]
else:
for item in items:
if item.name == identifier:
return item
args = dict(identifier=identifier, group=group)
raise UnknownItemError("Item %(identifier)s not found in %(group)s" % args)
[docs] def items(self, group=None, tags=None, criteria=None, **kwds):
if group is None:
group = self.default_group
try:
items = self._item[group].values()
except KeyError:
self._item.setdefault(group, {})
self.discover(group)
items = self._item[group].values()
if criteria is None:
criteria = {}
valid_items = []
for pl in items:
# Check tags. If one tag dont match, ignore this item
if tags is not None and all(tag in pl.tags for tag in tags) is False:
continue
# Check all criteria. If one criteria dont match, ignore item
if not all(hasattr(pl, criterion) and getattr(pl, criterion)
== criteria[criterion] for criterion in criteria):
continue
valid_items.append(pl)
return valid_items
[docs] def patch_item(self, item):
if hasattr(item, '__patched__'):
return
item.__patched__ = True
if not hasattr(item, "identifier"):
item.identifier = self.generate_item_id(item)
if not hasattr(item, "name"):
item.name = self.generate_item_name(item)
if not hasattr(item, "label"):
item.label = item.name.replace('_', ' ').capitalize()
if not hasattr(item, "tags"):
item.tags = []
if not hasattr(item, "criteria"):
item.criteria = {}
[docs] def set_proxy(self, group, item_proxy):
"""
Embed all item for given group in item_proxy.
"""
self._item_proxy[group] = item_proxy
def _sorted_items(self, items):
item_dict = {}
for item in items:
item_dict[item.name] = item
sorted_items = [item_dict[name] for name in sorted(item_dict.keys())]
return sorted_items