# -*- python -*-
#
# OpenAlea.Core
#
# Copyright 2006-2009 INRIA - CIRAD - INRA
#
# File author(): Jerome Chopard <jerome.chopard@sophia.inria.fr>
# Samuel Dufour-Kowalski <samuel.dufour@sophia.inria.fr>
#
# Distributed under the Cecill-C License.
# See accompanying file LICENSE.txt or copy at
# http://www.cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
# OpenAlea WebSite: http://openalea.gforge.inria.fr
#
###############################################################################
"""This module provide an algorithm to evaluate a dataflow"""
__license__ = "Cecill-C"
__revision__ = " $Id$ "
import sys
from time import clock
import traceback as tb
from openalea.core import ScriptLibrary
from openalea.core.dataflow import SubDataflow
from openalea.core.interface import IFunction
PROVENANCE = False
# Implement provenance in OpenAlea
db_conn = None
import sqlite3
from openalea.core.path import path
from openalea.core import settings
[docs]def db_create(cursor):
cur = cursor
#-prospective provenance-#
#User table creation
cur.execute("CREATE TABLE IF NOT EXISTS User (userid INTEGER,createtime DATETIME,name varchar (25), firstname varchar (25), email varchar (25), password varchar (25),PRIMARY KEY(userid))")
# CompositeNode table creation
cur.execute("CREATE TABLE IF NOT EXISTS CompositeNode (CompositeNodeid INTEGER, creatime DATETIME, name varchar (25), description varchar (25),userid INTEGER,PRIMARY KEY(CompositeNodeid),FOREIGN KEY(userid) references User)")
#Cr?ation de la table Node
cur.execute("CREATE TABLE IF NOT EXISTS Node (Nodeid INTEGER, createtime DATETIME, name varchar (25), NodeFactory varchar (25),CompositeNodeid INTEGER,PRIMARY KEY(Nodeid),FOREIGN KEY(CompositeNodeid) references CompsiteNode)")
#Cr?ation de la table Input
cur.execute("CREATE TABLE IF NOT EXISTS Input (Inputid INTEGER, createtime DATETIME, name varchar (25), typedata varchar (25), InputPort INTEGER,PRIMARY KEY (Inputid))")
#Cr?ation de la table Output
cur.execute("CREATE TABLE IF NOT EXISTS Output (Outputid INTEGER, createtime DATETIME, name varchar (25), typedata varchar (25), OutputPort INTEGER,PRIMARY KEY (Outputid))")
#Cr?ation de la table elt_connection
cur.execute("CREATE TABLE IF NOT EXISTS elt_connection (elt_connectionid INTEGER, createtime DATETIME,srcNodeid INTEGER, srcNodeOutputPortid INTEGER, targetNodeid INTEGER, targetNodeInputPortid INTEGER ,PRIMARY KEY (elt_connectionid))")
#- retrospective provenance -#
#- CompositeNodeExec table creation
cur.execute("CREATE TABLE IF NOT EXISTS CompositeNodeExec (CompositeNodeExecid INTEGER, createtime DATETIME, endtime DATETIME,userid INTEGER,CompositeNodeid INTEGER,PRIMARY KEY(CompositeNodeExecid),FOREIGN KEY(CompositeNodeid) references CompositeNode,FOREIGN KEY(userid) references User)")
#- NodeExec
cur.execute("CREATE TABLE IF NOT EXISTS NodeExec (NodeExecid INTEGER, createtime DATETIME, endtime DATETIME,Nodeid INTEGER,CompositeNodeExecid INTEGER,dataid INTEGER,PRIMARY KEY(NodeExecid),FOREIGN KEY(Nodeid) references Node, FOREIGN KEY (CompositeNodeExecid) references CompositeNodeExec, FOREIGN KEY (dataid) references Data)")
#- History
cur.execute("CREATE TABLE IF NOT EXISTS Histoire (Histoireid INTEGER, createtime DATETIME, name varchar (25), description varchar (25),userid INTEGER,CompositeNodeExecid INTEGER,PRIMARY KEY (Histoireid), FOREIGN KEY(Userid) references User, FOREIGN KEY(CompositeNodeExecid) references CompositeNodeExec)")
#- Data
cur.execute("CREATE TABLE IF NOT EXISTS Data (dataid INTEGER, createtime DATETIME,NodeExecid INTEGER, PRIMARY KEY(dataid),FOREIGN KEY(NodeExecid) references NodeExec)")
#- Tag
cur.execute("CREATE TABLE IF NOT EXISTS Tag (CompositeNodeExecid INTEGER, createtime DATETIME, name varchar(25),userid INTEGER,PRIMARY KEY(CompositeNodeExecid),FOREIGN KEY(userid) references User)")
return cur
[docs]def get_database_name():
db_fn = path(settings.get_openalea_home_dir())/'provenance.sq3'
return db_fn
[docs]def db_connexion():
""" Return a curso on the database.
If the database does not exists, create it.
"""
global db_conn
if db_conn is None:
db_fn = get_database_name()
if not db_fn.exists():
db_conn=sqlite3.connect(db_fn)
cur = db_conn.cursor()
cur = db_create(cur)
return cur
else:
cur = db_conn.cursor()
return cur
[docs]class Provenance(object):
def __init__(self, workflow):
self.clear()
self.workflow = workflow
[docs] def edges(self):
cn = self.workflow
edges= list(cn.edges())
sources=map(cn.source,edges)
targets = map(cn.target,edges)
source_ports=[cn.local_id(cn.source_port(eid)) for eid in edges]
target_ports=[cn.local_id(cn.target_port(eid)) for eid in edges]
_edges = dict(zip(edges,zip(sources,source_ports,targets, target_ports)))
return _edges
[docs] def clear(self):
self.nodes = []
[docs] def start_time(self):
pass
[docs] def end_time(self):
pass
[docs] def workflow_exec(self, *args):
pass
[docs] def node_exec(self, vid, node, start_time, end_time, *args):
pass
[docs] def write(self):
""" Write the provenance in db """
[docs]class PrintProvenance(Provenance):
[docs] def workflow_exec(self, *args):
print 'Workflow execution ', self.workflow.factory.name
[docs] def node_exec(self, vid, node, start_time, end_time, *args):
provenance(vid, node, start_time, end_time)
[docs]def provenance(vid, node, start_time, end_time):
#from service import db
#conn = db.connect()
if PROVENANCE:
cur = db_connexion()
pname = node.factory.package.name
name = node.factory.name
print "Provenance Process:"
print "instance ID ", vid, "Package Name: ",pname, "Name: ", name
print "start time :", start_time, "end_time: ", end_time, "duration : ", end_time-start_time
print 'Inputs : ', node.inputs
print 'outputs : ', node.outputs
# print the evaluation time
# This variable has to be retrieve by the settings
quantify = False
__evaluators__ = []
[docs]class EvaluationException(Exception):
def __init__(self, vid, node, exception, exc_info):
Exception.__init__(self)
self.vid = vid
self.node = node
self.exception = exception
self.exc_info = exc_info
# Sort functions
# order function sort by priority
[docs]def cmp_priority(x, y):
"""todo"""
(xvid, xactor) = x
(yvid, yactor) = y
px = xactor.internal_data.get('priority', 0)
py = yactor.internal_data.get('priority', 0)
# reverse order
return cmp(py, px)
# order function to sort by pos x
[docs]def cmp_posx(x, y):
"""todo"""
(xpid, xvid, xactor) = x
(ypid, yvid, yactor) = y
#px = xactor.internal_data.get('posx', 0)
#py = yactor.internal_data.get('posx', 0)
px = xactor.get_ad_hoc_dict().get_metadata('position')[0]
py = yactor.get_ad_hoc_dict().get_metadata('position')[0]
ret = cmp(px, py)
if (not ret):
ret = cmp(xpid, ypid)
# reverse order
return ret
# Evaluation Algoithm
""" Abstract evaluation algorithm """
[docs]class AbstractEvaluation(object):
def __init__(self, dataflow):
"""
:param dataflow: to be done
"""
self._dataflow = dataflow
if PROVENANCE:
self.provenance = PrintProvenance(dataflow)
[docs] def eval(self, *args):
"""todo"""
raise NotImplementedError()
[docs] def is_stopped(self, vid, actor):
""" Return True if evaluation must be stop at this vertex. """
return actor.block
[docs] def eval_vertex_code(self, vid):
"""
Evaluate the vertex vid.
Can raise an exception if evaluation failed.
"""
node = self._dataflow.actor(vid)
try:
t0 = clock()
ret = node.eval()
t1 = clock()
if PROVENANCE:
self.provenance.node_exec(vid, node, t0,t1)
#provenance(vid, node, t0,t1)
# When an exception is raised, a flag is set.
# So we remove it when evaluation is ok.
node.raise_exception = False
# if hasattr(node, 'raise_exception'):
# del node.raise_exception
node.notify_listeners(('data_modified', None, None))
return ret
except EvaluationException, e:
e.vid = vid
e.node = node
# When an exception is raised, a flag is set.
node.raise_exception = True
node.notify_listeners(('data_modified', None, None))
raise e
except Exception, e:
# When an exception is raised, a flag is set.
node.raise_exception = True
node.notify_listeners(('data_modified', None, None))
raise EvaluationException(vid, node, e, \
tb.format_tb(sys.exc_info()[2]))
[docs] def get_parent_nodes(self, pid):
"""
Return the list of parent node connected to pid
The list contains tuples (port_pid, node_pid, actor)
This list is sorted by the x value of the node
"""
df = self._dataflow
# For each connected node
npids = [(npid, df.vertex(npid), df.actor(df.vertex(npid))) \
for npid in df.connected_ports(pid)]
npids.sort(cmp=cmp_posx)
return npids
[docs] def set_provenance(self, provenance):
self.provenance = provenance
[docs]class BrutEvaluation(AbstractEvaluation):
""" Basic evaluation algorithm """
__evaluators__.append("BrutEvaluation")
def __init__(self, dataflow):
AbstractEvaluation.__init__(self, dataflow)
# a property to specify if the node has already been evaluated
self._evaluated = set()
[docs] def is_stopped(self, vid, actor):
""" Return True if evaluation must be stop at this vertex """
if vid in self._evaluated:
return True
try:
if actor.block:
status = True
n = actor.get_nb_output()
outputs = [i for i in range(n) if actor.get_output(i) is not None ]
if not outputs:
status = False
return status
except:
pass
return False
[docs] def eval_vertex(self, vid, *args):
""" Evaluate the vertex vid """
df = self._dataflow
actor = df.actor(vid)
self._evaluated.add(vid)
# For each inputs
for pid in df.in_ports(vid):
inputs = []
cpt = 0
# For each connected node
for npid, nvid, nactor in self.get_parent_nodes(pid):
if not self.is_stopped(nvid, nactor):
self.eval_vertex(nvid)
inputs.append(nactor.get_output(df.local_id(npid)))
cpt += 1
# set input as a list or a simple value
if (cpt == 1):
inputs = inputs[0]
if (cpt > 0):
actor.set_input(df.local_id(pid), inputs)
# Eval the node
self.eval_vertex_code(vid)
[docs] def eval(self, *args):
""" Evaluate the whole dataflow starting from leaves"""
t0 = clock()
df = self._dataflow
# Unvalidate all the nodes
self._evaluated.clear()
# Eval from the leaf
for vid in (vid for vid in df.vertices() if df.nb_out_edges(vid)==0):
self.eval_vertex(vid)
t1 = clock()
if quantify:
print "Evaluation time: %s"%(t1-t0)
[docs]class PriorityEvaluation(BrutEvaluation):
""" Support priority between nodes and selective"""
__evaluators__.append("PriorityEvaluation")
[docs] def eval(self, vtx_id=None, *args, **kwds):
"""todo"""
t0 = clock()
is_subdataflow = False if not kwds else kwds.get('is_subdataflow', False)
df = self._dataflow
# Unvalidate all the nodes
if is_subdataflow:
self._evaluated -= self._resolution_node
else:
self._evaluated.clear()
if (vtx_id is not None):
return self.eval_vertex(vtx_id, *args)
# Select the leaves (list of (vid, actor))
leaves = [(vid, df.actor(vid))
for vid in df.vertices() if df.nb_out_edges(vid)==0]
leaves.sort(cmp_priority)
# Excecute
for vid, actor in leaves:
self.eval_vertex(vid, *args)
t1 = clock()
if quantify:
print "Evaluation time: %s"%(t1-t0)
[docs]class GeneratorEvaluation(AbstractEvaluation):
""" Evaluation algorithm with generator / priority and selection"""
__evaluators__.append("GeneratorEvaluation")
def __init__(self, dataflow):
AbstractEvaluation.__init__(self, dataflow)
# a property to specify if the node has already been evaluated
self._evaluated = set()
self.reeval = False # Flag to force reevaluation (for generator)
[docs] def is_stopped(self, vid, actor):
""" Return True if evaluation must be stop at this vertex """
stopped = False
try:
stopped = actor.block or vid in self._evaluated
except:
pass
return stopped
[docs] def clear(self):
""" Clear evaluation variable """
self._evaluated.clear()
self.reeval = False
[docs] def eval_vertex(self, vid):
""" Evaluate the vertex vid """
df = self._dataflow
actor = df.actor(vid)
self._evaluated.add(vid)
# For each inputs
for pid in df.in_ports(vid):
inputs = []
cpt = 0
# For each connected node
for npid, nvid, nactor in self.get_parent_nodes(pid):
# Do no reevaluate the same node
if not self.is_stopped(nvid, nactor):
self.eval_vertex(nvid)
inputs.append(nactor.get_output(df.local_id(npid)))
cpt += 1
# set input as a list or a simple value
if (cpt == 1):
inputs = inputs[0]
if (cpt > 0):
actor.set_input(df.local_id(pid), inputs)
# Eval the node
ret = self.eval_vertex_code(vid)
# Reevaluation flag
if (ret):
self.reeval = ret
[docs] def eval(self, vtx_id=None, step=False):
t0 = clock()
df = self._dataflow
if (vtx_id is not None):
leafs = [(vtx_id, df.actor(vtx_id))]
else:
# Select the leafs (list of (vid, actor))
leafs = [(vid, df.actor(vid))
for vid in df.vertices() if df.nb_out_edges(vid)==0]
leafs.sort(cmp_priority)
# Execute
for vid, actor in leafs:
if not self.is_stopped(vid, actor):
self.reeval = True
while(self.reeval):
self.clear()
self.eval_vertex(vid)
t1 = clock()
if quantify:
print "Evaluation time: %s"%(t1-t0)
return False
[docs]class LambdaEvaluation(PriorityEvaluation):
""" Evaluation algorithm with support of lambda / priority and selection"""
__evaluators__.append("LambdaEvaluation")
def __init__(self, dataflow):
PriorityEvaluation.__init__(self, dataflow)
self.lambda_value = {} # lambda resolution dictionary
self._resolution_node = set()
[docs] def eval_vertex(self, vid, context, lambda_value, *args):
"""
Evaluate the vertex vid
This function is called both by the user (eval a node and its parents)
and by the SubDataFlow evaluation.
First the graph is traversed by the algorithm in a bottom-up way.
The SubDataflow is stored in the inputs.
:param context: is a list a value to assign to lambdas
"""
df = self._dataflow
actor = df.actor(vid)
# Do not evaluate a node which is blocked
if self.is_stopped(vid, actor):
return
self._evaluated.add(vid)
use_lambda = False
# For each inputs
for pid in df.in_ports(vid):
input_index = df.local_id(pid)
inputs = []
# Get input interface
interface = actor.input_desc[input_index].get('interface', None)
# Determine if the context must be transmitted
# If interface is IFunction it means that the node is a consumer
# We do not propagate the context
if (interface is IFunction):
transmit_cxt = None
transmit_lambda = None
else:
transmit_cxt = context
transmit_lambda = lambda_value
cpt = 0 # parent counter
# For each connected node
for npid, nvid, nactor in self.get_parent_nodes(pid):
# Do no reevaluate the same node
if not self.is_stopped(nvid, nactor):
self.eval_vertex(nvid, transmit_cxt, transmit_lambda)
outval = nactor.get_output(df.local_id(npid))
# Lambda
# We must consider 3 cases
# 1) Lambda detection (receive a SubDataflow and
# interface != IFunction)
#
# 2) Resolution mode (context is not None): we
# replace the lambda with value.
if (isinstance(outval, SubDataflow)
and interface is not IFunction):
if (not context and not lambda_value):
# we are not in resolution mode
use_lambda = True
self._resolution_node.add(vid)
else:
# We set the context value for later use.
# We set resolved values into the dataflow.
# outval is a SubDataFlow. For this object, we have
# now a value.
# E.g. f(x=3). We replace x subdf by 3.
# If x is used elsewhere (f(x,x)), we referenced it
# in a dict.
if (not lambda_value.has_key(outval)):
try:
lambda_value[outval] = context.pop()
except Exception:
raise Exception("The number of lambda variables is insuffisant")
# We replace the value with a context value
outval = lambda_value[outval]
inputs.append(outval)
cpt += 1
# set input as a list or a simple value
if (cpt == 1):
inputs = inputs[0]
if (cpt > 0):
actor.set_input(input_index, inputs)
# Eval the node
if (not use_lambda):
ret = self.eval_vertex_code(vid)
else:
# set the node output with subdataflow
for i in xrange(actor.get_nb_output()):
actor.set_output(i, SubDataflow(df, self, vid, i))
[docs] def eval(self, vtx_id=None, context=None, is_subdataflow=False, step=False):
"""
Eval the dataflow from vtx_id with a particular context
:param vtx_id: vertex id to start the evaluation
:param context: list a value to assign to lambda variables
"""
t0 = clock()
if PROVENANCE and (not is_subdataflow):
self.provenance.workflow_exec()
self.provenance.start_time()
self.lambda_value.clear()
if (context):
# The evaluate, due to the recursion, is done fisrt in last out.
# thus, we have to reverse the arguments to evaluate the function (FIFO).
context.reverse()
PriorityEvaluation.eval(self, vtx_id, context, self.lambda_value, is_subdataflow=is_subdataflow)
self.lambda_value.clear() # do not keep context in memory
if PROVENANCE:
self.provenance.end_time()
t1 = clock()
if quantify:
print "Evaluation time: %s"%(t1-t0)
if not is_subdataflow:
self._resolution_node.clear()
DefaultEvaluation = LambdaEvaluation
#DefaultEvaluation = GeneratorEvaluation
# from collections import deque
# class LambdaEvaluation (PriorityEvaluation):
# """ Evaluation algorithm with support of lambda / priority and selection"""
# def __init__ (self, dataflow):
# PriorityEvaluation.__init__(self, dataflow)
# def scan_graph(self, vid, context):
# """ Return the list of vextex id in the correct process order
# starting from vid
# @param vid: starting vertex id
# @param context: variable context
# """
# df = self._dataflow
# process_list = deque()
# scan_list = deque([(vid, context)])
# while(scan_list):
# (vid, context) = scan_list.popleft()
# process_list.appendleft((vid, context) )
# actor = df.actor(vid)
# # For each inputs
# for pid in df.in_ports(vid):
# # Determine if the context must be transmitted
# # If interface is IFunction it means that the node is a consumer
# # We do not propagate the context
# input_index = df.local_id(pid)
# interface = actor.input_desc[input_index].get('interface', None)
# transmit_cxt = None if (interface is IFunction) else context
# # For each connected node
# for npid in df.connected_ports(pid):
# nvid = df.vertex(npid)
# scan_list.append((nvid, transmit_cxt))
# return process_list
# def eval_vertex (self, vid, context, *args):
# """ Evaluate the graph starting at the vertex vid
# @param vid: starting vertex id
# @param context list of values to assign to variables
# """
# lambda_value = {}
# # Get the node order
# process_list = self.scan_graph(vid, context)
# # Eval each node
# scanned = set()
# for vid, context in process_list:
# if (vid not in scanned):
# self.eval_one_vertex(vid, context, lambda_value)
# scanned.add(vid)
# def get_output_value(self, nvid, nactor, npid):
# """ Return the value of a node output """
# return nactor.get_output(self._dataflow.local_id(npid))
# def eval_one_vertex (self, vid, context, lambda_value):
# """ Evaluate only one vertex
# @param vid: id of vertex to evalaute
# @param context: list of values to assign to variables
# @param lambda_value: dictionary of previous assigned values
# """
# df = self._dataflow
# actor = df.actor(vid)
# use_lambda = False
# # Get inputs
# for pid in df.in_ports(vid):
# inputs = []
# cpt = 0 # parent counter
# # Get input interface
# input_index = df.local_id(pid)
# interface = actor.input_desc[input_index].get('interface', None)
# # For each connected node
# for npid, nvid, nactor in self.get_parent_nodes(pid):
# outval = self.get_output_value(nvid, nactor, npid)
# # Lambda
# # We must consider 2 cases
# # 1) Lambda detection (receive a SubDataflow and interface != IFunction)
# #
# # 2) Resolution mode (context is not None): we
# # replace the lambda with value
# if (isinstance(outval, SubDataflow)
# and interface is not IFunction):
# if (not context and not lambda_value):
# # we are not in resolution mode
# use_lambda = True
# else:
# # We set the context value for later use
# if (not lambda_value.has_key(outval)):
# try:
# lambda_value[outval] = context.pop()
# except Exception,e:
# print e, context, lambda_value
# raise Exception("The number of lambda variables is insuffisant")
# # We replace the value with a context value
# outval = lambda_value[outval]
# inputs.append(outval)
# cpt += 1
# # set input as a list or a simple value
# if (cpt == 1): inputs = inputs[0]
# if (cpt > 0): actor.set_input(input_index, inputs)
# # Eval the node
# if (not use_lambda):
# ret = self.eval_vertex_code(vid)
# else:
# # tranmit a SubDataflow to following node
# for i in xrange(actor.get_nb_output()):
# actor.outputs[i] = SubDataflow(df, self, vid, i)
# def eval (self, vtx_id=None, context=None):
# """
# Eval the dataflow from vtx_id with a particular context
# @param vtx_id: vertex id to start the evaluation
# @param context: list a value to assign to lambda variables
# """
# PriorityEvaluation.eval(self, vtx_id, context)
# from openalea.core.threadmanager import ThreadManager
# import thread
# class ParallelEvaluation(LambdaEvaluation):
# """ Parallel execution of a dataflow """
# def get_output_value(self, nvid, nactor, npid):
# """ Return the value of a node output """
# l = self.locks[nvid]
# l.acquire()
# v = nactor.get_output(self._dataflow.local_id(npid))
# l.release()
# return v
# def eval_one_vertex (self, vid, context, lambda_value):
# """ Evaluate only one vertex
# @param vid: id of vertex to evalaute
# @param context: list of values to assign to variables
# @param lambda_value: dictionary of previous assigned values
# """
# LambdaEvaluation.eval_one_vertex(self, vid, context, lambda_value)
# self.locks[vid].release()
# def eval_vertex (self, vid, context, *args):
# """ Evaluate the graph starting at the vertex vid
# @param vid: starting vertex id
# @param context list of values to assign to variables
# """
# lambda_value = {}
# tm = ThreadManager()
# tm.clear()
# # Get the node order
# process_list = self.scan_graph(vid, context)
# # Synchronisation locks
# self.locks = {}
# for vid, context in process_list:
# if (self.locks.has_key(vid)): continue
# l = thread.allocate_lock()
# l.acquire()
# self.locks[vid] = l
# # Eval each node
# scanned = set()
# for vid, context in process_list:
# if (vid not in scanned):
# tm.add_task( self.eval_one_vertex, (vid, context, lambda_value))
# scanned.add(vid)
# DefaultEvaluation = ParallelEvaluation
# #DefaultEvaluation = LambdaEvaluation
[docs]class ToScriptEvaluation(AbstractEvaluation):
""" Basic transformation into script algorithm """
__evaluators__.append("ToScriptEvaluation")
def __init__(self, dataflow):
AbstractEvaluation.__init__(self, dataflow)
# a property to specify if the node has already been evaluated
self._evaluated = set()
[docs] def is_stopped(self, vid, actor):
""" Return True if evaluation must be stop at this vertex """
return actor.block or vid in self._evaluated
[docs] def eval_vertex(self, vid, *args):
""" Evaluate the vertex vid """
df = self._dataflow
actor = df.actor(vid)
self._evaluated.add(vid)
script = ""
# For each inputs
for pid in df.in_ports(vid):
# For each connected node
for npid, nvid, nactor in self.get_parent_nodes(pid):
if not self.is_stopped(nvid, nactor):
script += self.eval_vertex(nvid)
# Eval the node
script += actor.to_script()
return script
[docs] def eval(self, *args, **kwds):
""" Evaluate the whole dataflow starting from leaves"""
df = self._dataflow
# Unvalidate all the nodes
self._evaluated.clear()
ScriptLibrary().clear()
# Eval from the leaf
script = ""
for vid in (vid for vid in df.vertices() if df.nb_out_edges(vid)==0):
script += self.eval_vertex(vid)
return script
############################################################################
# Evaluation with scheduling
# The objective is to take
[docs]class DiscreteTimeEvaluation(AbstractEvaluation):
""" Evaluation algorithm with generator / priority and selection"""
__evaluators__.append("DiscreteTimeEvaluation")
def __init__(self, dataflow):
AbstractEvaluation.__init__(self, dataflow)
# a property to specify if the node has already been evaluated
self._evaluated = set()
self.reeval = False # Flag to force reevaluation (for generator)
# CPL
# At each evaluation of the dataflow, increase the current cycle of
# one unit.
self._current_cycle = 0
# timed nodes are a dict with vid, time >0
# when time is 0, remove the node from the dict
self._timed_nodes = dict()
self._stop = False
self._nodes_to_reset = []
[docs] def is_stopped(self, vid, actor):
""" Return True if evaluation must be stop at this vertex """
stopped = False
try:
if hasattr(actor,'block'):
stopped = actor.block
stopped = stopped or vid in self._evaluated
except:
pass
return stopped
[docs] def clear(self):
""" Clear evaluation variable """
self._evaluated.clear()
self.reeval = False
self._stop = False
self._nodes_to_reset = []
[docs] def next_step(self):
""" Update the scheduler of one step. """
self._current_cycle += 1
for vid in self._timed_nodes:
self._timed_nodes[vid] -= 1
[docs] def eval_vertex(self, vid):
""" Evaluate the vertex vid """
#print "Step ", self._current_cycle
df = self._dataflow
actor = df.actor(vid)
self._evaluated.add(vid)
# For each inputs
# Compute the inputs of the node
for pid in df.in_ports(vid):
inputs = []
cpt = 0
# For each connected node
for npid, nvid, nactor in self.get_parent_nodes(pid):
# Do no reevaluate the same node
if not self.is_stopped(nvid, nactor):
self.eval_vertex(nvid)
inputs.append(nactor.get_output(df.local_id(npid)))
cpt += 1
# set input as a list or a simple value
if (cpt == 1):
inputs = inputs[0]
if (cpt > 0):
actor.set_input(df.local_id(pid), inputs)
# Eval the node
delay = 0
# When a node return no delay, we stopped the simulation
stop_when_finished = False
if vid in self._timed_nodes:
delay = self._timed_nodes[vid]
if delay == 0:
del self._timed_nodes[vid]
stop_when_finished = True
if delay == 0:
delay = self.eval_vertex_code(vid)
# Reevaluation flag
# TODO: Add the node to the scheduler rather to execute
if (delay):
self._timed_nodes[vid] = int(delay)
self.reeval = delay
elif stop_when_finished:
self._stop = True
self._nodes_to_reset.append(vid)
elif self._current_cycle > 1000:
self._stop = True
[docs] def eval(self, vtx_id=None, step=False):
t0 = clock()
self.clear()
df = self._dataflow
if (vtx_id is not None):
leafs = [(vtx_id, df.actor(vtx_id))]
else:
# Select the leafs (list of (vid, actor))
leafs = [(vid, df.actor(vid))
for vid in df.vertices() if df.nb_out_edges(vid)==0]
leafs.sort(cmp_priority)
# Execute
for vid, actor in leafs:
if not self.is_stopped(vid, actor):
self.reeval = True
if not step:
while(self.reeval and not self._stop):
self.clear()
self.eval_vertex(vid)
self.next_step()
elif (self.reeval and not self._stop):
self.clear()
self.eval_vertex(vid)
self.next_step()
if self._stop:
self._nodes_to_reset.extend(self._timed_nodes)
for vid in self._nodes_to_reset:
df.actor(vid).reset()
#print 'Run %d times the dataflow'%(self._current_cycle,)
# Reset the state
if not step:
self.clear()
self._current_cycle = 0
t1 = clock()
if quantify:
print "Evaluation time: %s"%(t1-t0)
return False
###############################################################################
[docs]class SciFlowareEvaluation(AbstractEvaluation):
""" Distributed Evaluation algorithm with SciFloware backend"""
__evaluators__.append("SciFlowareEvaluation")
def __init__(self, dataflow):
AbstractEvaluation.__init__(self, dataflow)
# a property to specify if the node has already been evaluated
self._evaluated = set()
self._scifloware_actors = set()
@staticmethod
[docs] def is_operator(actor):
from openalea.scifloware.operator import algebra
factory = actor.factory
if factory is None:
return False
if 'SciFloware' not in factory.package.name:
return False
elif factory.name in algebra:
return True
else:
return False
[docs] def scifloware_actors(self):
""" Compute the scifloware actors.
Only those actors will be evaluated.
"""
df = self._dataflow
self._scifloware_actors.clear()
for vid in df.vertices():
actor = df.actor(vid)
if self.is_operator(actor):
self._scifloware_actors.add(vid)
[docs] def eval_vertex(self, vid):
""" Evaluate the vertex vid
This evaluation is both a kind of compilation and real evaluation.
Algorithm
---------
For each vertex which is a SciFloware operator (e.g. map, reduce, ...),
- select the vertices connected to each input port
- if the name of the port is Dataflow:
- get its name and send it as input to the operator
- else
- normal evaluation
"""
#print "Step ", self._current_cycle
df = self._dataflow
actor = df.actor(vid)
is_op = vid in self._scifloware_actors
self._evaluated.add(vid)
#assert self.is_operator(actor)
# For each inputs
# Compute the nodes
for pid in df.in_ports(vid):
inputs = []
is_dataflow = False
if is_op:
name = actor.get_input_port(df.local_id(pid))['name']
if name.lower() == 'dataflow':
is_dataflow = True
if is_dataflow:
out_ports = list(df.connected_ports(pid))
nb_out = len(out_ports)
if nb_out > 1:
raise Exception('Too many nodes connected to the SciFloware operator.')
elif nb_out == 1:
out_actor = df.actor(df.vertex(out_ports[0]))
dataflow_name = out_actor.factory.package.name+':'+out_actor.factory.name
actor.set_input(df.local_id(pid), dataflow_name)
else:
cpt = 0
# For each connected node
for npid, nvid, nactor in self.get_parent_nodes(pid):
# Do no reevaluate the same node
if not self.is_stopped(nvid, nactor):
self.eval_vertex(nvid)
inputs.append(nactor.get_output(df.local_id(npid)))
cpt += 1
# set input as a list or a simple value
if (cpt == 1):
inputs = inputs[0]
if (cpt > 0):
actor.set_input(df.local_id(pid), inputs)
self.eval_vertex_code(vid)
[docs] def eval(self, vtx_id=None, **kwds):
t0 = clock()
df = self._dataflow
self.scifloware_actors()
if (vtx_id is not None):
leafs = [(vtx_id, df.actor(vtx_id))]
else:
# Select the leafs (list of (vid, actor))
leafs = [(vid, df.actor(vid))
for vid in df.vertices() if df.nb_out_edges(vid)==0]
leafs.sort(cmp_priority)
# Execute
for vid, actor in leafs:
self.eval_vertex(vid)
t1 = clock()
if quantify:
print "Evaluation time: %s"%(t1-t0)
return False