//
// Suicidal Snail
//
#include "czmq.h"
// This is our subscriber. It connects to the publisher and subscribes to
// everything. It sleeps for a short time between messages to simulate doing
// too much work. If a message is more than 1 second late, it croaks:
#define MAX_ALLOWED_DELAY 1000 // msecs
static void
subscriber (void *args, zctx_t *ctx, void *pipe)
{
// Subscribe to everything
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsockopt_set_subscribe (subscriber, "");
zsocket_connect (subscriber, "tcp://localhost:5556");
// Get and process messages
while (true) {
char *string = zstr_recv (subscriber);
printf("%s\n", string);
int64_t clock;
int terms = sscanf (string, "%" PRId64, &clock);
assert (terms == 1);
free (string);
// Suicide snail logic
if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
fprintf (stderr, "E: subscriber cannot keep up, aborting\n");
break;
}
// Work for 1 msec plus some random additional time
zclock_sleep (1 + randof (2));
}
zstr_send (pipe, "gone and died");
}
// This is our publisher task. It publishes a time-stamped message to its
// PUB socket every 1 msec:
static void
publisher (void *args, zctx_t *ctx, void *pipe)
{
// Prepare publisher
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:5556");
while (true) {
// Send current clock (msecs) to subscribers
char string [20];
sprintf (string, "%" PRId64, zclock_time ());
zstr_send (publisher, string);
char *signal = zstr_recv_nowait (pipe);
if (signal) {
free (signal);
break;
}
zclock_sleep (1); // 1msec wait
}
}
// The main task simply starts a client, and a server, and then
// waits for the client to signal that it has died:
int main (void)
{
zctx_t *ctx = zctx_new ();
void *pubpipe = zthread_fork (ctx, publisher, NULL);
void *subpipe = zthread_fork (ctx, subscriber, NULL);
free (zstr_recv (subpipe));
zstr_send (pubpipe, "break");
zclock_sleep (100);
zctx_destroy (&ctx);
return 0;
}