1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
|
from __future__ import generators
import gobject
import time
import traceback
class GIdleThread(object):
"""This is a pseudo-"thread" for use with the GTK+ main loop.
This class does act a bit like a thread, all code is executed in
the callers thread though. The provided function should be a generator
(or iterator).
It can be started with start(). While the "thread" is running is_alive()
can be called to see if it's alive. wait([timeout]) will wait till the
generator is finished, or timeout seconds.
If an exception is raised from within the generator, it is stored in
the error property. Execution of the generator is finished.
Note that this routine runs in the current thread, so there is no need
for nasty locking schemes.
Example (runs a counter through the GLib main loop routine):
>>> def counter(max): for x in xrange(max): yield x
>>> t = GIdleThread(counter(123))
>>> t.start()
>>> while gen.is_alive():
... main.iteration(False)
"""
def __init__(self, generator, queue=None):
assert hasattr(generator, 'next'), 'The generator should be an iterator'
self._generator = generator
self._queue = queue
self._idle_id = 0
self._error = None
def start(self, priority=gobject.PRIORITY_LOW):
"""Start the generator. Default priority is low, so screen updates
will be allowed to happen.
"""
idle_id = gobject.idle_add(self.__generator_executer,
priority=priority)
self._idle_id = idle_id
return idle_id
def wait(self, timeout=0):
"""Wait until the corouine is finished or return after timeout seconds.
This is achieved by running the GTK+ main loop.
"""
clock = time.clock
start_time = clock()
main = gobject.main_context_default()
while self.is_alive():
main.iteration(False)
if timeout and (clock() - start_time >= timeout):
return
def interrupt(self):
"""Force the generator to stop running.
"""
if self.is_alive():
gobject.source_remove(self._idle_id)
self._idle_id = 0
def is_alive(self):
"""Returns True if the generator is still running.
"""
return self._idle_id != 0
error = property(lambda self: self._error,
doc="Return a possible exception that had occured "\
"during execution of the generator")
def __generator_executer(self):
try:
result = self._generator.next()
if self._queue:
try:
self._queue.put(result)
except QueueFull:
self.wait(0.5)
# If this doesn't work...
self._queue.put(result)
return True
except StopIteration:
self._idle_id = 0
return False
except Exception, e:
self._error = e
traceback.print_exc()
self._idle_id = 0
return False
class QueueEmpty(Exception):
"""Exception raised whenever the queue is empty and someone tries to fetch
a value.
"""
pass
class QueueFull(Exception):
"""Exception raised when the queue is full and the oldest item may not be
disposed.
"""
pass
class Queue(object):
"""A FIFO queue. If the queue has a max size, the oldest item on the
queue is dropped if that size id exceeded.
"""
def __init__(self, size=0, dispose_oldest=True):
self._queue = []
self._size = size
self._dispose_oldest = dispose_oldest
def put(self, item):
"""Put item on the queue. If the queue size is limited ...
"""
if self._size > 0 and len(self._queue) >= self._size:
if self._dispose_oldest:
self.get()
else:
raise QueueFull
self._queue.insert(0, item)
def get(self):
"""Get the oldest item off the queue.
QueueEmpty is raised if no items are left on the queue.
"""
try:
return self._queue.pop()
except IndexError:
raise QueueEmpty
if __name__ == '__main__':
def counter(max):
for i in range(max):
yield i
def shower(queue):
# Never stop reading the queue:
while True:
try:
cnt = queue.get()
print 'cnt =', cnt
except QueueEmpty:
pass
yield None
print 'Test 1: (should print range 0..22)'
queue = Queue()
c = GIdleThread(counter(23), queue)
s = GIdleThread(shower(queue))
main = gobject.main_context_default()
c.start()
s.start()
s.wait(2)
print 'Test 2: (should only print 22)'
queue = Queue(size=1)
c = GIdleThread(counter(23), queue)
s = GIdleThread(shower(queue))
main = gobject.main_context_default()
c.start(priority=gobject.PRIORITY_DEFAULT)
s.start()
s.wait(3)
|