Source code for core.service.data

from openalea.core.path import path as Path
from openalea.core.service.plugin import plugins
from openalea.core.data import Data

import mimetypes

__all__ = ["DataFactory", "DataClass", "MimeType", "DataType"]

REGISTERY_MIME_CLASS = {}
for pl in plugins('openalea.core', criteria=dict(implement='IData')):
    REGISTERY_MIME_CLASS[pl.mimetype] = pl
# for ModelClass in iter_plugins('oalab.model'):
#     REGISTERY_MIME_CLASS[ModelClass.mimetype] = ModelClass

REGISTERY_NAME_MIME = {}
for pl in plugins('openalea.core', criteria=dict(implement='IData')):
    REGISTERY_NAME_MIME[pl.default_name.lower()] = pl.mimetype
    REGISTERY_NAME_MIME[pl.extension.lower()] = pl.mimetype

# for ModelClass in iter_plugins('oalab.model'):
#     REGISTERY_NAME_MIME[ModelClass.default_name.lower()] = ModelClass.mimetype
#     REGISTERY_NAME_MIME[ModelClass.extension.lower()] = ModelClass.mimetype


def MimeType(path=None, name=None):
[docs] """ Return mimetype for path. First, try to find extension in registery filled by models. If datatype is not found, use builtin module "mimetypes". If it cannot guess, returns False. Search in module allows to specify """ if path: name = Path(path).ext[1:].lower() if name in REGISTERY_NAME_MIME: return REGISTERY_NAME_MIME[name] else: mtype, encoding = mimetypes.guess_type(path) return mtype else: name = name.lower() if name in REGISTERY_NAME_MIME: return REGISTERY_NAME_MIME[name] else: return False def DataType(path=None, name=None, mimetype=None):
[docs] if path: name = Path(path).ext[1:].lower() return name elif name: return Path(name).ext[1:].lower() elif mimetype: # for ModelClass in iter_plugins('oalab.model'): # if ModelClass.mimetype == mimetype: # return ModelClass.default_name for DataClass in plugins('oalab.dataclass', criteria=dict(implement='IData')): if DataClass.mimetype == mimetype: return DataClass.default_name else: return None def DataClass(dtype=None):
[docs] """ Return class wich match dtype. For example, for 'python' dtype it return PythonModel class. Matching can be extended with plugins. if dtype is None, returns all available DataClasses """ if dtype in REGISTERY_MIME_CLASS: return REGISTERY_MIME_CLASS[dtype].implementation else: return Data def arrange_data_args(path, mimetype, dtype):
if mimetype is None: if dtype: return path, MimeType(name=dtype) elif path: return path, MimeType(path=path) else: return path, None else: if dtype: new_mimetype = MimeType(name=dtype) if mimetype != new_mimetype: raise ValueError('dtype %r (%s) and mimetype %r are not compatible' % ( dtype, new_mimetype, mimetype)) return path, mimetype def DataFactory(path, mimetype=None, **kwargs):
[docs] path = Path(path) default_content = kwargs['default_content'] if 'default_content' in kwargs else None dtype = kwargs.pop('dtype', None) if path.isfile(): if default_content is not None: raise ValueError( "got multiple values for content (parameter and '%s')" % path.name) else: path, mimetype = arrange_data_args(path, mimetype, dtype) klass = DataClass(mimetype) return klass(path=path, mimetype=mimetype) elif path.exists(): raise ValueError("'%s' exists but is not a file" % path) elif not path.exists(): if default_content is None: default_content = b'' try: f = path.open('wb') except IOError: content = default_content else: f.write(default_content) f.close() content = None path, mimetype = arrange_data_args(path, mimetype, dtype) klass = DataClass(mimetype) return klass(path=path, mimetype=mimetype, content=content) def to_data(model, mimetype):
# TODO: check filename/filepath/path argument # TODO: must be extended using plugins klass = DataClass(DataType(mimetype=mimetype)) kwds = {} if 'filename' not in kwds: kwds['filename'] = model.name + '.' + klass.extension kwds['dtype'] = model.dtype kwds['mimetype'] = klass.mimetype data = klass(**kwds) data.content = model.repr_code()