Source code for core.project.project

# -*- python -*-
#
#       OpenAlea.OALab: Multi-Paradigm GUI
#
#       Copyright 2014 INRIA - CIRAD - INRA
#
#       File author(s): Julien Coste <julien.coste@inria.fr>, Guillaume Baty <guillaume.baty@inria.fr>
#
#       File contributor(s):
#
#       Distributed under the Cecill-C License.
#       See accompanying file LICENSE.txt or copy at
#           http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
#       OpenAlea WebSite : http://openalea.gforge.inria.fr
#
###############################################################################
"""
The Project is a structure which permits to manage different objects.

It stores **metadata** (alias, author, description, version, license, ...) and **data** (src, models, images, ...).

You have here the default architecture of the project saved in directory "project".


    /project
        oaproject.cfg        (Configuration file)
        /model          (Files sources, Script Python, LPy...)
        /control       (Control, like color map or curve)
        /world          (scene, scene 3D)
        /cache          (Intermediary saved objects)
        /data           (Data files like images, .dat, ...)
        /lib            (Contains python modules and packages)
        /startup          (Preprocessing scripts)
            *.py            (Preprocessing scripts)
            *import*.py     (Libs and packages to import in preprocessing)

:use:
    .. code-block:: python

        project = Project(path="/path/to/proj/project")
        project.add(category="model", filename="hello.py", content="print 'Hello World'")
        project.author = "John Doe"
        project.description = "This project is used to said hello to everyone"
        project.save()
"""

__all__ = [
    "Project",
    "ErrorItemExistsInProject",
]

from collections import OrderedDict
import copy
import os

from openalea.core.control import Control
from openalea.core.data import Data
from openalea.core.observer import Observed
from openalea.core.path import path as Path
from openalea.core.project.configobj import ConfigObj
from openalea.core.service.data import DataFactory
from openalea.core.service.interface import interface_name
from openalea.core.service.model import to_model, ModelFactory
from openalea.core.customexception import CustomException, ErrorInvalidItem, ErrorInvalidItemName


[docs]class ErrorItemExistsInProject(CustomException): title = u'Error: item exists in project yet.' message = u'Item %(name)s is in project yet' desc = u"As item is in project yet, you cannot add it again. Use replacement instead" def _kargs(self): return dict( project=self._args[0], category=self._args[1], name=self._args[2], )
def _normpath(path): """ Replace all symlink in path with real path and return its absolute path For example, if given path is "local/bin/python" and "local" is a symbolic link to "/usr/local", returned path will be "/usr/local/bin/python" """ if hasattr(os, 'readlink'): parts = Path(path).splitall() _path = Path('') for p in parts: _path = _path / p if _path.islink(): # readlink return an absolute path or relative path depending on symlink. # If symlink is a relative link, parent path is used to generate an absolute path # Default path behaviour when concatenating two absolute paths is to keep only second one: # path('/a/1')/path('/b/2') -> path('/b/2') # So, if symlink is absolute, all is ok _path = _path.parent / _path.readlink() return _path.abspath() else: return path.abspath() class MetaData(Control): pass class CategoryInfo(dict): pass
[docs]class Project(Observed): DEFAULT_METADATA = OrderedDict([ ("alias", MetaData('alias', 'IStr', 'MyProject')), ("icon", MetaData('icon', 'IFileStr', '')), ("authors", MetaData('author', 'ISequence', [])), ("description", MetaData('description', 'IStr')), ("long_description", MetaData('long_description', 'IStr')), ("citation", MetaData('citation', 'IStr')), ("url", MetaData('url', 'IStr')), ("dependencies", MetaData('dependencies', 'ISequence')), ("license", MetaData('license', 'IStr')), ("version", MetaData('version', 'IStr', '0.1')), ]) DEFAULT_CATEGORIES = OrderedDict([ ("cache", CategoryInfo(title='Temporary Data')), ("data", CategoryInfo(title='Data')), ("model", CategoryInfo(title='Model')), ("world", CategoryInfo(title='World')), ("startup", CategoryInfo(title='Startup')), ("doc", CategoryInfo(title='Documentation')), ("lib", CategoryInfo(title='Libraries')), ]) config_filename = "oaproject.cfg" MODE_COPY = 'copy' MODE_LINK = 'link' def __init__(self, path, **kwargs): self.categories = kwargs.get('categories', self.DEFAULT_CATEGORIES) Observed.__init__(self) self.metadata = OrderedDict() # listeners = kwargs['listeners'] if 'listeners' in kwargs else [] # for listener in listeners: # self.register_listener(listener) self.started = False self._path = _normpath(path) # Fill metadata for k, v in self.DEFAULT_METADATA.iteritems(): self.metadata[k] = kwargs.get(k, v.value) # Allocate category dictionaries for k in self.categories: self.__dict__[k] = {} if self._path.exists(): self._load() # self.notify_listeners(('project_loaded', (self, self.path))) # else: # self.notify_listeners(('project_created', (self, self.path))) # self.notify_listeners(('project_changed', self)) self.ns = {} def __setattr__(self, key, value): if key == "categories": return super(Project, self).__setattr__(key, value) elif key in self.DEFAULT_METADATA: old_value = self.metadata[key] if old_value != value: self.metadata[key] = value self.notify_listeners(('metadata_changed', (self, key, old_value, value))) self.notify_listeners(('project_changed', self)) elif key in self.categories: raise NameError("cannot change '%s' attribute" % key) else: return super(Project, self).__setattr__(key, value) def __getattr__(self, key): if key in self.DEFAULT_METADATA: return super(Project, self).__getattribute__('metadata')[key] else: return super(Project, self).__getattribute__(key) def __repr__(self): return "%s(%r)" % (self.__class__.__name__, str(self.path)) def _repr_html_(self): import openalea.core import base64 from openalea.deploy.shared_data import shared_data from openalea.core.project.formatting.html import html_metainfo_summary, html_item_summary from openalea.core.formatting.util import obj_icon_path from IPython.display import Image stylesheet_path = shared_data(openalea.core, 'stylesheet.css') if stylesheet_path and stylesheet_path.isfile(): with open(stylesheet_path) as f: stylesheet = f.read() html = '<style>%s</style>' % stylesheet else: html = '' icon = obj_icon_path(self, paths=[self.path]) if icon: data = base64.b64encode(Image(filename=icon)._repr_png_()).decode('ascii') image = '<image width="64px" style="vertical-align:middle;" src="data:image/png;base64,%s">' % data else: image = '' args = dict(image=image, title=self.title) html += '<div class="summary"><p class="title">%(image)s%(title)s</p>' % args html += '\n<hr>' html += html_metainfo_summary(self) html += html_item_summary(self) html += '</div>' return html @property
[docs] def path(self): return self._path
@property
[docs] def projectdir(self): return self._path.parent
@property
[docs] def name(self): return self._path.name
@property
[docs] def icon_path(self): """ :return: the complete path of the icon. To modify it, you have to modify the path of project, the name of project and/or the self.icon. """ icon_name = None if self.icon: if not self.icon.startswith(':'): # local icon icon_name = self.path / self.icon return icon_name
[docs] def start(self, *args, **kwargs): """ Load controls if available, execute all files in startup. """ self._load_controls() self.started = True self.ns.clear() loading_code = [ 'import sys', 'sys.path.insert(0, %r)' % str(self.path / 'lib') ] loading_code = '\n'.join(loading_code) loading = ModelFactory(mimetype='text/x-python') ns = loading.run_code(loading_code, self.ns) self.ns.update(ns) for startup in self.startup.values(): model = to_model(startup) ns = model.run_code(startup.read(), self.ns) self.ns.update(ns) interpreter = kwargs.get('shell') if interpreter: #interpreter.shell.user_ns.clear() interpreter.shell.init_user_ns() interpreter.shell.user_ns.update(self.ns)
[docs] def stop(self, *args, **kwargs): self.started = False self.ns.clear() from openalea.core.control.manager import ControlManager cm = ControlManager() cm.clear()
[docs] def run(self, filename, *args, **kwargs): model = self.get_runnable_model(filename) return self.run_model(model)
[docs] def run_model(self, model, *args, **kwargs): ns = {} ns.update(self.ns) ns.update(kwargs.pop('namespace', {})) ns["Model"] = self.get_runnable_model return model.run(*args, namespace=ns, **kwargs)
[docs] def add(self, category, obj=None, **kwargs): return self.add_item(category, obj, **kwargs)
[docs] def get(self, category, name, **kwargs): return self.get_item(category, name)
[docs] def remove(self, category, obj=None, **kwargs): return self.remove_item(category, obj=obj, **kwargs)
def _add_item(self, category, obj=None, **kwargs): mode = kwargs.pop('mode', self.MODE_COPY) if obj and isinstance(obj, Data): # TODO: Check obj follow Data or Model interface ?? new_path = self.path / category / obj.path.name if obj.path != new_path and mode == self.MODE_COPY: # TODO: use Data.copy instead return self._add_item(category, path=obj.path, **kwargs) category_dict = getattr(self, category) if obj.filename not in category_dict: category_dict[str(obj.filename)] = obj else: raise ValueError("data '%s' already exists in project '%s'" % (obj.filename, self.alias)) elif obj: category_dict = getattr(self, category) if obj.name not in category_dict: category_dict[str(obj.name)] = obj else: raise ValueError("data '%s' already exists in project '%s'" % (obj.name, self.alias)) else: filename = Path(kwargs.pop('filename')) if 'filename' in kwargs else None content = kwargs.pop('content', None) dtype = kwargs.pop('dtype', None) mimetype = kwargs.pop('mimetype', None) path = Path(kwargs.pop('path')) if 'path' in kwargs else None # If project path exists, ie project exists on disk, # Create category dir if necessary category_path = self.path / category if self.path.exists() and not category_path.exists(): category_path.makedirs() if filename: new_path = self.path / category / filename.name elif path: if not path.exists(): raise ErrorInvalidItem("path '%s' doesn't exists" % path) filename = path.name new_path = self.path / category / filename else: raise ValueError("path or filename required") if path is None: path = new_path # If data was outside project, we try to fix it. # If mode is "prefer copy", we try to copy file inside project # If copy fails, we get original content and pass it to new data # If mode is "prefer link", we just keep original path (keep outside project) # TODO: Move to Data.copy data_obj = None if new_path.abspath() != path.abspath() and mode == self.MODE_COPY: try: path.copyfile(new_path) except IOError: data_obj = DataFactory(path, mimetype, dtype=dtype, default_content=content) content = data_obj.read() else: content = None elif new_path.abspath() != path.abspath() and mode == self.MODE_LINK: new_path = path else: pass # Nothing to do, data is yet in the right place data_obj = DataFactory(new_path, mimetype, dtype=dtype, default_content=content) obj = self._add_item(category, data_obj, **kwargs) obj.package = self return obj def _remove_item(self, category, obj=None, **kwargs): category_dict = getattr(self, category) filename = kwargs['filename'] if 'filename' in kwargs else None if obj is None and filename is None: raise ValueError('You must specify a data object or a filename') if obj is not None: filename = obj.filename if self.get(category, filename): del category_dict[filename] def _rename_item(self, category, old, new): pold = Path(old) pnew = Path(new) if pold.isabs() or pnew.isabs() or pnew.name != new or pold.name != old: raise ValueError('You must give filename only, not path') new_path = self.path / category / new data = self.get_item(category, old) data.move(new_path) self._remove_item(category, filename=old) self._add_item(category, data) def _project_changed(self): self.notify_listeners(('project_changed', self)) self._save_manifest()
[docs] def valid_item_name(self, category, name): if not name: return ErrorInvalidItemName(self, category, name) data = self.get_item(category, name) if data: return ErrorItemExistsInProject(self, category, name) path = self.path / category / name if data is None and path.exists(): return Warning("Data yet exists on disk. Just add it.")
[docs] def add_item(self, category, obj=None, **kwargs): data = self._add_item(category, obj, **kwargs) self.notify_listeners(('data_added', (self, category, data))) self._project_changed() return data
[docs] def remove_item(self, category, obj=None, **kwargs): if obj: filename = obj.filename elif 'filename' in kwargs: filename = kwargs['filename'] self._remove_item(category, obj=obj, **kwargs) self.notify_listeners(('data_removed', (self, category, filename))) self._project_changed()
[docs] def rename_item(self, category, old, new): if old == new: return self._rename_item(category, old, new) self.notify_listeners(('data_renamed', (self, category, old, new))) self._project_changed()
[docs] def delete(self): raise NotImplementedError
[docs] def rename(self, new): dest = self.path.parent / new self.move(dest)
[docs] def move(self, dest): src = self.path dest = Path(dest).abspath() if src == dest: return if src.exists(): # Update item paths for category in self.categories: for item in self.items(category).values(): if hasattr(item, 'path'): item.path = dest / category / item.path.name # Move all files src.move(dest) self._path = dest self.notify_listeners(('project_moved', (self, src, dest))) self._project_changed()
[docs] def items(self, category, **kwds): return getattr(self, category)
[docs] def get_item(self, category, filename): files = getattr(self, category) return files.get(filename)
[docs] def get_model(self, filename): model = self.get_item('model', filename) if model is not None: return model else: found_models = [] for modelname in self.model: if filename == Path(modelname).namebase: found_models.append(self.get_item('model', modelname)) nmodels = len(found_models) if nmodels == 0: return None elif nmodels == 1: return found_models[0] else: dic = dict( NUM=nmodels, BASENAME=str(Path(modelname).namebase), LST=', '.join([repr(str(_model.filename)) for _model in found_models]) ) raise ValueError('%(NUM)d model have basename %(BASENAME)r: %(LST)s' % dic)
[docs] def get_runnable_model(self, name): data = self.get_model(name) if data: model = to_model(data) if model: return copy.copy(model)
def _load(self): """ *Partially load* a project from a manifest file. 1. Read manifest file (oaproject.cfg). 2. Load metadata inside project from manifest. 3. Create Data objects for each file inside project. .. warning:: **Doesn't** load data content ! If you want to load data, please use :meth:`Data.read<openalea.oalab.model.model.Data.read>`. """ from .serialization import ProjectLoader loader = ProjectLoader() loader.update(self, self.path, mode='lazy') def _save_manifest(self): from .serialization import ProjectSaver saver = ProjectSaver() saver.save(self, self.path, config_filename=self.config_filename, mode='metadata')
[docs] def save_metadata(self): self._save_manifest()
def _load_controls(self): from openalea.core.control.serialization import ControlLoader from openalea.core.service.control import register_control control_path = self.path / 'control.py' loader = ControlLoader() controls = loader.load(control_path) for control in controls: register_control(control)
[docs] def save(self): """ Save a manifest file on disk. It name is defined by config_filename. It contains **list of files** that are inside project (*manifest*) and **metadata** (author, version, ...). """ from .serialization import ProjectSaver saver = ProjectSaver() saver.save(self, self.path, config_filename=self.config_filename) self.notify_listeners(('project_saved', self))