Multithreaded service in Haxe

package ;

import haxe.io.Bytes;
import haxe.Stack;
import neko.Lib;
import neko.Sys;
#if !php
import neko.vm.Thread;
#end
import org.zeromq.ZMQ;
import org.zeromq.ZMQContext;
import org.zeromq.ZMQPoller;
import org.zeromq.ZMQSocket;
import org.zeromq.ZMQException;

/**
* Multithreaded Hello World Server
*
* See: http://zguide.zeromq.org/page:all#Multithreading-with-MQ
* Use with HelloWorldClient.hx
*
*/

class MTServer
{

static function worker() {
var context:ZMQContext = ZMQContext.instance();

// Socket to talk to dispatcher
var responder:ZMQSocket = context.socket(ZMQ_REP);
#if (neko || cpp)
responder.connect("inproc://workers");
#elseif php
responder.connect("ipc://workers.ipc");
#end
ZMQ.catchSignals();

while (true) {

try {
// Wait for next request from client
var request:Bytes = responder.recvMsg();

trace ("Received request:" + request.toString());

// Do some work
Sys.sleep(1);

// Send reply back to client
responder.sendMsg(Bytes.ofString("World"));
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break;
}
trace (e.toString());
}
}
responder.close();
return null;
}

/**
* Implements a reqeust/reply QUEUE broker device
* Returns if poll is interrupted
* @param ctx
* @param frontend
* @param backend
*/

static function queueDevice(ctx:ZMQContext, frontend:ZMQSocket, backend:ZMQSocket) {

// Initialise pollset
var poller:ZMQPoller = ctx.poller();
poller.registerSocket(frontend, ZMQ.ZMQ_POLLIN());
poller.registerSocket(backend, ZMQ.ZMQ_POLLIN());

ZMQ.catchSignals();

while (true) {
try {
poller.poll();
if (poller.pollin(1)) {
var more:Bool = true;
while (more) {
// Receive message
var msg = frontend.recvMsg();
more = frontend.hasReceiveMore();

// Broker it
backend.sendMsg(msg, { if (more) SNDMORE else null; } );
}
}

if (poller.pollin(2)) {
var more:Bool = true;
while (more) {
// Receive message
var msg = backend.recvMsg();
more = backend.hasReceiveMore();

// Broker it
frontend.sendMsg(msg, { if (more) SNDMORE else null; } );
}
}
} catch (e:ZMQException) {
if (ZMQ.isInterrupted()) {
break;
}
// Handle other errors
trace("ZMQException #:" + e.errNo + ", str:" + e.str());
trace (Stack.toString(Stack.exceptionStack()));

}

}

}
public static function main() {
var context:ZMQContext = ZMQContext.instance();

Lib.println ("** MTServer (see: http://zguide.zeromq.org/page:all#Multithreading-with-MQ)");

// Socket to talk to clients
var clients:ZMQSocket = context.socket(ZMQ_ROUTER);
clients.bind ("tcp://*:5556");

// Socket to talk to workers
var workers:ZMQSocket = context.socket(ZMQ_DEALER);

#if (neko || cpp)
workers.bind ("inproc://workers");

// Launch worker thread pool
var workerThreads:List<Thread> = new List<Thread>();
for (thread_nbr in 0 5) {
workerThreads.add(Thread.create(worker));
}
#elseif php
workers.bind ("ipc://workers.ipc");

// Launch pool of worker processes, due to php's lack of thread support
// See: https://github.com/imatix/zguide/blob/master/examples/PHP/mtserver.php
for (thread_nbr in 0 5) {
untyped __php__('
$pid = pcntl_fork();
if ($pid == 0) {
// Running in child process
worker();
exit();
}'
);
}
#end
// Invoke request / reply broker (aka QUEUE device) to connect clients to workers
queueDevice(context, clients, workers);

// Close up shop
clients.close();
workers.close();
context.term();
}
}