Titanic broker example in Ruby

# Titanic service
#
# Implements server side of http:#rfc.zeromq.org/spec:9
#
# Author: Tom van Leeuwen <ue.newueelv|mot#ue.newueelv|mot>
# Based on Python example by Min RK

require './mdcliapi2.rb'
require './mdwrkapi.rb'
require 'pathname'
require 'securerandom'
require 'json'
require 'thread'
require 'awesome_print'

TITANIC_DIR = Pathname(Dir.pwd).join('.titanic')

def request_filename uuid
TITANIC_DIR.join("#{uuid}.req")
end

def reply_filename uuid
TITANIC_DIR.join("#{uuid}.rep")
end

def titanic_request pipe
worker = MajorDomoWorker.new('tcp://localhost:5555', 'titanic.request')

# Ensure message directory exists
Dir.mkdir(TITANIC_DIR) unless Dir.exist?(TITANIC_DIR)

reply = nil

loop do
request = worker.recv reply

# Generate UUID and save message to disk
uuid = SecureRandom.uuid
filename = request_filename uuid
File.open(filename, 'w') { |fh| fh.write request.to_json }

# Send UUID through to message queue
pipe.send_string uuid

# Now send UUID back to client
# Done by the worker.recv at the top of the loop
reply = ["200", uuid]
end
end

def titanic_reply
worker = MajorDomoWorker.new('tcp://localhost:5555', 'titanic.reply')
reply = nil
loop do
request = worker.recv reply
uuid = request.shift

if File.exist?(reply_filename(uuid))
reply = %w[200]
reply.concat JSON.parse(File.read(reply_filename(uuid)))
elsif File.exist?(request_filename(uuid))
reply = %w[300] # pending
else
reply = %w[400] # unknown
end
end
end

def titanic_close
worker = MajorDomoWorker.new('tcp://localhost:5555', 'titanic.close')
reply = nil
loop do
request = worker.recv reply
uuid = request.shift

File.unlink(request_filename(uuid)) if File.exist?(request_filename(uuid))
File.unlink(reply_filename(uuid)) if File.exist?(reply_filename(uuid))

reply = %w[200]
end
end

def service_success client, uuid
# Attempt to process a single request, return True if successful
return true unless File.exist?(request_filename(uuid))

request = JSON.parse(File.read(request_filename(uuid)))
service = request.shift

# Use MMI protocol to check if service is available
mmi_request = [service]
client.send('mmi.service', mmi_request)
mmi_reply = client.recv

if mmi_reply and mmi_reply.first == "200"
client.send service, request
reply = client.recv
if reply
File.open(reply_filename(uuid), 'w') { |fh| fh.write reply.to_json }
return true
end
end

false
end

context = ZMQ::Context.new

# Create MDP client session with short timeout
client = MajorDomoClient.new("tcp://localhost:5555")
client.timeout = 1000 # 1 sec
# client.retries = 1 # only 1 retry

pipe = context.socket(ZMQ::PAIR)
pipe.setsockopt ZMQ::LINGER, 0
pipe.bind("inproc://titanic")
peer = context.socket(ZMQ::PAIR)
peer.setsockopt ZMQ::LINGER, 0
peer.connect("inproc://titanic")

Thread.start do
begin
titanic_request peer
rescue Exception => e
puts e ; puts e.backtrace.join("\n")
end
end
Thread.start do
begin
titanic_reply
rescue Exception => e
puts e ; puts e.backtrace.join("\n")
end
end
Thread.start do
begin
titanic_close
rescue Exception => e
puts e ; puts e.backtrace.join("\n")
end
end

poller = ZMQ::Poller.new
poller.register(pipe, ZMQ::POLLIN)

# Ensure message directory exists
Dir.mkdir(TITANIC_DIR) unless Dir.exist?(TITANIC_DIR)

# Main dispatcher loop
queue = TITANIC_DIR.join('queue')

# Ensure queue file exists and is empty
File.open(queue, 'w') { |fh| fh.write '' }

loop do
items = poller.poll(1000)

if items > 0
uuid = ""
pipe.recv_string uuid
File.open(queue, 'a') { |fh| fh.write "-#{uuid}\n" }
end

# Brute-force dispatcher
# yeah yeah… ugly
new = []
lines = File.read(queue).split("\n")
lines.each do |line|
if line =~ /^-(.*)$/
uuid = $1
puts "I: processing request #{uuid}"

if service_success client, uuid
# mark queue entry as processed
new << uuid
else
new << line
end
else
new << line
end
end

File.open(queue, 'w') { |fh| fh.write new.join("\n") + "\n" }
end