/* =====================================================================
* mdcliapi.c - Majordomo Protocol Client API
* Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
* ===================================================================== */
#include "mdcliapi.h"
// Structure of our class
// We access these properties only via class methods
struct _mdcli_t {
zctx_t *ctx; // Our context
char *broker;
void *client; // Socket to broker
int verbose; // Print activity to stdout
int timeout; // Request timeout
int retries; // Request retries
};
// ---------------------------------------------------------------------
// Connect or reconnect to broker
void s_mdcli_connect_to_broker (mdcli_t *self)
{
if (self->client)
zsocket_destroy (self->ctx, self->client);
self->client = zsocket_new (self->ctx, ZMQ_REQ);
zmq_connect (self->client, self->broker);
if (self->verbose)
zclock_log ("I: connecting to broker at %s…", self->broker);
}
// Here we have the constructor and destructor for our mdcli class:
// ---------------------------------------------------------------------
// Constructor
mdcli_t *
mdcli_new (char *broker, int verbose)
{
assert (broker);
mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
self->ctx = zctx_new ();
self->broker = strdup (broker);
self->verbose = verbose;
self->timeout = 2500; // msecs
self->retries = 3; // Before we abandon
s_mdcli_connect_to_broker (self);
return self;
}
// ---------------------------------------------------------------------
// Destructor
void
mdcli_destroy (mdcli_t **self_p)
{
assert (self_p);
if (*self_p) {
mdcli_t *self = *self_p;
zctx_destroy (&self->ctx);
free (self->broker);
free (self);
*self_p = NULL;
}
}
// These are the class methods. We can set the request timeout and number
// of retry attempts, before sending requests:
// ---------------------------------------------------------------------
// Set request timeout
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
assert (self);
self->timeout = timeout;
}
// ---------------------------------------------------------------------
// Set request retries
void
mdcli_set_retries (mdcli_t *self, int retries)
{
assert (self);
self->retries = retries;
}
// Here is the send method. It sends a request to the broker and gets a
// reply even if it has to retry several times. It takes ownership of the
// request message, and destroys it when sent. It returns the reply
// message, or NULL if there was no reply after multiple attempts:
zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
assert (self);
assert (request_p);
zmsg_t *request = *request_p;
// Prefix request with protocol frames
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
zmsg_pushstr (request, service);
zmsg_pushstr (request, MDPC_CLIENT);
if (self->verbose) {
zclock_log ("I: send request to '%s' service:", service);
zmsg_dump (request);
}
int retries_left = self->retries;
while (retries_left && !zctx_interrupted) {
zmsg_t *msg = zmsg_dup (request);
zmsg_send (&msg, self->client);
zmq_pollitem_t items [] = {
{ self->client, 0, ZMQ_POLLIN, 0 }
};
// On any blocking call, libzmq will return -1 if there was
// an error; we could in theory check for different error codes
// but in practice it's OK to assume it was EINTR (Ctrl-C):
int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted
// If we got a reply, process it
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->client);
if (self->verbose) {
zclock_log ("I: received reply:");
zmsg_dump (msg);
}
// We would handle malformed replies better in real code
assert (zmsg_size (msg) >= 3);
zframe_t *header = zmsg_pop (msg);
assert (zframe_streq (header, MDPC_CLIENT));
zframe_destroy (&header);
zframe_t *reply_service = zmsg_pop (msg);
assert (zframe_streq (reply_service, service));
zframe_destroy (&reply_service);
zmsg_destroy (&request);
return msg; // Success
}
else
if (--retries_left) {
if (self->verbose)
zclock_log ("W: no reply, reconnecting…");
s_mdcli_connect_to_broker (self);
}
else {
if (self->verbose)
zclock_log ("W: permanent error, abandoning");
break; // Give up
}
}
if (zctx_interrupted)
printf ("W: interrupt received, killing client…\n");
zmsg_destroy (&request);
return NULL;
}