"""
integration pattern classes for message processing
(c) integrationLabs 1996- 2004
coded by: thanos and friends
integrationLabs opensource
$RCSfile: __init__.py,v $
$Date: 2004/07/16 17:31:19 $
$Revision: 1.3 $
"""
from types import ListType, TupleType
from ilabs.core import Node, RoutingStrategy, RouteTable, Router
[docs]class NodeList(Node):
deferChange = False
onChange = None
def __init__(self, name='', *args):
Node.__init__(self, name)
self.list =[]
if self.onChange: self.onChange(self.__init__, *args)
def __setitem__(self, i, item):
elf.list[i] = item
if self.onChange: self.onChange(self.__setitem__, i, item)
def __delitem__(self, i):
del self.list[i]
if self.onChange: self.onChange(self.__delitem__, i)
def __setslice__(self, i, j, other):
self.list[i:j] = other
if self.onChange: self.onChange(self.__setslice__, i, j, other)
def __delslice__(self, i, j):
del self.list[i:j]
if self.onChange: self.onChange(self.__delslice__, i, j)
[docs] def append(self, value):
self.list.append(value)
if self.onChange: self.onChange(self.append, value)
[docs] def insert(self, where, value):
self.list.insert(where, value)
if self.onChange: self.onChange(self.insert, value)
[docs] def pop(self, i=-1):
result = self.list.pop(i)
if self.onChange: self.onChange(self.pop, value)
return result
[docs] def remove(self, item):
self.list.remove(item)
if self.onChange: self.onChange(self.remove, value)
[docs] def count(self, item):
result = self.list.count(item)
if self.onChange: self.onChange(self.count, value)
return result
[docs] def index(self, item, *args):
result = self.list.index(item, *args)
if self.onChange: self.onChange(self.index, value)
return result
[docs] def reverse(self):
self.list.reverse()
if self.onChange: self.onChange(self.reverse, value)
[docs] def sort(self, *args):
self.list.sort(*args)
if self.onChange: self.onChange(self.sort, value)
[docs] def extend(self, other):
self.list.extend(other)
if self.onChange: self.onChange(self.extend, value)
def __repr__(self): return repr(self.list)
def __lt__(self, other): return self.list < other.list
def __le__(self, other): return self.list <= other.list
def __eq__(self, other): return self.list == other.list
def __ne__(self, other): return self.list != other.list
def __gt__(self, other): return self.list > other.list
def __ge__(self, other): return self.list >= other.list
def __cmp__(self, other): return cmp(self.list, other.list)
def __contains__(self, item): return item in self.list
def __len__(self): return len(self.list)
def __getitem__(self, i): return self.list[i]
def __getslice__(self, i, j):
i = max(i, 0); j = max(j, 0)
return self.list[i:j]
[docs]class PipeLine(NodeList):
"""
Behaves as a chained list of Nodes. Received messages are passed to the first node, and the
last node in turn sends the message to the destination.
The PipeLine's process is called before and of the containing nodes.
Use list operations (append, insert, del, setitem, etc) to set the nodes in the bank.
When Nodes are added, inserted removed they are relinked to maintain the pipeline.
PipeLine = [Node1, Node2.. NodeN]
NodeA -> PipeLine -> NodeB
Expands to:
NodeA -> PipeLine -> Node1 - > Node2 -> ...-> NodeN -> NodeB
"""
[docs] def onChange(self, *args):
""" called every time the list is altered to relink the nodes """
self.link()
[docs] def send(self, agent):
""" send the agent and sends it to 1st node """
if len(self):
self[0].receive(agent)
else:
Node.send(self, agent)
[docs] def connect(self, receiver, *args):
""" connects 1st as the pipelines destination and last node of chain to the actual destination """
Node.connect(self, receiver, *args)
if len(self):
self[-1].connect(receiver)
return self
[docs] def link(self):
""" used to relink the nodes """
if len(self):
for i in xrange(len(self[:-1])):
self[i].connect(self[i+1])
if self.destination:
self[-1].connect(self.destination)
[docs]class NodeBank(NodeList):
"""
Behaves as a "parallel" bank of Nodes.
Received messages are passed to the all the nodes in turn, the resulting messages are
then sent on to the destination.
As with PipeLine this can be treated as a list.
Use list operations (append, insert, del, setitem, etc) to set the nodes in the bank.
"""
[docs] def connect(self, receiver, *args):
assert receiver, "receiver must not be None"
Node.connect(self, receiver, *args)
if len(self):
map(lambda x, r = receiver, a = args: x.connect(r, *a), self)
[docs] def receive(self, agent):
if len(self):
for node in self:
node.receive(agent)
else:
Node.recieve(self, agent)
[docs] def append(self, node):
NodeList.append(self, node)
self.connectNode(node)
def __setitem__(self, index, node):
NodeList.__setitem__(self, index, node)
self.connectNode(node)
[docs] def connectNode(self, node):
if self.destination:
node.connect(self.destination)
[docs] def onChange(self, *args):
"do nothing"
pass
[docs]class RouteList(RouteTable):
[docs] def addRoute(self, route, *criteria):
if not criteria:
return RouteTable.addRoute(self, route, route)
return RouteTable.addRoute(self, route, *criteria)
[docs]class AllRoutes(RoutingStrategy):
[docs] def getRoutesUsingAgent(self, agent, table):
return table.allRoutes()
[docs]class Distributor(Router):
strategyClass = AllRoutes
tableClass = RouteList
"""
Will send messages to a list of connected subscribers
"""
[docs]class SubscriptionTable(RouteTable):
"""
Implements a router table adds subscribing route (node)
to an approriate distributer.
"""
[docs] def addRoute(self, node, *criteria):
for criterion in criteria:
if criterion not in self.table:
self.table[criterion] = Distributor()
self.table[criterion].connect(node)
[docs]class RecipientList(Router):
"""
Implements a router that sends a message to each route
on the approriate subscription list
"""
tableClass = SubscriptionTable
[docs]class MessageProcessor(Node):
routerClass = RecipientList
concentratorClass = Node
def __init__(self, name='', router=None, concentrator=None):
Node.__init__(self, name)
if not router:
router = routerClass()
self.router = router
if not concentrator:
concentrator = concentratorClass()
self.destination = concentrator
[docs] def register(self, processor, *criteria):
if id not in table:
processor.connect(self.destination)
self.router.connect(processor, *criteria)
[docs] def send(self, envelope):
self.router.receive(envelope)
[docs] def connect(self, recipient):
self.destination.connect(recipient)
[docs]class StateMachine(Node):
"""
Implements a hierachical state machine.
Add attribute the states where
states is a tuple of entries.
Each entry can be one of:
entry := current, message, next
entry := current state, message, test, resultset
current := StateMachine.INIT
State
collection of State
StateMachine.ANY
message := Message
collection of Message
StateMachine.ANY
next := State
StateMachine.CURRENT
test := python expression
resultset:= (result, next)+
some simple examples:
states=((DrawerClosed, Eject, DrawerOpen),
(DrawerOpen, Eject, CDStopped),
(CDStopped, Play, CDPlaying),
(CDPlaying, Pause, CDPaused),
((CDPlaying,CDPaused), Stoped, CDStoped),
(CDPaused, (Pause,Play) CDPlaying)
)
states=((DrawerClosed, Eject, DrawerOpen),
(DrawerOpen, Eject, "hasCd()", (False, CDStopped), (True, DrawerClosed)),
((CDStopped, CDPlaying, CDPause), Eject, DrawerOpen),
((CDStopped, CDPause), Play, CDPlaying),
(CDPlaying,CDPaused), Stoped, CDStoped),
((CDPlaying,CDPaused), NextTrack, "isLastTrack()", (False, StateMachine.CURRENT), (True, CDStop)),
((CDPlaying,CDPaused), PrevTrack", isFirstTrack()", (False, StateMachine.CURRENT), (True, CDStop))
)
for more and hierachicle see the test code
"""
[docs] class State(Node):
def __repr__(self): return self.name or self.__class__.__name__
[docs] def process(self, node, message):
Node.process(self, message)
INIT = State('INIT')
ANY = State('ANY')
LAST=State('LAST')
CURRENT =State('CURRENT')
UNDEF = INIT
DEFAULT = object()
debug = False
def __init__(self, name, *args):
Node.__init__(self, name, *args)
self.state = self.INIT
self.buildTable()
[docs] def buildTable(self):
states = {}
for entry in self.states:
currentState, transitions = entry[0], entry[1:]
for transition in transitions:
if len(transition) == 2:
messageList, nextStates = transition
test = None
elif len(transition) > 3:
messageList,test,nextStates = transition[0], compile(transition[1], '<string>', 'eval') , dict(transition[2:])
if self.DEFAULT not in nextStates:
nextStates[self.DEFAULT] = None
if not type(messageList) in (ListType, TupleType):
messageList = [messageList]
for message in messageList:
states[self.getInitKey(currentState, message)] = test, nextStates
self.states = states
[docs] def getKey(self, message, currentState): return (currentState, message)
getInitKey = getKey
[docs] def getNextState(self, message, *varspace):
key = self.getKey( self.state, message)
entry = self.states.get(key)
#print '1.StateMachine key:%s, nextState: %s' % (key, entry)
if entry is None:
key = self.getKey(StateMachine.ANY, message)
entry = self.states.get(key)
#print '2.StateMachine key:%s, nextState: %s' % (key, entry)
retval = self.UNDEF
if entry is not None:
import time
test, resultStates = entry[0], entry[1]
if test:
res = eval(test, *varspace)
try:
newState = resultStates[res]
except KeyError:
newState = resultStates[self.DEFAULT]
"""
print test, 'TEST', time.strftime("%H:%M:%S.%%04d", time.localtime(message.header[0])) % message.header[7]
print res, newState, resultStates
"""
else:
newState = resultStates
else:
if self.UNDEF == StateMachine.CURRENT:
newState = self.state
else:
newState = self.INIT
#print self.getNextState, retval, self.state
return newState
[docs] def process(self, message, *varspace):
state = self.getNextState(message, *varspace)
if state != self.state:
if 0:
import time
print time.strftime("%H:%M:%S.%%04d", time.localtime(message.header[0])) % message.header[7]
print self.state, state, message.header
self.state = state
if hasattr(state, 'process'):
return state.process(self, message)
[docs] def ofInterest(self):
return [message for state, message in self.states if state == self.state]