Source code for openalea.oalab.shell.streamredirection
# -*- python -*-
#
# OpenAlea.Visualea: OpenAlea graphical user interface
#
# Copyright 2006 - 2007 - 2008 INRIA - CIRAD - INRA
#
# File author(s): Samuel Dufour-Kowalski <samuel.dufour@sophia.inria.fr>
# Christophe Pradal <christophe.prada@cirad.fr>
#
# Distributed under the CeCILL v2 License.
# See accompanying file LICENSE.txt or copy at
# http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
#
# OpenAlea WebSite : http://openalea.gforge.inria.fr
#
################################################################################
import sys
from openalea.vpltk import qt
RedirectionEventId = qt.QtCore.QEvent.User + 100
sys_stderr = None
sys_stdout = None
sys_stdin = None
py2exe_release = False
[docs]class MultipleRedirection(object):
""" Dummy file which redirects stream to multiple file """
def __init__(self, *streams):
""" The stream is redirect to the file list 'files' """
self.streams = streams
[docs] def write(self, str):
""" Emulate write function """
for stream in self.streams:
stream.write(str)
[docs]class NoneOutput(object):
""" Dummy file which redirects stream to nothing """
def __init__(self):
""" The stream is redirect to nothing """
pass
[docs] def write(self, str):
""" Emulate write function """
pass
[docs]class ThreadedRedirection(object):
""" Dummy file which redirects stream to threaded gui output """
def __init__(self, guistream):
""" The stream is redirect to the file list 'files' """
self.guistream = guistream
[docs] def write(self, str):
""" Emulate write function """
if self.guistream.thread() != qt.QtCore.QThread.currentThread():
e = qt.QtCore.QEvent(qt.QtCore.QEvent.Type(RedirectionEventId))
e.txt = str
qt.QtGui.QApplication.postEvent(self.guistream, e)
pass
else:
self.guistream.write(str)
[docs]class GraphicalStreamRedirection(object):
""" Redirection of a stream as graphic output """
def __init__(self):
""" capture all interactive input/output """
global sys_stdout, sys_stderr, sys_stdin
if sys_stdout is None:
sys_stdout = sys.stdout
if sys_stderr is None:
sys_stderr = sys.stderr
if sys_stdin is None:
sys_stdin = sys.stdin
sys.stdout = ThreadedRedirection(self)
if py2exe_release:
sys.stderr = ThreadedRedirection(self)
else:
sys.stderr = MultipleRedirection(sys_stderr, ThreadedRedirection(self))
sys.stdin = self
# self.multipleStdOutRedirection()
def __del__(self):
sys.stdout = sys_stdout
sys.stderr = sys_stderr
sys.stdin = sys_stdin
[docs] def customEvent(self, event):
""" custom event processing. Redirection to write """
if event.type() == RedirectionEventId:
self.write(event.txt)
[docs] def multipleStdOutRedirection(self, enabled=True):
""" make multiple (sys.stdout/pyconsole) or single (pyconsole) redirection of stdout """
if enabled:
sys.stdout = MultipleRedirection(sys_stdout, ThreadedRedirection(self))
else:
sys.stdout = ThreadedRedirection(self)
[docs] def selfAsStdOutRedirection(self):
sys.stdout = ThreadedRedirection(self)
[docs] def sysAsStdOutRedirection(self):
sys.stdout = sys_stdout
[docs] def noneAsStdOutRedirection(self):
sys.stdout = NoneOutput()
[docs] def hasMultipleStdOutRedirection(self):
return isinstance(sys.stdout, MultipleRedirection)
[docs] def isSelfStdOutRedirection(self):
return isinstance(sys.stdout, ThreadedRedirection)
[docs] def isSysStdOutRedirection(self):
return sys.stdout == sys_stdout
[docs] def isNoneAsStdOutRedirection(self):
return isinstance(sys.stdout, NoneOutput)
[docs] def multipleStdErrRedirection(self, enabled=True):
""" make multiple (sys.stderr/pyconsole) or single (pyconsole) redirection of stderr """
if enabled:
sys.stderr = MultipleRedirection(sys_stderr, ThreadedRedirection(self))
else:
sys.stderr = ThreadedRedirection(self)
[docs] def selfAsStdErrRedirection(self):
sys.stderr = ThreadedRedirection(self)
[docs] def sysAsStdErrRedirection(self):
sys.stderr = sys_stderr
[docs] def noneAsStdErrRedirection(self):
sys.stderr = NoneOutput()
[docs] def hasMultipleStdErrRedirection(self):
return isinstance(sys.stderr, MultipleRedirection)
[docs] def isSelfStdErrRedirection(self):
return isinstance(sys.stderr, ThreadedRedirection)
[docs] def isSysStdErrRedirection(self):
return sys.stderr == sys_stderr
[docs] def isNoneAsStdErrRedirection(self):
return isinstance(sys.stderr, NoneOutput)
[docs] def setOutputRedirection(self, selfoutput=True, sysoutput=True, outanderr=3):
if selfoutput:
if sysoutput:
if outanderr & 1:
self.multipleStdOutRedirection(True)
if outanderr & 2:
self.multipleStdErrRedirection(True)
else:
if outanderr & 1:
self.selfAsStdOutRedirection()
if outanderr & 2:
self.selfAsStdErrRedirection()
elif sysoutput:
if outanderr & 1:
self.sysAsStdOutRedirection()
if outanderr & 2:
self.sysAsStdErrRedirection()
else:
if outanderr & 1:
self.noneAsStdOutRedirection()
if outanderr & 2:
self.noneAsStdErrRedirection()