//
// Freelance client - Model 2
// Uses DEALER socket to blast one or more services
//
#include "czmq.h"
// We design our client API as a class, using the CZMQ style
#ifdef __cplusplus
extern "C" {
#endif
typedef struct _flclient_t flclient_t;
flclient_t *flclient_new (void);
void flclient_destroy (flclient_t **self_p);
void flclient_connect (flclient_t *self, char *endpoint);
zmsg_t *flclient_request (flclient_t *self, zmsg_t **request_p);
#ifdef __cplusplus
}
#endif
// If not a single service replies within this time, give up
#define GLOBAL_TIMEOUT 2500
int main (int argc, char *argv [])
{
if (argc == 1) {
printf ("I: syntax: %s <endpoint> …\n", argv [0]);
exit (EXIT_SUCCESS);
}
// Create new freelance client object
flclient_t *client = flclient_new ();
// Connect to each endpoint
int argn;
for (argn = 1; argn < argc; argn++)
flclient_connect (client, argv [argn]);
// Send a bunch of name resolution 'requests', measure time
int requests = 10000;
uint64_t start = zclock_time ();
while (requests--) {
zmsg_t *request = zmsg_new ();
zmsg_addstr (request, "random name");
zmsg_t *reply = flclient_request (client, &request);
if (!reply) {
printf ("E: name service not available, aborting\n");
break;
}
zmsg_destroy (&reply);
}
printf ("Average round trip cost: %d usec\n",
(int) (zclock_time () - start) / 10);
flclient_destroy (&client);
return 0;
}
// Here is the flclient class implementation. Each instance has a context,
// a DEALER socket it uses to talk to the servers, a counter of how many
// servers it's connected to, and a request sequence number:
struct _flclient_t {
zctx_t *ctx; // Our context wrapper
void *socket; // DEALER socket talking to servers
size_t servers; // How many servers we have connected to
uint sequence; // Number of requests ever sent
};
// --------------------------------------------------------------------
// Constructor
flclient_t *
flclient_new (void)
{
flclient_t
*self;
self = (flclient_t *) zmalloc (sizeof (flclient_t));
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_DEALER);
return self;
}
// --------------------------------------------------------------------
// Destructor
void
flclient_destroy (flclient_t **self_p)
{
assert (self_p);
if (*self_p) {
flclient_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self);
*self_p = NULL;
}
}
// --------------------------------------------------------------------
// Connect to new server endpoint
void
flclient_connect (flclient_t *self, char *endpoint)
{
assert (self);
zsocket_connect (self->socket, endpoint);
self->servers++;
}
// The request method does the hard work. It sends a request to all
// connected servers in parallel (for this to work, all connections
// have to be successful and completed by this time). It then waits
// for a single successful reply, and returns that to the caller.
// Any other replies are just dropped:
zmsg_t *
flclient_request (flclient_t *self, zmsg_t **request_p)
{
assert (self);
assert (*request_p);
zmsg_t *request = *request_p;
// Prefix request with sequence number and empty envelope
char sequence_text [10];
sprintf (sequence_text, "%u", ++self->sequence);
zmsg_pushstr (request, sequence_text);
zmsg_pushstr (request, "");
// Blast the request to all connected servers
int server;
for (server = 0; server < self->servers; server++) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->socket);
}
// Wait for a matching reply to arrive from anywhere
// Since we can poll several times, calculate each one
zmsg_t *reply = NULL;
uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
while (zclock_time () < endtime) {
zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } };
zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
if (items [0].revents & ZMQ_POLLIN) {
// Reply is [empty][sequence][OK]
reply = zmsg_recv (self->socket);
assert (zmsg_size (reply) == 3);
free (zmsg_popstr (reply));
char *sequence = zmsg_popstr (reply);
int sequence_nbr = atoi (sequence);
free (sequence);
if (sequence_nbr == self->sequence)
break;
}
}
zmsg_destroy (request_p);
return reply;
}