diff options
Diffstat (limited to 'lib/pygame/threads/__init__.py')
-rwxr-xr-x | lib/pygame/threads/__init__.py | 310 |
1 files changed, 0 insertions, 310 deletions
diff --git a/lib/pygame/threads/__init__.py b/lib/pygame/threads/__init__.py deleted file mode 100755 index 4931865..0000000 --- a/lib/pygame/threads/__init__.py +++ /dev/null @@ -1,310 +0,0 @@ -""" -* Experimental * - -Like the map function, but can use a pool of threads. - -Really easy to use threads. eg. tmap(f, alist) - -If you know how to use the map function, you can use threads. -""" - -__author__ = "Rene Dudfield" -__version__ = "0.3.0" -__license__ = 'Python license' - -import traceback, sys - -from pygame.compat import geterror - -if sys.version_info[0] == 3: - from multiprocessing import JoinableQueue as Queue - from queue import Empty -elif (sys.version_info[0] == 2 and sys.version_info[1] < 5): - from Py25Queue import Queue - from Py25Queue import Empty -else: - # use up to date version - from Queue import Queue - from Queue import Empty - -import threading -Thread = threading.Thread - -STOP = object() -FINISH = object() - -# DONE_ONE = object() -# DONE_TWO = object() - -# a default worker queue. -_wq = None - -# if we are using threads or not. This is the number of workers. -_use_workers = 0 - -# Set this to the maximum for the amount of Cores/CPUs -# Note, that the tests early out. -# So it should only test the best number of workers +2 -MAX_WORKERS_TO_TEST = 64 - - - -def init(number_of_workers = 0): - """ Does a little test to see if threading is worth it. - Sets up a global worker queue if it's worth it. - - Calling init() is not required, but is generally better to do. - """ - global _wq, _use_workers - - if number_of_workers: - _use_workers = number_of_workers - else: - _use_workers = benchmark_workers() - - # if it is best to use zero workers, then use that. - _wq = WorkerQueue(_use_workers) - - - - -def quit(): - """ cleans up everything. - """ - global _wq, _use_workers - _wq.stop() - _wq = None - _use_workers = False - - -def benchmark_workers(a_bench_func = None, the_data = None): - """ does a little test to see if workers are at all faster. - Returns the number of workers which works best. - Takes a little bit of time to run, so you should only really call - it once. - You can pass in benchmark data, and functions if you want. - a_bench_func - f(data) - the_data - data to work on. - """ - global _use_workers - - #TODO: try and make this scale better with slower/faster cpus. - # first find some variables so that using 0 workers takes about 1.0 seconds. - # then go from there. - - - # note, this will only work with pygame 1.8rc3+ - # replace the doit() and the_data with something that releases the GIL - - - import pygame - import pygame.transform - import time - - if not a_bench_func: - def doit(x): - return pygame.transform.scale(x, (544, 576)) - else: - doit = a_bench_func - - if not the_data: - thedata = [] - for x in range(10): - thedata.append(pygame.Surface((155,155), 0, 32)) - else: - thedata = the_data - - best = time.time() + 100000000 - best_number = 0 - last_best = -1 - - for num_workers in range(0, MAX_WORKERS_TO_TEST): - - wq = WorkerQueue(num_workers) - t1 = time.time() - for xx in range(20): - print ("active count:%s" % threading.activeCount()) - results = tmap(doit, thedata, worker_queue = wq) - t2 = time.time() - - wq.stop() - - - total_time = t2 - t1 - print ("total time num_workers:%s: time:%s:" % (num_workers, total_time)) - - if total_time < best: - last_best = best_number - best_number =num_workers - best = total_time - - if num_workers - best_number > 1: - # We tried to add more, but it didn't like it. - # so we stop with testing at this number. - break - - - return best_number - - - - -class WorkerQueue(object): - - def __init__(self, num_workers = 20): - self.queue = Queue() - self.pool = [] - self._setup_workers(num_workers) - - def _setup_workers(self, num_workers): - """ Sets up the worker threads - NOTE: undefined behaviour if you call this again. - """ - self.pool = [] - - for _ in range(num_workers): - self.pool.append(Thread(target=self.threadloop)) - - for a_thread in self.pool: - a_thread.setDaemon(True) - a_thread.start() - - - def do(self, f, *args, **kwArgs): - """ puts a function on a queue for running later. - """ - self.queue.put((f, args, kwArgs)) - - - def stop(self): - """ Stops the WorkerQueue, waits for all of the threads to finish up. - """ - self.queue.put(STOP) - for thread in self.pool: - thread.join() - - - def threadloop(self): #, finish = False): - """ Loops until all of the tasks are finished. - """ - while True: - args = self.queue.get() - if args is STOP: - self.queue.put(STOP) - self.queue.task_done() - break - else: - try: - args[0](*args[1], **args[2]) - finally: - # clean up the queue, raise the exception. - self.queue.task_done() - #raise - - - def wait(self): - """ waits until all tasks are complete. - """ - self.queue.join() - -class FuncResult: - """ Used for wrapping up a function call so that the results are stored - inside the instances result attribute. - """ - def __init__(self, f, callback = None, errback = None): - """ f - is the function we that we call - callback(result) - this is called when the function(f) returns - errback(exception) - this is called when the function(f) raises - an exception. - """ - self.f = f - self.exception = None - self.callback = callback - self.errback = errback - - def __call__(self, *args, **kwargs): - - #we try to call the function here. If it fails we store the exception. - try: - self.result = self.f(*args, **kwargs) - if self.callback: - self.callback(self.result) - except Exception: - self.exception = geterror() - if self.errback: - self.errback(self.exception) - - -def tmap(f, seq_args, num_workers = 20, worker_queue = None, wait = True, stop_on_error = True): - """ like map, but uses a thread pool to execute. - num_workers - the number of worker threads that will be used. If pool - is passed in, then the num_workers arg is ignored. - worker_queue - you can optionally pass in an existing WorkerQueue. - wait - True means that the results are returned when everything is finished. - False means that we return the [worker_queue, results] right away instead. - results, is returned as a list of FuncResult instances. - stop_on_error - - """ - - if worker_queue: - wq = worker_queue - else: - # see if we have a global queue to work with. - if _wq: - wq = _wq - else: - if num_workers == 0: - return map(f, seq_args) - - wq = WorkerQueue(num_workers) - - # we short cut it here if the number of workers is 0. - # normal map should be faster in this case. - if len(wq.pool) == 0: - return map(f, seq_args) - - #print ("queue size:%s" % wq.queue.qsize()) - - - #TODO: divide the data (seq_args) into even chunks and - # then pass each thread a map(f, equal_part(seq_args)) - # That way there should be less locking, and overhead. - - - - results = [] - for sa in seq_args: - results.append(FuncResult(f)) - wq.do(results[-1], sa) - - - #wq.stop() - - if wait: - #print ("wait") - wq.wait() - #print ("after wait") - #print ("queue size:%s" % wq.queue.qsize()) - if wq.queue.qsize(): - raise Exception("buggy threadmap") - # if we created a worker queue, we need to stop it. - if not worker_queue and not _wq: - #print ("stoping") - wq.stop() - if wq.queue.qsize(): - um = wq.queue.get() - if not um is STOP: - raise Exception("buggy threadmap") - - - # see if there were any errors. If so raise the first one. This matches map behaviour. - # TODO: the traceback doesn't show up nicely. - # NOTE: TODO: we might want to return the results anyway? This should be an option. - if stop_on_error: - error_ones = filter(lambda x:x.exception, results) - if error_ones: - raise error_ones[0].exception - - return map(lambda x:x.result, results) - else: - return [wq, results] |