//
// Espresso Pattern
// This shows how to capture data using a pub-sub proxy
//
#include "czmq.h"
// The subscriber thread requests messages starting with
// A and B, then reads and counts incoming messages.
static void
subscriber_thread (void *args, zctx_t *ctx, void *pipe)
{
// Subscribe to "A" and "B"
void *subscriber = zsocket_new (ctx, ZMQ_SUB);
zsocket_connect (subscriber, "tcp://localhost:6001");
zsockopt_set_subscribe (subscriber, "A");
zsockopt_set_subscribe (subscriber, "B");
int count = 0;
while (true) {
char *string = zstr_recv (subscriber);
if (!string)
break; // Interrupted
free (string);
count++;
}
printf ("Subscriber received %d messages\n", count);
}
// The publisher sends random messages starting with A-J:
static void
publisher_thread (void *args, zctx_t *ctx, void *pipe)
{
void *publisher = zsocket_new (ctx, ZMQ_PUB);
zsocket_bind (publisher, "tcp://*:6000");
while (true) {
char string [10];
sprintf (string, "%c-%05d", randof (10) + 'A', randof (100000));
if (zstr_send (publisher, string) == -1)
break; // Interrupted
zclock_sleep (100); // Wait for 1/10th second
}
}
// The listener receives all messages flowing through the proxy, on its
// pipe. In CZMQ, the pipe is a pair of ZMQ_PAIR sockets that connects
// attached child threads. In other languages your mileage may vary:
static void
listener_thread (void *args, zctx_t *ctx, void *pipe)
{
// Print everything that arrives on pipe
while (true) {
char *string = zstr_recv (pipe);
if (!string)
break; // Interrupted
puts (string);
free (string);
}
}
// The main task starts the subscriber and publisher, and then sets
// itself up as a listening proxy. The listener runs as a child thread:
int main (void)
{
// Start child threads
zctx_t *ctx = zctx_new ();
zthread_fork (ctx, publisher_thread, NULL);
zthread_fork (ctx, subscriber_thread, NULL);
void *subscriber = zsocket_new (ctx, ZMQ_XSUB);
zsocket_connect (subscriber, "tcp://localhost:6000");
void *publisher = zsocket_new (ctx, ZMQ_XPUB);
zsocket_bind (publisher, "tcp://*:6001");
void *listener = zthread_fork (ctx, listener_thread, NULL);
zmq_proxy (subscriber, publisher, listener);
puts (" interrupted");
// Tell attached threads to exit
zctx_destroy (&ctx);
return 0;
}