Freelance client API in Python

"""
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services

Author: Min RK <moc.liamg|krnimajneb#moc.liamg|krnimajneb>
"""

import threading
import time

import zmq

from zhelpers import zpipe

# If no server replies within this time, abandon request
GLOBAL_TIMEOUT = 3000 # msecs
# PING interval for servers we think are alivecp //
PING_INTERVAL = 2000
# msecs
# Server considered dead if silent for this long
SERVER_TTL = 6000
# msecs//

def flciapi_agent(peer):
"""This is the thread that handles our real flcliapi class
"""

pass

# =====================================================================
# Synchronous part, works in our application thread

class FreelanceClient(object):
ctx = None # Our Context
pipe = None # Pipe through to flciapi agent
agent = None # agent in a thread

def __init__(self):
self.ctx = zmq.Context()
self.pipe, peer = zpipe(self.ctx)
self.agent = threading.Thread(target=agent_task, args=(self.ctx,peer))
self.agent.daemon = True
self.agent.start()


def connect(self, endpoint):
"""Connect to new server endpoint
Sends [CONNECT][endpoint] to the agent
"""

self.pipe.send_multipart(["CONNECT", endpoint])
time.sleep(0.1) # Allow connection to come up

def request(self, msg):
"Send request, get reply"
request = ["REQUEST"] + msg
self.pipe.send_multipart(request)
reply = self.pipe.recv_multipart()
status = reply.pop(0)
if status != "FAILED":
return reply

# =====================================================================
# Asynchronous part, works in the background

# ---------------------------------------------------------------------
# Simple class for one server we talk to

class FreelanceServer(object):
endpoint = None # Server identity/endpoint
alive = True # 1 if known to be alive
ping_at = 0 # Next ping at this time
expires = 0 # Expires at this time

def __init__(self, endpoint):
self.endpoint = endpoint
self.alive = True
self.ping_at = time.time() + 1e-3*PING_INTERVAL
self.expires = time.time() + 1e-3*SERVER_TTL

def ping(self, socket):
if time.time() > self.ping_at:
socket.send_multipart([self.endpoint, 'PING'])
self.ping_at = time.time() + 1e-3*PING_INTERVAL

def tickless(self, tickless):
if tickless > self.ping_at:
tickless = self.ping_at
return tickless

# ---------------------------------------------------------------------
# Simple class for one background agent

class FreelanceAgent(object):
ctx = None # Own context
pipe = None # Socket to talk back to application
router = None # Socket to talk to servers
servers = None # Servers we've connected to
actives = None # Servers we know are alive
sequence = 0 # Number of requests ever sent
request = None # Current request if any
reply = None # Current reply if any
expires = 0 # Timeout for request/reply

def __init__(self, ctx, pipe):
self.ctx = ctx
self.pipe = pipe
self.router = ctx.socket(zmq.ROUTER)
self.servers = {}
self.actives = []

def control_message (self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)

if command == "CONNECT":
endpoint = msg.pop(0)
print "I: connecting to %s\n" % endpoint,
self.router.connect(endpoint)
server = FreelanceServer(endpoint)
self.servers[endpoint] = server
self.actives.append(server)
# these are in the C case, but seem redundant:
server.ping_at = time.time() + 1e-3*PING_INTERVAL
server.expires = time.time() + 1e-3*SERVER_TTL
elif command == "REQUEST":
assert not self.request # Strict request-reply cycle
# Prefix request with sequence number and empty envelope
self.request = [str(self.sequence), ''] + msg

# Request expires after global timeout
self.expires = time.time() + 1e-3*GLOBAL_TIMEOUT

def router_message (self):
reply = self.router.recv_multipart()
# Frame 0 is server that replied
endpoint = reply.pop(0)
server = self.servers[endpoint]
if not server.alive:
self.actives.append(server)
server.alive = 1

server.ping_at = time.time() + 1e-3*PING_INTERVAL
server.expires = time.time() + 1e-3*SERVER_TTL;

# Frame 1 may be sequence number for reply
sequence = reply.pop(0)
if int(sequence) == self.sequence:
self.sequence += 1
reply = ["OK"] + reply
self.pipe.send_multipart(reply)
self.request = None

# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.

def agent_task(ctx, pipe):
agent = FreelanceAgent(ctx, pipe)
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
poller.register(agent.router, zmq.POLLIN)

while True:
# Calculate tickless timer, up to 1 hour
tickless = time.time() + 3600
if (agent.request and tickless > agent.expires):
tickless = agent.expires
for server in agent.servers.values():
tickless = server.tickless(tickless)
try:
items = dict(poller.poll(1000 * (tickless - time.time())))
except:
break # Context has been shut down

if agent.pipe in items:
agent.control_message()

if agent.router in items:
agent.router_message()

# If we're processing a request, dispatch to next server
if (agent.request):
if (time.time() >= agent.expires):
# Request expired, kill it
agent.pipe.send("FAILED")
agent.request = None
else:
# Find server to talk to, remove any expired ones
while agent.actives:
server = agent.actives[0]
if time.time() >= server.expires:
server.alive = 0
agent.actives.pop(0)
else:
request = [server.endpoint] + agent.request
agent.router.send_multipart(request)
break

# Disconnect and delete any expired servers
# Send heartbeats to idle servers if needed
for server in agent.servers.values():
server.ping(agent.router)