//
// Custom routing Router to Mama (ROUTER to REQ)
//
#include "zhelpers.h"
#include <pthread.h>
#define NBR_WORKERS 10
static void *
worker_task (void *args)
{
void *context = zmq_ctx_new ();
void *worker = zmq_socket (context, ZMQ_REQ);
// We use a string identity for ease here
s_set_id (worker);
zmq_connect (worker, "ipc://routing.ipc");
int total = 0;
while (true) {
// Tell the router we're ready for work
s_send (worker, "ready");
// Get workload from router, until finished
char *workload = s_recv (worker);
int finished = (strcmp (workload, "END") == 0);
free (workload);
if (finished) {
printf ("Processed: %d tasks\n", total);
break;
}
total++;
// Do some random work
s_sleep (randof (1000) + 1);
}
zmq_close (worker);
zmq_ctx_destroy (context);
return NULL;
}
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. Each thread has its own
// context and conceptually acts as a separate process.
int main (void)
{
void *context = zmq_ctx_new ();
void *client = zmq_socket (context, ZMQ_ROUTER);
zmq_bind (client, "ipc://routing.ipc");
srandom ((unsigned) time (NULL));
int worker_nbr;
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
pthread_t worker;
pthread_create (&worker, NULL, worker_task, NULL);
}
int task_nbr;
for (task_nbr = 0; task_nbr < NBR_WORKERS * 10; task_nbr++) {
// LRU worker is next waiting in queue
char *address = s_recv (client);
char *empty = s_recv (client);
free (empty);
char *ready = s_recv (client);
free (ready);
s_sendmore (client, address);
s_sendmore (client, "");
s_send (client, "This is the workload");
free (address);
}
// Now ask mamas to shut down and report their results
for (worker_nbr = 0; worker_nbr < NBR_WORKERS; worker_nbr++) {
char *address = s_recv (client);
char *empty = s_recv (client);
free (empty);
char *ready = s_recv (client);
free (ready);
s_sendmore (client, address);
s_sendmore (client, "");
s_send (client, "END");
free (address);
}
zmq_close (client);
zmq_ctx_destroy (context);
return 0;
}