Infrastructures.Org | ISconf.Org TerraLuna Projects

root/trunk/lib/python/isconf/Kernel.py

Revision 221 (checked in by stevegt, 6 years ago)

fix python 2.4 socket select() incompatibility, add 2.4 labtest run

Line 
1 # vim:set expandtab:
2 # vim:set foldmethod=indent:
3 # vim:set shiftwidth=4:
4 # vim:set tabstop=4:
5
6 from __future__ import generators
7 import copy
8 import os
9 import sys
10 import time
11 import traceback
12
13 from isconf.Globals import *
14
15 class Deadlock(Exception): pass
16 class Restart(Exception): pass
17
18 class Bus:
19     """
20     
21     >>> def mygen(name,inpin,outpin):
22     ...     while True:
23     ...         mlist = []
24     ...         yield inpin.rx(mlist)
25     ...         for msg in mlist:
26     ...             msg = "%s got (%s)" % (name, msg)
27     ...             while not outpin.tx(msg):
28     ...                 print "waiting for readers"
29     ...                 yield None
30     >>>
31     >>> def printer(inpin):
32     ...     while True:
33     ...         l = []
34     ...         yield inpin.rx(l)
35     ...         for j in l:
36     ...             print j
37     >>>
38     >>> bus1 = Bus()
39     >>> bus2 = Bus()
40     >>> bus3 = Bus(minreaders=2)
41     >>> assert not bus1.tx('never')
42     >>> t = kernel.spawn(bus1.writer('apple'))
43     >>> a = kernel.spawn(mygen('alice',inpin=bus1,outpin=bus2))
44     >>> b = kernel.spawn(mygen('bob',inpin=bus2,outpin=bus3))
45     >>> r = kernel.spawn(bus3.reader(),itermode=True)
46     >>> p = kernel.spawn(printer(inpin=bus3))
47     >>> kernel.run(steps=100)
48     bob got (alice got (apple))
49     >>> bus1.tx('pear')
50     1
51     >>> kernel.run(steps=100)
52     bob got (alice got (pear))
53     >>> r.next()
54     'EAGAIN'
55     >>> kernel.run(steps=100)
56     >>> r.next()
57     'bob got (alice got (apple))'
58     >>> kernel.run(steps=100)
59     >>> r.next()
60     'bob got (alice got (pear))'
61     >>> kernel.run(steps=100)
62     >>> r.next()
63     'EAGAIN'
64     
65     """
66    
67     def __init__(self,maxlen=None,minreaders=1,name=None):
68         self.maxlen = maxlen
69         # all reader queues, indexed by task id
70         self.readq = {}
71         self.minreaders = minreaders
72         self.name = name
73         self.state = 'up'
74    
75     def busy(self):
76         self.clean()
77         for (tid,queue) in self.readq.items():
78             if len(queue):
79                 return True
80         return False
81
82     def clean(self):
83         for (tid,queue) in self.readq.items():
84             if not kernel.isrunning(tid):
85                 del self.readq[tid]
86
87     def close(self):
88         self.state = 'down'
89
90     def subscribe(self,tid):
91         """Create a readers queue for tid.  Called by kernel."""
92         self.readq.setdefault(tid,[])
93
94     def tx(self,msg):
95         if self.state == 'down':
96             return 0
97         self.clean()
98         i = len(self.readq) # number of subscribed readers
99         if i < self.minreaders:
100             return False
101         for (tid,queue) in self.readq.items():
102             if self.maxlen and len(queue) + 1 > self.maxlen:
103                 raise Deadlock  # XXX need some id here
104             queue.append(msg)
105         return i
106
107     def reader(self):
108         """convenience generator -- read bus while not in a task"""
109         while True:
110             mlist = []
111             yield self.rx(mlist)
112             for msg in mlist:
113                 yield msg
114    
115     def writer(self,msg):
116         """convenience generator -- reliable tx"""
117         while not self.tx(msg):
118             yield None
119    
120     def ready(self,tid,buf,expires,count):
121         if len(self.readq[tid]):
122             c = min(len(self.readq[tid]), count)
123             buf += self.readq[tid][:c]
124             self.readq[tid] = self.readq[tid][c:]
125             return True
126         if self.state == 'down':
127             buf.append(kernel.eof)
128             return True
129         if (expires is not None) and time.time() > expires:
130             buf.append(kernel.eagain)
131             return True
132         return False
133
134     def rx(self,buf,timeout=None,count=999999):
135         """
136
137         >>> def cgen(inpin,count=99999):
138         ...     while True:
139         ...         mlist = []
140         ...         yield inpin.rx(mlist,count=count)
141         ...         print count, mlist
142         ...
143         >>> bus1 = Bus()
144         >>> a = kernel.spawn(cgen(inpin=bus1))
145         >>> b = kernel.spawn(cgen(inpin=bus1,count=1))
146         >>> kernel.run(steps=1000)
147         >>> bus1.tx(1)
148         2
149         >>> bus1.tx(2)
150         2
151         >>> bus1.tx(3)
152         2
153         >>> kernel.run(steps=1000)
154         99999 [1, 2, 3]
155         1 [1]
156         1 [2]
157         1 [3]
158
159         """
160         expires = None
161         if timeout is not None:
162             expires = time.time() + timeout
163         # kernel will call subscribe()
164         return kernel.sigrx, self, buf, expires, count
165
166 class Kernel:
167     """
168
169     Contains a scheduler, shared objects, messaging, logging, and
170     related bits.
171
172     This is a "weightless threads" scheduler, using Python generator
173     objects as if they were threads -- we will call them 'tasks' here
174     to avoid confusion.  Google for 'python weightless threads
175     generators' for more information, and for more background google
176     for 'continuations' and for 'coroutines'.  You can safely ignore
177     mentions of stackless python microthreads -- they are
178     something else entirely and require a special version of
179     python. 
180         
181     At http://www.chiark.greenend.org.uk/~sgtatham/coroutines.html,
182     Simon Tatham shows a really good example of why protocol stacks
183     are a pain to write when you don't have something like threads,
184     coroutines, continuations, or at least generators available. 
185   
186     Here we aren't quite emulating full-fledged coroutines or even
187     continuations, but instead sticking with simple message-passing
188     tasks because it's a familiar concept which translates directly
189     from UNIX processes and SysV IPC, making the control flow easier
190     to understand and debug.
191
192     Generator-based tasks are also more portable than using native
193     Python threads, since Python threads aren't available on all
194     versions of UNIX yet.  Of course this is less portable to other
195     languages; if you're porting ISconf to Perl, for instance, you
196     might use POE.  If you're porting to C, then you're probably stuck
197     with native threads, or simulating coroutines with setjmp/longjmp,
198     or worse yet, something like Simon's coroutine macro hacks
199     described at the above URL.
200     
201     When writing this version of ISconf, I was tempted to use
202     asyncore, which is the basis of both Zope and Twisted, and also
203     tried hand-crafting my own event queue and select() loop, but kept
204     running into the same issues Simon describes so succinctly.  I
205     kept throwing away code until I finally gave in to the Force and
206     started using generators instead -- *muuuch* better.   
207
208     """
209
210     # XXX yield should return object rather than string
211     sigrx='rx'
212     sigbusy='busy'
213     signice='nice'
214     sigret='ret'
215     sigsleep='sleep'
216     sigspawn='spawn'
217     siguntil='until'
218     eagain = 'EAGAIN'
219     eof = 'EOF'
220
221     def __init__(self):
222         self._tasks = {}
223         self._nextid = 1
224         self.HZ = 1000
225         self._shutdown = False
226
227     def isdone(self,tid):
228         return not self.isrunning(tid)
229
230     def isrunning(self,tid):
231         return self._tasks.get(tid,False)
232
233     def kill(self,tid):
234         debug("killing", tid)
235         self._tasks.setdefault(tid,None)
236         del self._tasks[tid]
237
238     def abort(self,task,e):
239         tid = task.tid
240         exc_info = sys.exc_info()
241         exc_type, exc_val, tb = exc_info[:3]
242         out = traceback.format_exception(exc_type, exc_val, tb)
243         out = ''.join(out)
244         out = out.strip() + "\n"
245         if task.errpin:
246             task.errpin.tx(out)
247         else:
248             msg = "kernel: %s" % out
249             error(msg)
250             print >>sys.stderr, msg
251             # XXX only restart task instead
252             raise Restart
253         self.kill(tid)
254
255     def killall(self):
256         tids = self._tasks.keys()
257         for tid in tids:
258             self.kill(tid)
259         self._tasks = {}
260
261     def shutdown(self):
262         debug("shutting down")
263         self._shutdown = True
264
265     def ps(self):
266         return self._tasks
267         # XXX
268         out = ''
269         for id in self._tasks.keys():
270             task = self._tasks[id]
271             out += str(task) + "\n"
272         return out
273
274     def wait(self,genobj):
275         """Spawn a task and wait for it to finish.  For example, if
276         you do:
277         
278             yield kernel.wait(sometask())
279
280         ...the yield will not return until sometask() completes.
281
282         """
283         return self.siguntil, self.spawn(genobj).isdone
284
285     # XXX add respawn flag, only raise Restart if not set
286     def spawn(self,genobj,itermode=False,name=None):
287         """
288         Let the kernel manage an ordinary generator object by wrapping
289         it in a Task -- extremely powerful, because this means a yield
290         in an ordinary generator will allow unrelated tasks to run.
291         It also means ordinary generators can yield sig* values to
292         talk to the kernel e.g. sigsleep.
293
294         If itermode=False, then run freely, returning values only to the
295         kernel, which is going to interpret them as control signals or
296         throw them away if unrecognized.
297
298         If itermode=True, then run in single-step mode and return each
299         value to the caller like a normal generator.  The only unusual
300         thing the caller needs to know is that if there is no result
301         ready, then you will get a kernel.eagain result instead. 
302
303         >>> def mygen():
304         ...     i = 0
305         ...     while True:
306         ...         yield i
307         ...         if i == 3: yield kernel.sigsleep,1
308         ...         i += 1
309         ...
310         >>> obj = kernel.spawn(mygen(),itermode=True)
311         >>> kernel.run(steps=10)
312         >>> obj.next()
313         0
314         >>> kernel.run(steps=10)
315         >>> obj.next()
316         1
317         >>> kernel.run(steps=10)
318         >>> obj.next()
319         2
320         >>> kernel.run(steps=10)
321         >>> obj.next()
322         3
323         >>> kernel.run(steps=10)
324         >>> obj.next()
325         'EAGAIN'
326         >>> kernel.run(steps=10)
327         >>> obj.next()
328         'EAGAIN'
329         >>> while obj.next() != 4:
330         ...         kernel.run(steps=10)
331         ...
332         >>> kernel.run(steps=100)
333         >>> obj.next()
334         5
335         >>> kernel.run(steps=100)
336         >>> obj.next()
337         6
338         >>> obj.itermode=False
339         >>> kernel.run(steps=100)
340         >>> assert obj.next() > 10
341         
342         """
343
344         task = Task(genobj,tid=self._nextid,name=name)
345         tid = task.tid
346         assert tid == self._nextid
347         self._nextid += 1
348         self._tasks[tid] = task
349         task.itermode = itermode
350         # immediately advance to the first yield; allows message bus
351         # readers to subscribe right away; assumes bus.rx() is their
352         # first yield
353         self.step(task)
354         return task
355
356     def run(self, initobj=None, steps=None):
357         """
358         runs for {steps} or until init task is done
359         
360         """
361         assert initobj or steps
362         if initobj and not self.isrunning(1):
363             self.spawn(initobj)
364         ticks = 0
365         while True:
366             if self._shutdown:
367                 sys.exit(0)
368             if initobj and not self.isrunning(1): break
369             if steps and steps <= ticks:
370                 break
371             ticks += 1
372             # print self.HZ
373             time.sleep(1/self.HZ)
374             if not steps:
375                 self.HZ *= .99
376             self.HZ = max(self.HZ,1)
377             self.HZ = min(self.HZ,999999)
378             # if self.HZ < 100:
379             #     debug("HZ", self.HZ)
380             for tid in self._tasks.keys():
381                 task = self._tasks[tid]
382                 task.priority = min(task.priority, 10)
383                 # wait for N ticks if delay is set
384                 if task.delay > 1:
385                     task.delay -= 1
386                     continue
387                 task.delay += task.priority
388                 if task.sleep and task.sleepDone > time.time():
389                     # slow down so we don't beat up time()
390                     # XXX we can do a better job here -- use HZ and
391                     # sleepDone to calculate delay more accurately
392                     task.priority += 1
393                     continue
394                 task.sleep = None
395                 # wait until condition is met
396                 if task.until:
397                     done=False
398                     try:
399                         if isinstance(task.untilArgs,list) or \
400                                isinstance(task.untilArgs,tuple):
401                             done = task.until(*task.untilArgs)
402                         elif task.untilArgs:
403                             done = task.until(task.untilArgs)
404                         else:
405                             done = task.until()
406                     except Exception, e:
407                         # XXX add traceback
408                         self.abort(task,e)
409                         continue
410                     if not done:
411                         # slow down so we don't beat up until()
412                         task.priority += 1
413                         continue
414                 task.until = None
415                 if task.itermode and task.resultReady:
416                     # we're waiting for Task.next() to pick up our
417                     # previous result
418                     task.priority += 1
419                     continue
420                 self.step(task)
421
422     def step(self,task):
423         obj = task.obj
424         tid = task.tid
425         # debug("stepping task", tid)
426         try:
427             argv = obj.next()
428         except StopIteration:
429             self.kill(tid)
430             return
431         except ValueError, e:
432             if e == 'generator already executing':
433                 # kernel.run() is nested -- that's okay to do
434                 return
435             raise
436         except Exception, e:
437             # XXX add traceback
438             self.abort(task,e)
439             return
440         # figure out why task yielded and what it wants
441         targv = argv
442         if not isinstance(targv,tuple):
443             targv = (targv,)
444         if targv:
445             why = targv[0]
446         if why != self.sigsleep and why != None:
447             # print "why",why
448             self.HZ *= 10
449         sigargs = None
450         if len(targv) > 1:
451             sigargs = targv[1:]
452         # XXX these should all be 'is' rather than '=='
453         if why == self.sigbusy:
454             task.nice -= 1
455             if task.nice < 0:
456                 task.nice = 0
457         elif why == self.signice:
458             task.nice = sigargs[0]
459         elif why == self.sigsleep:
460             task.sleep = sigargs[0]
461             task.sleepDone = time.time() + task.sleep
462         elif why == self.sigspawn:
463             genobj = sigargs[0]
464             spawnargs = None
465             if len(sigargs) > 0:
466                 spawnargs = sigargs[1:]
467             self.spawn(genobj)
468         elif why == self.siguntil:
469             task.until = sigargs[0]
470             if len(sigargs) > 1:
471                 task.untilArgs = sigargs[1:]
472             else:
473                 task.untilArgs = None
474         elif why == self.sigrx:
475             bus = sigargs[0]
476             buf = sigargs[1]
477             args = sigargs[2:]
478             task.until = bus.ready
479             task.untilArgs = [tid,buf] + list(args)
480             bus.subscribe(tid)
481         elif why == self.sigret:
482             task.context.ret()
483         else:
484             # we got an ordinary value back -- save it for itermode
485             task.result = argv
486             task.resultReady = True
487         task.priority = (task.priority + task.nice) / 2
488
489 class Task:
490     """
491     A "weightless thread" representing a generator object.
492     
493     XXX stop passing in parent -- we can tell who the parent task
494     is, even if a few stack frames away, e.g.:
495
496     s = inspect.stack()
497     s[1][0]
498     <frame object at 0x82ba4a4>
499     genobj.gi_frame
500     <frame object at 0x82ba4a4>
501
502     """
503    
504     def __init__(self,genobj,tid=None,parent=None,name=None):
505         self.obj = genobj
506         nice = 0
507         self.ptid = None
508         if parent:
509             nice = parent.nice
510             self.ptid = parent.tid
511         self.delay = nice
512         self.errpin = None
513         self.name = name
514         self.nice = nice
515         self.priority = nice
516         self.result = kernel.eagain
517         self.resultReady = True
518         self.sleep = 0
519         self.sleepDone = 0
520         self.itermode = False
521         self.tid = tid
522         self.time = time.time()
523         self.until = None
524         self.untilArgs = None
525
526     def __repr__(self):
527         return str(self.__dict__)
528
529     # syntactic sugar to let us iterate on the task object as
530     # if it were the generator object, while still allowing the
531     # kernel to do the iteration and capture the results
532     def XXX__iter__(self):
533         self.wrapper=Wrapper(self).wrapper()
534         return self.wrapper
535     def XXXnext(self):
536         if not hasattr(self,'wrapper'):
537             self.wrapper=Wrapper(self).wrapper()
538         return self.wrapper.next()
539     def __iter__(self):
540         if not self.itermode:
541             raise Exception("set itermode=True if you want to iterate on this task")
542         return self
543     def next(self):
544         if not kernel.isrunning(self.tid):
545             raise StopIteration
546         if not self.resultReady:
547             return kernel.eagain
548         result = self.result
549         debug("task.resultReady",self.resultReady)
550         self.resultReady = False
551         kernel.HZ *= 10
552         return result
553
554     def isdone(self):
555         if kernel.isdone(self.tid):
556             return True
557         return False
558
559
560 kernel = Kernel()
Note: See TracBrowser for help on using the browser.