Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/lib/pygame/threads/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/pygame/threads/__init__.py')
-rwxr-xr-xlib/pygame/threads/__init__.py310
1 files changed, 0 insertions, 310 deletions
diff --git a/lib/pygame/threads/__init__.py b/lib/pygame/threads/__init__.py
deleted file mode 100755
index 4931865..0000000
--- a/lib/pygame/threads/__init__.py
+++ /dev/null
@@ -1,310 +0,0 @@
-"""
-* Experimental *
-
-Like the map function, but can use a pool of threads.
-
-Really easy to use threads. eg. tmap(f, alist)
-
-If you know how to use the map function, you can use threads.
-"""
-
-__author__ = "Rene Dudfield"
-__version__ = "0.3.0"
-__license__ = 'Python license'
-
-import traceback, sys
-
-from pygame.compat import geterror
-
-if sys.version_info[0] == 3:
- from multiprocessing import JoinableQueue as Queue
- from queue import Empty
-elif (sys.version_info[0] == 2 and sys.version_info[1] < 5):
- from Py25Queue import Queue
- from Py25Queue import Empty
-else:
- # use up to date version
- from Queue import Queue
- from Queue import Empty
-
-import threading
-Thread = threading.Thread
-
-STOP = object()
-FINISH = object()
-
-# DONE_ONE = object()
-# DONE_TWO = object()
-
-# a default worker queue.
-_wq = None
-
-# if we are using threads or not. This is the number of workers.
-_use_workers = 0
-
-# Set this to the maximum for the amount of Cores/CPUs
-# Note, that the tests early out.
-# So it should only test the best number of workers +2
-MAX_WORKERS_TO_TEST = 64
-
-
-
-def init(number_of_workers = 0):
- """ Does a little test to see if threading is worth it.
- Sets up a global worker queue if it's worth it.
-
- Calling init() is not required, but is generally better to do.
- """
- global _wq, _use_workers
-
- if number_of_workers:
- _use_workers = number_of_workers
- else:
- _use_workers = benchmark_workers()
-
- # if it is best to use zero workers, then use that.
- _wq = WorkerQueue(_use_workers)
-
-
-
-
-def quit():
- """ cleans up everything.
- """
- global _wq, _use_workers
- _wq.stop()
- _wq = None
- _use_workers = False
-
-
-def benchmark_workers(a_bench_func = None, the_data = None):
- """ does a little test to see if workers are at all faster.
- Returns the number of workers which works best.
- Takes a little bit of time to run, so you should only really call
- it once.
- You can pass in benchmark data, and functions if you want.
- a_bench_func - f(data)
- the_data - data to work on.
- """
- global _use_workers
-
- #TODO: try and make this scale better with slower/faster cpus.
- # first find some variables so that using 0 workers takes about 1.0 seconds.
- # then go from there.
-
-
- # note, this will only work with pygame 1.8rc3+
- # replace the doit() and the_data with something that releases the GIL
-
-
- import pygame
- import pygame.transform
- import time
-
- if not a_bench_func:
- def doit(x):
- return pygame.transform.scale(x, (544, 576))
- else:
- doit = a_bench_func
-
- if not the_data:
- thedata = []
- for x in range(10):
- thedata.append(pygame.Surface((155,155), 0, 32))
- else:
- thedata = the_data
-
- best = time.time() + 100000000
- best_number = 0
- last_best = -1
-
- for num_workers in range(0, MAX_WORKERS_TO_TEST):
-
- wq = WorkerQueue(num_workers)
- t1 = time.time()
- for xx in range(20):
- print ("active count:%s" % threading.activeCount())
- results = tmap(doit, thedata, worker_queue = wq)
- t2 = time.time()
-
- wq.stop()
-
-
- total_time = t2 - t1
- print ("total time num_workers:%s: time:%s:" % (num_workers, total_time))
-
- if total_time < best:
- last_best = best_number
- best_number =num_workers
- best = total_time
-
- if num_workers - best_number > 1:
- # We tried to add more, but it didn't like it.
- # so we stop with testing at this number.
- break
-
-
- return best_number
-
-
-
-
-class WorkerQueue(object):
-
- def __init__(self, num_workers = 20):
- self.queue = Queue()
- self.pool = []
- self._setup_workers(num_workers)
-
- def _setup_workers(self, num_workers):
- """ Sets up the worker threads
- NOTE: undefined behaviour if you call this again.
- """
- self.pool = []
-
- for _ in range(num_workers):
- self.pool.append(Thread(target=self.threadloop))
-
- for a_thread in self.pool:
- a_thread.setDaemon(True)
- a_thread.start()
-
-
- def do(self, f, *args, **kwArgs):
- """ puts a function on a queue for running later.
- """
- self.queue.put((f, args, kwArgs))
-
-
- def stop(self):
- """ Stops the WorkerQueue, waits for all of the threads to finish up.
- """
- self.queue.put(STOP)
- for thread in self.pool:
- thread.join()
-
-
- def threadloop(self): #, finish = False):
- """ Loops until all of the tasks are finished.
- """
- while True:
- args = self.queue.get()
- if args is STOP:
- self.queue.put(STOP)
- self.queue.task_done()
- break
- else:
- try:
- args[0](*args[1], **args[2])
- finally:
- # clean up the queue, raise the exception.
- self.queue.task_done()
- #raise
-
-
- def wait(self):
- """ waits until all tasks are complete.
- """
- self.queue.join()
-
-class FuncResult:
- """ Used for wrapping up a function call so that the results are stored
- inside the instances result attribute.
- """
- def __init__(self, f, callback = None, errback = None):
- """ f - is the function we that we call
- callback(result) - this is called when the function(f) returns
- errback(exception) - this is called when the function(f) raises
- an exception.
- """
- self.f = f
- self.exception = None
- self.callback = callback
- self.errback = errback
-
- def __call__(self, *args, **kwargs):
-
- #we try to call the function here. If it fails we store the exception.
- try:
- self.result = self.f(*args, **kwargs)
- if self.callback:
- self.callback(self.result)
- except Exception:
- self.exception = geterror()
- if self.errback:
- self.errback(self.exception)
-
-
-def tmap(f, seq_args, num_workers = 20, worker_queue = None, wait = True, stop_on_error = True):
- """ like map, but uses a thread pool to execute.
- num_workers - the number of worker threads that will be used. If pool
- is passed in, then the num_workers arg is ignored.
- worker_queue - you can optionally pass in an existing WorkerQueue.
- wait - True means that the results are returned when everything is finished.
- False means that we return the [worker_queue, results] right away instead.
- results, is returned as a list of FuncResult instances.
- stop_on_error -
- """
-
- if worker_queue:
- wq = worker_queue
- else:
- # see if we have a global queue to work with.
- if _wq:
- wq = _wq
- else:
- if num_workers == 0:
- return map(f, seq_args)
-
- wq = WorkerQueue(num_workers)
-
- # we short cut it here if the number of workers is 0.
- # normal map should be faster in this case.
- if len(wq.pool) == 0:
- return map(f, seq_args)
-
- #print ("queue size:%s" % wq.queue.qsize())
-
-
- #TODO: divide the data (seq_args) into even chunks and
- # then pass each thread a map(f, equal_part(seq_args))
- # That way there should be less locking, and overhead.
-
-
-
- results = []
- for sa in seq_args:
- results.append(FuncResult(f))
- wq.do(results[-1], sa)
-
-
- #wq.stop()
-
- if wait:
- #print ("wait")
- wq.wait()
- #print ("after wait")
- #print ("queue size:%s" % wq.queue.qsize())
- if wq.queue.qsize():
- raise Exception("buggy threadmap")
- # if we created a worker queue, we need to stop it.
- if not worker_queue and not _wq:
- #print ("stoping")
- wq.stop()
- if wq.queue.qsize():
- um = wq.queue.get()
- if not um is STOP:
- raise Exception("buggy threadmap")
-
-
- # see if there were any errors. If so raise the first one. This matches map behaviour.
- # TODO: the traceback doesn't show up nicely.
- # NOTE: TODO: we might want to return the results anyway? This should be an option.
- if stop_on_error:
- error_ones = filter(lambda x:x.exception, results)
- if error_ones:
- raise error_ones[0].exception
-
- return map(lambda x:x.result, results)
- else:
- return [wq, results]