Titanic broker example in Haxe

package ;
import haxe.Stack;
import neko.Lib;
import neko.Sys;
import haxe.io.Input;
import neko.FileSystem;
import neko.io.File;
import neko.io.FileInput;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMsg;
import org.zeromq.ZThread;
import org.zeromq.ZMQException;

/**
* Titanic service
* Implements server side of http://rfc.zeromq.org/spec:9
* @author Richard Smith
*/

class Titanic
{

/** Connection string to broker */
private var broker:String;

/** Print activity to stdout */
private var verbose:Bool;

/** Logger function used in verbose mode */
private var log:Dynamic->Void;

private static inline var UID = "0123456789ABCDEF";

private static inline var TITANIC_DIR = ".titanic";

/**
* Main method
*/

public static function main() {
Lib.println("** Titanic (see: http://zguide.zeromq.org/page:all#Disconnected-Reliability-Titanic-Pattern)");

var argArr = Sys.args();
var verbose = (argArr.length > 1 && argArr[argArr.length - 1] == "-v");
var log = Lib.println;

var ctx = new ZContext();

// Create Titanic worker class
var titanic = new Titanic("tcp://localhost:5555", verbose);

// Create MDP client session with short timeout
var client = new MDCliAPI("tcp://localhost:5555", verbose);
client.timeout = 1000; // 1 sec
client.retries = 1; // Only 1 retry

var requestPipe = ZThread.attach(ctx, titanic.titanicRequest,"tcp://localhost:5555");
ZThread.detach(titanic.titanicReply, "tcp://localhost:5555");
ZThread.detach(titanic.titanicClose, "tcp://localhost:5555");

var poller = new ZMQPoller();
poller.registerSocket(requestPipe, ZMQ.ZMQ_POLLIN());

// Main dispatcher loop
while (true) {
// We'll dispatch once per second, if there's no activity
try {
var res = poller.poll(1000 * 1000); // 1 sec
} catch (e:ZMQException) {
if (!ZMQ.isInterrupted()) {
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));
} else
log("W: interrupt received, sinking the titanic…");
ctx.destroy();
client.destroy();
return;
}
if (poller.pollin(1)) {
// Ensure message directory exists
if (!FileSystem.exists(TITANIC_DIR))
FileSystem.createDirectory(TITANIC_DIR);
// Append UUID to queue, prefixed with "-" for pending
var msg = ZMsg.recvMsg(requestPipe);
if (msg == null)
break; // Interrupted
var file = File.append(TITANIC_DIR + "/queue", false);
var uuid = msg.pop().toString();
file.writeString("-" + uuid);
file.flush();
file.close();
}
// Brute-force dispatcher
if (FileSystem.exists(TITANIC_DIR + "/queue")) {
try {
var filec = File.getContent(TITANIC_DIR + "/queue");
FileSystem.deleteFile(TITANIC_DIR + "/queue");
var fileh = File.write(TITANIC_DIR + "/queue", false);
var index = 0;
while (index+33 <= filec.length) {
var str = filec.substr(index, 33);
var prefix = "-";
// UUID is prefixed with '-' if still waiting
if (str.charAt(0) == "-") {
if (verbose)
log("I: processing request " + str.substr(1));
if (titanic.serviceSuccess(client, str.substr(1))) {
// Mark queue entry as processed
prefix = "+";
}
}
fileh.writeString(prefix + str.substr(1));
index += 33;
}
fileh.flush();
fileh.close();
} catch (e:Dynamic) {
log("E: error reading queue file " +e);
}
}
}
client.destroy();
ctx.destroy();
}

/**
* Constructor
* @param broker
* @param ?verbose
* @param ?logger
*/

public function new(broker:String, ?verbose:Bool, ?logger:Dynamic->Void) {
this.broker = broker;
this.verbose = verbose;
if (logger != null)
log = logger;
else
log = neko.Lib.println;
}

/**
* Returns a new UUID as a printable String
* @param ?size
* @return
*/

private function generateUUID(?size:Int):String {
if (size == null) size = 32;
var nchars = UID.length;
var uid = new StringBuf();
for (i in 0 size) {
uid.add(UID.charAt(ZHelpers.randof(nchars-1)));
}
return uid.toString();
}

/**
* Returns request filename for given UUID
* @param uuid
* @return
*/

private function requestFilename(uuid:String):String {
return TITANIC_DIR + "/" + uuid + ".req";
}

/**
* Returns reply filename for given UUID
* @param uuid
* @return
*/

private function replyFilename(uuid:String):String {
return TITANIC_DIR + "/" + uuid + ".rep";
}

/**
* Implements Titanic request service "titanic.request"
* @param ctx
* @param pipe
*/

public function titanicRequest(ctx:ZContext, pipe:ZMQSocket, broker:String) {
var worker = new MDWrkAPI(broker, "titanic.request", verbose);
var reply:ZMsg = null;

while (true) {
if (reply != null) trace("reply object:" + reply.toString());
// Send reply if it's not null
// and then get next request from broker
var request = worker.recv(reply);
if (request == null)
break; // Interrupted, exit

// Ensure message directory exists
if (!FileSystem.exists(TITANIC_DIR))
FileSystem.createDirectory(TITANIC_DIR);

// Generate UUID and save message to disk
var uuid = generateUUID();
var filename = requestFilename(uuid);
var file = File.write(filename, false);
ZMsg.save(request, file);
file.close();
request.destroy();
// Send UUID through to message queue
reply = new ZMsg();
reply.addString(uuid);
reply.send(pipe);

// Now send UUID back to client
// Done by the worker.recv() call at the top of the loop
reply = new ZMsg();
reply.addString("200");
reply.addString(uuid);
}
worker.destroy();
}

/**
* Implements titanic reply service "titanic.reply"
*/

public function titanicReply(broker:String) {
var worker = new MDWrkAPI(broker, "titanic.reply", verbose);
var reply:ZMsg = null;

while (true) {
// Send reply if it's not null
// and then get next request from broker
var request = worker.recv(reply);
if (request == null)
break; // Interrupted, exit

// Ensure message directory exists
if (!FileSystem.exists(TITANIC_DIR))
FileSystem.createDirectory(TITANIC_DIR);

// Generate UUID and save message to disk
var uuid = request.popString();
var reqfilename = requestFilename(uuid);
var repfilename = replyFilename(uuid);
if (FileSystem.exists(repfilename)) {
var file = File.read(repfilename, false);
reply = ZMsg.load(file);
reply.pushString("200");
file.close();
} else {
reply = new ZMsg();
if (FileSystem.exists(reqfilename))
reply.pushString("300"); // Pending
else
reply.pushString("400");
request.destroy();
}
}
worker.destroy();
}

/**
* Implements titanic close service "titanic.close"
* @param broker
*/

public function titanicClose(broker:String) {
var worker = new MDWrkAPI(broker, "titanic.close", verbose);
var reply:ZMsg = null;

while (true) {
// Send reply if it's not null
// and then get next request from broker
var request = worker.recv(reply);
if (request == null)
break; // Interrupted, exit

// Ensure message directory exists
if (!FileSystem.exists(TITANIC_DIR))
FileSystem.createDirectory(TITANIC_DIR);

// Generate UUID and save message to disk
var uuid = request.popString();
var reqfilename = requestFilename(uuid);
var repfilename = replyFilename(uuid);
FileSystem.deleteFile(reqfilename);
FileSystem.deleteFile(repfilename);

request.destroy();
reply = new ZMsg();
reply.addString("200");
}
worker.destroy();
}

/**
* Attempt to process a single service request message, return true if successful
* @param client
* @param uuid
* @return
*/

public function serviceSuccess(client:MDCliAPI, uuid:String):Bool {
// Load request message, service will be first frame
var filename = requestFilename(uuid);
var file = File.read(filename, false);
var request:ZMsg = null;
try {
request = ZMsg.load(file);
file.close();
} catch (e:Dynamic) {
log("E: Error loading file:" + filename + ", details:" + e);
return false;
}
var service = request.pop();
var serviceName = service.toString();

// Use MMI protocol to check if service is available
var mmiRequest = new ZMsg();
mmiRequest.add(service);
var mmiReply = client.send("mmi.service", mmiRequest);
var serviceOK = (mmiReply != null && mmiReply.first().streq("200"));

if (serviceOK) {
// Now call requested service and store reply from service
var reply = client.send(serviceName, request);
if (reply != null) {
filename = replyFilename(uuid);
try {
var file = File.write(filename, false);
ZMsg.save(reply, file);
file.close();
return true;
} catch (e:Dynamic) {
log("E: Error writing file:" + filename + ", details:" + e);
return false;
}
}
reply.destroy();
} else
request.destroy();
return false;
}

}