//
// Round-trip demonstrator
//
// While this example runs in a single process, that is just to make
// it easier to start and stop the example. The client task signals to
// main when it's ready.
//
#include "czmq.h"
static void
client_task (void *args, zctx_t *ctx, void *pipe)
{
void *client = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (client, "tcp://localhost:5555");
printf ("Setting up test…\n");
zclock_sleep (100);
int requests;
int64_t start;
printf ("Synchronous round-trip test…\n");
start = zclock_time ();
for (requests = 0; requests < 10000; requests++) {
zstr_send (client, "hello");
char *reply = zstr_recv (client);
free (reply);
}
printf (" %d calls/second\n",
(1000 * 10000) / (int) (zclock_time () - start));
printf ("Asynchronous round-trip test…\n");
start = zclock_time ();
for (requests = 0; requests < 100000; requests++)
zstr_send (client, "hello");
for (requests = 0; requests < 100000; requests++) {
char *reply = zstr_recv (client);
free (reply);
}
printf (" %d calls/second\n",
(1000 * 100000) / (int) (zclock_time () - start));
zstr_send (pipe, "done");
}
// Here is the worker task. All it does is receive a message, and
// bounce it back the way it came:
static void *
worker_task (void *args)
{
zctx_t *ctx = zctx_new ();
void *worker = zsocket_new (ctx, ZMQ_DEALER);
zsocket_connect (worker, "tcp://localhost:5556");
while (true) {
zmsg_t *msg = zmsg_recv (worker);
zmsg_send (&msg, worker);
}
zctx_destroy (&ctx);
return NULL;
}
// Here is the broker task. It uses the zmq_proxy function to switch
// messages between frontend and backend:
static void *
broker_task (void *args)
{
// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *frontend = zsocket_new (ctx, ZMQ_DEALER);
zsocket_bind (frontend, "tcp://*:5555");
void *backend = zsocket_new (ctx, ZMQ_DEALER);
zsocket_bind (backend, "tcp://*:5556");
zmq_proxy (frontend, backend, NULL);
zctx_destroy (&ctx);
return NULL;
}
// Finally, here's the main task, which starts the client, worker, and
// broker, and then runs until the client signals it to stop:
int main (void)
{
// Create threads
zctx_t *ctx = zctx_new ();
void *client = zthread_fork (ctx, client_task, NULL);
zthread_new (worker_task, NULL);
zthread_new (broker_task, NULL);
// Wait for signal on client pipe
char *signal = zstr_recv (client);
free (signal);
zctx_destroy (&ctx);
return 0;
}