//
// Paranoid Pirate worker
//
#include "czmq.h"
#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 1000 // msecs
#define INTERVAL_INIT 1000 // Initial reconnect
#define INTERVAL_MAX 32000 // After exponential backoff
// Paranoid Pirate Protocol constants
#define PPP_READY "\001" // Signals worker is ready
#define PPP_HEARTBEAT "\002" // Signals worker heartbeat
// Helper function that returns a new configured socket
// connected to the Paranoid Pirate queue
static void *
s_worker_socket (zctx_t *ctx) {
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "tcp://localhost:5556");
// Tell queue we're ready for work
printf ("I: worker ready\n");
zframe_t *frame = zframe_new (PPP_READY, 1);
zframe_send (&frame, worker, 0);
return worker;
}
// We have a single task, which implements the worker side of the
// Paranoid Pirate Protocol (PPP). The interesting parts here are
// the heartbeating, which lets the worker detect if the queue has
// died, and vice-versa:
int main (void)
{
zctx_t *ctx = zctx_new ();
void *worker = s_worker_socket (ctx);
// If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;
// Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
srandom ((unsigned) time (NULL));
int cycles = 0;
while (true) {
zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
if (items [0].revents & ZMQ_POLLIN) {
// Get message
// - 3-part envelope + content -> request
// - 1-part HEARTBEAT -> heartbeat
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted
// To test the robustness of the queue implementation we //
// simulate various typical problems, such as the worker
// crashing, or running very slowly. We do this after a few
// cycles so that the architecture can get up and running
// first:
if (zmsg_size (msg) == 3) {
cycles++;
if (cycles > 3 && randof (5) == 0) {
printf ("I: simulating a crash\n");
zmsg_destroy (&msg);
break;
}
else
if (cycles > 3 && randof (5) == 0) {
printf ("I: simulating CPU overload\n");
sleep (3);
if (zctx_interrupted)
break;
}
printf ("I: normal reply\n");
zmsg_send (&msg, worker);
liveness = HEARTBEAT_LIVENESS;
sleep (1); // Do some heavy work
if (zctx_interrupted)
break;
}
else
// When we get a heartbeat message from the queue, it means the
// queue was (recently) alive, so reset our liveness indicator:
if (zmsg_size (msg) == 1) {
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
liveness = HEARTBEAT_LIVENESS;
else {
printf ("E: invalid message\n");
zmsg_dump (msg);
}
zmsg_destroy (&msg);
}
else {
printf ("E: invalid message\n");
zmsg_dump (msg);
}
interval = INTERVAL_INIT;
}
else
// If the queue hasn't sent us heartbeats in a while, destroy the
// socket and reconnect. This is the simplest most brutal way of
// discarding any messages we might have sent in the meantime://
if (--liveness == 0) {
printf ("W: heartbeat failure, can't reach queue\n");
printf ("W: reconnecting in %zd msec…\n", interval);
zclock_sleep (interval);
if (interval < INTERVAL_MAX)
interval *= 2;
zsocket_destroy (ctx, worker);
worker = s_worker_socket (ctx);
liveness = HEARTBEAT_LIVENESS;
}
// Send heartbeat to queue if it's time
if (zclock_time () > heartbeat_at) {
heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
printf ("I: worker heartbeat\n");
zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
zframe_send (&frame, worker, 0);
}
}
zctx_destroy (&ctx);
return 0;
}