Key-value message class in C

/* =====================================================================
* kvsimple - simple key-value message class for example applications
* ===================================================================== */


#include "kvsimple.h"
#include "zlist.h"

// Keys are short strings
#define KVMSG_KEY_MAX 255

// Message is formatted on wire as 4 frames:
// frame 0: key (0MQ string)
// frame 1: sequence (8 bytes, network order)
// frame 2: body (blob)
#define FRAME_KEY 0
#define FRAME_SEQ 1
#define FRAME_BODY 2
#define KVMSG_FRAMES 3

// The kvmsg class holds a single key-value message consisting of a
// list of 0 or more frames:

struct _kvmsg {
// Presence indicators for each frame
int present [KVMSG_FRAMES];
// Corresponding 0MQ message frames, if any
zmq_msg_t frame [KVMSG_FRAMES];
// Key, copied into safe C string
char key [KVMSG_KEY_MAX + 1];
};

// Here are the constructor and destructor for the class:

// Constructor, takes a sequence number for the new kvmsg instance:
kvmsg_t *
kvmsg_new (int64_t sequence)
{
kvmsg_t
*self;

self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
kvmsg_set_sequence (self, sequence);
return self;
}

// zhash_free_fn callback helper that does the low level destruction:
void
kvmsg_free (void *ptr)
{
if (ptr) {
kvmsg_t *self = (kvmsg_t *) ptr;
// Destroy message frames if any
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);

// Free object itself
free (self);
}
}

// Destructor
void
kvmsg_destroy (kvmsg_t **self_p)
{
assert (self_p);
if (*self_p) {
kvmsg_free (*self_p);
*self_p = NULL;
}
}

// The recv method reads a key-value message from socket, and returns a new
// kvmsg instance:

kvmsg_t *
kvmsg_recv (void *socket)
{
assert (socket);
kvmsg_t *self = kvmsg_new (0);

// Read all frames off the wire, reject if bogus
int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
if (self->present [frame_nbr])
zmq_msg_close (&self->frame [frame_nbr]);
zmq_msg_init (&self->frame [frame_nbr]);
self->present [frame_nbr] = 1;
if (zmq_msg_recv (&self->frame [frame_nbr], socket, 0) == -1) {
kvmsg_destroy (&self);
break;
}
// Verify multipart framing
int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
if (zsockopt_rcvmore (socket) != rcvmore) {
kvmsg_destroy (&self);
break;
}
}
return self;
}

// The send method sends a multi-frame key-value message to a socket:

void
kvmsg_send (kvmsg_t *self, void *socket)
{
assert (self);
assert (socket);

int frame_nbr;
for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
zmq_msg_t copy;
zmq_msg_init (&copy);
if (self->present [frame_nbr])
zmq_msg_copy (&copy, &self->frame [frame_nbr]);
zmq_msg_send (&copy, socket,
(frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
zmq_msg_close (&copy);
}
}

// These methods let the caller get and set the message key, as a
// fixed string and as a printf formatted string:

char *
kvmsg_key (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_KEY]) {
if (!*self->key) {
size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
if (size > KVMSG_KEY_MAX)
size = KVMSG_KEY_MAX;
memcpy (self->key,
zmq_msg_data (&self->frame [FRAME_KEY]), size);
self->key [size] = 0;
}
return self->key;
}
else
return NULL;
}

void
kvmsg_set_key (kvmsg_t *self, char *key)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_KEY];
if (self->present [FRAME_KEY])
zmq_msg_close (msg);
zmq_msg_init_size (msg, strlen (key));
memcpy (zmq_msg_data (msg), key, strlen (key));
self->present [FRAME_KEY] = 1;
}

void
kvmsg_fmt_key (kvmsg_t *self, char *format, …)
{
char value [KVMSG_KEY_MAX + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, KVMSG_KEY_MAX, format, args);
va_end (args);
kvmsg_set_key (self, value);
}

// These two methods let the caller get and set the message sequence number:

int64_t
kvmsg_sequence (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_SEQ]) {
assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
int64_t sequence = ((int64_t) (source [0]) << 56)
+ ((int64_t) (source [1]) << 48)
+ ((int64_t) (source [2]) << 40)
+ ((int64_t) (source [3]) << 32)
+ ((int64_t) (source [4]) << 24)
+ ((int64_t) (source [5]) << 16)
+ ((int64_t) (source [6]) << 8)
+ (int64_t) (source [7]);
return sequence;
}
else
return 0;
}

void
kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_SEQ];
if (self->present [FRAME_SEQ])
zmq_msg_close (msg);
zmq_msg_init_size (msg, 8);

byte *source = zmq_msg_data (msg);
source [0] = (byte) ((sequence >> 56) & 255);
source [1] = (byte) ((sequence >> 48) & 255);
source [2] = (byte) ((sequence >> 40) & 255);
source [3] = (byte) ((sequence >> 32) & 255);
source [4] = (byte) ((sequence >> 24) & 255);
source [5] = (byte) ((sequence >> 16) & 255);
source [6] = (byte) ((sequence >> 8) & 255);
source [7] = (byte) ((sequence) & 255);

self->present [FRAME_SEQ] = 1;
}

// These methods let the caller get and set the message body, as a
// fixed string and as a printf formatted string:

byte *
kvmsg_body (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
else
return NULL;
}

void
kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
{
assert (self);
zmq_msg_t *msg = &self->frame [FRAME_BODY];
if (self->present [FRAME_BODY])
zmq_msg_close (msg);
self->present [FRAME_BODY] = 1;
zmq_msg_init_size (msg, size);
memcpy (zmq_msg_data (msg), body, size);
}

void
kvmsg_fmt_body (kvmsg_t *self, char *format, …)
{
char value [255 + 1];
va_list args;

assert (self);
va_start (args, format);
vsnprintf (value, 255, format, args);
va_end (args);
kvmsg_set_body (self, (byte *) value, strlen (value));
}

// The size method returns the body size of the last-read message, if any:

size_t
kvmsg_size (kvmsg_t *self)
{
assert (self);
if (self->present [FRAME_BODY])
return zmq_msg_size (&self->frame [FRAME_BODY]);
else
return 0;
}

// The store method stores the key-value message into a hash map, unless
// the key and value are both null. It nullifies the kvmsg reference so
// that the object is owned by the hash map, not the caller:

void
kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
{
assert (self_p);
if (*self_p) {
kvmsg_t *self = *self_p;
assert (self);
if (self->present [FRAME_KEY]
&& self->present [FRAME_BODY]) {
zhash_update (hash, kvmsg_key (self), self);
zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
}
*self_p = NULL;
}
}

// The dump method prints the key-value message to stderr,
// for debugging and tracing:

void
kvmsg_dump (kvmsg_t *self)
{
if (self) {
if (!self) {
fprintf (stderr, "NULL");
return;
}
size_t size = kvmsg_size (self);
byte *body = kvmsg_body (self);
fprintf (stderr, "[seq:%" PRId64 "]", kvmsg_sequence (self));
fprintf (stderr, "[key:%s]", kvmsg_key (self));
fprintf (stderr, "[size:%zd] ", size);
int char_nbr;
for (char_nbr = 0; char_nbr < size; char_nbr++)
fprintf (stderr, "%02X", body [char_nbr]);
fprintf (stderr, "\n");
}
else
fprintf (stderr, "NULL message\n");
}

// It's good practice to have a self-test method that tests the class; this
// also shows how it's used in applications:

int
kvmsg_test (int verbose)
{
kvmsg_t
*kvmsg;

printf (" * kvmsg: ");

// Prepare our context and sockets
zctx_t *ctx = zctx_new ();
void *output = zsocket_new (ctx, ZMQ_DEALER);
int rc = zmq_bind (output, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);
void *input = zsocket_new (ctx, ZMQ_DEALER);
rc = zmq_connect (input, "ipc://kvmsg_selftest.ipc");
assert (rc == 0);

zhash_t *kvmap = zhash_new ();

// Test send and receive of simple message
kvmsg = kvmsg_new (1);
kvmsg_set_key (kvmsg, "key");
kvmsg_set_body (kvmsg, (byte *) "body", 4);
if (verbose)
kvmsg_dump (kvmsg);
kvmsg_send (kvmsg, output);
kvmsg_store (&kvmsg, kvmap);

kvmsg = kvmsg_recv (input);
if (verbose)
kvmsg_dump (kvmsg);
assert (streq (kvmsg_key (kvmsg), "key"));
kvmsg_store (&kvmsg, kvmap);

// Shutdown and destroy all objects
zhash_destroy (&kvmap);
zctx_destroy (&ctx);

printf ("OK\n");
return 0;
}