//
// Broker peering simulation (part 2)
// Prototypes the request-reply flow
//
#include "czmq.h"
#define NBR_CLIENTS 10
#define NBR_WORKERS 3
#define LRU_READY "\001" // Signals worker is ready
// Our own name; in practice this would be configured per node
static char *self;
// The client task does a request-reply dialog using a standard
// synchronous REQ socket:
static void *
client_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *client = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (client, "ipc://%s-localfe.ipc", self);
while (true) {
// Send request, get reply
zstr_send (client, "HELLO");
char *reply = zstr_recv (client);
if (!reply)
break; // Interrupted
printf ("Client: %s\n", reply);
free (reply);
sleep (1);
}
zctx_destroy (&ctx);
return NULL;
}
// The worker task plugs into the LRU routing dialog using a REQ
// socket:
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_REQ);
zsocket_connect (worker, "ipc://%s-localbe.ipc", self);
// Tell broker we're ready for work
zframe_t *frame = zframe_new (LRU_READY, 1);
zframe_send (&frame, worker, 0);
// Process messages as they arrive
while (true) {
zmsg_t *msg = zmsg_recv (worker);
if (!msg)
break; // Interrupted
zframe_print (zmsg_last (msg), "Worker: ");
zframe_reset (zmsg_last (msg), "OK", 2);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
// The main task begins by setting-up its frontend and backend sockets
// and then starting its client and worker tasks:
int main (int argc, char *argv [])
{
// First argument is this broker's name
// Other arguments are our peers' names
//
if (argc < 2) {
printf ("syntax: peering2 me {you}…\n");
exit (EXIT_FAILURE);
}
self = argv [1];
printf ("I: preparing broker at %s…\n", self);
srandom ((unsigned) time (NULL));
zctx_t *ctx = zctx_new ();
// Bind cloud frontend to endpoint
void *cloudfe = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_identity (cloudfe, self);
zsocket_bind (cloudfe, "ipc://%s-cloud.ipc", self);
// Connect cloud backend to all peers
void *cloudbe = zsocket_new (ctx, ZMQ_ROUTER);
zsockopt_set_identity (cloudbe, self);
int argn;
for (argn = 2; argn < argc; argn++) {
char *peer = argv [argn];
printf ("I: connecting to cloud frontend at '%s'\n", peer);
zsocket_connect (cloudbe, "ipc://%s-cloud.ipc", peer);
}
// Prepare local frontend and backend
void *localfe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localfe, "ipc://%s-localfe.ipc", self);
void *localbe = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_bind (localbe, "ipc://%s-localbe.ipc", self);
// Get user to tell us when we can start…
printf ("Press Enter when all brokers are started: ");
getchar ();
// Start local workers
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++)
zthread_new (worker_task, NULL);
// Start local clients
int client_nbr;
for (client_nbr = 0; client_nbr < NBR_CLIENTS; client_nbr++)
zthread_new (client_task, NULL);
// Here we handle the request-reply flow. We're using the LRU approach
// to poll workers at all times, and clients only when there are one or
// more workers available.
// Least recently used queue of available workers
int capacity = 0;
zlist_t *workers = zlist_new ();
while (true) {
// First, route any waiting replies from workers
zmq_pollitem_t backends [] = {
{ localbe, 0, ZMQ_POLLIN, 0 },
{ cloudbe, 0, ZMQ_POLLIN, 0 }
};
// If we have no workers anyhow, wait indefinitely
int rc = zmq_poll (backends, 2,
capacity? 1000 * ZMQ_POLL_MSEC: -1);
if (rc == -1)
break; // Interrupted
// Handle reply from local worker
zmsg_t *msg = NULL;
if (backends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localbe);
if (!msg)
break; // Interrupted
zframe_t *address = zmsg_unwrap (msg);
zlist_append (workers, address);
capacity++;
// If it's READY, don't route the message any further
zframe_t *frame = zmsg_first (msg);
if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
zmsg_destroy (&msg);
}
// Or handle reply from peer broker
else
if (backends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudbe);
if (!msg)
break; // Interrupted
// We don't use peer broker address for anything
zframe_t *address = zmsg_unwrap (msg);
zframe_destroy (&address);
}
// Route reply to cloud if it's addressed to a broker
for (argn = 2; msg && argn < argc; argn++) {
char *data = (char *) zframe_data (zmsg_first (msg));
size_t size = zframe_size (zmsg_first (msg));
if (size == strlen (argv [argn])
&& memcmp (data, argv [argn], size) == 0)
zmsg_send (&msg, cloudfe);
}
// Route reply to client if we still need to
if (msg)
zmsg_send (&msg, localfe);
// Now we route as many client requests as we have worker capacity
// for. We may reroute requests from our local frontend, but not from //
// the cloud frontend. We reroute randomly now, just to test things
// out. In the next version we'll do this properly by calculating
// cloud capacity://
while (capacity) {
zmq_pollitem_t frontends [] = {
{ localfe, 0, ZMQ_POLLIN, 0 },
{ cloudfe, 0, ZMQ_POLLIN, 0 }
};
rc = zmq_poll (frontends, 2, 0);
assert (rc >= 0);
int reroutable = 0;
// We'll do peer brokers first, to prevent starvation
if (frontends [1].revents & ZMQ_POLLIN) {
msg = zmsg_recv (cloudfe);
reroutable = 0;
}
else
if (frontends [0].revents & ZMQ_POLLIN) {
msg = zmsg_recv (localfe);
reroutable = 1;
}
else
break; // No work, go back to backends
// If reroutable, send to cloud 20% of the time
// Here we'd normally use cloud status information
//
if (reroutable && argc > 2 && randof (5) == 0) {
// Route to random broker peer
int random_peer = randof (argc - 2) + 2;
zmsg_pushmem (msg, argv [random_peer], strlen (argv [random_peer]));
zmsg_send (&msg, cloudbe);
}
else {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zmsg_wrap (msg, frame);
zmsg_send (&msg, localbe);
capacity--;
}
}
}
// When we're done, clean up properly
while (zlist_size (workers)) {
zframe_t *frame = (zframe_t *) zlist_pop (workers);
zframe_destroy (&frame);
}
zlist_destroy (&workers);
zctx_destroy (&ctx);
return EXIT_SUCCESS;
}