;;; -*- Mode:Lisp; Syntax:ANSI-Common-Lisp; -*-
;;;
;;; Least-recently used (LRU) queue device in Common Lisp
;;; Clients and workers are shown here in-process
;;;
;;; Kamil Shakirov <moc.liamg|08slimak#moc.liamg|08slimak>
;;;
(defpackage #:zguide.lruqueue
(:nicknames #:lruqueue)
(:use #:cl #:zhelpers)
(:shadow #:message)
(:export #:main))
(in-package :zguide.lruqueue)
(defun message (fmt &rest args)
(let ((new-fmt (format nil "[~A] ~A"
(bt:thread-name (bt:current-thread)) fmt)))
(apply #'zhelpers:message new-fmt args)))
(defparameter *number-clients* 10)
(defparameter *number-workers* 3)
;; Basic request-reply client using REQ socket
(defun client-thread (context)
(zmq:with-socket (client context zmq:req)
(set-socket-id client) ; Makes tracing easier
(zmq:connect client "ipc://frontend.ipc")
;; Send request, get reply
(send-text client "HELLO")
(let ((reply (recv-text client)))
(message "Client: ~A~%" reply))))
;; Worker using REQ socket to do LRU routing
(defun worker-thread (context)
(zmq:with-socket (worker context zmq:req)
(set-socket-id worker) ; Makes tracing easier
(zmq:connect worker "ipc://backend.ipc")
;; Tell broker we're ready for work
(send-text worker "READY")
;; Ignore errors and exit when the context gets terminated
(ignore-errors
(loop
;; Read and save all frames until we get an empty frame
;; In this example there is only 1 but it could be more
(let ((address (recv-text worker)))
(recv-text worker) ; empty
;; Get request, send reply
(let ((request (recv-text worker)))
(message "Worker: ~A~%" request)
(send-more-text worker address)
(send-more-text worker "")
(send-text worker "OK")))))))
(defun main ()
;; Prepare our context and sockets
(zmq:with-context (context 1)
(zmq:with-socket (frontend context zmq:router)
(zmq:with-socket (backend context zmq:router)
(zmq:bind frontend "ipc://frontend.ipc")
(zmq:bind backend "ipc://backend.ipc")
(dotimes (i *number-clients*)
(bt:make-thread (lambda () (client-thread context))
:name (format nil "client-thread-~D" i)))
(dotimes (i *number-workers*)
(bt:make-thread (lambda () (worker-thread context))
:name (format nil "worker-thread-~D" i)))
;; Logic of LRU loop
;; - Poll backend always, frontend only if 1+ worker ready
;; - If worker replies, queue worker as ready and forward reply
;; to client if necessary
;; - If client requests, pop next worker and send request to it
;; Queue of available workers
(let ((number-clients *number-clients*)
(available-workers 0)
(worker-queue (make-queue)))
(loop
;; Initialize poll set
(zmq:with-polls
((items2 .
;; Always poll for worker activity on backend
((backend . zmq:pollin)
(frontend . zmq:pollin)))
(items1 .
;; Poll front-end only if we have available workers
((backend . zmq:pollin))))
(let ((revents
(if (zerop available-workers)
(zmq:poll items1)
(zmq:poll items2))))
;; Handle worker activity on backend
(when (= (first revents) zmq:pollin)
;; Queue worker address for LRU routing
(let ((worker-addr (recv-text backend)))
(assert (< available-workers *number-workers*))
(enqueue worker-queue worker-addr)
(incf available-workers))
;; Second frame is empty
(recv-text backend) ; empty
;; Third frame is READY or else a client reply address
(let ((client-addr (recv-text backend)))
(when (string/= client-addr "READY")
(recv-text backend) ; empty
(let ((reply (recv-text backend)))
(send-more-text frontend client-addr)
(send-more-text frontend "")
(send-text frontend reply))
(when (zerop (decf number-clients))
(return)))))
(when (and (cdr revents)
(= (second revents) zmq:pollin))
;; Now get next client request, route to LRU worker
;; Client request is [address][empty][request]
(let ((client-addr (recv-text frontend)))
(recv-text frontend) ; empty
(let ((request (recv-text frontend)))
(send-more-text backend (dequeue worker-queue))
(send-more-text backend "")
(send-more-text backend client-addr)
(send-more-text backend "")
(send-text backend request))
(decf available-workers)))))))))
(sleep 2))
(cleanup))