~clay

merely my musings

Archive for the ‘twisted’ tag

Simulating synchronous programming with Python generators

with 3 comments

Robey’s recent article on naggati reminded me of something I’d been idly pondering for a while. Having recently written an SSH-based host discovery scanner on top of the Twisted asynchronous programming library, I too yearned for a way to write sequences of commands in plain-old imperative code, hiding the callback complexities of event-driven code from users.

Continuations fit the bill nicely. These are functions from which you can return multiple times, resuming right where you left off. With continuations, you could write a sequence of functions that might make asynchronous calls, but the framework would call your continuation back where it left off.

Python does not have first-class continuations, but it does have generators, and these behave almost identically (for my purposes, at least). A generator is a function that can yield multiple values. Well, actually, it returns an iterator, which then can be used to fetch multiple values from the generator. An example will probably make it clear:

>>> def finite_generator():
...     yield 'apple'
...     yield 'orange'
...     yield 'pear'
...
>>> iterator = finite_generator()
>>> for fruit in iterator:
...     print fruit
...
apple
orange
pear

Generators can also run forever:

>>> def infinite_generator():
...     i = 0
...     while True:
...         yield i
...         i += 1
...
>>> iterator = infinite_generator()
>>> for i in iterator:
...     print i
...
0
1
2
3
4
5
... and on and on forever

I had been using iterators in my asynchronous host scanner whenever I needed to run asynchronous commands within a loop. The asynchronous programming model prevents you from writing something like:

for foo in bar:
    async_method(foo)

Instead, you would do something like this:

def callback(response, iterator):
    do_something_with_response(response)
    schedule_next_task(iterator)

def schedule_next_task(iterator):
    try:
        foo = iterator.next()
        deferred = async_method(foo)
        deferred.addCallback(callback, iterator)
    except StopIteration:
        pass

iterator = iter(bar)
schedule_next_task(iterator)

It works like this:

  1. We get an iterator for our list, bar — this could just as well be a generator function
  2. We fetch the first value from the iterator and pass it to the asynchronous method
  3. That method presumably makes some type of I/O request, and responds immediately with a Deferred instance
  4. We add a callback function to the Deferred and request that our iterator instance be passed to it when it is called
  5. Control returns to the event loop, which might be busy scheduling other I/O requests
  6. When the I/O completes, the event loop calls our callback function with the response and our iterator instance
  7. The callback processes the response, and then repeats to step 2, fetching the next item from the iterator
  8. When the iterator is exhausted, the cycle stops

It occurred to me that I might be able to extend this concept to use generators as a sort of continuation to emulate synchronous code. What if, instead of returning strings or numbers from a generator, you returned functions? Some wrapper code could initialize the iterator, and then loop over it using the technique above, calling each function returned from the generator.

Tonight I decided to give this a try. Forking off an experimental branch and making a few modifications to the underlying fido host discovery routines, I crafted the following pleisiochronous host scanner:

#!/usr/bin/env python
#
# Use a generator to simulate synchronous execution on an asynchronous framework
#

from fido.common.command import RemoteCommandExecutor
from fido.common.host.unix import UnixHost
from fido.common.ssh import SSHCredentials

from contrib.host.software.sun.host import SolarisHost
from contrib.host.software.linux.host import LinuxHost

from twisted.internet import reactor

import pprint

class PlesiochronousHostScanner(object):
    """
    Scans a host over SSH, building a list of host attributes. Built on the Twisted asynchronous
    library, but uses a Python generator function to emulate garden variety synchronous code.
    """

    def __init__(self, address, credentials):
        """
        address: the IP address to scan
        credentials: a hash like: { 'username': '...' , 'password': '...', 'public_key': '<optional>' }
        """

        self.address = address
        self.credentials = credentials
        self.host = UnixHost(RemoteCommandExecutor(address, credentials))
        self.pp = pprint.PrettyPrinter()

        # create some scratch space for the discovery methods
        self.context = { }

        # get an iterator from the generator
        self.iterator = self.scanning_sequence()

    def scanning_sequence(self):
        """
        A typical nugget of synchronous code, with one important exception: asynchronous
        functions must be yielded instead of being called directly.
        """
        yield self.host.uname

        os = self.context['uname'].split()[0]

        if os == 'SunOS':
            self.host = SolarisHost.from_host(self.host)
            yield self.host.zonename
            yield self.host.zones
        elif os == 'Linux':
            self.host = LinuxHost.from_host(self.host)
        else:
            print "Unable to scan host type: %s" % os
            return

        yield self.host.hostid
        yield self.host.device
        yield self.host.bios
        yield self.host.installed_memory_in_MB
        yield self.host.interfaces

    def callback(self, response):
        self.context.update(response)
        self.schedule_next_task()

    def errback(self, error):
        print "scanning error: %s" % error

    def schedule_next_task(self):
        try:
            function = self.iterator.next()
            deferred = function()
            deferred.addCallbacks(self.callback, self.errback)
        except StopIteration:
            self.scan_complete()

    def start_scan(self):
        self.schedule_next_task()

    def scan_complete(self):
        print "Scan of %s is complete" % self.address
        self.pp.pprint(self.context)

        # In this contrived example, we'll stop the reactor when we've finished scanning a host
        reactor.stop()

if __name__ == '__main__':
    import sys
    from optparse import OptionParser
    parser = OptionParser()
    parser.add_option("-u", "--username", dest="username")
    parser.add_option("-p", "--password", dest="password")

    (options, args) = parser.parse_args()

    address = args.pop(0)
    credentials = iter([SSHCredentials(options.username, options.password, None)])

    scanner = PlesiochronousHostScanner(address, credentials)

    reactor.callWhenRunning(scanner.start_scan)

    reactor.run()

It works:

satellite:~ clay$ python pleisio.py -u username -p password 10.20.30.40
Scan of 10.20.30.40 is complete
{'bios': {'bios_date': '11/15/2007',
          'bios_vendor': 'Sun Microsystems',
          'bios_version': 'S39_3B25'},
 'device': {'system_product': 'Sun Fire X2200 M2',
            'system_serial': '0805QAT0EA',
            'system_uuid': 'bd6529dc-fc79-0010-9e1b-001b245c1d4f',
            'system_vendor': 'Sun Microsystems',
            'system_version': 'Rev 50'},
 'hostid': '0ec2daa6',
 'installed_memory_in_MB': 32768,
 'interfaces': {'bge0': {'ipv4_addresses': [10.20.30.40],
                         'ipv6_addresses': [],
                         'mac_address': 00:1B:24:5C:18:B5,
                         'zone': None},
                'lo0': {'ipv4_addresses': [],
                        'ipv6_addresses': [],
                        'mac_address': None,
                        'zone': None}},
 'uname': 'SunOS myhost.mydomain.com 5.10 Generic_127112-11 i86pc i386 i86pc',
 'zonename': 'global',
 'zones': {'myzone': {'brand': 'native',
                          'ip_mode': 'shared',
                          'root': '/zones/myzone',
                          'state': 'running',
                          'uuid': '09fbf9ba-c0c5-408f-c9e9-820471983f25',
                          'zonename': 'myzone'}}}

The beauty of this approach is that the PlesiochronousHostScanner#scanning_sequence method is pretty straightforward, and could actually be written by end users familiar with Python but not familiar with asynchronous programming. It also makes discovery logic much easier to understand than in the state-machine-based asynchronous discovery engine I had previously built.

Having just concocted this tonight, I’m not sure whether this is something I’ll pursue, but it has been a fun experiment. I’m curious what other asynchronous programmers think of this approach.

Written by clay

May 15th, 2009 at 11:41 pm