"""expect module
"""
import re
import logging
import sys
import time
LOGGER = logging.getLogger(__name__)
class ExpectTimeoutError(Exception):
"""Exception raised when expect times out.
"""
pass
[docs]class Handler(object):
"""Class wrapping an io object.
"""
[docs] def __init__(self,
iostream,
eol='\n',
break_conditions=None,
print_input=True,
print_output=False,
split_pattern='\n'):
"""Initialize object with given parameters.
:param iostream: Io stream to read data from and write data
data to. The class of this object must
implement two functions, read(count) and
write(string). read() must return a string.
:param eol: 'end of line' string to send after the
'send string'.
:param break_conditions: expect() throws an exception if the returned
value from `iostream`.read() is in this
iterable.
:param print_input: Print input on stdout.
:param print_output: Print output on stdout.
:param split_pattern: Split read data using this regexp before sreaching
for a match.
"""
self.iostream = iostream
self.input_buffer = ''
self.eol = eol
if break_conditions is None:
break_conditions = ['', None]
self.break_conditions = break_conditions
self.print_input = print_input
self.print_output = print_output
if split_pattern:
self.re_split = re.compile(split_pattern)
else:
self.re_split = None
[docs] def expect(self, pattern, timeout=None, print_input=True):
"""Returns when regular expression `pattern` matches the data
read from the output stream.
:param pattern: Regular expression to match.
:returns: The matched string.
"""
start_time = time.time()
re_expect = re.compile(r'(' + pattern + r')')
while True:
mo = re_expect.search(self.input_buffer)
if mo:
LOGGER.debug("Found expected pattern '%s'.", pattern)
self.input_buffer = self.input_buffer[mo.end():]
return mo.group(1)
else:
char = self.iostream.read(1)
if char in self.break_conditions:
fmt = "break condition met: '{}' in '{}'."
raise RuntimeError(fmt.format(char, self.break_conditions))
if self.print_input and print_input == True:
sys.stdout.write(char)
self.input_buffer += char
if self.re_split:
self.input_buffer = self.re_split.split(self.input_buffer)[-1]
# timeout handling
if timeout:
if (time.time() - start_time) > timeout:
raise ExpectTimeoutError("Timed out waiting for '%s'", pattern)
[docs] def send(self, string, send_eol=True):
"""Writes a string to the iostream.
:param string: String to send.
:param send_eol: Send 'end of line' after ``string``.
:returns: Return value of iostream.write().
"""
if send_eol:
string += self.eol
if self.print_output:
sys.stdout.write(string)
return self.iostream.write(string)