Source code for termio

# -*- coding: utf-8 -*-
#
#       Copyright 2011 Liftoff Software Corporation (http://liftoffsoftware.com)
#
# NOTE:  Commercial licenses for this software are available!
#

# TODO: See if we can spin off termio.py into its own little program that sits between Gate One and ssh_connect.py.  That way we can take advantage of multiple cores/processors (for terminal-to-HTML processing).  There's no reason why we can't write something that does what dtach does.  Just need to redirect the fd of self.cmd to a unix domain socket and os.setsid() somewhere after forking (twice maybe?).
# TODO: Make the environment variables used before launching self.cmd configurable

# Meta
__version__ = '0.9'
__license__ = "AGPLv3 or Proprietary (see LICENSE.txt)"
__version_info__ = (0, 9)
__author__ = 'Dan McDougall <daniel.mcdougall@liftoffsoftware.com>'

__doc__ = """\
About termio
============
This module provides a Multiplex class that can perform the following:

 * Fork a child process that opens a given terminal program.
 * Read and write data to and from the child process.
 * Log the output of the child process to a file and/or syslog.

The Multiplex class is meant to be used in conjunction with a running Tornado
IOLoop instance.  It can be instantiated from within your Tornado application
like so::

    multiplexer = termio.Multiplex(
        'nethack',
        log_path='/var/log/myapp',
        user='bsmith@CORP',
        term_id=1,
        syslog=True
    )

Then *multiplexer* can create and launch a new controlling terminal (tty)
running the given command (e.g. 'nethack')::

    env = {
        'PATH': os.environ['PATH'],
        'MYVAR': 'foo'
    }
    fd = multiplexer.spawn(80, 24, env=env)
    # The fd is returned from spawn() in case you want more low-level control.

Input and output from the controlled program is asynchronous and gets handled
via IOLoop.  It will automatically write all output from the terminal program to
an instance of self.terminal_emulator (which defaults to Gate One's
terminal.Terminal).  So if you want to perform an action whenever the running
terminal application has output (like, say, sending a message to a client)
you'll need to attach a callback::

    def screen_update():
        'Called when new output is ready to send to the client'
        output = multiplexer.dumplines()
        socket_or_something.write(output)
    multiplexer.callbacks[multiplexer.CALLBACK_UPDATE] = screen_update

In this example, screen_update() will write() the output of
multiplexer.dumplines() to *socket_or_something* whenever the terminal program
has some sort of output.  You can also make calls directly to the terminal
emulator (if you're using a custom one)::

    def screen_update():
        output = multiplexer.term.my_custom_func()
        whatever.write(output)

Writing characters to the controlled terminal application is pretty
straightforward::

    multiplexer.write('some text')

Typically you'd pass in keystrokes or commands from your application to the
underlying program this way and the screen/terminal emulator would get updated
automatically.  If using Gate One's Terminal() you can also attach callbacks
to perform further actions when more specific situations are encountered (e.g.
when the window title is set via that respective escape sequence)::

    def set_title():
        'Hypothetical title-setting function'
        print("Window title was just set to: %s" % multiplexer.term.title)
    multiplexer.term.callbacks[multiplexer.CALLBACK_TITLE] = set_title

Module Functions and Classes
============================
"""

# Stdlib imports
import signal, threading, os, sys, time, struct, io, gzip, re
from datetime import timedelta, datetime
from functools import partial
from itertools import izip
import logging

# Import our own stuff
from utils import get_translation, human_readable_bytes, noop, which
from utils import get_or_update_metadata, json_encode

_ = get_translation()

# Globals
SEPARATOR = u"\U000f0f0f" # The character used to separate frames in the log
# NOTE: That unicode character was carefully selected from only the finest
# of the PUA.  I hereby dub thee, "U+F0F0F0, The Separator."
CALLBACK_THREAD = None # Used by add_callback()
POSIX = 'posix' in sys.builtin_module_names

# Helper functions
[docs]def debug_expect(m_instance, match): """ This method is used by :py:meth:`BaseMultiplex.expect` if :py:attr:`BaseMultiplex.debug` is True. It facilitates easy debugging of regular expressions. It will print out precisely what was matched and where. .. note:: This function only works with post-process patterns. """ print("%s was matched..." % repr(match)) out = "" for line in m_instance.dump(): regex = re.escape(match) match_obj = re.match(regex, line) if match_obj: out += "--->%s\n" % repr(line) break else: out += " %s\n" % repr(line) print(out) # Exceptions
[docs]class Timeout(Exception): """ Used by :py:meth:`BaseMultiplex.expect` and :py:meth:`BaseMultiplex.await`; called when a timeout is reached. """ pass # Classes
[docs]class Pattern(object): """ Used by :py:meth:`BaseMultiplex.expect`, an object to store patterns (regular expressions) and their associated properties. .. note:: The variable *m_instance* is used below to mean the current instance of BaseMultiplex (or a subclass thereof). .. py:attribute:: pattern A regular expression or iterable of regular expressions that will be checked against the output stream. .. py:attribute:: callback A function that will be called when the pattern is matched. Callbacks are called like so: >>> callback(m_instance, matched_string) .. py:attribute:: optional Indicates that this pattern is optional. Meaning that it isn't required to match before the next pattern in :py:attr:`BaseMultiplex._patterns` is checked. .. py:attribute:: sticky Indicates that the pattern will not time out and won't be automatically removed from self._patterns when it is matched. .. py:attribute:: errorback A function to call in the event of a timeout or if an exception is encountered. Errorback functions are called like so: >>> errorback(m_instance) .. py:attribute:: preprocess Indicates that this pattern is to be checked against the incoming stream before it is processed by the terminal emulator. Useful if you need to match non-printable characters like control codes and escape sequences. .. py:attribute:: timeout A :py:obj:`datetime.timedelta` object indicating how long we should wait before calling :py:meth:`errorback`. .. py:attribute:: created A :py:obj:`datetime.datetime` object that gets set when the Pattern is instantiated by :py:meth:`BaseMultiplex.expect`. It is used to determine if and when a timeout has been reached. """ def __init__(self, pattern, callback, optional=False, sticky=False, errorback=None, preprocess=False, timeout=30): self.pattern = pattern self.callback = callback self.errorback = errorback self.optional = optional self.sticky = sticky self.preprocess = preprocess self.timeout = timeout self.created = datetime.now()
[docs]class BaseMultiplex(object): """ A base class that all Multiplex types will inherit from. """ CALLBACK_UPDATE = 1 # Screen update CALLBACK_EXIT = 2 # When the underlying program exits def __init__(self, cmd, terminal_emulator=None, # Defaults to Gate One's terminal.Terminal log_path=None, user=None, # Only used by log output (to differentiate who's who) term_id=None, # Also only for syslog output for the same reason syslog=False, syslog_host=None, syslog_facility=None, debug=False): self.debug = debug self.lock = threading.Lock() self.cmd = cmd if not terminal_emulator: # Why do this? So you could use/write your own specialty emulator. # Whatever you use it just has to accept 'rows' and 'cols' as # keyword arguments in __init__() from terminal import Terminal # Dynamic import to cut down on waste self.terminal_emulator = Terminal else: self.terminal_emulator = terminal_emulator self.log_path = log_path # Logs of the terminal output wind up here self.syslog = syslog # See "if self.syslog:" below self._alive = False self.ratelimiter_engaged = False self.rows = 24 self.cols = 80 self.pid = -1 # Means "no pid yet" self.started = "Never" self._patterns = [] self.timeout_thread = None # Setup our callbacks self.callbacks = { # Defaults do nothing which saves some conditionals self.CALLBACK_UPDATE: {}, self.CALLBACK_EXIT: {}, } # Configure syslog logging self.user = user self.term_id = term_id self.syslog_buffer = '' if self.syslog and not self.syslog_host: try: import syslog except ImportError: logging.error(_( "The syslog module is required to log terminal sessions to " "syslog if no syslog_host is set. The syslog module is not" " required if you want to send syslog messages to a remote " "syslog server but for this to work you must set the " "syslog_host variable either via the command-line switch or" " in your server.conf.")) sys.exit(1) if not syslog_facility: syslog_facility = syslog.LOG_DAEMON syslog_facility = syslog_facility # Sets up syslog messages to show up like this: # Sep 28 19:45:02 <hostname> gateone: <log message> syslog.openlog('gateone', 0, syslog_facility)
[docs] def __del__(self): """ Makes sure that the underlying terminal program is terminated so we don't leave things hanging around. """ logging.debug("Calling Multiplex.__del__()") if self.isalive(): self.terminate()
[docs] def __repr__(self): """ Returns self.__str__() """ return "<%s>" % self.__str__()
[docs] def __str__(self): """ Returns a string representation of this Multiplex instance and the current state of things. """ started = self.started if started != "Never": started = self.started.isoformat() out = ( "%s.%s: " "alive: %s, " "command: %s, " "started: %s" % ( self.__module__, self.__class__.__name__, self._alive, repr(self.cmd), started ) ) return out
[docs] def add_callback(self, event, callback, identifier=None): """ Attaches the given *callback* to the given *event*. If given, *identifier* can be used to reference this callback leter (e.g. when you want to remove it). Otherwise an identifier will be generated automatically. If the given *identifier* is already attached to a callback at the given event, that callback will be replaced with *callback*. *event* - The numeric ID of the event you're attaching *callback* to (e.g. Multiplex.CALLBACK_UPDATE). *callback* - The function you're attaching to the *event*. *identifier* - A string or number to be used as a reference point should you wish to remove or update this callback later. Returns the identifier of the callback. to Example: >>> m = Multiplex() >>> def somefunc(): pass >>> id = "myref" >>> ref = m.add_callback(m.CALLBACK_UPDATE, somefunc, id) .. note:: This allows the controlling program to have multiple callbacks for the same event. """ if not identifier: identifier = callback.__hash__() self.callbacks[event][identifier] = callback return identifier
[docs] def remove_callback(self, event, identifier): """ Removes the callback referenced by *identifier* that is attached to the given *event*. Example: >>> m.remove_callback(m.CALLBACK_BELL, "myref") """ try: del self.callbacks[event][identifier] except KeyError: pass # Doesn't exist anymore--nothing to do
[docs] def remove_all_callbacks(self, identifier): """ Removes all callbacks associated with *identifier* """ for event, identifiers in self.callbacks.items(): try: del self.callbacks[event][identifier] except KeyError: pass # Doesn't exist--nothing to worry about
[docs] def _call_callback(self, callback): """ This method is here in the event that subclasses of BaseMultiplex need to call callbacks in an implementation-specific way. It just calls *callback*. """ callback()
[docs] def spawn(self, rows=24, cols=80, env=None): raise NotImplementedError(_( "spawn() *must* be overridden by subclasses."))
[docs] def isalive(self): raise NotImplementedError(_( "isalive() *must* be overridden by subclasses."))
[docs] def term_write(self, stream): """ Writes :py:obj:`stream` to :py:data:`term` and also takes care of logging to :py:attr:`log_path` (if set) and/or syslog (if :py:attr:`syslog` is `True`). When complete, will call any callbacks registered in :py:obj:`CALLBACK_UPDATE`. :stream: A string or bytes containing the incoming output stream from the underlying terminal program. .. note:: This kind of logging doesn't capture user keystrokes. This is intentional as we don't want passwords winding up in the logs. """ # Write to the log (if configured) if self.log_path: now = int(round(time.time() * 1000)) if not os.path.exists(self.log_path): # Write the first frame as metadata metadata = { 'version': '1.0', # Log format version 'rows': self.rows, 'cols': self.cols, 'start_date': now # NOTE: end_date should be added later when the is read for # the first time by either the logviewer or the logging # plugin. } # The hope is that we can use the first-frame-metadata paradigm # to store all sorts of useful information about a log. output = unicode(json_encode(metadata)) output = u"%s:%s\U000f0f0f" % (now, output) log = gzip.open(self.log_path, mode='a') log.write(output.encode("utf-8")) log.close() # NOTE: I'm using an obscure unicode symbol in order to avoid # conflicts. We need to dpo our best to ensure that we can # differentiate between terminal output and our log format... # This should do the trick because it is highly unlikely that # someone would be displaying this obscure unicode symbol on an # actual terminal unless they were using Gate One to view a # Gate One log file in vim or something =) # \U000f0f0f == U+F0F0F (Private Use Symbol) output = unicode(stream.decode('utf-8', "ignore")) output = u"%s:%s\U000f0f0f" % (now, output) log = gzip.open(self.log_path, mode='a') log.write(output.encode("utf-8")) log.close() # NOTE: Gate One's log format is special in that it can be used for both # playing back recorded sessions *or* generating syslog-like output. if self.syslog: # Try and keep it as line-line as possible so we don't end up with # a log line per character. if '\n' in stream: for line in stream.splitlines(): if self.syslog_buffer: line = self.syslog_buffer + line self.syslog_buffer = '' # Sylog really doesn't like any fancy encodings line = line.encode('ascii', 'xmlcharrefreplace') syslog.syslog("%s %s: %s" % ( self.user, self.term_id, line)) else: self.syslog_buffer += stream # Handle preprocess patterns (for expect()) if self._patterns: self.preprocess(stream) self.term.write(stream) # Handle post-process patterns (for expect()) if self._patterns: self.postprocess() for callback in self.callbacks[self.CALLBACK_UPDATE].values(): self._call_callback(callback)
[docs] def preprocess(self, stream): """ Handles preprocess patterns registered by :py:meth:`expect`. That is, those patterns which have been marked with `preprocess = True`. Patterns marked in this way get handled *before* the terminal emulator processes the :py:obj:`stream`. :stream: A string or bytes containing the incoming output stream from the underlying terminal program. """ preprocess_patterns = (a for a in self._patterns if a.preprocess) finished_non_sticky = False # If there aren't any preprocess patterns this won't do anything: for pattern_obj in preprocess_patterns: if finished_non_sticky and not pattern_obj.sticky: # We only want sticky patterns if we've already matched once continue match = pattern_obj.pattern.search(stream) if match: callback = partial(pattern_obj.callback, self, match.group()) self._call_callback(callback) if not pattern_obj.sticky: self.unexpect(hash(pattern_obj)) # Remove it if not pattern_obj.optional: # We only match the first non-optional pattern finished_non_sticky = True # TODO: Figure out why we're not catching our ready_string
[docs] def postprocess(self): """ Handles post-process patterns registered by :py:meth:`expect`. """ # Check the terminal emulator screen for any matching patterns. post_patterns = (a for a in self._patterns if not a.preprocess) finished_non_sticky = False for pattern_obj in post_patterns: # For post-processing matches we search the terminal emulator's # screen as a single string. This allows for full-screen screen # scraping in addition to typical 'expect-like' functionality. # The big difference being that with traditional expect (and # pexpect) you don't get to examine the program's output as it # would be rendered in an actual terminal. # By using post-processing of the text after it has been handled # by a terminal emulator we don't have to worry about hidden # characters and escape sequences that we may not be aware of or # could make our regular expressions much more complicated than # they should be. if finished_non_sticky and not pattern_obj.sticky: continue # We only want sticky patterns at this point # For convenience, trailing whitespace is removed from the lines # output from the terminal emulator. This is so we don't have to # put '\w*' before every '$' to match the end of a line. term_lines = "\n".join( [a.rstrip() for a in self.term.dump()]).rstrip() if isinstance(pattern_obj.pattern, (list, tuple)): for pat in pattern_obj.pattern: match = pat.search(term_lines) if match: self._handle_match(pattern_obj, match) else: match = pattern_obj.pattern.search(term_lines) if match: self._handle_match(pattern_obj, match)
[docs] def _handle_match(self, pattern_obj, match): """ Handles a matched regex detected by :py:meth:`postprocess`. It calls the :py:obj:`Pattern.callback` and takes care of removing it from :py:attr:`_patterns` (if it isn't sticky). """ callback = partial(pattern_obj.callback, self, match.group()) self._call_callback(callback) if self.debug: debug_callback = partial( debug_expect, self, match.group()) self._call_callback(debug_callback) if not pattern_obj.sticky: self.unexpect(hash(pattern_obj)) # Remove it if not pattern_obj.optional and not pattern_obj.sticky: # We only match the first non-optional pattern finished_non_sticky = True
[docs] def writeline(self, line=''): """ Just like Multiplex.write() but it writes a newline after writing *line*. If no *line* is given a newline will be written. """ self.write(line + u'\n')
[docs] def writelines(self, lines): """ Writes *lines* (a list of strings) to the underlying program, appending a newline after each line. """ if getattr(lines, '__iter__', False): for line in lines: self.write(line + u'\n') else: raise TypeError(_( "%s is not iterable (strings don't count :)" % type(lines)))
[docs] def dump_html(self, full=False, client_id='0'): """ Returns the difference of terminal lines (a list of lines, to be specific) and its scrollback buffer (which is also a list of lines) as a tuple, (scrollback, text). If a line hasn't changed since the last dump said line will be replaced with an empty string in the output. If *full*, will return the entire screen (not just the diff). if *client_id* is given (string), this will be used as a unique client identifier for keeping track of screen differences (so you can have multiple clients getting their own unique diff output for the same Multiplex instance). """ if client_id not in self.prev_output: self.prev_output[client_id] = [None for a in xrange(self.rows-1)] try: scrollback, html = ([], []) if self.term: try: result = self.term.dump_html() if result: scrollback, html = result # Make a copy so we can save it to prev_output later preserved_html = html[:] except IOError as e: logging.debug(_("IOError attempting self.term.dump_html()")) logging.debug("%s" % e) if html: if not full: count = 0 for line1, line2 in izip(self.prev_output[client_id], html): if line1 != line2: html[count] = line2 # I love updates-in-place else: html[count] = '' count += 1 # Otherwise a full dump will take place self.prev_output.update({client_id: preserved_html}) return (scrollback, html) except ValueError as e: # This would be special... logging.error(_("ValueError in dumplines(): %s" % e)) return ([], []) except (IOError, TypeError) as e: logging.error(_("Unhandled exception in dumplines(): %s" % e)) if self.ratelimiter_engaged: # Caused by the program being out of control return([], [ _("<b>Program output too noisy. Sending Ctrl-c...</b>")]) else: import traceback traceback.print_exc(file=sys.stdout) return ([], [])
[docs] def dump(self): """ Dumps whatever is currently on the screen of the terminal emulator as a list of plain strings (so they'll be escaped and look nice in an interactive Python interpreter). """ return self.term.dump()
[docs] def timeout_check(self, timeout_now=False): """ Iterates over :py:attr:`_patterns` checking each to determine if it has timed out. If a timeout has occurred for a Pattern and said Pattern has an *errorback* function that function will be called. Returns True if there are still non-sticky patterns remaining. False otherwise. If *timeout_now* is True, will force the first errorback to be called and will empty out self._patterns. """ remaining_patterns = False for pattern_obj in self._patterns: if timeout_now: if pattern_obj.errorback: errorback = partial(pattern_obj.errorback, self) self._call_callback(errorback) self.unexpect() return False elapsed = datetime.now() - pattern_obj.created if elapsed > pattern_obj.timeout: if not pattern_obj.sticky: self.unexpect(hash(pattern_obj)) if pattern_obj.errorback: errorback = partial(pattern_obj.errorback, self) self._call_callback(errorback) elif not pattern_obj.sticky: remaining_patterns = True return remaining_patterns
[docs] def expect(self, patterns, callback, optional=False, sticky=False, errorback=None, timeout=15, position=None, preprocess=False): """ Watches the stream of output coming from the underlying terminal program for *patterns* and if there's a match *callback* will be called:: callback(multiplex_instance, matched_string) *patterns* can be a string, an :py:obj:`re.SRE_Pattern` (as created by :py:func:`re.compile`), or a iterator of either/or. Returns a reference object that can be used to remove the registered pattern/callback at any time using the :py:meth:`unexpect` method (see below). .. note:: This function is non-blocking! .. warning:: The *timeout* value gets compared against the time :py:meth:`expect` was called to create it. So don't wait too long if you're planning on using :py:meth:`await`! Here's a simple example that changes a user's password:: >>> def write_password(m_instance, matched): ... print("Sending Password... %s patterns remaining." % len(m_instance._patterns)) ... m_instance.writeline('somepassword') >>> m = Multiplex('passwd someuser') # Assumes running as root :) >>> m.expect('(?i)password:', write_password) # Step 1 >>> m.expect('(?i)password:', write_password) # Step 2 >>> print(len(m._patterns)) # To show that there's two in the queue 2 >>> m.spawn() # Execute the command >>> m.await(10) # This will block for up to 10 seconds waiting for self._patterns to be empty (not counting optional patterns) Sending Password... 1 patterns remaining. Sending Password... 0 patterns remaining. >>> m.isalive() False >>> # All done! .. tip:: The :py:meth:`await` method will automatically call :py:meth:`spawn` if not :py:meth:`isalive`. This would result in the password of 'someuser' being changed to 'somepassword'. How is the order determined? Every time :py:meth:`expect` is called it creates a new :py:class:`Pattern` using the given parameters and appends it to :py:attr:`_patterns` (which is a list). As each :py:class:`Pattern` is matched its *callback* gets called and the :py:class:`Pattern` is removed from :py:attr:`_patterns` (unless *sticky* is set to True). So even though the patterns and callbacks listed above were identical they will get executed and removed in the order they were created as each respective :py:class:`Pattern` is matched. .. note:: Only the first pattern or patterns marked as *sticky* are checked against the incoming stream. If the first non-sticky pattern is marked *optional* then the proceeding pattern will be checked (and so on). All other patterns will sit in self._patterns until their predecessors are matched/removed. Patterns can be removed from self._patterns as needed by calling unexpect(<reference>). Here's an example:: >>> def handle_accepting_ssh_key(m_instance, matched): ... m_instance.writeline(u'yes') >>> m = Multiplex('ssh someuser@somehost') >>> ref1 = m.expect('(?i)Are you sure.*\(yes/no\)\?', handle_accepting_ssh_key, optional=True) >>> def send_password(m_instance, matched): ... m_instance.unexpect(ref1) ... self.writeline('somepassword') >>> ref2 = m.expect('(?i)password:', send_password) >>> # spawn() and/or await() and do stuff... The example above would send 'yes' if asked by the SSH program to accept the host's public key (which would result in it being automatically removed from self._patterns). However, if this condition isn't met before send_password() is called, send_password() will use the reference object to remove it directly. This ensures that the pattern won't be accidentally matched later on in the program's execution. .. note:: Even if we didn't match the "Are you sure..." pattern it would still get auto-removed after its timeout was reached. **About pattern ordering:** The position at which the given pattern will be inserted in self._patterns can be specified via the *position* argument. The default is to simply append which should be appropriate in most cases. **About Timeouts:** The *timeout* value passed to expect() will be used to determine how long to wait before the pattern is removed from self._patterns. When this occurs, *errorback* will be called with current Multiplex instance as the only argument. If *errorback* is None (the default) the pattern will simply be discarded with no action taken. .. note:: If *sticky* is True the *timeout* value will be ignored. **Notes about the length of what will be matched:** The entire terminal 'screen' will be searched every time new output is read from the incoming stream. This means that the number of rows and columns of the terminal determines the size of the search. So if your pattern needs to look for something inside of 50 lines of text you need to make sure that when you call spawn() you specify at least rows=50. Example:: >>> def handle_long_search(m_instance, matched) ... do_stuff(matched) >>> m = Multiplex('someCommandWithLotsOfOutput.sh') >>> # 'begin', at least one non-newline char, 50 newlines, at least one char, then 'end': >>> my_regex = re.compile('begin.+[\\n]{50}.+end', re.MULTILINE) >>> ref = m.expect(my_regex, handle_accepting_ssh_key) >>> m.spawn(rows=51, cols=150) >>> # Call m.read(), m.spawn() or just let an event loop (e.g. Tornado's IOLoop) take care of things... **About non-printable characters:** If the *postprocess* argument is True (default), patterns will be checked against the current screen as output by the terminal emulator. This means that things like control codes and escape sequences will be handled and discarded by the terminal emulator and as such won't be available for patterns to be checked against. To get around this limitation you can set *preprocess* to True and the pattern will be checked against the incoming stream before it is processed by the terminal emulator. Example:: >>> def handle_xterm_title(m_instance, matched) ... print("Caught title: %s" % matched) >>> m = Multiplex('echo -e "\\033]0;Some Title\\007"') >>> title_seq_regex = re.compile(r'\\x1b\\][0-2]\;(.*?)(\\x07|\\x1b\\\\)') >>> m.expect(title_seq_regex, handle_xterm_title, preprocess=True) >>> m.await() Caught title: Some Title >>> **Notes about debugging:** Instead of using await() to wait for all of your patterns to be matched at once you can make individual calls to read() to determine if your patterns are being matched in the way that you want. For example:: >>> def do_stuff(m_instance, matched): ... print("Debug: do_stuff() got %s" % repr(matched)) ... # Do stuff here >>> m = Multiplex('someLongComplicatedOutput.sh') >>> m.expect('some pattern', do_stuff) >>> m.expect('some other pattern', do_stuff) >>> m.spawn() >>> # Instead of calling await() just call one read() at a time... >>> print(repr(m.read())) '' >>> print(repr(m.read())) # Oops, called read() too soon. Try again: 'some other pattern' >>> # Doh! Looks like 'some other pattern' comes first. Let's start over... >>> m.unexpect() # Called with no arguments, it empties m._patterns >>> m.terminate() # Tip: This will call unexpect() too so the line above really isn't necessary >>> m.expect('some other pattern', do_stuff) # This time this one will be first >>> m.expect('some pattern', do_stuff) >>> m.spawn() >>> print(repr(m.read())) # This time I watied a moment :) 'Debug: do_stuff() got "some other pattern"' 'some other pattern' >>> # Huzzah! Now let's see if 'some pattern' matches... >>> print(repr(m.read())) 'Debug: do_stuff() got "some pattern"' 'some pattern' >>> # As you can see, calling read() at-will in an interactive interpreter can be very handy. **About asynchronous use:** This mechanism is non-blocking (with the exception of await()) and is meant to be used asynchronously. This means that if the running program has no output, m.read() won't result in any patterns being matched. So you must be careful about timing *or* you need to ensure that read() gets called either automatically when there's data to be read (IOLoop, EPoll, select, etc) or at regular intervals via a loop. Also, if you're not calling read() at an interval (i.e. you're using a mechanism to detect when there's output to be read before calling it e.g. IOLoop) you need to ensure that timeout_check() is called regularly anyway or timeouts won't get detected if there's no output from the underlying program. See the MultiplexPOSIXIOLoop.read() override for an example of what this means and how to do it. """ # Create the Pattern object before we do anything else if isinstance(patterns, (str, unicode)): # Convert to a compiled regex (assume MULTILINE for the sanity of # the ignorant) patterns = re.compile(patterns, re.MULTILINE) if isinstance(patterns, (tuple, list)): # Ensure that all patterns are RegexObjects pattern_list = [] for pattern in patterns: if isinstance(pattern, str): pattern = re.compile(pattern) pattern_list.append(pattern) else: pattern_list.append(pattern) patterns = tuple(pattern_list) # No reason to keep it as a list # Convert timeout to a timedelta if necessary if isinstance(timeout, (str, int, float)): timeout = timedelta(seconds=float(timeout)) elif not isinstance(timeout, timedelta): raise TypeError(_( "The timeout value must be a string, integer, float, or a " "timedelta object")) pattern_obj = Pattern(patterns, callback, optional=optional, sticky=sticky, errorback=errorback, preprocess=preprocess, timeout=timeout) if not position: self._patterns.append(pattern_obj) else: self._patterns.insert(position, pattern_obj) return hash(pattern_obj)
[docs] def unexpect(self, ref=None): """ Removes *ref* from self._patterns so it will no longer be checked against the incoming stream. If *ref* is None (the default), self._patterns will be emptied. """ if not ref: self._patterns = [] # Reset return for i, item in enumerate(self._patterns): if hash(item) == ref: self._patterns.pop(i)
[docs] def await(self, timeout=15, rows=24, cols=80, env=None): """ Blocks until all non-optional patterns inside self._patterns have been removed *or* if the given *timeout* is reached. *timeout* may be an integer (in seconds) or a datetime.timedelta object. Returns True if all non-optional, non-sticky patterns were handled successfully. .. warning:: The timeouts attached to Patterns are set when they are created. Not when when you call :py:meth:`await`! As a convenience, if :py:meth:`isalive` resolves to False, :py:meth:`spawn` will be called automatically with *rows*, *cols*, and *env* given as arguments. await To wait with expectation. """ if not self.isalive(): self.spawn(rows=rows, cols=cols, env=env) start = datetime.now() # Convert timeout to a timedelta if necessary if isinstance(timeout, (str, int, float)): timeout = timedelta(seconds=float(timeout)) elif not isinstance(timeout, timedelta): raise TypeError(_( "The timeout value must be a string, integer, float, or a " "timedelta object")) remaining_patterns = True while remaining_patterns: # First we need to discount optional patterns remaining_patterns = False for pattern in self._patterns: if not pattern.optional and not pattern.sticky: remaining_patterns = True break # Now check if we've timed out if (datetime.now() - start) > timeout: raise Timeout("Lingered longer than %s" % timeout.seconds) # Lastly we perform a read() to ensure the output is processed self.read() # Remember: read() is non-blocking time.sleep(0.1) # So we don't eat up all the CPU return True
[docs] def terminate(self): raise NotImplementedError(_( "terminate() *must* be overridden by subclasses."))
[docs] def _read(self, bytes=-1): """ This function must be overridden by subclasses of :py:class:`BaseMultiplex`. It is expected that this method read the output from the running terminal program in a non-blocking way, pass the result into :py:meth:`term_write`, and then return the result. """ raise NotImplementedError(_( "_read() *must* be overridden by subclasses."))
[docs] def read(self, bytes=-1): """ Calls :py:meth:`_read` and checks if any timeouts have been reached in :py:attr:`_patterns`. Returns the result of :py:meth:`_read`. """ result = self._read(bytes) # Perform checks for timeouts in self._patterns (used by self.expect()) self.timeout_check() return result
def write(self): raise NotImplementedError(_( "write() *must* be overridden by subclasses."))
[docs]class MultiplexPOSIXIOLoop(BaseMultiplex): """ The Multiplex class takes care of executing a child process and keeping track of its state via a terminal emulator (will use terminal.Terminal by default). If there's a started instance of tornado.ioloop, handlers will be added to it that automatically keep the terminal emulator synchronized with the output of the child process. If there's no IOLoop (or it just isn't started), terminal applications can be interacted with by calling Multiplex.read() (to write any pending output to the terminal emulator) and Multiplex.write() (which writes directly to stdin of the child). NOTE: Multiplex.read() is non-blocking. """ def __init__(self, *args, **kwargs): super(Multiplex, self).__init__(*args, **kwargs) from tornado import ioloop self.terminating = False self.io_loop = ioloop.IOLoop.instance() # Monitors child for activity self.io_loop.set_blocking_signal_threshold(5, self._blocked_io_handler) interval = 100 # 0.1 seconds self.scheduler = ioloop.PeriodicCallback( self._timeout_checker, interval, io_loop=self.io_loop)
[docs] def _call_callback(self, callback): """ If the IOLoop is started, adds the callback via IOLoop.add_callback() to ensure it gets called at the next IOLoop iteration (which is thread safe). If the IOLoop isn't started *callback* will get called immediately and directly. """ try: if self.io_loop.running(): self.io_loop.add_callback(callback) else: callback() finally: del callback
[docs] def _reenable_output(self): """ Restarts capturing output from the underlying terminal program by disengaging the rate limiter and re-instantiating the terminal emulator. """ self.ratelimiter_engaged = False # Empty the output queue. import termios termios.tcflush(self.fd, termios.TCOFLUSH) with self.lock: with io.open(self.fd, 'rb', closefd=False) as reader: updated = reader.read() # Clear it out del updated # Create a new terminal emulator instance to free up any memory that # was consumed by the runaway process buffering up too much stuff. del self.term self.term = self.terminal_emulator(rows=self.rows, cols=self.cols) # TODO: Consider restoring the mode/state of the terminal emulator. for i in self.prev_output.keys(): self.prev_output.update({i: [None for a in xrange(self.rows-1)]})
[docs] def _blocked_io_handler(self, signum=None, frame=None): """ Handles the situation where a terminal is blocking IO with too much output. Normally this gets called automatically by IOLoop's signal threshold mechanism (IOLoop.set_blocking_signal_threshold()). """ logging.warning( "Noisy process kicked off rate limiter. Sending Ctrl-c.") #os.kill(self.pid, signal.SIGINT) # Doesn't work right with dtach # Sending Ctrl-c via write() seems to work better: with io.open(self.fd, 'wb', closefd=False) as writer: writer.write("\x03\n") # Just pray it works! writer.write(_("# Process was auto-killed.\n")) # This doesn't seem to work (would be nice if it did though!): #os.write(self.fd, "\x19") # Ctrl-S to the bad process self.ratelimiter_engaged = True for callback in self.callbacks[self.CALLBACK_UPDATE].values(): self._call_callback(callback) self.io_loop.add_timeout(timedelta(seconds=5), self._reenable_output)
[docs] def spawn(self, rows=24, cols=80, env=None): """ Creates a new virtual terminal (tty) and executes self.cmd within it. Also attaches _ioloop_read_handler() to the IOLoop so that the terminal emulator will automatically stay in sync with the output of the child process. *cols* The number of columns to emulate on the virtual terminal (width) *rows* The number of rows to emulate (height). *env* A dictionary of environment variables to set when executing self.cmd. """ self.started = datetime.now() signal.signal(signal.SIGCHLD, signal.SIG_IGN) # No zombies allowed logging.debug( "spawn(rows=%s, cols=%s, env=%s)" % (rows, cols, repr(env))) import pty pid, fd = pty.fork() if pid == 0: # We're inside the child process # Close all file descriptors other than stdin, stdout, and stderr (0, 1, 2) try: # This ensures that the child doesn't get the parent's FDs os.closerange(3, 256) except OSError: pass if not env: env = {} env["COLUMNS"] = str(cols) env["LINES"] = str(rows) env["TERM"] = "xterm" # TODO: This needs to be configurable on-the-fly env["PATH"] = os.environ['PATH'] # The sleep statement below ensures we capture all output from the # fd before it is closed... It turns out that IOLoop's response to # changes in the fd is so fast that it can result in the fd being # closed the very moment the Python interpreter is reading from it. cmd = ['/bin/sh', '-c', self.cmd + '; sleep .1'] os.dup2(2, 1) # Copy stderr to stdout (equivalent to 2>&1) os.execvpe(cmd[0], cmd, env) os._exit(0) else: # We're inside this Python script self._alive = True self.fd = fd self.pid = pid self.time = time.time() self.term = self.terminal_emulator(rows=rows, cols=cols) # Tell our IOLoop instance to start watching the child self.io_loop.add_handler( fd, self._ioloop_read_handler, self.io_loop.READ) self.prev_output = {} # Set non-blocking so we don't wait forever for a read() import fcntl fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL) fcntl.fcntl(self.fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) # Set the size of the terminal self.resize(rows, cols, ctrl_l=False) return fd
[docs] def isalive(self): """ Checks the underlying process to see if it is alive and sets self._alive appropriately. """ if self._alive: # Re-check it for f in os.listdir('/proc'): pid_dir = os.path.join('/proc', f) if os.path.isdir(pid_dir): try: pid = int(f) except ValueError: continue # Not a PID if pid == self.pid: self._alive = True return True self._alive = False return False
[docs] def resize(self, rows, cols, ctrl_l=True): """ Resizes the child process's terminal window to *rows* and *cols* by first sending it a TIOCSWINSZ event and then sending ctrl-l. The sending of ctrl-l can be disabled by setting *ctrl_l* to False. """ logging.debug("Resizing term %s to rows: %s, cols: %s" % ( self.term_id, rows, cols)) self.rows = rows self.cols = cols self.term.resize(rows, cols) # Sometimes the resize doesn't actually apply (for whatever reason) # so to get around this we have to send a different value than the # actual value we want then send our actual value. It's a bug outside # of Gate One that I have no idea how to isolate but this has proven to # be an effective workaround. import fcntl, termios s = struct.pack("HHHH", rows, cols, 0, 0) fcntl.ioctl(self.fd, termios.TIOCSWINSZ, s) if ctrl_l: self.write(u'\x0c') # ctrl-l # SIGWINCH has been disabled since it can screw things up #os.kill(self.pid, signal.SIGWINCH) # Send the resize signal
[docs] def terminate(self): """ Kill the child process associated with the given file descriptor (fd). NOTE: If dtach is being used this only kills the dtach process. """ if not self.terminating: self.terminating = True else: return # Something else already called it logging.debug("terminate() self.pid: %s" % self.pid) if self._patterns: self.timeout_check(timeout_now=True) if self.scheduler._running: self.scheduler.stop() # NOTE: Without this 'del' we end up with a memory leak every time # a new instance of Multiplex is created. Apparently the references # inside of PeriodicCallback pointing to self prevent proper garbage # collection. del self.scheduler try: self.io_loop.remove_handler(self.fd) os.close(self.fd) except (KeyError, IOError, OSError): # This can happen when the fd is removed by the underlying process # before the next cycle of the IOLoop. Not really a problem. pass try: # TODO: Make this walk the series from SIGINT to SIGKILL #os.kill(self.pid, signal.SIGINT) os.kill(self.pid, signal.SIGTERM) #os.kill(self.pid, signal.SIGKILL) os.waitpid(-1, os.WNOHANG) except OSError: # The process is already dead--great. pass # Unset our blocked IO handler so there's no references to self hanging # around preventing us from freeing up memory try: self.io_loop.set_blocking_signal_threshold(5, None) except ValueError: pass # Can happen if this instance winds up in a thread # Kick off a process that finalizes the log (updates metadata and # recompresses everything to save disk space) pid = os.fork() # Multiprocessing doesn't get much simpler than this! if pid == 0: # We're inside the child process os.setsid() # This prevents defunct processes (zombies) pid = os.fork() if pid == 0: # We're inside the sub-child process # Have to wait just a moment for the main thread to finish writing: time.sleep(5) try: get_or_update_metadata(self.log_path, self.user) except Exception: pass # Whatever, the metadata will get fixed when enumerated os._exit(0) else: os._exit(0)
[docs] def _ioloop_read_handler(self, fd, event): """ Read in the output of the process associated with *fd* and write it to self.term. This method will also keep an eye on the output rate of the underlying terminal application. If it goes to high (which would gobble up CPU) it will engage a rate limiter. So if someone thinks it would be funny to run 'top' with a refresh rate of 0.01 they'll really only be getting updates every ~2 seconds (and it won't bog down the server =). NOTE: This method is not meant to be called directly... The IOLoop should be the one calling it when it detects an io_loop.READ event. """ if event == self.io_loop.READ: self._call_callback(self.read) else: # Child died logging.debug(_( "Apparently fd %s just died (event: %s)" % (self.fd, event))) self.terminate() for callback in self.callbacks[self.CALLBACK_EXIT].values(): self._call_callback(callback)
[docs] def _read(self, bytes=-1): """ Reads at most *bytes* from the incoming stream, writes the result to the terminal emulator using self.term_write(), and returns what was read. If *bytes* is -1 (default) it will read self.fd until there's no more output. Returns the result of all the reads. NOTE: Non-blocking. """ result = "" try: with self.lock: with io.open(self.fd, 'rb', closefd=False) as reader: if bytes == -1: while True: updated = reader.read(bytes) if not updated: break if self.ratelimiter_engaged: # Don't write if the rate limiter is enaged break result += updated self.term_write(updated) elif bytes: result = reader.read(bytes) self.term_write(result) except IOError as e: # IOErrors can happen when self.fd is closed before we finish # reading from it. Not a big deal. pass except OSError as e: logging.error("Got exception in read: %s" % `e`) except Exception as e: import traceback logging.error( "Got unhandled exception in read (???): %s" % `e`) traceback.print_exc(file=sys.stdout) if self.isalive(): self.terminate() finally: return result
[docs] def _timeout_checker(self): """ Runs :py:meth:`timeout_check` and if there are no more non-sticky patterns in :py:attr:`_patterns`, stops :py:attr:`scheduler`. """ remaining_patterns = self.timeout_check() if not remaining_patterns: # No reason to keep the PeriodicCallback going logging.debug("Stopping self.scheduler") try: self.scheduler.stop() except AttributeError: # Now this is a neat trick: The way IOLoop works with its # stack_context thingamabob the scheduler doesn't actualy end up # inside the MultiplexPOSIXIOLoop instance inside of this # instance of _timeout_checker() *except* inside the main # thread. It is absolutely wacky but it works and works well :) pass
[docs] def read(self, bytes=-1): """ .. note:: This is an override of BaseMultiplex.read() in order to take advantage of the IOLoop for ensuring expect() patterns timeout properly. Calls :py:meth:`_read` and checks if any timeouts have been reached in :py:attr:`_patterns`. Returns the result of :py:meth:`_read`. This is an override of BaseMultiplex.read() that will create a `PeriodicCallback` that executes :py:attr:`timeout_check` at a regular interval. The `PeriodicCallback` will automatically cancel itself if there are no more non-sticky patterns in :py:attr:`_patterns`. """ result = self._read(bytes) remaining_patterns = self.timeout_check() if remaining_patterns and not self.scheduler._running: # Start 'er up in case we don't get any more output logging.debug("Starting self.scheduler to check for timeouts") self.scheduler.start() return result
[docs] def _write(self, chars): """ Writes *chars* to self.fd (pretty straightforward). If IOError or OSError exceptions are encountered, will run self.die() and self.terminate(). All other exceptions are logged but no action will be taken. """ try: with io.open( self.fd, 'wt', newline="", encoding='UTF-8', closefd=False ) as writer: writer.write(chars) except (IOError, OSError): if self.isalive(): self.terminate() except Exception as e: logging.error("write() exception: %s" % e)
[docs] def write(self, chars): """ Calls self._write(*chars*) via self._call_callback() to ensure thread safety. """ write = partial(self._write, chars) self._call_callback(write)
[docs]def spawn(cmd, rows=24, cols=80, env=None, *args, **kwargs): """ A shortcut to:: >>> m = Multiplex(cmd, *args, **kwargs) >>> m.spawn(rows, cols, env) >>> return m """ m = Multiplex(cmd, *args, **kwargs) m.spawn(rows, cols, env) return m
if POSIX: Multiplex = MultiplexPOSIXIOLoop else: raise NotImplementedError(_( "termio currently only works on Unix platforms."))