Paranoid Pirate queue in C

//
// Paranoid Pirate queue
//
#include "czmq.h"

#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs

// Paranoid Pirate Protocol constants
#define PPP_READY "\001"
// Signals worker is ready
#define PPP_HEARTBEAT "\002" // Signals worker heartbeat

// Here we define the worker class; a structure and a set of functions that
// as constructor, destructor, and methods on worker objects:

typedef struct {
zframe_t *address; // Address of worker
char *identity; // Printable identity
int64_t expiry; // Expires at this time
} worker_t;

// Construct new worker
static worker_t *
s_worker_new (zframe_t *address)
{
worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
self->address = address;
self->identity = zframe_strdup (address);
self->expiry = zclock_time () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
return self;
}

// Destroy specified worker object, including identity frame.
static void
s_worker_destroy (worker_t **self_p)
{
assert (self_p);
if (*self_p) {
worker_t *self = *self_p;
zframe_destroy (&self->address);
free (self->identity);
free (self);
*self_p = NULL;
}
}

// The ready method puts a worker to the end of the ready list:

static void
s_worker_ready (worker_t *self, zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (streq (self->identity, worker->identity)) {
zlist_remove (workers, worker);
s_worker_destroy (&worker);
break;
}
worker = (worker_t *) zlist_next (workers);
}
zlist_append (workers, self);
}

// The next method returns the next available worker address:

static zframe_t *
s_workers_next (zlist_t *workers)
{
worker_t *worker = zlist_pop (workers);
assert (worker);
zframe_t *frame = worker->address;
worker->address = NULL;
s_worker_destroy (&worker);
return frame;
}

// The purge method looks for and kills expired workers. We hold workers
// from oldest to most recent, so we stop at the first alive worker:

static void
s_workers_purge (zlist_t *workers)
{
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
if (zclock_time () < worker->expiry)
break; // Worker is alive, we're done here

zlist_remove (workers, worker);
s_worker_destroy (&worker);
worker = (worker_t *) zlist_first (workers);
}
}

// The main task is an LRU queue with heartbeating on workers so we can
// detect crashed or blocked worker tasks:

int main (void)
{
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
void *backend = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (frontend, "tcp://*:5555"); // For clients
zsocket_bind (backend, "tcp://*:5556"); // For workers

// List of available workers
zlist_t *workers = zlist_new ();

// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (true) {
zmq_pollitem_t items [] = {
{ backend, 0, ZMQ_POLLIN, 0 },
{ frontend, 0, ZMQ_POLLIN, 0 }
};
// Poll frontend only if we have available workers
int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// Handle worker activity on backend
if (items [0].revents & ZMQ_POLLIN) {
// Use worker address for LRU routing
zmsg_t *msg = zmsg_recv (backend);
if (!msg)
break; // Interrupted

// Any sign of life from worker means it's ready
zframe_t *address = zmsg_unwrap (msg);
worker_t *worker = s_worker_new (address);
s_worker_ready (worker, workers);

// Validate control message, or return reply to client
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_READY, 1)
&& memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {
printf ("E: invalid message from worker");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else
zmsg_send (&msg, frontend);
}
if (items [1].revents & ZMQ_POLLIN) {
// Now get next client request, route to next worker
zmsg_t *msg = zmsg_recv (frontend);
if (!msg)
break; // Interrupted
zmsg_push (msg, s_workers_next (workers));
zmsg_send (&msg, backend);
}

// We handle heartbeating after any socket activity. First we send
// heartbeats to any idle workers if it's time. Then we purge any
// dead workers:

if (zclock_time () >= heartbeat_at) {
worker_t *worker = (worker_t *) zlist_first (workers);
while (worker) {
zframe_send (&worker->address, backend,
ZFRAME_REUSE + ZFRAME_MORE);
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, backend, 0);
worker = (worker_t *) zlist_next (workers);
}
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
s_workers_purge (workers);
}

// When we're done, clean up properly
while (zlist_size (workers)) {
worker_t *worker = (worker_t *) zlist_pop (workers);
s_worker_destroy (&worker);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return 0;
}