root/trac/hacks/marketplugin/0.9/tracmarket/mq.py

Revision 117 (checked in by stevegt, 6 years ago)

checkpoint after mss tutorial, before moving order validity checks until after order entry into db

Line 
1 from cPickle import loads, dumps
2 import random
3 import os
4 import re
5 import sys
6 import types
7 import time
8
9 from trac.core import *
10
11 # from trac.db import SQLiteConnection
12 # from tracmarket.db import *
13
14 '''
15 See the "Enterprise Integration Patterns" book by Hohpe and Woolf for
16 much of the terminology and concepts used in this module. 
17
18 Additional terms we use here:
19
20 - group: a named publish/subscribe channel; clients subscribe to
21   groups; clients send messages to groups; groups can be named using
22   any string, but a good convention is "foo.bar.baz" or "foo/bar/baz"
23
24 - reader: the callback function which the bus calls to transfer a
25   message to a client
26
27 '''
28
29 class IMessageBusSubscriber(Interface):
30     '''
31     A component which subscribes to message bus channels.  Note that
32     anything can call bus.subscribe() etc. -- implementing this
33     interface is not required for that.  This interface exists solely
34     for components to implement solicit() if they need to, so the bus
35     can tickle them for subscriptions at startup.
36
37     '''
38
39     def solicit(bus):
40         '''Ask subscriber for subscription requests.  Subscriber will call
41         bus.subscribe() to respond.'''
42
43 class MemoryBus(object):
44     '''
45     A message bus which routes and delivers messages using a
46     publish/subscribe model.  This is intended to be used as a
47     per-query, in-memory message bus, with no persisent message store.
48     The goals are simplicity, speed, and no need to isolate queries
49     from each other by using any correlation id or special reply
50     channel names.  Keeping everything in memory should also avoid
51     most of the need for a transactional interface to the bus,
52     assuming each http query is handled within a single db
53     transaction. 
54     
55     A hypothetical DbBus might subclass MemoryBus to create a bus with
56     transactions and persistent message storage; that might be worth
57     doing if the DbBus was running in its own daemon, with MemoryBus
58     clients attaching to it.
59     
60     '''
61
62     def __init__(self, subscribers=None, deadletter_id=None, ctx=None,
63             delimiter='.'):
64         self.pattern_type = type(re.compile(''))
65         self.grep = os.environ.get('MQGREP', None)
66         self.debug = os.environ.get('MQDEBUG', None)
67         # self.queues[subscription_id] = Queue()
68         # self.subscriptions[subscription_id] = dict(**subscribe_args)
69         # self.patterns[pattern][subscription_id] = True
70         # self.query_cache[group] = sendto_queues
71         self.queues = {}
72         self.subscriptions = {}
73         self.patterns = {}
74         self.query_cache = {}
75         # delimiter between parts of heirarchical group names
76         self.delimiter = delimiter
77         # bulletin board space for storage of miscellaneous local
78         # context bits (db handles, etc.)
79         self.ctx = ctx
80         # caller is supposed to create a dead letter group and reader
81         # and then give us the subscription id; if they didn't do
82         # this, then create a group named 'deadletter', and subscribe
83         # ourselves to it
84         if not deadletter_id:
85             self.deadletter_id = self.subscribe('deadletter', reader=self.error)
86         else:
87             self.deadletter_id = deadletter_id
88         # wake up subscribers (this needs to be *last*)
89         if subscribers:
90             for s in subscribers:
91                 s.solicit(self)
92
93     def error(self, bus, group, msg):
94         print >>sys.stderr, "dead letter: no readers for", msg.group
95
96     def subscribe(self, group, reader=None, id=None):
97         '''Called by client to attach to bus; returns subscription id.
98         Can be called multiple times by same client using different
99         group.  Group is a string or compiled regex.  Bus will enqueue
100         for client any message addressed to any group matching the
101         group string or regex.  Reader is a callback function.
102         Messages will be delivered to client using reader(bus, group,
103         msg).  If reader is None, then client will need to call recv()
104         to pick up messages.  Client can assign any globally unique
105         string as subscription id; if id is None, then a random id
106         will be assigned.  Client can re-subscribe (to change callme
107         or reader, for instance) by re-using the same id.'''
108         self.query_cache = {}
109         # convert to regex
110         if type(group) is self.pattern_type:
111             pattern = group
112         elif group.endswith(self.delimiter):
113             pattern = re.compile("^%s.*$" % re.escape(group))
114         else:
115             pattern = re.compile("^%s$" % re.escape(group))
116         # assign id
117         if not id:
118             id = pattern.pattern + "." + str(time.time()) + str(random.random())
119         id = str(id)
120         # add subscription to db and indexes
121         if reader:
122             dst = str(reader)
123         else:
124             dst = sys._getframe(1).f_code.co_name
125         self.queues.setdefault(id, Queue(dst=dst))
126         self.subscriptions[id] = dict(reader=reader, group=group,
127                 pattern=pattern, id=id)
128         self.patterns.setdefault(pattern, {})
129         self.patterns[pattern][id] = True
130         # catch up on messages in case we just now added reader
131         self._notify(id)
132         return id
133
134     def _notify(self, id):
135         # send all queued messages to reader
136         reader = self.subscriptions[id]['reader']
137         if not reader:
138             return
139         queue = self.queues[id]
140         while len(queue):
141             msg = self.recv(id)
142             reader(bus=self, group=msg.group, msg=msg)
143
144     def send(self, group, msg):
145         '''Called by anyone to deliver a message to all subscribers of
146         group.  Senders need not be subscribed.  Group is a string,
147         msg is a Msg object.'''
148         grep_hit =  self.grep and re.match(self.grep, group)
149         # check cache
150         sendto = self.query_cache.get(group, None)
151         if sendto is None:
152             sendto = {}
153             # cache miss -- slog through all subscriptions
154             for pattern,id_dict in self.patterns.items():
155                 if re.match(pattern, group):
156                     ids = id_dict.keys()
157                     for id in ids:
158                         queue = self.queues[id]
159                         sendto[id] = queue
160             # populate cache
161             self.query_cache[group] = sendto
162         if not sendto:
163             # no readers found -- route to dead letter queue
164             sendto[self.deadletter_id] = self.queues[self.deadletter_id]
165         # XXX deprecate
166         assert not msg.group
167         msg.group = group
168         if not msg.sender:
169             # msg.sender = sys._getframe(1).f_code.co_name
170             msg.sender = str(sys._getframe(1).f_code)
171         sender = msg.sender
172         if msg.cp:
173             # use pickle instead of copy, because a dump into queue on
174             # send and a load into reader on recv is usually faster than
175             # multiple deep copies
176             try:
177                 msg = dumps(msg)
178             except Exception, e:
179                 raise ValueError, "object not picklable: %s: %s: %s" % (
180                     type(msg.data), str(e), str(msg.data))
181         # enqueue message
182         for queue in sendto.values():
183             if grep_hit:
184                 print "%s: %s -> %s" % (group, sender, queue.dst)
185                 if self.debug:
186                     import pdb
187                     pdb.set_trace()
188             queue.send(msg)
189         # notify readers
190         for id in sendto.keys():
191             self._notify(id)
192        
193     def recv(self, id, i=0, peek=False):
194         '''Called by client to receive the next message in queue for
195         subscription id.  Id must be the same as that
196         returned by open().  Returns exactly one Msg object, or None
197         if no messages are in queue.'''
198         queue = self.queues.get(id, None)
199         if not queue:
200             return None
201         msg = queue.recv(i, peek=peek)
202         if not msg:
203             return None
204         if type(msg) is str:
205             msg = loads(msg)
206         return msg
207
208     def peek(self, id, i=0):
209         return self.recv(id, i, peek=True)
210
211     # XXX replace with include/exclude or allow/deny
212     def XXXshutdown(reader, group, func):
213         '''Called by client to stop queueing messages into one queue.
214         Bus will continue to serve messages from queue in response to
215         recv(), but will not enqueue any new ones.  Queue will be
216         destroyed after last message is dequeued.'''
217
218     def XXXdestroy(reader, group, func):
219         '''Called by client to stop queueing messages into queue and
220         remove queue immediately.  Any messages currently in queue
221         will be destroyed.'''
222
223     # XXX implement request/response pattern
224     # def reader(self, bus, group, msg):
225     #     foo = bus.rpc('get', Msg('foo')).data
226     #     # or
227     #     foo = bus.rpc.get('foo')
228
229     # XXX enable bus.foo.bar.baz(msg)
230     # XXX -- this is a bad way to do it; use a msg factory instead:
231     # XXX factory.foo.bar.baz(data) generates message and calls bus.send()
232     class Path(object):
233         pass
234     def XXX__call__(self, *args, **kwargs):
235         pass
236     def XXX__getattr__(self, name):
237         val = self[name]
238         return val
239     def XXX__getitem__(self, name):
240         """enable self['foo.bar']"""
241         name = str(name)
242         item, child = self._parts(name)
243         val = super(Bus, self).get(item, None)
244         if val is None:
245             print item
246             val = self[item] = Bus(name=item)
247         if child:
248             return val[child]
249         return val
250
251 class Queue(list):
252     '''A publish/subscribe message channel for one reader (or a group
253     of load-balancing readers).
254     
255     >>> g = Queue()
256     >>> g.send('a')
257     >>> g.send('b')
258     >>> g.peek()
259     'a'
260     >>> g.peek(1)
261     'b'
262     >>> g.recv()
263     'a'
264     >>> g.recv()
265     'b'
266     >>> g.peek()
267     
268     '''
269
270     def __init__(self, dst):
271         self.dst=dst
272
273     def send(self, msg):
274         self.append(msg)
275
276     def recv(self, i=0, peek=False):
277         if len(self) -1 < i:
278             return None
279         if peek:
280             return self[i]
281         else:
282             return self.pop(i)
283
284     def peek(self, i=0):
285         return self.recv(i, peek=True)
286
287
288 class Msg(object):
289     '''A basic message.'''
290
291     def __init__(self, data, sender=None, ttl=None, cp=True, **kwargs):
292         '''Create message.  Message payload is in data, envelope info
293         is in other attributes.  Data must be a pickleable python
294         object.  Sender is a unique string.  The cp flag is
295         a signal to bus to make copies of this message on send and
296         receive; if False, senders and receivers need to take extra
297         care to not modify the message contents, lest they disturb
298         peers who are also holding references to the same message.'''
299         self.data = data
300         self.sender = sender
301         self.cp = cp
302         self.time = time.time()
303         if ttl:
304             self.expires = self.time + ttl
305         # destination group (channel) name
306         self.group = None 
307         # group to post replies to
308         self.reply_to = None 
309         self.__dict__.update(kwargs)
310
311     def __getitem__(self, name):
312         return getattr(self, name)
313
314 """
315 class ListMsg(object):
316     '''A message with attributes useful for updating a list.'''
317
318     def __init__(self, data, **kwargs):
319         Msg.__init__(self, data, index, **kwargs)
320         
321         self.index = index
322
323
324
325
326
327
328         # series type:
329         #   list    payload is an element of a sequence
330         #   dict    payload is an element of a dict
331         #   None    payload is a standalone scalar value
332         self.type = None
333         # series operation:
334         #   reset   start a new data series; message payload supercedes
335         #           all prior messages posted to this group
336         self.op = None
337         # series key:
338         #   x       add/replace element; use x as list index or dict key
339         #   None    append data to existing sequence, incrementing
340         #           previous index value; message payload is a
341         #           continuation of prior message(s)
342         self.key = None
343         # string describing message sender
344         self.__dict__.update(kwargs)
345
346     def __getitem__(self, name):
347         return getattr(self, name)
348
349 """
Note: See TracBrowser for help on using the browser.