"""
Core classes for message processing
(c) integrationLabs 1996- 2016
coded by: thanos vassilakis
syntazo opensource
$RCSfile: core.py,v $
$Date: 2004/07/16 17:35:39 $
$Revision: 1.2 $
"""
try:
class ilabsObject(object):
pass
except:
[docs] class ilabsObject:
pass
[docs]class DeliveryAgent(ilabsObject):
"""
Implements a form of Visitor pattern.
Its responsibility is to hold a message and deliver it to the
correct processor method of the current node.
Use this class to encapsule a business message
and control what method processes it at a node.
Was called Envelope - renamed to avoid confusion.
"""
wayslip = None
def __init__(self, wayslip=None, message=None):
"""
@param wayslip: this is by default None but can be used for routing information.
@type wayslip: Wayslip object.
@param message: message to be delivered
@type message: any object.
"""
if wayslip: self.wayslip = wayslip
self.message = message
[docs] def processNode(self, node):
"""
invokes Node.processAgent
@param node: The node the message will be delivered to.
@type node: Node object
@return: a message instance.
"""
return node.processAgent(self)
[docs]class Node(ilabsObject):
"""
abstract node - don't use - always subclass
implentation of Node - with a single destination
"""
destination = None
name = ''
def __init__(self, name=''):
if name:
self.name = name
[docs] def receive(self, agent):
"""
if you override invoke the agent's
process method and pass result with send
"""
assert agent, "Got to receive an agent"
agent = agent.processNode(self)
if agent:
self.send(agent)
[docs] def send(self, agent):
"""
Must be overridden.
Should normally implement sending the agent onto the next node in the net work.
override and always return self
@param agent: the devilery agent
@type agent: DeliveryAgent
@return: self
"""
if self.destination:
self.destination.receive(agent)
return self
[docs] def connect(self, receiver, *args):
"""
Must be overridden.
- override and always return self
@param receiver: the node to receive the processed message
@type receiver: Node subclass
@param args: a list of args to facilitate the connection.
@return: self
"""
assert receiver, "receiver must not be None"
self.destination = receiver
return self
[docs] def disconnect(self, receiver=None, *args):
"""
disconnects a node to this pipe.
@param reciever: the node to be connected.
@type receiver: node to be disconnected.
@param *args: list of args to facilitate disconnection.
@return: must return self
@rtype: Node
"""
self.destination = None
return self
[docs] def processAgent(self, agent):
"""
Override is you need access to the delivery agent
@param agent: the delivery agent
@type agent: DeliveryAgent
@return: the delivery agent
@rtype: DeliveryAgent
"""
agent.message = self.process(agent.message)
return agent
[docs] def process(self, message):
"""
Override to process message
@param message: the sage to process
@return: the message
"""
return message
[docs]class Proxy(Node):
"""
This is a node place holder or socket.
You use this class when you want to be able to replace nodes without rewiring them.
a Proxy pattern.
"""
UNWIRED = True
WIREDTHRU = False
def __init__(self, name='', node=None, whenEmpty=UNWIRED):
Node.__init__(self, name=name)
self.node = None
self.connections = []
self.whenEmpty = whenEmpty
[docs] def send(self, agent):
""" send the agent and sends it to 1st node """
if self.node:
self.node.receive(agent)
elif self.whenEmpty == self.WIREDTHRU:
for node, args in self.connections:
node.receive(agent)
[docs] def connect(self, receiver, *args):
"""
connects 1st as the pipelines destination and last node of chain to the actual destination
@param reciever: the node to be connected.
@type receiver: node
@return: must return self
@rtype: Node
"""
assert receiver, "receiver must not be None"
if self.node:
self.node.connect(receiver, *args)
self.connections.append((receiver, args))
return self
[docs] def plugin(self, node):
"""
Plugs node into socket of Proxy.
Will unplug and existing plugged-in node.
@param reciever: the node to be connected.
@type receiver: node
@return: must return self
@rtype: Node
"""
assert node, "plugin must take a node"
self.unplug()
self.node = node
for node, args in self.connections:
self.node.connect(node, *args)
return self
[docs] def unplug(self):
"""
Un plugs current node from socket of Proxy.
Disconects
@param reciever: the node to be connected.
@type receiver: node
@return: must return self
@rtype: Node
"""
if self.node:
self.node.disconnect()
self.node = None
return self
[docs]class RoutingStrategy(ilabsObject):
"""
Abstarct routing strategy.
"""
[docs] def getRoutesUsingAgent(self, agent, table):
result = self.getRoutesUsingMessage(agent.message, table)
return result
[docs] def getRoutesUsingMessage(self, message, table):
# print
# print "PY message:", message
# print
criteria = self.criteriaFromMessage(message, table)
result = table.routes(*criteria)
return result
[docs] def criteriaFromMessage(self, message, table):
"""
extracts routing citeria from an agents's message.
"""
"should implement routing stategy"
raise NotImplementedError()
[docs]class RouteTable(ilabsObject):
"""
Implements a default route table as a dictionary.
"""
NoRoutes = []
tableClass = dict
cache = {}
def __init__(self):
self.table = self.tableClass()
[docs] def allRoutes(self):
return self.table.values()
[docs] def addRoute(self, route, *criteria):
"""
inserts a route for each criterion.
@param route: Node to add to table keyed by criteria
@type route: Node
@param criteria: criteria by which router chooses the node to route Agent.
@type criteria: list of any hashable parameters.
@return: self
"""
for criterion in criteria:
self.table[criterion] = route
return self
[docs] def removeRoute(self, route, *criteria):
for criterion in criteria:
del self.table[criterion]
return self
[docs] def routes(self, *criteria):
"""
returns a list of routes given a set of criterion.
@param criteria: criteria by which router chooses the node to route Agent.
@type criteria: list of any hashable parameters.
@return: list of routes
"""
if not criteria:
return self.NoRoutes
return [self.table[criterion] for criterion in criteria if criterion in self.table]
try:
routes = self.cache[criteria]
except KeyError:
routes = [self.table[criterion] for criterion in criteria if criterion in self.table]
self.cache[criteria] = routes
return routes
def __repr__(self):
return "%s routes: %s" % (self.__class__, str(self.table.items()))
[docs]class Router(Node):
"""
Abstarct message router
tableClass = route table class
strategyClass = routing strategy
"""
tableClass = RouteTable
strategyClass = RoutingStrategy
def __init__(self, name='', table=None, strategy=None):
"""
table - the route table instance
strategy - the routing table strategy
"""
Node.__init__(self, name=name)
if table is None:
table = self.tableClass()
self.table = table
if strategy is None:
strategy = self.strategyClass()
self.strategy = strategy
self.default = []
Node.__init__(self, name=name)
[docs] def connect(self, route, *criteria):
"""
use to added a route with a given criteria
@param route: Node to add to table keyed by criteria
@type route: Node
@param criteria: criteria by which router chooses the node to route Agent.
@type criteria: list of any hashable parameters.
@return: self
"""
assert route, "receiver must not be None"
self.table.addRoute(route, *criteria)
return self
[docs] def setDefault(self, *routes):
self.default = routes
[docs] def disconnect(self, *criteria):
self.table.removeRoute(route, *criteria)
return self
[docs] def send(self, agent):
routes = self.strategy.getRoutesUsingAgent(agent, self.table)
if not routes:
routes = self.default
for route in routes:
message = agent.message
route.receive(agent)
agent.message = message
def __repr__(self):
return "%s table: %s" % (self.__class__, self.table)
[docs]class System(ilabsObject):
def __init__(self, name='', configurator=None):
self.setup(configurator)
[docs] def setup(self, configurator):
configurator.configure(self)