diff options
Diffstat (limited to 'src/pygame/threads')
-rw-r--r-- | src/pygame/threads/.gitignore | 1 | ||||
-rw-r--r-- | src/pygame/threads/Py25Queue.py | 216 | ||||
-rw-r--r-- | src/pygame/threads/__init__.py | 310 | ||||
-rw-r--r-- | src/pygame/threads/__init__.pyo | bin | 0 -> 6340 bytes |
4 files changed, 527 insertions, 0 deletions
diff --git a/src/pygame/threads/.gitignore b/src/pygame/threads/.gitignore new file mode 100644 index 0000000..a74b07a --- /dev/null +++ b/src/pygame/threads/.gitignore @@ -0,0 +1 @@ +/*.pyc diff --git a/src/pygame/threads/Py25Queue.py b/src/pygame/threads/Py25Queue.py new file mode 100644 index 0000000..603c1bd --- /dev/null +++ b/src/pygame/threads/Py25Queue.py @@ -0,0 +1,216 @@ +"""A multi-producer, multi-consumer queue.""" + +from time import time as _time + +from collections import deque + +__all__ = ['Empty', 'Full', 'Queue'] + +class Empty(Exception): + "Exception raised by Queue.get(block=0)/get_nowait()." + pass + +class Full(Exception): + "Exception raised by Queue.put(block=0)/put_nowait()." + pass + +class Queue: + """Create a queue object with a given maximum size. + + If maxsize is <= 0, the queue size is infinite. + """ + def __init__(self, maxsize=0): + try: + import threading + except ImportError: + import dummy_threading as threading + self._init(maxsize) + # mutex must be held whenever the queue is mutating. All methods + # that acquire mutex must release it before returning. mutex + # is shared between the three conditions, so acquiring and + # releasing the conditions also acquires and releases mutex. + self.mutex = threading.Lock() + # Notify not_empty whenever an item is added to the queue; a + # thread waiting to get is notified then. + self.not_empty = threading.Condition(self.mutex) + # Notify not_full whenever an item is removed from the queue; + # a thread waiting to put is notified then. + self.not_full = threading.Condition(self.mutex) + # Notify all_tasks_done whenever the number of unfinished tasks + # drops to zero; thread waiting to join() is notified to resume + self.all_tasks_done = threading.Condition(self.mutex) + self.unfinished_tasks = 0 + + def task_done(self): + """Indicate that a formerly enqueued task is complete. + + Used by Queue consumer threads. For each get() used to fetch a task, + a subsequent call to task_done() tells the queue that the processing + on the task is complete. + + If a join() is currently blocking, it will resume when all items + have been processed (meaning that a task_done() call was received + for every item that had been put() into the queue). + + Raises a ValueError if called more times than there were items + placed in the queue. + """ + self.all_tasks_done.acquire() + try: + unfinished = self.unfinished_tasks - 1 + if unfinished <= 0: + if unfinished < 0: + raise ValueError('task_done() called too many times') + self.all_tasks_done.notifyAll() + self.unfinished_tasks = unfinished + finally: + self.all_tasks_done.release() + + def join(self): + """Blocks until all items in the Queue have been gotten and processed. + + The count of unfinished tasks goes up whenever an item is added to the + queue. The count goes down whenever a consumer thread calls task_done() + to indicate the item was retrieved and all work on it is complete. + + When the count of unfinished tasks drops to zero, join() unblocks. + """ + self.all_tasks_done.acquire() + try: + while self.unfinished_tasks: + self.all_tasks_done.wait() + finally: + self.all_tasks_done.release() + + def qsize(self): + """Return the approximate size of the queue (not reliable!).""" + self.mutex.acquire() + n = self._qsize() + self.mutex.release() + return n + + def empty(self): + """Return True if the queue is empty, False otherwise (not reliable!).""" + self.mutex.acquire() + n = self._empty() + self.mutex.release() + return n + + def full(self): + """Return True if the queue is full, False otherwise (not reliable!).""" + self.mutex.acquire() + n = self._full() + self.mutex.release() + return n + + def put(self, item, block=True, timeout=None): + """Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a positive number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + """ + self.not_full.acquire() + try: + if not block: + if self._full(): + raise Full + elif timeout is None: + while self._full(): + self.not_full.wait() + else: + if timeout < 0: + raise ValueError("'timeout' must be a positive number") + endtime = _time() + timeout + while self._full(): + remaining = endtime - _time() + if remaining <= 0.0: + raise Full + self.not_full.wait(remaining) + self._put(item) + self.unfinished_tasks += 1 + self.not_empty.notify() + finally: + self.not_full.release() + + def put_nowait(self, item): + """Put an item into the queue without blocking. + + Only enqueue the item if a free slot is immediately available. + Otherwise raise the Full exception. + """ + return self.put(item, False) + + def get(self, block=True, timeout=None): + """Remove and return an item from the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until an item is available. If 'timeout' is + a positive number, it blocks at most 'timeout' seconds and raises + the Empty exception if no item was available within that time. + Otherwise ('block' is false), return an item if one is immediately + available, else raise the Empty exception ('timeout' is ignored + in that case). + """ + self.not_empty.acquire() + try: + if not block: + if self._empty(): + raise Empty + elif timeout is None: + while self._empty(): + self.not_empty.wait() + else: + if timeout < 0: + raise ValueError("'timeout' must be a positive number") + endtime = _time() + timeout + while self._empty(): + remaining = endtime - _time() + if remaining <= 0.0: + raise Empty + self.not_empty.wait(remaining) + item = self._get() + self.not_full.notify() + return item + finally: + self.not_empty.release() + + def get_nowait(self): + """Remove and return an item from the queue without blocking. + + Only get an item if one is immediately available. Otherwise + raise the Empty exception. + """ + return self.get(False) + + # Override these methods to implement other queue organizations + # (e.g. stack or priority queue). + # These will only be called with appropriate locks held + + # Initialize the queue representation + def _init(self, maxsize): + self.maxsize = maxsize + self.queue = deque() + + def _qsize(self): + return len(self.queue) + + # Check whether the queue is empty + def _empty(self): + return not self.queue + + # Check whether the queue is full + def _full(self): + return self.maxsize > 0 and len(self.queue) == self.maxsize + + # Put a new item in the queue + def _put(self, item): + self.queue.append(item) + + # Get an item from the queue + def _get(self): + return self.queue.popleft() diff --git a/src/pygame/threads/__init__.py b/src/pygame/threads/__init__.py new file mode 100644 index 0000000..4931865 --- /dev/null +++ b/src/pygame/threads/__init__.py @@ -0,0 +1,310 @@ +""" +* Experimental * + +Like the map function, but can use a pool of threads. + +Really easy to use threads. eg. tmap(f, alist) + +If you know how to use the map function, you can use threads. +""" + +__author__ = "Rene Dudfield" +__version__ = "0.3.0" +__license__ = 'Python license' + +import traceback, sys + +from pygame.compat import geterror + +if sys.version_info[0] == 3: + from multiprocessing import JoinableQueue as Queue + from queue import Empty +elif (sys.version_info[0] == 2 and sys.version_info[1] < 5): + from Py25Queue import Queue + from Py25Queue import Empty +else: + # use up to date version + from Queue import Queue + from Queue import Empty + +import threading +Thread = threading.Thread + +STOP = object() +FINISH = object() + +# DONE_ONE = object() +# DONE_TWO = object() + +# a default worker queue. +_wq = None + +# if we are using threads or not. This is the number of workers. +_use_workers = 0 + +# Set this to the maximum for the amount of Cores/CPUs +# Note, that the tests early out. +# So it should only test the best number of workers +2 +MAX_WORKERS_TO_TEST = 64 + + + +def init(number_of_workers = 0): + """ Does a little test to see if threading is worth it. + Sets up a global worker queue if it's worth it. + + Calling init() is not required, but is generally better to do. + """ + global _wq, _use_workers + + if number_of_workers: + _use_workers = number_of_workers + else: + _use_workers = benchmark_workers() + + # if it is best to use zero workers, then use that. + _wq = WorkerQueue(_use_workers) + + + + +def quit(): + """ cleans up everything. + """ + global _wq, _use_workers + _wq.stop() + _wq = None + _use_workers = False + + +def benchmark_workers(a_bench_func = None, the_data = None): + """ does a little test to see if workers are at all faster. + Returns the number of workers which works best. + Takes a little bit of time to run, so you should only really call + it once. + You can pass in benchmark data, and functions if you want. + a_bench_func - f(data) + the_data - data to work on. + """ + global _use_workers + + #TODO: try and make this scale better with slower/faster cpus. + # first find some variables so that using 0 workers takes about 1.0 seconds. + # then go from there. + + + # note, this will only work with pygame 1.8rc3+ + # replace the doit() and the_data with something that releases the GIL + + + import pygame + import pygame.transform + import time + + if not a_bench_func: + def doit(x): + return pygame.transform.scale(x, (544, 576)) + else: + doit = a_bench_func + + if not the_data: + thedata = [] + for x in range(10): + thedata.append(pygame.Surface((155,155), 0, 32)) + else: + thedata = the_data + + best = time.time() + 100000000 + best_number = 0 + last_best = -1 + + for num_workers in range(0, MAX_WORKERS_TO_TEST): + + wq = WorkerQueue(num_workers) + t1 = time.time() + for xx in range(20): + print ("active count:%s" % threading.activeCount()) + results = tmap(doit, thedata, worker_queue = wq) + t2 = time.time() + + wq.stop() + + + total_time = t2 - t1 + print ("total time num_workers:%s: time:%s:" % (num_workers, total_time)) + + if total_time < best: + last_best = best_number + best_number =num_workers + best = total_time + + if num_workers - best_number > 1: + # We tried to add more, but it didn't like it. + # so we stop with testing at this number. + break + + + return best_number + + + + +class WorkerQueue(object): + + def __init__(self, num_workers = 20): + self.queue = Queue() + self.pool = [] + self._setup_workers(num_workers) + + def _setup_workers(self, num_workers): + """ Sets up the worker threads + NOTE: undefined behaviour if you call this again. + """ + self.pool = [] + + for _ in range(num_workers): + self.pool.append(Thread(target=self.threadloop)) + + for a_thread in self.pool: + a_thread.setDaemon(True) + a_thread.start() + + + def do(self, f, *args, **kwArgs): + """ puts a function on a queue for running later. + """ + self.queue.put((f, args, kwArgs)) + + + def stop(self): + """ Stops the WorkerQueue, waits for all of the threads to finish up. + """ + self.queue.put(STOP) + for thread in self.pool: + thread.join() + + + def threadloop(self): #, finish = False): + """ Loops until all of the tasks are finished. + """ + while True: + args = self.queue.get() + if args is STOP: + self.queue.put(STOP) + self.queue.task_done() + break + else: + try: + args[0](*args[1], **args[2]) + finally: + # clean up the queue, raise the exception. + self.queue.task_done() + #raise + + + def wait(self): + """ waits until all tasks are complete. + """ + self.queue.join() + +class FuncResult: + """ Used for wrapping up a function call so that the results are stored + inside the instances result attribute. + """ + def __init__(self, f, callback = None, errback = None): + """ f - is the function we that we call + callback(result) - this is called when the function(f) returns + errback(exception) - this is called when the function(f) raises + an exception. + """ + self.f = f + self.exception = None + self.callback = callback + self.errback = errback + + def __call__(self, *args, **kwargs): + + #we try to call the function here. If it fails we store the exception. + try: + self.result = self.f(*args, **kwargs) + if self.callback: + self.callback(self.result) + except Exception: + self.exception = geterror() + if self.errback: + self.errback(self.exception) + + +def tmap(f, seq_args, num_workers = 20, worker_queue = None, wait = True, stop_on_error = True): + """ like map, but uses a thread pool to execute. + num_workers - the number of worker threads that will be used. If pool + is passed in, then the num_workers arg is ignored. + worker_queue - you can optionally pass in an existing WorkerQueue. + wait - True means that the results are returned when everything is finished. + False means that we return the [worker_queue, results] right away instead. + results, is returned as a list of FuncResult instances. + stop_on_error - + """ + + if worker_queue: + wq = worker_queue + else: + # see if we have a global queue to work with. + if _wq: + wq = _wq + else: + if num_workers == 0: + return map(f, seq_args) + + wq = WorkerQueue(num_workers) + + # we short cut it here if the number of workers is 0. + # normal map should be faster in this case. + if len(wq.pool) == 0: + return map(f, seq_args) + + #print ("queue size:%s" % wq.queue.qsize()) + + + #TODO: divide the data (seq_args) into even chunks and + # then pass each thread a map(f, equal_part(seq_args)) + # That way there should be less locking, and overhead. + + + + results = [] + for sa in seq_args: + results.append(FuncResult(f)) + wq.do(results[-1], sa) + + + #wq.stop() + + if wait: + #print ("wait") + wq.wait() + #print ("after wait") + #print ("queue size:%s" % wq.queue.qsize()) + if wq.queue.qsize(): + raise Exception("buggy threadmap") + # if we created a worker queue, we need to stop it. + if not worker_queue and not _wq: + #print ("stoping") + wq.stop() + if wq.queue.qsize(): + um = wq.queue.get() + if not um is STOP: + raise Exception("buggy threadmap") + + + # see if there were any errors. If so raise the first one. This matches map behaviour. + # TODO: the traceback doesn't show up nicely. + # NOTE: TODO: we might want to return the results anyway? This should be an option. + if stop_on_error: + error_ones = filter(lambda x:x.exception, results) + if error_ones: + raise error_ones[0].exception + + return map(lambda x:x.result, results) + else: + return [wq, results] diff --git a/src/pygame/threads/__init__.pyo b/src/pygame/threads/__init__.pyo Binary files differnew file mode 100644 index 0000000..6d335ba --- /dev/null +++ b/src/pygame/threads/__init__.pyo |