File transfer test, model 3 in C

// File Transfer model #3
// In which the client requests each chunk individually, using
// command pipelining to give us a credit-based flow control.

#include <czmq.h>
#define CHUNK_SIZE 250000
#define PIPELINE 10

static void
client_thread (void *args, zctx_t *ctx, void *pipe)
void *dealer = zsocket_new (ctx, ZMQ_DEALER);
zsocket_set_hwm (dealer, PIPELINE);
zsocket_connect (dealer, "tcp://");

// We'll allow up to five chunks in transit at once
size_t credit = PIPELINE;

size_t total = 0; // Total bytes received
size_t chunks = 0; // Total chunks received
size_t offset = 0; // Offset of next chunk request

while (true) {
while (credit) {
// Ask for next chunk
zstr_sendfm (dealer, "fetch");
zstr_sendfm (dealer, "%ld", offset);
zstr_sendf (dealer, "%ld", CHUNK_SIZE);
offset += CHUNK_SIZE;
zframe_t *chunk = zframe_recv (dealer);
if (!chunk)
break; // Shutting down, quit
size_t size = zframe_size (chunk);
zframe_destroy (&chunk);
total += size;
if (size < CHUNK_SIZE)
break; // Last chunk received; exit
printf ("%zd chunks received, %zd bytes\n", chunks, total);
zstr_send (pipe, "OK");

// The rest of the code is exactly the same as in model 2, except
// that we set the HWM on the server's ROUTER socket to PIPELINE
// to act as a sanity check.

static void
free_chunk (void *data, void *arg)
free (data);

// The server thread waits for a chunk request from a client,
// reads that chunk and sends it back to the client:

static void
server_thread (void *args, zctx_t *ctx, void *pipe)
FILE *file = fopen ("testdata", "r");
assert (file);

void *router = zsocket_new (ctx, ZMQ_ROUTER);
zsocket_set_hwm (router, PIPELINE);
zsocket_bind (router, "tcp://*:6000");
while (true) {
// First frame in each message is the sender identity
zframe_t *identity = zframe_recv (router);
if (!identity)
break; // Shutting down, quit

// Second frame is "fetch" command
char *command = zstr_recv (router);
assert (streq (command, "fetch"));
free (command);

// Third frame is chunk offset in file
char *offset_str = zstr_recv (router);
size_t offset = atoi (offset_str);
free (offset_str);

// Fourth frame is maximum chunk size
char *chunksz_str = zstr_recv (router);
size_t chunksz = atoi (chunksz_str);
free (chunksz_str);

// Read chunk of data from file
fseek (file, offset, SEEK_SET);
byte *data = malloc (chunksz);
assert (data);

// Send resulting chunk to client
size_t size = fread (data, 1, chunksz, file);
zframe_t *chunk = zframe_new_zero_copy (data, size, free_chunk, NULL);
zframe_send (&identity, router, ZFRAME_MORE);
zframe_send (&chunk, router, 0);
fclose (file);

// The main task starts the client and server threads; it's easier
// to test this as a single process with threads, than as multiple
// processes:

int main (void)
// Start child threads
zctx_t *ctx = zctx_new ();
zthread_fork (ctx, server_thread, NULL);
void *client =
zthread_fork (ctx, client_thread, NULL);
// Loop until client tells us it's done
char *string = zstr_recv (client);
free (string);
// Kill server thread
zctx_destroy (&ctx);
return 0;