# -*- python -*-
#
# OpenAlea.Core
#
# Copyright 2006-2009 INRIA - CIRAD - INRA
#
# File author(s): Jerome Chopard <jerome.chopard@sophia.inria.fr>
# Fred Theveny <frederic.theveny@cirad.fr>
#
# Distributed under the Cecill-C License.
# See accompanying file LICENSE.txt or copy at
# http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
# OpenAlea WebSite: http://openalea.gforge.inria.fr
#
###############################################################################
"""This module provide an implementation of a dataflow"""
__license__ = "Cecill-C"
__revision__ = " $Id$ "
from openalea.core.graph.property_graph import PropertyGraph, InvalidVertex
from openalea.core.graph.property_graph import InvalidEdge
from openalea.core.graph.id_generator import IdGenerator
from collections import deque
[docs]class PortError (Exception):
pass
[docs]class Port (object):
"""
simple structure to maintain some port property
a port is an entry point to a vertex
"""
def __init__(self, vid, local_pid, is_out_port):
#internal data to access from dataflow
self._vid = vid
self._local_pid = local_pid
self._is_out_port = is_out_port
[docs]class DataFlow(PropertyGraph):
"""
Directed graph with connections between in_ports
of vertices and out_port of vertices
ports are typed
"""
def __init__(self):
PropertyGraph.__init__(self)
self._ports = {}
self._pid_generator = IdGenerator()
self.add_edge_property("_source_port")
self.add_edge_property("_target_port")
self.add_vertex_property("_ports")
self.add_vertex_property("_actor")
####################################################
#
# edge port view
#
####################################################
[docs] def source_port(self, eid):
"""
out port of the source vertex
of the edge
:param eid: todo
:rtype: pid
"""
return self.edge_property("_source_port")[eid]
[docs] def target_port(self, eid):
"""
in port of the target vertex
of the edge
:param eid: todo
:rtype: pid
"""
return self.edge_property("_target_port")[eid]
####################################################
#
# vertex port view
#
####################################################
[docs] def out_ports(self, vid=None):
"""
iter on all out ports of a given vertex
iter on all out ports of the dataflow
if vid is None
:param vid: todo
:rtype: iter of pid
"""
for pid in self.ports(vid):
if self.is_out_port(pid):
yield pid
[docs] def in_ports(self, vid=None):
"""
iter on all in ports of a given vertex
iter on all in ports of the dataflow
if vid is None
:rtype: iter of pid
"""
for pid in self.ports(vid):
if self.is_in_port(pid):
yield pid
[docs] def ports(self, vid=None):
"""
iter on all ports of a given vertex
iter on all ports of the dataflow
if vid is None
:rtype: iter of pid
"""
if vid is None:
return iter(self._ports)
else:
return iter(self.vertex_property("_ports")[vid])
####################################################
#
# port view
#
####################################################
[docs] def is_in_port(self, pid):
"""
test whether port refered by pid
is an in port of its vertex
:rtype: bool
"""
return not self._ports[pid]._is_out_port
[docs] def is_out_port(self, pid):
"""
test whether port refered by pid
is an out port of its vertex
:rtype: bool
"""
return self._ports[pid]._is_out_port
[docs] def vertex(self, pid):
"""
return the id of the vertex which own the port
:rtype: vid
"""
return self._ports[pid]._vid
[docs] def connected_ports(self, pid):
"""
iterate on all ports connected
to this port
:rtype: iter of pid
"""
if self.is_out_port(pid):
for eid in self.connected_edges(pid):
yield self.target_port(eid)
else:
for eid in self.connected_edges(pid):
yield self.source_port(eid)
[docs] def connected_edges(self, pid):
"""
iterate on all edges connected
to this port
:rtype: iter of eid
"""
vid = self.vertex(pid)
if self.is_out_port(pid):
for eid in self.out_edges(vid):
if self.source_port(eid)==pid:
yield eid
else:
for eid in self.in_edges(vid):
if self.target_port(eid)==pid:
yield eid
[docs] def nb_connections(self, pid):
""" Compute number of edges connected to a given port.
args:
- pid (pid): id of port
return:
- int
"""
return len(tuple(self.connected_edges(pid)))
####################################################
#
# local port concept
#
####################################################
[docs] def port(self, pid):
"""
port object specified by its global pid
"""
try:
return self._ports[pid]
except KeyError:
raise PortError("port %s don't exist" % str(pid))
[docs] def local_id(self, pid):
"""
local port identifier of a given port
specified by its global pid
"""
try:
return self._ports[pid]._local_pid
except KeyError:
raise PortError("port %s don't exist" % str(pid))
[docs] def out_port(self, vid, local_pid):
"""
global port id of a given port
:rtype: pid
"""
for pid in self.out_ports(vid):
if self._ports[pid]._local_pid == local_pid:
return pid
raise PortError("Local pid '%s' does not exist" % str(local_pid))
[docs] def in_port(self, vid, local_pid):
"""
global port id of a given port
:rtype: pid
"""
for pid in self.in_ports(vid):
if self._ports[pid]._local_pid == local_pid:
return pid
raise PortError("local pid '%s' does not exist for vertex %d" % (str(local_pid),vid) )
#####################################################
#
# associated actor
#
#####################################################
[docs] def set_actor(self, vid, actor):
"""
associate an actor to a given vertex
"""
try : actor.set_id(vid)
except Exception, e: print e
self.vertex_property("_actor")[vid] = actor
[docs] def actor(self, vid):
"""
return actor associated to a given vertex
"""
return self.vertex_property("_actor")[vid]
[docs] def add_actor(self, actor, vid=None):
"""
create a vertex and the corresponding ports
and associate it with the given actor
return: vid
"""
vid = self.add_vertex(vid)
for key, interface in actor.inputs():
self.add_in_port(vid, key)
for key, interface in actor.outputs():
self.add_out_port(vid, key)
self.set_actor(vid, actor)
return vid
#####################################################
#
# mutable concept
#
#####################################################
[docs] def add_in_port(self, vid, local_pid, pid=None):
"""
add a new in port to vertex pid using local_pid
use pid as global port id if specified or
create a new one if None
raise an error if pid is already used
:returns: pid used
:rtype: pid
"""
pid = self._pid_generator.get_id(pid)
self._ports[pid] = Port(vid, local_pid, False)
self.vertex_property("_ports")[vid].add(pid)
return pid
[docs] def add_out_port(self, vid, local_pid, pid=None):
"""
add a new out port to vertex pid using local_pid
use pid as global port id if specified or
create a new one if None
raise an error if pid is already used
:returns: pid used
:rtype: pid
"""
pid = self._pid_generator.get_id(pid)
self._ports[pid] = Port(vid, local_pid, True)
self.vertex_property("_ports")[vid].add(pid)
return pid
[docs] def remove_port(self, pid):
"""
remove the specified port
and all connections to this port
"""
for eid in list(self.connected_edges(pid)):
self.remove_edge(eid)
self.vertex_property("_ports")[self.vertex(pid)].remove(pid)
self._pid_generator.release_id(pid)
del self._ports[pid]
[docs] def connect(self, source_pid, target_pid, eid=None):
"""
connect the out port source_pid with
the in_port target_pid
use eid if not None or create a new one
raise an error if eid is already used
:returns: eid used
:rtype: eid
"""
if not self.is_out_port(source_pid):
raise PortError("source_pid %s is not an output port" % \
str(source_pid))
if not self.is_in_port(target_pid):
raise PortError("target_pid %s is not an input port" % \
str(target_pid))
eid = self.add_edge((self.vertex(source_pid), \
self.vertex(target_pid)), eid)
self.edge_property("_source_port")[eid] = source_pid
self.edge_property("_target_port")[eid] = target_pid
return eid
[docs] def add_vertex(self, vid=None):
"""todo"""
vid = PropertyGraph.add_vertex(self, vid)
self.vertex_property("_ports")[vid] = set()
return vid
add_vertex.__doc__ = PropertyGraph.add_vertex.__doc__
[docs] def remove_vertex(self, vid):
"""todo"""
for pid in list(self.ports(vid)):
try:
self.remove_port(pid)
except:
pass
PropertyGraph.remove_vertex(self, vid)
remove_vertex.__doc__ = PropertyGraph.remove_vertex.__doc__
[docs] def clear(self):
"""todo"""
self._ports.clear()
self._pid_generator = IdGenerator()
PropertyGraph.clear(self)
clear.__doc__ = PropertyGraph.clear.__doc__
[docs] def get_all_parent_nodes(self, vid):
""" Return an iterator of vextex id corresponding to all the
parent node of vid"""
input_vid = vid
scan_list = deque([vid])
processed = set()
while(scan_list):
vid = scan_list.popleft()
#process_list.appendleft(vid)
if(input_vid != vid):
yield vid
processed.add(vid)
actor = self.actor(vid)
# For each inputs
for pid in self.in_ports(vid):
# For each connected node
for npid in self.connected_ports(pid):
nvid = self.vertex(npid)
if(not nvid in processed):
scan_list.append(nvid)
[docs]class SubDataflow(object):
""" Represents a part of a dataflow for a partial evaluation
A SubDataflow is a callable and absracts a part of a dataflow as a funtion
"""
def __init__(self, dataflow, algo, node_id, port_index):
""" Constructor
:param dataflow: todo
:param algo: algorithm for evaluation.
:param node_id: todo
:param port_index: output port index in node_id
"""
self.dataflow = dataflow
self.algo = algo
self.node_id = node_id
self.port_index = port_index
def __call__(self, *args):
""" Consider the Subdataflow as a function """
if(not self.dataflow):
return args[0]
# Identity function
#if(len(args)==1): return args[0]
#else: return args
self.algo.eval(self.node_id, list(args),is_subdataflow=True )
ret = self.dataflow.actor(self.node_id).get_output(self.port_index)
return ret