Freelance client API in Tcl

# =====================================================================

  1. flcliapi - Freelance Pattern agent class
  2. Model 3: uses ROUTER socket to address specific services
  1. ---------------------------------------------------------------------
  2. Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
  3. Copyright other contributors as noted in the AUTHORS file.
  1. Tcl port by Jos Decoster <moc.liamg|retsoced.soj#moc.liamg|retsoced.soj>
  1. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
  1. This is free software; you can redistribute it and/or modify it under
  2. the terms of the GNU Lesser General Public License as published by
  3. the Free Software Foundation; either version 3 of the License, or (at
  4. your option) any later version.
  1. This software is distributed in the hope that it will be useful, but
  2. WITHOUT ANY WARRANTY; without even the implied warranty of
  3. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  4. Lesser General Public License for more details.
  1. You should have received a copy of the GNU Lesser General Public
  2. License along with this program. If not, see
  3. <http://www.gnu.org/licenses/>.
  4. =====================================================================


package require TclOO
package require zmq
package require mdp

package provide FLClient 1.0

# If no server replies within this time, abandon request
set GLOBAL_TIMEOUT 3000 ;# msecs
# PING interval for servers we think are alive

set PING_INTERVAL 2000 ;# msecs
# Server considered dead if silent for this long

set SERVER_TTL 6000 ;# msecs
oo::class create FLClient {

variable ctx pipe pipe_address readable agent

constructor {} {
set ctx [zmq context flcli_context_[::mdp::contextid]]
set pipe [zmq socket flcli_pipe_[::mdp::socketid] $ctx PAIR]
set pipe_address "ipc://flclientpipe_[::mdp::socketid].ipc"
$pipe connect $pipe_address
set agent [FLClient_agent new $ctx $pipe_address]
$agent process
set readable 0
}

destructor {
$agent destroy
$pipe setsockopt LINGER 0
$pipe close
$ctx term
}

# Sends [CONNECT][endpoint] to the agent
method connect {endpoint} {
set msg {}
set msg [zmsg add $msg "CONNECT"]
set msg [zmsg add $msg $endpoint]
zmsg send $pipe $msg
after 100 ;# Allow connection to come up
}

# Send & destroy request, get reply
method is_readable {} {
set readable 1
}

method request {request} {
set request [zmsg push $request "REQUEST"]
zmsg send $pipe $request
$pipe readable [list [self] is_readable]
vwait [my varname readable]
$pipe readable {}
set reply [zmsg recv $pipe]
if {[llength $reply]} {
set status [zmsg pop reply]
if {$status eq "FAILED"} {
set reply {}
}
}
return $reply
}
}

oo::class create FLClient_server {

variable endpoint alive ping_at expires

constructor {iendpoint} {
set endpoint $iendpoint
set alive 0
set ping_at [expr {[clock milliseconds] + $::PING_INTERVAL}]
set expires [expr {[clock milliseconds] + $::SERVER_TTL}]
}

destructor {
}

method ping {socket} {
if {[clock milliseconds] >= $ping_at} {
puts "ping [self]"
set ping {}
set ping [zmsg add $ping $endpoint]
set ping [zmsg add $ping "PING"]
zmsg send $socket $ping
set ping_at [expr {[clock milliseconds] + $::PING_INTERVAL}]
}
}

method ping_at {} {
return $ping_at
}

method activate {} {
set oalive $alive
set alive 1
set ping_at [expr {[clock milliseconds] + $::PING_INTERVAL}]
set expires [expr {[clock milliseconds] + $::SERVER_TTL}]
return $oalive
}

method expires {} {
return $expires
}

method endpoint {} {
return $endpoint
}

method no_longer_alive {} {
set alive 0
}
}

oo::class create FLClient_agent {

variable ctx pipe router servers actives sequence request reply expires afterid

constructor {context pipe_address} {
set ctx $context
set pipe [zmq socket flcli_agent_pipe_[::mdp::socketid] $ctx PAIR]
$pipe bind $pipe_address
set router [zmq socket flcli_router_[::mdp::socketid] $ctx ROUTER]
# set servers ;# array
set actives [list]
set sequence 0
set request {}
set reply {}
set expires 0
after 100 ;# Allow connection to come up
}

destructor {
$pipe setsockopt LINGER 0
$pipe close
$router setsockopt LINGER 0
$router close
}

method control_message {} {
puts "control message"
catch {
$pipe readable {}
$router readable {}
if {[info exists $afterid]} {
catch {after cancel $afterid}
}

set msg [zmsg recv $pipe]
puts [join [zmsg dump $msg] \n]
set command [zmsg pop msg]

if {$command eq "CONNECT"} {
set endpoint [zmsg pop msg]
puts "I: connecting to $endpoint…"
$router connect $endpoint
set server [FLClient_server new $endpoint]
set servers($endpoint) $server
lappend actives $server
} elseif {$command eq "REQUEST"} {
# Prefix request with sequence number and empty envelope
set request [zmsg push $msg [incr sequence]]
set expires [expr {[clock milliseconds] + $::GLOBAL_TIMEOUT}]
}

my process_request
} emsg
}

method router_message {} {
puts "router message"
catch {
$pipe readable {}
$router readable {}
if {[info exists $afterid]} {
catch {after cancel $afterid}
}

set reply [zmsg recv $router]
puts "Reply"
puts [join [zmsg dump $reply] \n]

# Frame 0 is server that replied
set endpoint [zmsg pop reply]
if {![info exists servers($endpoint)]} {
error "server for endpoint '$endpoint' not found"
}
if {[$servers($endpoint) activate] == 0} {
lappend actives $servers($endpoint)
}

# Frame 1 may be sequence number for reply
set nsequence [zmsg pop reply]
puts "$sequence == $nsequence"
if {$sequence == $nsequence} {
set reply [zmsg push $reply "OK"]
zmsg send $pipe $reply
set request {}
}

my process_request
} emsg
}

method process_request {} {
puts "process request"
catch {

if {[llength $request]} {
puts [join [zmsg dump $request] \n]
if {[clock milliseconds] >= $expires} {
# Request expired, kill it
$pipe send "FAILED"
set request {}
} else {
# Find server to talk to, remove any expired ones
while {[llength $actives]} {
set server [lindex $actives 0]
if {[clock milliseconds] >= [$server expires]} {
$server no_longer_alive
set actives [lassign $actives server]
} else {
set nrequest $request
set nrequest [zmsg push $nrequest [$server endpoint]]
zmsg send $router $nrequest
break
}
}
}
}

foreach server [array names servers] {
$servers($server) ping $router
}
set t [my get_timeout]
$pipe readable [list [self] control_message]
$router readable [list [self] router_message]
puts "after $t"
set afterid [after $t [list [self] poll]]
} emsg
}

method poll {} {
puts "poll"

$pipe readable {}
$router readable {}
if {[info exists $afterid]} {
catch {after cancel $afterid}
}

my process_request
}

method get_timeout {} {
# Calculate tickless timer, up to 1 hour
set tickless [expr {[clock milliseconds] + 1000 * 3600}]
if {[llength $request] && $tickless > $expires} {
set tickless $expires
}
foreach {endpoint server} [array get servers] {
set ping_at [$server ping_at]
if {$tickless > $ping_at} {
set tickless $ping_at
}
}
set timeout [expr {$tickless - [clock milliseconds]}]
if {$timeout < 0} {
set timeout 0
}
return $timeout
}

# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.
method process {} {
$pipe readable [list [self] control_message]
$router readable [list [self] router_message]
set t [my get_timeout]
set afterid [after $t [list [self] poll]]
}
}