Key-value message class - full in Python


kvmsg - key-value message class for example applications

Author: Min RK <moc.liamg|krnimajneb#moc.liamg|krnimajneb>


import struct # for packing integers
import sys
from uuid import uuid4

import zmq
# zmq.jsonapi ensures bytes, instead of unicode:
import zmq.utils.jsonapi as json

class KVMsg(object):
Message is formatted on wire as 5 frames:
frame 0: key (0MQ string)
frame 1: sequence (8 bytes, network order)
frame 2: uuid (blob, 16 bytes)
frame 3: properties (0MQ string)
frame 4: body (blob)

key = None
sequence = 0
properties = None
body = None

def __init__(self, sequence, uuid=None, key=None, properties=None, body=None):
assert isinstance(sequence, int)
self.sequence = sequence
if uuid is None:
uuid = uuid4().bytes
self.uuid = uuid
self.key = key = {} if properties is None else properties
self.body = body

# dictionary access maps to properties:
def __getitem__(self, k):

def __setitem__(self, k, v):[k] = v

def get(self, k, default=None):
return, default)

def store(self, dikt):
"""Store me in a dict if I have anything to store"""
# this seems weird to check, but it's what the C example does
if self.key is not None and self.body is not None:
dikt[self.key] = self

def send(self, socket):
"""Send key-value message to socket; any empty frames are sent as such."""
key = '' if self.key is None else self.key
seq_s = struct.pack('!q', self.sequence)
body = '' if self.body is None else self.body
prop_s = json.dumps(
socket.send_multipart([ key, seq_s, self.uuid, prop_s, body ])

def recv(cls, socket):
"""Reads key-value message from socket, returns new kvmsg instance."""
return cls.from_msg(socket.recv_multipart())

def from_msg(cls, msg):
"""Construct key-value message from a multipart message"""
key, seq_s, uuid, prop_s, body = msg
key = key if key else None
seq = struct.unpack('!q',seq_s)[0]
body = body if body else None
prop = json.loads(prop_s)
return cls(seq, uuid=uuid, key=key, properties=prop, body=body)

def dump(self):
if self.body is None:
size = 0
size = len(self.body)
print >> sys.stderr, "[seq:{seq}][key:{key}][size:{size}] {props} {data}".format(
# uuid=hexlify(self.uuid),

# ---------------------------------------------------------------------
# Runs self test of class

def test_kvmsg (verbose):
print " * kvmsg: ",

# Prepare our context and sockets
ctx = zmq.Context()
output = ctx.socket(zmq.DEALER)
input = ctx.socket(zmq.DEALER)

kvmap = {}
# Test send and receive of simple message
kvmsg = KVMsg(1)
kvmsg.key = "key"
kvmsg.body = "body"
if verbose:

kvmsg2 = KVMsg.recv(input)
if verbose:
assert kvmsg2.key == "key"

assert len(kvmap) == 1 # shouldn't be different

# test send/recv with properties:
kvmsg = KVMsg(2, key="key", body="body")
kvmsg["prop1"] = "value1"
kvmsg["prop2"] = "value2"
kvmsg["prop3"] = "value3"
assert kvmsg["prop1"] == "value1"
if verbose:
kvmsg2 = KVMsg.recv(input)
if verbose:
# ensure properties were preserved
assert kvmsg2.key == kvmsg.key
assert kvmsg2.body == kvmsg.body
assert ==
assert kvmsg2["prop2"] == kvmsg["prop2"]

print "OK"

if __name__ == '__main__':
test_kvmsg('-v' in sys.argv)