Source code for core.data

# -*- python -*-
# -*- coding: utf8 -*-
#
#       OpenAlea.Core
#
#       Copyright 2006-2014 INRIA - CIRAD - INRA
#
#       File author(s): Samuel Dufour-Kowalski <samuel.dufour@sophia.inria.fr>
#                       Julien Coste <julien.coste@inria.fr>
#                       Guillaume Baty <guillaume.baty@inria.fr>
#
#       Distributed under the Cecill-C License.
#       See accompanying file LICENSE.txt or copy at
#           http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
#       OpenAlea WebSite : http://openalea.gforge.inria.fr
#
###############################################################################


"""Data management classes"""

__license__ = "Cecill-C"
__revision__ = " $Id$ "

from openalea.core.node import AbstractFactory, Node, NodeFactory
from openalea.core.interface import IData
from openalea.core.path import path as Path

import os
import string


[docs]class PackageData(object): """ String representing a package data """ # if __local__ is True, then PackageData point to a # local data (i.e in the currrent directory) __local__ = False def __init__(self, pkg_name, filename, package=None): """ pkg_name : package name name : data name package : Package object """ self.pkg_name = pkg_name self.name = filename if(not package): from openalea.core.pkgmanager import PackageManager path = PackageManager()[self.pkg_name].path else: path = package.path if(PackageData.__local__): self.repr = self.name else: self.repr = os.path.join(path, self.name) def __repr__(self): return "PackageData(%s, %s)" % (self.pkg_name, self.name) def __str__(self): return self.repr
[docs]class DataFactory(AbstractFactory): """ Data representation as factory """ #mimetype = "openalea/datafactory" def __init__(self, name, description='', editors=None, includes=None, **kargs): """ name : filename description : file description editors : dictionnary listing external command to execute includes : List of data files that are included in the file. """ AbstractFactory.__init__(self, name, description, category='data', **kargs) self.pkgdata_cache = None self.editors = editors self.includes = includes
[docs] def is_data(self): return True
[docs] def is_valid(self): """ Return True if the factory has associated data. Else raise an exception """ if(not os.path.exists(str(self.get_pkg_data()))): raise Exception("%s does'nt exists. Ignoring" % (str(self.get_pkg_data())))
[docs] def get_pkg_data(self): """ Return the associated PackageData object """ if(not self.pkgdata_cache): self.pkgdata_cache = \ PackageData(self.package.name, self.name, self.package) return self.pkgdata_cache
[docs] def instantiate(self, call_stack=[]): """ Return a node instance :param call_stack: the list of NodeFactory id already in call stack (in order to avoid infinite recursion) """ node = DataNode(self.get_pkg_data(), self.editors, self.includes) node.factory = self return node
[docs] def instantiate_widget(self, node=None, parent=None, edit=False, autonomous=False): """ Return the corresponding widget initialized with node """ if(node): editors = node.get_input(1) else: editors = self.editors # single command if(editors and isinstance(editors, str)): editors = {'edit': editors} # multiple command # Add systematically a text editor. if(not isinstance(editors, dict)): editors = {} # Add a text editor _edit = [x for x in editors if x.lower() == 'edit'] if not _edit: editors['edit'] = None from openalea.visualea.code_editor import EditorSelector return EditorSelector(parent, editors, (self.get_pkg_data(), ))
[docs] def get_writer(self): """ Return the writer class """ return PyDataFactoryWriter(self)
[docs] def clean_files(self): """ Remove files depending of factory """ os.remove(str(self.get_pkg_data()))
[docs]class DataNode(Node): """ Node representing a Data """ __color__ = (200, 200, 200) def __init__(self, packagedata, editors=None, includes=[]): """ @packagedata : A file contained in a defined package. @editors : A dictionary of commands which act on the file. @includes : Other files that are included in the data file. """ Node.__init__(self, inputs=(dict(name='data', interface=IData, value=packagedata), dict(name='editors', interface=None, value=editors), dict(name='includes', interface=None, value=includes)), outputs=(dict(name='data', interface=IData), ), ) self.caption = '%s' % (packagedata.name) self.watch = None self.monitor_file(str(packagedata)) if not includes: self.set_port_hidden(2, True) def __del__(self): del(self.watch) def __call__(self, args): return str(args[0]),
[docs] def monitor_file(self, filename): """ Enable file monitoring """ try: # TODO : Try to remove qt dependencie here from openalea.vpltk.qt import QtCore self.watch = QtCore.QFileSystemWatcher() QtCore.QCoreApplication.instance().connect( self.watch, QtCore.SIGNAL("fileChanged(const QString&)"), self.changed) self.watch.addPath(filename) except: print "File monitoring is not available"
[docs] def changed(self, path): """ Call listeners """ self.continuous_eval.notify_listeners(("node_modified", ))
[docs] def to_script(self): return "\n"
[docs]class PyDataFactoryWriter(object): """ DataFactory python Writer """ datafactory_template = """ $NAME = DataFactory(name=$PNAME, description=$DESCRIPTION, editors=$EDITORS, includes=$INCLUDES, ) """ def __init__(self, factory): self.factory = factory def __repr__(self): """ Return the python string representation """ f = self.factory fstr = string.Template(self.datafactory_template) result = fstr.safe_substitute(NAME=f.get_python_name(), PNAME=repr(f.name), DESCRIPTION=repr(f.description), EDITORS=repr(f.editors), INCLUDES=repr(f.includes), ) return result
[docs]class Data(object): mimetype = None default_name = 'Data' default_file_name = "filename.ext" pattern = "*.ext" extension = "ext" icon = "Crystal_Clear_app_kcmdf.png" def __init__(self, **kwargs): """ Classical use : *path* exists. Nothing is loaded in memory. Use :meth:`~Data.read` to get content """ # TODO: document args self.path = Path(kwargs.pop('path')) if 'path' in kwargs else None self._filename = Path(kwargs.pop('filename')).name if 'filename' in kwargs else None if self._filename is None and self.path is None: raise ValueError('path or filename required') if self._filename and self.path and self.path.name != self._filename: raise ValueError("path '%s' and filename '%s' are not compatible" % (self.path, self._filename)) self.dtype = kwargs.pop('dtype', None) self._content = kwargs.pop('content', None) self.mimetype = kwargs.pop('mimetype', None)
[docs] def get_documentation(self): return "No documentation for %s" % self.filename
[docs] def is_same_data(self, other): if self.exists() and other.exists(): return self.path == other.path elif not self.exists() and not other.exists(): return self._content == other._content else: return False
[docs] def save(self): if self.path is None: raise ValueError('You must specify a path to be able to save data') if self._content is not None: with open(self.path, 'wb') as f: f.write(self._content) self._content = None
[docs] def read(self): if self.exists(): with open(self.path, 'rb') as f: return f.read() else: return self._content
[docs] def rename(self, new): pnew = Path(new) if pnew.isabs() or pnew.name != new: raise ValueError('You must give filename only, not path') new_path = self.path.parent / new self.move(new_path)
[docs] def move(self, new_path): new_path = Path(new_path) if self._filename is not None: self._filename = new_path.name if self.path.isfile(): self.path.move(new_path) self.path = new_path
[docs] def exists(self): if self.path: return self.path.exists() else: return False
@property def filename(self): if self._filename is None: return self.path.name else: return self._filename @property
[docs] def name(self): return self.filename
@filename.setter
[docs] def filename(self, value): self._filename = value
def _set_content(self, content): self._content = content def _get_content(self): if self._content is None: return self.read() else: return self._content content = property(fget=_get_content, fset=_set_content) code = property()
[docs]class PythonFile(Data): mimetype = 'text/x-python' default_name = 'Python' default_file_name = "script.py" pattern = "*.py" extension = "py" icon = ":/images/resources/Python-logo.png"
[docs] def get_documentation(self): from openalea.oalab.model.parse import get_docstring return get_docstring(self.read())