<?php
/*
* Titanic service
*
* Implements server side of http://rfc.zeromq.org/spec:9
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/
include_once "mdwrkapi.php";
include_once "mdcliapi.php";
/* Return a new UUID as a printable character string */
function s_generate_uuid() {
$uuid = sprintf('%04x%04x%04x%03x4%04x%04x%04x%04x',
mt_rand(0, 65535), mt_rand(0, 65535), // 32 bits for "time_low"
mt_rand(0, 65535), // 16 bits for "time_mid"
mt_rand(0, 4095), // 12 bits before the 0100 of (version) 4 for "time_hi_and_version"
bindec(substr_replace(sprintf('%016b', mt_rand(0, 65535)), '01', 6, 2)),
// 8 bits, the last two of which (positions 6 and 7) are 01, for "clk_seq_hi_res"
// (hence, the 2nd hex digit after the 3rd hyphen can only be 1, 5, 9 or d)
// 8 bits for "clk_seq_low"
mt_rand(0, 65535), mt_rand(0, 65535), mt_rand(0, 65535) //// 48 bits for "node" //
);
return $uuid;
}
define("TITANIC_DIR", ".titanic");
/**
* Returns freshly allocated request filename for given UUID
*/
function s_request_filename($uuid) {
return TITANIC_DIR . "/" . $uuid . ".req";
}
/**
* Returns freshly allocated reply filename for given UUID
*/
function s_reply_filename($uuid) {
return TITANIC_DIR . "/" . $uuid . ".rep";
}
/**
* Titanic request service
*/
function titanic_request($pipe) {
$worker = new Mdwrk("tcp://localhost:5555", "titanic.request");
$reply = null;
while(true) {
// Get next request from broker
$request = $worker->recv($reply);
// Ensure message directory exists
if(!is_dir(TITANIC_DIR)) {
mkdir(TITANIC_DIR);
}
// Generate UUID and save message to disk
$uuid = s_generate_uuid();
$filename = s_request_filename($uuid);
$fh = fopen($filename, "w");
$request->save($fh);
fclose($fh);
// Send UUID through to message queue
$reply = new Zmsg($pipe);
$reply->push($uuid);
$reply->send();
// Now send UUID back to client
// - sent in the next loop iteration
$reply = new Zmsg();
$reply->push($uuid);
$reply->push("200");
}
}
/**
* Titanic reply service
*/
function titanic_reply() {
$worker = new Mdwrk( "tcp://localhost:5555", "titanic.reply", false);
$reply = null;
while(true) {
$request = $worker->recv($reply);
$uuid = $request->pop();
$req_filename = s_request_filename($uuid);
$rep_filename = s_reply_filename($uuid);
if(file_exists($rep_filename)) {
$fh = fopen($rep_filename, "r");
assert($fh);
$reply = new Zmsg();
$reply->load($fh);
$reply->push("200");
fclose($fh);
} else {
$reply = new Zmsg();
if(file_exists($req_filename)) {
$reply->push("300"); // Pending
} else {
$reply->push("400"); // Unknown
}
}
}
}
/**
* Titanic close service
*/
function titanic_close() {
$worker = new Mdwrk("tcp://localhost:5555", "titanic.close", false);
$reply = null;
while(true) {
$request = $worker->recv($reply);
$uuid = $request->pop();
$req_filename = s_request_filename($uuid);
$rep_filename = s_reply_filename($uuid);
unlink($req_filename);
unlink($rep_filename);
$reply = new Zmsg();
$reply->push("200");
}
}
/**
* Attempt to process a single request, return 1 if successful
*
* @param Mdcli $client
* @param string $uuid
*/
function s_service_success($client, $uuid) {
// Load request message, service will be first frame
$filename = s_request_filename($uuid);
$fh = fopen($filename, "r");
// If the client already closed request, treat as successful
if(!$fh) {
return true;
}
$request = new Zmsg();
$request->load($fh);
fclose($fh);
$service = $request->pop();
// Use MMI protocol to check if service is available
$mmi_request = new Zmsg();
$mmi_request->push($service);
$mmi_reply = $client->send("mmi.service", $mmi_request);
$service_ok = $mmi_reply && $mmi_reply->pop() == "200";
if($service_ok) {
$reply = $client->send($service, $request);
$filename = s_reply_filename($uuid);
$fh = fopen($filename, "w");
assert($fh);
$reply->save($fh);
fclose($fh);
return true;
}
return false;
}
$verbose = $_SERVER['argc'] > 1 && $_SERVER['argv'][1] == '-v';
$pid = pcntl_fork();
if($pid == 0) {
titanic_reply();
exit();
}
$pid = pcntl_fork();
if($pid == 0) {
titanic_close();
exit();
}
$pid = pcntl_fork();
if($pid == 0) {
$pipe = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_PAIR);
$pipe->connect("ipc://" . sys_get_temp_dir() . "/titanicpipe");
titanic_request($pipe);
exit();
}
// Create MDP client session with short timeout
$client = new Mdcli("tcp://localhost:5555", $verbose);
$client->set_timeout(1000); // 1 sec
$client->set_retries(1); // only 1 retry
$request_pipe = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_PAIR);
$request_pipe->bind("ipc://" . sys_get_temp_dir() . "/titanicpipe");
$read = $write = array();
// Main dispatcher loop
while(true) {
// We'll dispatch once per second, if there's no activity
$poll = new ZMQPoll();
$poll->add($request_pipe, ZMQ::POLL_IN);
$events = $poll->poll($read, $write, 1000);
if($events) {
// Ensure message directory exists
if(!is_dir(TITANIC_DIR)) {
mkdir(TITANIC_DIR);
}
// Append UUID to queue, prefixed with '-' for pending
$msg = new Zmsg($request_pipe);
$msg->recv();
$fh = fopen(TITANIC_DIR . "/queue", "a");
$uuid = $msg->pop();
fprintf($fh, "-%s\n", $uuid);
fclose($fh);
}
// Brute-force dispatcher
if(file_exists(TITANIC_DIR . "/queue")) {
$fh = fopen(TITANIC_DIR . "/queue", "r+");
while($fh && $entry = fread($fh, 33)) {
// UUID is prefixed with '-' if still waiting
if($entry[0] == "-") {
if($verbose) {
printf ("I: processing request %s%s", substr($entry, 1), PHP_EOL);
}
if(s_service_success($client, substr($entry, 1))) {
// Mark queue entry as processed
fseek($fh, -33, SEEK_CUR);
fwrite ($fh, "+");
fseek($fh, 32, SEEK_CUR);
}
}
// Skip end of line, LF or CRLF
if(fgetc($fh) == "\r") {
fgetc($fh);
}
}
if($fh) {
fclose($fh);
}
}
}