Alexandre Bourget

geek joy

New and hot, part 4: Pyramid, Socket.IO and Gevent

March 17, 2011 at 12:10 PM

This is part 4 of my March 9th 2011 Confoo presentation. Refer to the Table of Contents for the other parts.

Gevent

In this section, we will change HTTP server from paster's to Gevent's. We will also implement Socket.IO in our Pyramid application and have the client communicate in full duplex with the server. We will then graph the CPU usage (on a Linux machine) directly to the web viewer.

Gevent is a micro-threading library (à la Stackless Python) and it's HTTP server supports a large number of concurrent connections, which yield an event when new data is available. It is based on libevent and Python's greenlets.

gevent Show the install instructions for gevent and pyramid_socketio.

Requirements for Gevent:

(env)$ sudo apt-get install libevent-dev
(env)$ pip install gevent gevent-websocket gevent-socketio

This will also installs greenlet, on which it is based.

Pyramid's Socket.IO integration layer

We'll install the pyramid_socketio package, that will tie in all the Socket.IO support, and allow us to write beautiful stateful classes for each client.

(env)$ pip install pyramid_socketio

This package will bring in it's dependencies: gevent-websocket, gevent-socketio, gevent and greenlet.

gevent-websocket gives us WebSocket support and implements the WebSocket protocol on top of Gevent. It is used by gevent-socketio when dealing with WebSocket, otherwise, Socket.IO's fallbacks are handled directly.

Switching from Paster to Gevent's server

Thankfully, pyramid_socketio provides with a simple script to replace our call to paster serve --reload file.ini or paster serve file.ini. It's socketio-serve-reload file.ini and socketio-serve file.ini, respectively.

(env)$ paster serve --reload development.ini
...
^C^C caught in monitor process
(Killed)
(env)$ socketio-serve-reload development.ini
...

The Socket.IO server handles the initialization of the logging library, the setup of a watcher if you ask for code reloading, will read host/port from your .ini file, just like paster did, it will attempt to listen on port 843 (must be run as root though) to setup the Flash Socket Policy server, if you want to use the Flash WebSocket fallback. Otherwise, it's a drop-in replacement for paster serve.

When using the socketio-serve server, Gevent will automatically be initialized and will monkey patch several modules (like socket and threading) to make sure it has versions that will yield control to Gevent instead of blocking on I/O. From that point on, any process that would start off a new thread will, without knowing it, launch a new Greenlet, using the same APIs, transparently.

Also, if you want subprocess support in your application (which we will need), get this version, by a guy who ported the stdlib subprocess to handle gevent's event loop, and never block on I/O.

subprocess Copy the vendor/subprocess.py file from the web location above, in the ~/Foo/foo directory.. for use within views.py

Quick look at the code

Let's have a quick look of how Gevent works, from a stripped-down version of the socketio-serve script:

# imports...
# get host/port
# init logging
# grab --watch argv parameter, assign do_reload

def socketio_serve():
    cfgfile = "file.ini"

    def main():
        app = paste.deploy.loadapp('config:%s' % cfgfile, relative_to='.')
        server = socketio.SocketIOServer((host, port), app,
                                         resource="socket.io")

        print "Serving on %s:%d (http://127.0.0.1:%d) ..." % (host, port, port)
        server.serve_forever()

   def reloader():
        from paste import reloader
        reloader.install()
        reloader.watch_file(cfgfile)
        for lang in glob.glob('*/locale/*/LC_MESSAGES/*.mo'):
            reloader.watch_file(lang)

    jobs = [gevent.spawn(main)]
    if do_reload:
        jobs.append(gevent.spawn(reloader))
    gevent.joinall(jobs)

This shows how Gevent handles concurrent jobs. You spawn greenlets with gevent.spawn() and wait for them to terminate with gevent.joinall(). The reloader() borrows the reloader code from paste (the same one used when running paster serve --reload) and will exit completely the program with error code number 3. The socketio-serve-reload wraps around this program and catches those errors, and restarts the server when something is modified.

Our Socket.IO aware application, client-side.

Now, let's write some basic WebSocket code, directly in our index.html:

socketio YASnippet to copy the boilerplate
socketio Event to copy socket.io.js to ~/Foo/foo/static/js/socket.io.js, fitting with the boilerplate.
  <script src="http://cdn.socket.io/stable/socket.io.js"></script>

  <script>
    var socket = null;
    $(document).ready(function() {
      socket = new io.Socket(null, {});

      socket.on('connect', function() {
        console.log("Connected");
        socket.send({type: "connect", userid: 123});
      });
      socket.on('message', function(obj) {
        console.log("Message", JSON.stringify(obj));
        if (obj.type == "some") {
          console.log("do some");
        }
      });
      socket.on('error', function(obj) {
        console.log("Error", JSON.stringify(obj));
      });
      socket.on('disconnect', function() {
        console.log("Disconnected");
      });

      console.log("Connecting...");
      socket.connect();
    });
  </script>

When the socket gets connected, we immediately send a message, with the type connect. This will be mapped on the server side (by pyramid_socketio) to the msg_connect method in the SocketIOContext provided to socketio_manage.

The null value and empty object passed to new io.Socket() means we're going to connect to the same host and port as the current request, and the URL will be /socket.io/... with some extra path information like the transport being used, and the session ID (used by Socket.IO to maintain a channel open).

The server-side socket.io handler

Configuring socket.io in our Pyramid app goes like this:

def main(...):
    ...
    #config.add_static_view('socket.io/lib', 'foo:static')
    config.add_route('socket_io', 'socket.io/*remaining')
    ...

If you want Flash fallback support as an alternative WebSockets implementation, uncomment the add_static_view call, to serve the WebSocketMain.swf file. Setting this up is slightly more complicated, requires Flash installed on the client side, a Flash Policy Server on the server side, and an added javascript that you can get at: https://github.com/gimite/web-socket-js. Check out this repository in foo/static:

(env)$ ## Optional, for Flash websockets fallback support
(env)$ cd foo/static
(env)$ git clone https://github.com/gimite/web-socket-js.git
(env)$ cd ../..

The WebSocketMain.swf file must be served from the same domain, otherwise, you'll have to use the insecure one, and change the location in your HTML output to something like:

<script>WEB_SOCKET_SWF_LOCATION = '/path/to/WebSocketMainInsecure.swf';</script>

If you want the Flash support, don't forget to add the script tag in your HTML file. See the documentation of web-socket-js for more info about these things.

You can turn off Flash fallback altogether by passing some parameters to your call to io.Socket(null, {transports: ['websocket', 'flashsocket', 'htmlfile', 'xhr-multipart', 'xhr-polling', 'jsonp-polling']});.

Back to Pyramid. This is the basic setup to handle messages using the pyramid_socketio helpers:

manage Copy and paste the Socket.IO server-side boilerplate.
### In views.py:

from pyramid.response import Response
from pyramid_socketio.io import SocketIOContext, socketio_manage
import gevent

class ConnectIOContext(SocketIOContext):
    # self.io is the Socket.IO socket
    # self.request is the request
    def msg_connect(self, msg):
        print "Connect message received", msg
        self.msg("connected", hello="world")

# Socket.IO implementation
@view_config(route_name="socket_io")
def socketio_service(request):
    print "Socket.IO request running"
    retval = socketio_manage(ConnectIOContext(request))
    return Response(retval)

The first section is a SocketIOContext, which is provided by the pyramid_socketio package. It is a simple objects that maps incoming messages from the socket to class methods. It also provides convenience methods like spawn(), msg() and error() to spawn a new greenlet, send a new packet (or message) or send an error message (in a pre-defined format). The Socket.IO object itself, representing the socket, will be available via self.io (read gevent-socketio's documentation for more information on that object) and the original request for the socket will be held in self.request. If you send a message like {type: "connect", userid: 123} from the web application, it will run the msg_connect() method with a dict representing your Javascript object as a second parameter.

The second section is the pyramid handler. Once the gevent-socketio has done his job (of dealing with the abstraction of the transports), it will launch the request against the normal WSGI application, and will arrive, just like with a normal GET request, to one of ours views. This is where we pass on the control to the pyramid_socketio manager. The manager will listen for incoming packets, and dispatch to the SocketIOContext we've provided.

Using Flot.js to load dynamic stats from the server

What we want to do here is to graph some values coming from the server, being pushed to the client. This paradigm will irrevocably change the way we consume and construct web application in the near future.

flot Open the Flot website with an example.

Flot.js is a nice graphs library that works completely on the client side. See it's website for more examples and details.

Start by getting jquery.flot.js into your project:

(env)$ cd ~/Foo/foo/static
(env)$ mkdir js
(env)$ cd js
(env)$ wget http://people.iola.dk/olau/flot/jquery.flot.js

then let's add in our HTML template, after the code to load jQuery itself:

  <script src="${request.static_url('foo:static/js/jquery.flot.js')}"></script>

somewhere. Add a placeholder for the graph somewhere in your page:

  <div id="graph"></div>

Then, that could be the handler to display some basic data (put that under the Socket.IO stuff):

data YASnippet to add the values for d1, and d2
  <script>
    var d1 = [[1, 2], [2, 4], [3, 0], [4, 5]];
    var d2 = [[1, 4], [2, 6], [3, 7], [4, 2]];
    $.plot($('#graph'), [d1, d2], {});
  </script>

If we modify [d1, d2] to [{label: "Hello", data: d1}, d2], we'll have a label associated with it gratis.

Now we want to have some data fed from the server to the client, in real-time. Let's add a handler for messages labeled showdata on the client side. That'll be in our socket.on('message', ...) handler:

      socket.on('message', function(obj) {
        ...
        if (obj.type == "showdata") {
          d1.push([d1.length, obj.point]);
          $.plot($('#graph'), [{label: "Bob", data: d1}]);
        }
      });

To have those values sent on the server side, we'll modify slightly our server-side code:

sendcpu YASnippet to add the 'sendcpu' stub.
class ConnectIOContext(SocketIOContext):
    ...
    def msg_connect(self, msg):
        ...
        def sendcpu():
            """Calculate CPU utilization"""
            prev = None
            while self.io.connected():
                vals = map(int, [x for x in open('/proc/stat').readlines()
                                 if x.startswith('cpu ')][0].split()[1:5])
                if prev:
                    percent = (100.0 * (sum(vals[:3]) - sum(prev[:3])) / 
                               (sum(vals) - sum(prev)))
                    self.msg("showdata", point=percent)
                prev = vals
                gevent.sleep(0.5)
        self.spawn(sendcpu)
    ...

Except all the CPU usage calculation, there are two relevant parts here: the call to self.spawn() and the call to self.msg().

The spawn() method allows us to spawn a new greenlet and attach it to the SocketIOContext, so that when we kill the Socket.IO session, we kill also all the greenlets that are related to it. It helps prevent memory leaks. It's a thin wrapper around gevent's spawn method that keeps a reference in the SocketIOContext.

The call to self.msg() is a method provided by the pyramid_socketio package. It takes as a first argument the type of the message, and everything specified as keyword arguments afterwards are used to create the JSON object that will be transmitted. You can pass on lists, dicts, etc..

What if...

Remember in the last post, we were dealing with some FFmpeg video encoding, and we had it displayed in a video tag ? What if we could receive a message when the video is done transcoding ? Wouldn't it be cool if we could have such a simple implementation:

      socket.on('message', function(obj) {
        console.log("Message", obj);
        if (obj.type == "video") {
          go(obj.url);
        }
      });

with slight tweaks to the go() function to handle an argument, which would be the URL to ask for the video:

        function go(url) {
          $('#video').html('<div>Video ready: ' + url + '</div><video controls preload src="' + url + '" />');
          ...
        }

We'll do just that in the next episode.

If you would like us to implement anything you've seen here in a real-life project, or to kick start some of your projects, don't hesitate to send out an e-mail at contact@savoirfairelinux.com mentioning this blog. We'll be glad to help.

blog comments powered by Disqus