From f891b3a18c952b88967f7408bb942ef3c0cd111f Mon Sep 17 00:00:00 2001 From: Rodrigo Perez Fulloni Date: Mon, 17 Dec 2012 22:24:44 +0000 Subject: Final version 1.0 --- (limited to 'src/pygame/threads/__init__.py') diff --git a/src/pygame/threads/__init__.py b/src/pygame/threads/__init__.py new file mode 100644 index 0000000..4931865 --- /dev/null +++ b/src/pygame/threads/__init__.py @@ -0,0 +1,310 @@ +""" +* Experimental * + +Like the map function, but can use a pool of threads. + +Really easy to use threads. eg. tmap(f, alist) + +If you know how to use the map function, you can use threads. +""" + +__author__ = "Rene Dudfield" +__version__ = "0.3.0" +__license__ = 'Python license' + +import traceback, sys + +from pygame.compat import geterror + +if sys.version_info[0] == 3: + from multiprocessing import JoinableQueue as Queue + from queue import Empty +elif (sys.version_info[0] == 2 and sys.version_info[1] < 5): + from Py25Queue import Queue + from Py25Queue import Empty +else: + # use up to date version + from Queue import Queue + from Queue import Empty + +import threading +Thread = threading.Thread + +STOP = object() +FINISH = object() + +# DONE_ONE = object() +# DONE_TWO = object() + +# a default worker queue. +_wq = None + +# if we are using threads or not. This is the number of workers. +_use_workers = 0 + +# Set this to the maximum for the amount of Cores/CPUs +# Note, that the tests early out. +# So it should only test the best number of workers +2 +MAX_WORKERS_TO_TEST = 64 + + + +def init(number_of_workers = 0): + """ Does a little test to see if threading is worth it. + Sets up a global worker queue if it's worth it. + + Calling init() is not required, but is generally better to do. + """ + global _wq, _use_workers + + if number_of_workers: + _use_workers = number_of_workers + else: + _use_workers = benchmark_workers() + + # if it is best to use zero workers, then use that. + _wq = WorkerQueue(_use_workers) + + + + +def quit(): + """ cleans up everything. + """ + global _wq, _use_workers + _wq.stop() + _wq = None + _use_workers = False + + +def benchmark_workers(a_bench_func = None, the_data = None): + """ does a little test to see if workers are at all faster. + Returns the number of workers which works best. + Takes a little bit of time to run, so you should only really call + it once. + You can pass in benchmark data, and functions if you want. + a_bench_func - f(data) + the_data - data to work on. + """ + global _use_workers + + #TODO: try and make this scale better with slower/faster cpus. + # first find some variables so that using 0 workers takes about 1.0 seconds. + # then go from there. + + + # note, this will only work with pygame 1.8rc3+ + # replace the doit() and the_data with something that releases the GIL + + + import pygame + import pygame.transform + import time + + if not a_bench_func: + def doit(x): + return pygame.transform.scale(x, (544, 576)) + else: + doit = a_bench_func + + if not the_data: + thedata = [] + for x in range(10): + thedata.append(pygame.Surface((155,155), 0, 32)) + else: + thedata = the_data + + best = time.time() + 100000000 + best_number = 0 + last_best = -1 + + for num_workers in range(0, MAX_WORKERS_TO_TEST): + + wq = WorkerQueue(num_workers) + t1 = time.time() + for xx in range(20): + print ("active count:%s" % threading.activeCount()) + results = tmap(doit, thedata, worker_queue = wq) + t2 = time.time() + + wq.stop() + + + total_time = t2 - t1 + print ("total time num_workers:%s: time:%s:" % (num_workers, total_time)) + + if total_time < best: + last_best = best_number + best_number =num_workers + best = total_time + + if num_workers - best_number > 1: + # We tried to add more, but it didn't like it. + # so we stop with testing at this number. + break + + + return best_number + + + + +class WorkerQueue(object): + + def __init__(self, num_workers = 20): + self.queue = Queue() + self.pool = [] + self._setup_workers(num_workers) + + def _setup_workers(self, num_workers): + """ Sets up the worker threads + NOTE: undefined behaviour if you call this again. + """ + self.pool = [] + + for _ in range(num_workers): + self.pool.append(Thread(target=self.threadloop)) + + for a_thread in self.pool: + a_thread.setDaemon(True) + a_thread.start() + + + def do(self, f, *args, **kwArgs): + """ puts a function on a queue for running later. + """ + self.queue.put((f, args, kwArgs)) + + + def stop(self): + """ Stops the WorkerQueue, waits for all of the threads to finish up. + """ + self.queue.put(STOP) + for thread in self.pool: + thread.join() + + + def threadloop(self): #, finish = False): + """ Loops until all of the tasks are finished. + """ + while True: + args = self.queue.get() + if args is STOP: + self.queue.put(STOP) + self.queue.task_done() + break + else: + try: + args[0](*args[1], **args[2]) + finally: + # clean up the queue, raise the exception. + self.queue.task_done() + #raise + + + def wait(self): + """ waits until all tasks are complete. + """ + self.queue.join() + +class FuncResult: + """ Used for wrapping up a function call so that the results are stored + inside the instances result attribute. + """ + def __init__(self, f, callback = None, errback = None): + """ f - is the function we that we call + callback(result) - this is called when the function(f) returns + errback(exception) - this is called when the function(f) raises + an exception. + """ + self.f = f + self.exception = None + self.callback = callback + self.errback = errback + + def __call__(self, *args, **kwargs): + + #we try to call the function here. If it fails we store the exception. + try: + self.result = self.f(*args, **kwargs) + if self.callback: + self.callback(self.result) + except Exception: + self.exception = geterror() + if self.errback: + self.errback(self.exception) + + +def tmap(f, seq_args, num_workers = 20, worker_queue = None, wait = True, stop_on_error = True): + """ like map, but uses a thread pool to execute. + num_workers - the number of worker threads that will be used. If pool + is passed in, then the num_workers arg is ignored. + worker_queue - you can optionally pass in an existing WorkerQueue. + wait - True means that the results are returned when everything is finished. + False means that we return the [worker_queue, results] right away instead. + results, is returned as a list of FuncResult instances. + stop_on_error - + """ + + if worker_queue: + wq = worker_queue + else: + # see if we have a global queue to work with. + if _wq: + wq = _wq + else: + if num_workers == 0: + return map(f, seq_args) + + wq = WorkerQueue(num_workers) + + # we short cut it here if the number of workers is 0. + # normal map should be faster in this case. + if len(wq.pool) == 0: + return map(f, seq_args) + + #print ("queue size:%s" % wq.queue.qsize()) + + + #TODO: divide the data (seq_args) into even chunks and + # then pass each thread a map(f, equal_part(seq_args)) + # That way there should be less locking, and overhead. + + + + results = [] + for sa in seq_args: + results.append(FuncResult(f)) + wq.do(results[-1], sa) + + + #wq.stop() + + if wait: + #print ("wait") + wq.wait() + #print ("after wait") + #print ("queue size:%s" % wq.queue.qsize()) + if wq.queue.qsize(): + raise Exception("buggy threadmap") + # if we created a worker queue, we need to stop it. + if not worker_queue and not _wq: + #print ("stoping") + wq.stop() + if wq.queue.qsize(): + um = wq.queue.get() + if not um is STOP: + raise Exception("buggy threadmap") + + + # see if there were any errors. If so raise the first one. This matches map behaviour. + # TODO: the traceback doesn't show up nicely. + # NOTE: TODO: we might want to return the results anyway? This should be an option. + if stop_on_error: + error_ones = filter(lambda x:x.exception, results) + if error_ones: + raise error_ones[0].exception + + return map(lambda x:x.result, results) + else: + return [wq, results] -- cgit v0.9.1