Archive for the ‘python’ tag
Simulating synchronous programming with Python generators
Robey’s recent article on naggati reminded me of something I’d been idly pondering for a while. Having recently written an SSH-based host discovery scanner on top of the Twisted asynchronous programming library, I too yearned for a way to write sequences of commands in plain-old imperative code, hiding the callback complexities of event-driven code from users.
Continuations fit the bill nicely. These are functions from which you can return multiple times, resuming right where you left off. With continuations, you could write a sequence of functions that might make asynchronous calls, but the framework would call your continuation back where it left off.
Python does not have first-class continuations, but it does have generators, and these behave almost identically (for my purposes, at least). A generator is a function that can yield multiple values. Well, actually, it returns an iterator, which then can be used to fetch multiple values from the generator. An example will probably make it clear:
>>> def finite_generator(): ... yield 'apple' ... yield 'orange' ... yield 'pear' ... >>> iterator = finite_generator() >>> for fruit in iterator: ... print fruit ... apple orange pear
Generators can also run forever:
>>> def infinite_generator(): ... i = 0 ... while True: ... yield i ... i += 1 ... >>> iterator = infinite_generator() >>> for i in iterator: ... print i ... 0 1 2 3 4 5 ... and on and on forever
I had been using iterators in my asynchronous host scanner whenever I needed to run asynchronous commands within a loop. The asynchronous programming model prevents you from writing something like:
for foo in bar:
async_method(foo)
Instead, you would do something like this:
def callback(response, iterator):
do_something_with_response(response)
schedule_next_task(iterator)
def schedule_next_task(iterator):
try:
foo = iterator.next()
deferred = async_method(foo)
deferred.addCallback(callback, iterator)
except StopIteration:
pass
iterator = iter(bar)
schedule_next_task(iterator)
It works like this:
- We get an iterator for our list, bar — this could just as well be a generator function
- We fetch the first value from the iterator and pass it to the asynchronous method
- That method presumably makes some type of I/O request, and responds immediately with a Deferred instance
- We add a callback function to the Deferred and request that our iterator instance be passed to it when it is called
- Control returns to the event loop, which might be busy scheduling other I/O requests
- When the I/O completes, the event loop calls our callback function with the response and our iterator instance
- The callback processes the response, and then repeats to step 2, fetching the next item from the iterator
- When the iterator is exhausted, the cycle stops
It occurred to me that I might be able to extend this concept to use generators as a sort of continuation to emulate synchronous code. What if, instead of returning strings or numbers from a generator, you returned functions? Some wrapper code could initialize the iterator, and then loop over it using the technique above, calling each function returned from the generator.
Tonight I decided to give this a try. Forking off an experimental branch and making a few modifications to the underlying fido host discovery routines, I crafted the following pleisiochronous host scanner:
#!/usr/bin/env python
#
# Use a generator to simulate synchronous execution on an asynchronous framework
#
from fido.common.command import RemoteCommandExecutor
from fido.common.host.unix import UnixHost
from fido.common.ssh import SSHCredentials
from contrib.host.software.sun.host import SolarisHost
from contrib.host.software.linux.host import LinuxHost
from twisted.internet import reactor
import pprint
class PlesiochronousHostScanner(object):
"""
Scans a host over SSH, building a list of host attributes. Built on the Twisted asynchronous
library, but uses a Python generator function to emulate garden variety synchronous code.
"""
def __init__(self, address, credentials):
"""
address: the IP address to scan
credentials: a hash like: { 'username': '...' , 'password': '...', 'public_key': '<optional>' }
"""
self.address = address
self.credentials = credentials
self.host = UnixHost(RemoteCommandExecutor(address, credentials))
self.pp = pprint.PrettyPrinter()
# create some scratch space for the discovery methods
self.context = { }
# get an iterator from the generator
self.iterator = self.scanning_sequence()
def scanning_sequence(self):
"""
A typical nugget of synchronous code, with one important exception: asynchronous
functions must be yielded instead of being called directly.
"""
yield self.host.uname
os = self.context['uname'].split()[0]
if os == 'SunOS':
self.host = SolarisHost.from_host(self.host)
yield self.host.zonename
yield self.host.zones
elif os == 'Linux':
self.host = LinuxHost.from_host(self.host)
else:
print "Unable to scan host type: %s" % os
return
yield self.host.hostid
yield self.host.device
yield self.host.bios
yield self.host.installed_memory_in_MB
yield self.host.interfaces
def callback(self, response):
self.context.update(response)
self.schedule_next_task()
def errback(self, error):
print "scanning error: %s" % error
def schedule_next_task(self):
try:
function = self.iterator.next()
deferred = function()
deferred.addCallbacks(self.callback, self.errback)
except StopIteration:
self.scan_complete()
def start_scan(self):
self.schedule_next_task()
def scan_complete(self):
print "Scan of %s is complete" % self.address
self.pp.pprint(self.context)
# In this contrived example, we'll stop the reactor when we've finished scanning a host
reactor.stop()
if __name__ == '__main__':
import sys
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-u", "--username", dest="username")
parser.add_option("-p", "--password", dest="password")
(options, args) = parser.parse_args()
address = args.pop(0)
credentials = iter([SSHCredentials(options.username, options.password, None)])
scanner = PlesiochronousHostScanner(address, credentials)
reactor.callWhenRunning(scanner.start_scan)
reactor.run()
It works:
satellite:~ clay$ python pleisio.py -u username -p password 10.20.30.40
Scan of 10.20.30.40 is complete
{'bios': {'bios_date': '11/15/2007',
'bios_vendor': 'Sun Microsystems',
'bios_version': 'S39_3B25'},
'device': {'system_product': 'Sun Fire X2200 M2',
'system_serial': '0805QAT0EA',
'system_uuid': 'bd6529dc-fc79-0010-9e1b-001b245c1d4f',
'system_vendor': 'Sun Microsystems',
'system_version': 'Rev 50'},
'hostid': '0ec2daa6',
'installed_memory_in_MB': 32768,
'interfaces': {'bge0': {'ipv4_addresses': [10.20.30.40],
'ipv6_addresses': [],
'mac_address': 00:1B:24:5C:18:B5,
'zone': None},
'lo0': {'ipv4_addresses': [],
'ipv6_addresses': [],
'mac_address': None,
'zone': None}},
'uname': 'SunOS myhost.mydomain.com 5.10 Generic_127112-11 i86pc i386 i86pc',
'zonename': 'global',
'zones': {'myzone': {'brand': 'native',
'ip_mode': 'shared',
'root': '/zones/myzone',
'state': 'running',
'uuid': '09fbf9ba-c0c5-408f-c9e9-820471983f25',
'zonename': 'myzone'}}}
The beauty of this approach is that the PlesiochronousHostScanner#scanning_sequence method is pretty straightforward, and could actually be written by end users familiar with Python but not familiar with asynchronous programming. It also makes discovery logic much easier to understand than in the state-machine-based asynchronous discovery engine I had previously built.
Having just concocted this tonight, I’m not sure whether this is something I’ll pursue, but it has been a fun experiment. I’m curious what other asynchronous programmers think of this approach.


