diff options
Diffstat (limited to 'sugar_network/client/cache.py')
-rw-r--r-- | sugar_network/client/cache.py | 243 |
1 files changed, 125 insertions, 118 deletions
diff --git a/sugar_network/client/cache.py b/sugar_network/client/cache.py index 76c3dd0..09eb40a 100644 --- a/sugar_network/client/cache.py +++ b/sugar_network/client/cache.py @@ -16,135 +16,142 @@ import os import json import time -import shutil import logging -from os.path import exists, join, isdir +from os.path import exists -from sugar_network import toolkit, client -from sugar_network.toolkit.bundle import Bundle -from sugar_network.toolkit import pipe, enforce +from sugar_network import client +from sugar_network.toolkit import pylru, enforce -_logger = logging.getLogger('cache') - +_POOL_SIZE = 256 -def recycle(): - stat = os.statvfs(client.local_root.value) - total = stat.f_blocks * stat.f_frsize - free = stat.f_bfree * stat.f_frsize - to_free = client.cache_limit.value * total / 100 - free - ts = time.time() - - __, items = _list() - for mtime, neg_size, path in items: - if to_free > 0: - shutil.rmtree(path, ignore_errors=True) - _logger.debug('Recycled %r to save %s bytes', path, -neg_size) - to_free += neg_size - elif mtime == 0: - shutil.rmtree(path, ignore_errors=True) - _logger.debug('Recycled malformed cache item %r', path) - elif client.cache_lifetime.value and \ - client.cache_lifetime.value < (ts - mtime) / 86400.0: - shutil.rmtree(path, ignore_errors=True) - _logger.debug('Recycled stale %r to get %s bytes', path, -neg_size) - else: - break +_logger = logging.getLogger('cache') -def ensure(requested_size=0, temp_size=0): - stat = os.statvfs(client.local_root.value) - if stat.f_blocks == 0: - # TODO Sonds like a tmpfs or so - return - total = stat.f_blocks * stat.f_frsize - free = stat.f_bfree * stat.f_frsize +class Cache(object): - to_free = max(client.cache_limit.value * total / 100, temp_size) - \ - (free - requested_size) - if to_free <= 0: - return + def __init__(self, volume): + self._volume = volume + self._pool = None + self._du = 0 - _logger.debug('Recycle %s bytes free=%d requested_size=%d temp_size=%d', - to_free, free, requested_size, temp_size) + def __iter__(self): + self._ensure_open() + return iter(self._pool) - cached_total, items = _list() - enforce(cached_total >= to_free, 'No free disk space') + def ensure(self, requested_size, temp_size=0): + self._ensure_open() + stat = os.statvfs(client.local_root.value) + if stat.f_blocks == 0: + # TODO Sonds like a tmpfs or so + return + total = stat.f_blocks * stat.f_frsize + free = stat.f_bfree * stat.f_frsize - for __, neg_size, path in items: - shutil.rmtree(path, ignore_errors=True) - _logger.debug('Recycled %r to save %s bytes', path, -neg_size) - to_free += neg_size + to_free = max(client.cache_limit.value * total / 100, temp_size) - \ + (free - requested_size) if to_free <= 0: - break - - -def get(guid, hints=None): - path = join(client.local_root.value, 'cache', 'implementation', guid) - if exists(path): - pipe.trace('Reuse cached %s implementation from %r', guid, path) + return + + _logger.debug('Recycle %s byte free=%d requested_size=%d temp_size=%d', + to_free, free, requested_size, temp_size) + enforce(self._du >= to_free, 'No free disk space') + + for guid, size, mtime in self._reversed_iter(): + self._checkout(guid, (size, mtime)) + to_free -= size + if to_free <= 0: + break + + def checkin(self, guid, meta=None): + self._ensure_open() + if guid in self._pool: + self._pool.__getitem__(guid) + return + _logger.debug('Checkin %r', guid) + impls = self._volume['implementation'] + if meta is None: + meta = impls.get(guid).meta('data') + size = meta.get('unpack_size') or meta['blob_size'] + mtime = os.stat(impls.path(guid)).st_mtime + self._pool[guid] = (size, mtime) + + def checkout(self, guid): + self._ensure_open() + if guid not in self._pool: + return + _logger.debug('Checkout %r', guid) + size, __ = self._pool.peek(guid) + self._du -= size + del self._pool[guid] + + def recycle(self): + self._ensure_open() + stat = os.statvfs(client.local_root.value) + total = stat.f_blocks * stat.f_frsize + free = stat.f_bfree * stat.f_frsize + to_free = client.cache_limit.value * total / 100 - free ts = time.time() - os.utime(path, (ts, ts)) - return path - - pipe.trace('Download %s implementation', guid) - # TODO Per download progress - pipe.feedback('download') - - ensure(hints.get('unpack_size') or 0, hints.get('bundle_size') or 0) - blob = client.IPCConnection().download(['implementation', guid, 'data']) - _unpack_stream(blob, path) - with toolkit.new_file(join(path, '.unpack_size')) as f: - json.dump(hints.get('unpack_size') or 0, f) - - topdir = os.listdir(path)[-1:] - if topdir: - for exec_dir in ('bin', 'activity'): - bin_path = join(path, topdir[0], exec_dir) - if not exists(bin_path): + + for guid, size, mtime in self._reversed_iter(): + if to_free > 0: + self._checkout(guid, (size, mtime)) + to_free -= size + elif client.cache_lifetime.value and \ + client.cache_lifetime.value < (ts - mtime) / 86400.0: + self._checkout(guid, (size, None)) + else: + break + + def _ensure_open(self): + if self._pool is not None: + return + + _logger.debug('Open implementations pool') + + pool = [] + contexts = self._volume['context'] + impls = self._volume['implementation'] + for res in impls.find(not_layer=['local'])[0]: + meta = res.meta('data') + if not meta or 'blob_size' not in meta: continue - for filename in os.listdir(bin_path): - os.chmod(join(bin_path, filename), 0755) - - return path - - -def _list(): - total = 0 - result = [] - root = join(client.local_root.value, 'cache', 'implementation') - - if not exists(root): - os.makedirs(root) - return 0, [] - - for filename in os.listdir(root): - path = join(root, filename) - if not isdir(path): - continue - try: - with file(join(path, '.unpack_size')) as f: - unpack_size = json.load(f) - total += unpack_size - # Negative `unpack_size` to process large impls at first - result.append((os.stat(path).st_mtime, -unpack_size, path)) - except Exception: - toolkit.exception('Cannot list %r cached implementation', path) - result.append((0, 0, path)) - - return total, sorted(result) - - -def _unpack_stream(stream, dst): - with toolkit.NamedTemporaryFile() as tmp_file: - for chunk in stream: - tmp_file.write(chunk) - tmp_file.flush() - if not exists(dst): - os.makedirs(dst) - try: - with Bundle(tmp_file.name, 'application/zip') as bundle: - bundle.extractall(dst) - except Exception: - shutil.rmtree(dst, ignore_errors=True) - raise + clone = contexts.path(res['context'], 'clone') + if exists(clone): + with file(clone) as f: + if json.load(f) == res.guid: + continue + pool.append(( + os.stat(impls.path(res.guid)).st_mtime, + res.guid, + meta.get('unpack_size') or meta['blob_size'], + )) + + self._pool = pylru.lrucache(_POOL_SIZE, self._checkout) + for mtime, guid, size in sorted(pool): + self._pool[guid] = (size, mtime) + self._du += size + + def _reversed_iter(self): + i = self._pool.head.prev + while True: + while i.empty: + if i is self._pool.head: + return + i = i.prev + size, mtime = i.value + yield i.key, size, mtime + if i is self._pool.head: + break + i = i.next + + def _checkout(self, guid, value): + size, mtime = value + if mtime is None: + _logger.debug('Recycle stale %r to save %s bytes', guid, size) + else: + _logger.debug('Recycle %r to save %s bytes', guid, size) + self._volume['implementation'].delete(guid) + self._du -= size + if guid in self._pool: + del self._pool[guid] |