Source code for openalea.oalab.shell.streamredirection

# -*- python -*-
#
#       OpenAlea.Visualea: OpenAlea graphical user interface
#
#       Copyright 2006 - 2007 - 2008 INRIA - CIRAD - INRA
#
#       File author(s): Samuel Dufour-Kowalski <samuel.dufour@sophia.inria.fr>
#                       Christophe Pradal <christophe.prada@cirad.fr>
#
#       Distributed under the CeCILL v2 License.
#       See accompanying file LICENSE.txt or copy at
#           http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
#
#       OpenAlea WebSite : http://openalea.gforge.inria.fr
#
################################################################################


import sys
from openalea.vpltk import qt

RedirectionEventId = qt.QtCore.QEvent.User + 100
sys_stderr = None
sys_stdout = None
sys_stdin = None

py2exe_release = False


[docs]class MultipleRedirection(object): """ Dummy file which redirects stream to multiple file """ def __init__(self, *streams): """ The stream is redirect to the file list 'files' """ self.streams = streams
[docs] def write(self, str): """ Emulate write function """ for stream in self.streams: stream.write(str)
[docs] def flush(self): pass
[docs]class NoneOutput(object): """ Dummy file which redirects stream to nothing """ def __init__(self): """ The stream is redirect to nothing """ pass
[docs] def write(self, str): """ Emulate write function """ pass
[docs]class ThreadedRedirection(object): """ Dummy file which redirects stream to threaded gui output """ def __init__(self, guistream): """ The stream is redirect to the file list 'files' """ self.guistream = guistream
[docs] def write(self, str): """ Emulate write function """ if self.guistream.thread() != qt.QtCore.QThread.currentThread(): e = qt.QtCore.QEvent(qt.QtCore.QEvent.Type(RedirectionEventId)) e.txt = str qt.QtGui.QApplication.postEvent(self.guistream, e) pass else: self.guistream.write(str)
[docs] def flush(self): pass
[docs]class GraphicalStreamRedirection(object): """ Redirection of a stream as graphic output """ def __init__(self): """ capture all interactive input/output """ global sys_stdout, sys_stderr, sys_stdin if sys_stdout is None: sys_stdout = sys.stdout if sys_stderr is None: sys_stderr = sys.stderr if sys_stdin is None: sys_stdin = sys.stdin sys.stdout = ThreadedRedirection(self) if py2exe_release: sys.stderr = ThreadedRedirection(self) else: sys.stderr = MultipleRedirection(sys_stderr, ThreadedRedirection(self)) sys.stdin = self # self.multipleStdOutRedirection() def __del__(self): sys.stdout = sys_stdout sys.stderr = sys_stderr sys.stdin = sys_stdin
[docs] def customEvent(self, event): """ custom event processing. Redirection to write """ if event.type() == RedirectionEventId: self.write(event.txt)
[docs] def multipleStdOutRedirection(self, enabled=True): """ make multiple (sys.stdout/pyconsole) or single (pyconsole) redirection of stdout """ if enabled: sys.stdout = MultipleRedirection(sys_stdout, ThreadedRedirection(self)) else: sys.stdout = ThreadedRedirection(self)
[docs] def selfAsStdOutRedirection(self): sys.stdout = ThreadedRedirection(self)
[docs] def sysAsStdOutRedirection(self): sys.stdout = sys_stdout
[docs] def noneAsStdOutRedirection(self): sys.stdout = NoneOutput()
[docs] def hasMultipleStdOutRedirection(self): return isinstance(sys.stdout, MultipleRedirection)
[docs] def isSelfStdOutRedirection(self): return isinstance(sys.stdout, ThreadedRedirection)
[docs] def isSysStdOutRedirection(self): return sys.stdout == sys_stdout
[docs] def isNoneAsStdOutRedirection(self): return isinstance(sys.stdout, NoneOutput)
[docs] def multipleStdErrRedirection(self, enabled=True): """ make multiple (sys.stderr/pyconsole) or single (pyconsole) redirection of stderr """ if enabled: sys.stderr = MultipleRedirection(sys_stderr, ThreadedRedirection(self)) else: sys.stderr = ThreadedRedirection(self)
[docs] def selfAsStdErrRedirection(self): sys.stderr = ThreadedRedirection(self)
[docs] def sysAsStdErrRedirection(self): sys.stderr = sys_stderr
[docs] def noneAsStdErrRedirection(self): sys.stderr = NoneOutput()
[docs] def hasMultipleStdErrRedirection(self): return isinstance(sys.stderr, MultipleRedirection)
[docs] def isSelfStdErrRedirection(self): return isinstance(sys.stderr, ThreadedRedirection)
[docs] def isSysStdErrRedirection(self): return sys.stderr == sys_stderr
[docs] def isNoneAsStdErrRedirection(self): return isinstance(sys.stderr, NoneOutput)
[docs] def setOutputRedirection(self, selfoutput=True, sysoutput=True, outanderr=3): if selfoutput: if sysoutput: if outanderr & 1: self.multipleStdOutRedirection(True) if outanderr & 2: self.multipleStdErrRedirection(True) else: if outanderr & 1: self.selfAsStdOutRedirection() if outanderr & 2: self.selfAsStdErrRedirection() elif sysoutput: if outanderr & 1: self.sysAsStdOutRedirection() if outanderr & 2: self.sysAsStdErrRedirection() else: if outanderr & 1: self.noneAsStdOutRedirection() if outanderr & 2: self.noneAsStdErrRedirection()
[docs] def flush(self): pass