Clustering Web Sockets with Socket.IO and Express 3

Node.js gets a lot of well-deserved press for its impressive performance. The event loop can handle pretty impressive loads with a single process. However, most servers have multiple processors, and I, for one, would like to take advantage of them. Node’s cluster api can help.

While cluster is a core api in node.js, I’d like to incorporate it with Express 3 and Socket.io.

Final source code available on github

The node cluster docs gives us the following example.

cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
  i = 0
  while i < numCPUs     cluster.fork()     i++   cluster.on "exit", (worker, code, signal) ->
    console.log "worker " + worker.process.pid + " died"
else
  http.createServer((req, res) ->
    res.writeHead 200
    res.end "hello world\n"
  ).listen 8000

The code compiles and runs, but I have not confirmation that things are actually working. I’d like to add a little logging to confirm that we actually have multiple workers going. Lets add these lines right before the ‘exit’ listener.

  cluster.on 'fork', (worker) ->
    console.log 'forked worker ' + worker.id

On my machine, we get this output:
coffee server
forked worker 1
forked worker 2
forked worker 3
forked worker 4
forked worker 5
forked worker 6
forked worker 7
forked worker 8

So far, so good. Lets add express to the mix.

cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
  i = 0
  while i < numCPUs     cluster.fork()     i++   cluster.on 'fork', (worker) ->
    console.log 'forked worker ' + worker.process.pid
  cluster.on "listening", (worker, address) ->
    console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
  cluster.on "exit", (worker, code, signal) ->
    console.log "worker " + worker.process.pid + " died"
else
  app = require("express")()
  server = require("http").createServer(app)
  server.listen 8000
  app.get "/", (req, res) ->
    console.log 'request handled by worker with pid ' + process.pid
    res.writeHead 200
    res.end "hello world\n"

At this point, I’d like to throw a few http requests against the setup to ensure that I’m really utilizing all my processors.
Running (curl -XGET “http://localhost:8000&#8221;) 6 times makes the node process go:

request handled by worker with pid 85229
request handled by worker with pid 85231
request handled by worker with pid 85231
request handled by worker with pid 85231
request handled by worker with pid 85227
request handled by worker with pid 85229

Alright, last step is getting socket.io involved. Just a couple extra lines for the socket, however, we’ll need to add a basic index.html file to actually make the socket calls.

cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
if cluster.isMaster
  i = 0
  while i < numCPUs     cluster.fork()     i++   cluster.on 'fork', (worker) ->
    console.log 'forked worker ' + worker.process.pid
  cluster.on "listening", (worker, address) ->
    console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
  cluster.on "exit", (worker, code, signal) ->
    console.log "worker " + worker.process.pid + " died"
else
  app = require("express")()
  server = require("http").createServer(app)
  io = require("socket.io").listen(server)
  server.listen 8000
  app.get "/", (req, res) ->
    res.sendfile(__dirname + '/index.html');
  io.sockets.on "connection", (socket) ->
    console.log 'socket call handled by worker with pid ' + process.pid
    socket.emit "news",
      hello: "world"

 

<script class="hiddenSpellError" type="text/javascript">// <![CDATA[
src</span>="/socket.io/socket.io.js">
// ]]></script><script type="text/javascript">// <![CDATA[

// ]]></script>
 var socket = io.connect('http://localhost');
 socket.on('news', function (data) {
 console.log(data);
 socket.emit('my other event', { my: 'data' });
 });
// ]]></script>

When I run this code, problems start to appear. Specifically, the following message shows up in my output

warn – client not handshaken client should reconnect

Not surprisingly, we have issues with sockets appearing disconnected. Socket.io defaults to storing its open sockets in an in-memory store. As a result, sockets in other processes have no access to the information. We can easily fix the problem by using the redis store for socket.io. The docs we need are here.

With the redis store in place, it looks like this:

cluster = require("cluster")
http = require("http")
numCPUs = require("os").cpus().length
RedisStore = require("socket.io/lib/stores/redis")
redis = require("socket.io/node_modules/redis")
pub = redis.createClient()
sub = redis.createClient()
client = redis.createClient()
if cluster.isMaster
  i = 0
  while i < numCPUs     cluster.fork()     i++   cluster.on 'fork', (worker) ->
    console.log 'forked worker ' + worker.process.pid
  cluster.on "listening", (worker, address) ->
    console.log "worker " + worker.process.pid + " is now connected to " + address.address + ":" + address.port
  cluster.on "exit", (worker, code, signal) ->
    console.log "worker " + worker.process.pid + " died"
else
  app = require("express")()
  server = require("http").createServer(app)
  io = require("socket.io").listen(server)
  io.set "store", new RedisStore(
    redisPub: pub
    redisSub: sub
    redisClient: client
  )
  server.listen 8000
  app.get "/", (req, res) ->
    res.sendfile(__dirname + '/index.html');
  io.sockets.on "connection", (socket) ->
    console.log 'socket call handled by worker with pid ' + process.pid
    socket.emit "news",
      hello: "world"
Advertisements
Tagged , , , ,

5 thoughts on “Clustering Web Sockets with Socket.IO and Express 3

  1. Steven says:

    Great article. One question on clustering of socket.io.
    Previously in most examples, the sockets were assigned to a hashtable, for quick access, for example

    var clients = {}; //hashtable
    //on connect we assign the socket to the hashtable with an identifier’
    clients[identifier]=socket;
    //when accessing the socket
    var s = clients[identifier];
    if(s)
    {
    s.emit(“somehandler”);
    }

    My question would be how do i access a specific socket based on an identifier on a cluster and redis setup?

    Thanks in advance.

    • Ranadeep says:

      I have the exact same problem . I guess its easy to store them in a hash in all the workers , but deleting them in one worker and somehow propagating that rest of the workers is still troubling me

  2. Alternatively, one might consider ActionHero.js, which natively supports Node clustering.

  3. jonpress says:

    For scalability, there is also SocketCluster http://socketcluster.io/ – It uses engine.io just like Socket.io.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: