Majordomo broker in C

//
// Majordomo Protocol broker
// A minimal C implementation of the Majordomo Protocol as defined in
// http://rfc.zeromq.org/spec:7 and http://rfc.zeromq.org/spec:8.
//
#include "czmq.h"
#include "mdp.h"

// We'd normally pull these from config data

#define HEARTBEAT_LIVENESS 3 // 3-5 is reasonable
#define HEARTBEAT_INTERVAL 2500 // msecs
#define HEARTBEAT_EXPIRY HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

// The broker class defines a single broker instance:

typedef struct {
zctx_t *ctx; // Our context
void *socket; // Socket for clients & workers
int verbose; // Print activity to stdout
char *endpoint; // Broker binds to this endpoint
zhash_t *services; // Hash of known services
zhash_t *workers; // Hash of known workers
zlist_t *waiting; // List of waiting workers
uint64_t heartbeat_at; // When to send HEARTBEAT
} broker_t;

static broker_t *
s_broker_new (int verbose);
static void
s_broker_destroy (broker_t **self_p);
static void
s_broker_bind (broker_t *self, char *endpoint);
static void
s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
s_broker_purge (broker_t *self);

// The service class defines a single service instance:

typedef struct {
broker_t *broker; // Broker instance
char *name; // Service name
zlist_t *requests; // List of client requests
zlist_t *waiting; // List of waiting workers
size_t workers; // How many workers we have
} service_t;

static service_t *
s_service_require (broker_t *self, zframe_t *service_frame);
static void
s_service_destroy (void *argument);
static void
s_service_dispatch (service_t *service, zmsg_t *msg);

// The worker class defines a single worker, idle or active:

typedef struct {
broker_t *broker; // Broker instance
char *identity; // Identity of worker
zframe_t *address; // Address frame to route to
service_t *service; // Owning service, if known
int64_t expiry; // Expires at unless heartbeat
} worker_t;

static worker_t *
s_worker_require (broker_t *self, zframe_t *address);
static void
s_worker_delete (worker_t *self, int disconnect);
static void
s_worker_destroy (void *argument);
static void
s_worker_send (worker_t *self, char *command, char *option,
zmsg_t *msg);
static void
s_worker_waiting (worker_t *self);

// Here are the constructor and destructor for the broker:

static broker_t *
s_broker_new (int verbose)
{
broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));

// Initialize broker state
self->ctx = zctx_new ();
self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
self->verbose = verbose;
self->services = zhash_new ();
self->workers = zhash_new ();
self->waiting = zlist_new ();
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
return self;
}

static void
s_broker_destroy (broker_t **self_p)
{
assert (self_p);
if (*self_p) {
broker_t *self = *self_p;
zctx_destroy (&self->ctx);
zhash_destroy (&self->services);
zhash_destroy (&self->workers);
zlist_destroy (&self->waiting);
free (self);
*self_p = NULL;
}
}

// The bind method binds the broker instance to an endpoint. We can call
// this multiple times. Note that MDP uses a single socket for both clients
// and workers:

void
s_broker_bind (broker_t *self, char *endpoint)
{
zsocket_bind (self->socket, endpoint);
zclock_log ("I: MDP broker/0.2.0 is active at %s", endpoint);
}

// The worker_msg method processes one READY, REPLY, HEARTBEAT or
// DISCONNECT message sent to the broker by a worker:

static void
s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 1); // At least, command

zframe_t *command = zmsg_pop (msg);
char *identity = zframe_strhex (sender);
int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
free (identity);
worker_t *worker = s_worker_require (self, sender);

if (zframe_streq (command, MDPW_READY)) {
if (worker_ready) // Not first command in session
s_worker_delete (worker, 1);
else
if (zframe_size (sender) >= 4 // Reserved service name
&& memcmp (zframe_data (sender), "mmi.", 4) == 0)
s_worker_delete (worker, 1);
else {
// Attach worker to service and mark as idle
zframe_t *service_frame = zmsg_pop (msg);
worker->service = s_service_require (self, service_frame);
worker->service->workers++;
s_worker_waiting (worker);
zframe_destroy (&service_frame);
}
}
else
if (zframe_streq (command, MDPW_REPLY)) {
if (worker_ready) {
// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
s_worker_waiting (worker);
}
else
s_worker_delete (worker, 1);
}
else
if (zframe_streq (command, MDPW_HEARTBEAT)) {
if (worker_ready)
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
else
s_worker_delete (worker, 1);
}
else
if (zframe_streq (command, MDPW_DISCONNECT))
s_worker_delete (worker, 0);
else {
zclock_log ("E: invalid input message");
zmsg_dump (msg);
}
free (command);
zmsg_destroy (&msg);
}

// Process a request coming from a client. We implement MMI requests
// directly here (at present, we implement only the mmi.service request):

static void
s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
assert (zmsg_size (msg) >= 2); // Service name + body

zframe_t *service_frame = zmsg_pop (msg);
service_t *service = s_service_require (self, service_frame);

// Set reply return address to client sender
zmsg_wrap (msg, zframe_dup (sender));

// If we got a MMI service request, process that internally
if (zframe_size (service_frame) >= 4
&& memcmp (zframe_data (service_frame), "mmi.", 4) == 0) {
char *return_code;
if (zframe_streq (service_frame, "mmi.service")) {
char *name = zframe_strdup (zmsg_last (msg));
service_t *service =
(service_t *) zhash_lookup (self->services, name);
return_code = service && service->workers? "200": "404";
free (name);
}
else
return_code = "501";

zframe_reset (zmsg_last (msg), return_code, strlen (return_code));

// Remove & save client return envelope and insert the
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_push (msg, zframe_dup (service_frame));
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
}
else
// Else dispatch the message to the requested service
s_service_dispatch (service, msg);
zframe_destroy (&service_frame);
}

// The purge method deletes any idle workers that haven't pinged us in a
// while. We hold workers from oldest to most recent, so we can stop
// scanning whenever we find a live worker. This means we'll mainly stop
// at the first worker, which is essential when we have large numbers of
// workers (since we call this method in our critical path):

static void
s_broker_purge (broker_t *self)
{
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
if (zclock_time () < worker->expiry)
break; // Worker is alive, we're done here
if (self->verbose)
zclock_log ("I: deleting expired worker: %s",
worker->identity);

s_worker_delete (worker, 0);
worker = (worker_t *) zlist_first (self->waiting);
}
}

// Here is the implementation of the methods that work on a service:

// Lazy constructor that locates a service by name, or creates a new
// service if there is no service already with that name.

static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{
assert (service_frame);
char *name = zframe_strdup (service_frame);

service_t *service =
(service_t *) zhash_lookup (self->services, name);
if (service == NULL) {
service = (service_t *) zmalloc (sizeof (service_t));
service->broker = self;
service->name = name;
service->requests = zlist_new ();
service->waiting = zlist_new ();
zhash_insert (self->services, name, service);
zhash_freefn (self->services, name, s_service_destroy);
if (self->verbose)
zclock_log ("I: added service: %s", name);
}
else
free (name);

return service;
}

// Service destructor is called automatically whenever the service is
// removed from broker->services.

static void
s_service_destroy (void *argument)
{
service_t *service = (service_t *) argument;
while (zlist_size (service->requests)) {
zmsg_t *msg = zlist_pop (service->requests);
zmsg_destroy (&msg);
}
zlist_destroy (&service->requests);
zlist_destroy (&service->waiting);
free (service->name);
free (service);
}

// The dispatch method sends requests to waiting workers:

static void
s_service_dispatch (service_t *self, zmsg_t *msg)
{
assert (self);
if (msg) // Queue message if any
zlist_append (self->requests, msg);

s_broker_purge (self->broker);
while (zlist_size (self->waiting) && zlist_size (self->requests)) {
worker_t *worker = zlist_pop (self->waiting);
zlist_remove (self->broker->waiting, worker);
zmsg_t *msg = zlist_pop (self->requests);
s_worker_send (worker, MDPW_REQUEST, NULL, msg);
zmsg_destroy (&msg);
}
}

// Here is the implementation of the methods that work on a worker:

// Lazy constructor that locates a worker by identity, or creates a new
// worker if there is no worker already with that identity.

static worker_t *
s_worker_require (broker_t *self, zframe_t *address)
{
assert (address);

// self->workers is keyed off worker identity
char *identity = zframe_strhex (address);
worker_t *worker =
(worker_t *) zhash_lookup (self->workers, identity);

if (worker == NULL) {
worker = (worker_t *) zmalloc (sizeof (worker_t));
worker->broker = self;
worker->identity = identity;
worker->address = zframe_dup (address);
zhash_insert (self->workers, identity, worker);
zhash_freefn (self->workers, identity, s_worker_destroy);
if (self->verbose)
zclock_log ("I: registering new worker: %s", identity);
}
else
free (identity);
return worker;
}

// The delete method deletes the current worker.

static void
s_worker_delete (worker_t *self, int disconnect)
{
assert (self);
if (disconnect)
s_worker_send (self, MDPW_DISCONNECT, NULL, NULL);

if (self->service) {
zlist_remove (self->service->waiting, self);
self->service->workers--;
}
zlist_remove (self->broker->waiting, self);
// This implicitly calls s_worker_destroy
zhash_delete (self->broker->workers, self->identity);
}

// Worker destructor is called automatically whenever the worker is
// removed from broker->workers.

static void
s_worker_destroy (void *argument)
{
worker_t *self = (worker_t *) argument;
zframe_destroy (&self->address);
free (self->identity);
free (self);
}

// The send method formats and sends a command to a worker. The caller may
// also provide a command option, and a message payload:

static void
s_worker_send (worker_t *self, char *command, char *option, zmsg_t *msg)
{
msg = msg? zmsg_dup (msg): zmsg_new ();

// Stack protocol envelope to start of message
if (option)
zmsg_pushstr (msg, option);
zmsg_pushstr (msg, command);
zmsg_pushstr (msg, MDPW_WORKER);

// Stack routing envelope to start of message
zmsg_wrap (msg, zframe_dup (self->address));

if (self->broker->verbose) {
zclock_log ("I: sending %s to worker",
mdps_commands [(int) *command]);
zmsg_dump (msg);
}
zmsg_send (&msg, self->broker->socket);
}

// This worker is now waiting for work

static void
s_worker_waiting (worker_t *self)
{
// Queue to broker and service waiting lists
assert (self->broker);
zlist_append (self->broker->waiting, self);
zlist_append (self->service->waiting, self);
self->expiry = zclock_time () + HEARTBEAT_EXPIRY;
s_service_dispatch (self->service, NULL);
}

// Finally here is the main task. We create a new broker instance and
// then processes messages on the broker socket:

int main (int argc, char *argv [])
{
int verbose = (argc > 1 && streq (argv [1], "-v"));

broker_t *self = s_broker_new (verbose);
s_broker_bind (self, "tcp://*:5555");

// Get and process messages forever or until interrupted
while (true) {
zmq_pollitem_t items [] = {
{ self->socket, 0, ZMQ_POLLIN, 0 } };
int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
if (rc == -1)
break; // Interrupted

// Process next input message, if any
if (items [0].revents & ZMQ_POLLIN) {
zmsg_t *msg = zmsg_recv (self->socket);
if (!msg)
break; // Interrupted
if (self->verbose) {
zclock_log ("I: received message:");
zmsg_dump (msg);
}
zframe_t *sender = zmsg_pop (msg);
zframe_t *empty = zmsg_pop (msg);
zframe_t *header = zmsg_pop (msg);

if (zframe_streq (header, MDPC_CLIENT))
s_broker_client_msg (self, sender, msg);
else
if (zframe_streq (header, MDPW_WORKER))
s_broker_worker_msg (self, sender, msg);
else {
zclock_log ("E: invalid message:");
zmsg_dump (msg);
zmsg_destroy (&msg);
}
zframe_destroy (&sender);
zframe_destroy (&empty);
zframe_destroy (&header);
}
// Disconnect and delete any expired workers
// Send heartbeats to idle workers if needed
if (zclock_time () > self->heartbeat_at) {
s_broker_purge (self);
worker_t *worker = (worker_t *) zlist_first (self->waiting);
while (worker) {
s_worker_send (worker, MDPW_HEARTBEAT, NULL, NULL);
worker = (worker_t *) zlist_next (self->waiting);
}
self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
}
}
if (zctx_interrupted)
printf ("W: interrupt received, shutting down…\n");

s_broker_destroy (&self);
return 0;
}