diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2013-06-11 06:34:27 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2013-06-11 06:34:27 (GMT) |
commit | e5254ac6d179b13c2aba66a89410ce8d0d95566d (patch) | |
tree | f2d6dcc37d802df3558c20ddfec4f58da74a8f56 | |
parent | e036cec6f898d4150d385ce0a6a2c33fed3b6662 (diff) |
Initial implementation for recycling cached implementations
-rwxr-xr-x | sugar-network-client | 14 | ||||
-rw-r--r-- | sugar_network/client/__init__.py | 14 | ||||
-rw-r--r-- | sugar_network/client/cache.py | 115 | ||||
-rw-r--r-- | sugar_network/client/injector.py | 6 | ||||
-rw-r--r-- | sugar_network/client/solver.py | 8 | ||||
-rw-r--r-- | sugar_network/node/commands.py | 10 | ||||
-rw-r--r-- | sugar_network/resources/implementation.py | 12 | ||||
-rw-r--r-- | sugar_network/toolkit/http.py | 9 | ||||
-rw-r--r-- | tests/__init__.py | 15 | ||||
-rw-r--r-- | tests/units/client/__main__.py | 1 | ||||
-rwxr-xr-x | tests/units/client/cache.py | 145 | ||||
-rwxr-xr-x | tests/units/client/injector.py | 4 | ||||
-rwxr-xr-x | tests/units/node/node.py | 6 | ||||
-rwxr-xr-x | tests/units/resources/implementation.py | 20 |
14 files changed, 327 insertions, 52 deletions
diff --git a/sugar-network-client b/sugar-network-client index eba2494..0efd7dc 100755 --- a/sugar-network-client +++ b/sugar-network-client @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (C) 2012 Aleksey Lim +# Copyright (C) 2012-2013 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -26,7 +26,7 @@ from gevent import monkey import sugar_network_webui as webui from sugar_network import db, toolkit, client, node -from sugar_network.client import IPCRouter, clones +from sugar_network.client import IPCRouter, clones, cache from sugar_network.client.commands import CachedClientCommands from sugar_network.node import stats_node, stats_user from sugar_network.resources.volume import Volume @@ -135,6 +135,9 @@ class Application(application.Daemon): os.makedirs(mounts_root) self.jobs.spawn(mountpoints.monitor, mounts_root) + if client.cache_timeout.value: + self.jobs.spawn(self._recycle_cache) + def delayed_start(event=None): for __ in commands.subscribe(event='delayed-start'): break @@ -157,6 +160,13 @@ class Application(application.Daemon): def shutdown(self): self.jobs.kill() + def _recycle_cache(self): + while True: + logging.debug('Start cache recycling in %d seconds', + client.cache_timeout.value) + coroutine.sleep(client.cache_timeout.value) + cache.recycle() + def __SIGCHLD_cb(self): while True: try: diff --git a/sugar_network/client/__init__.py b/sugar_network/client/__init__.py index 62d9128..c3d70ab 100644 --- a/sugar_network/client/__init__.py +++ b/sugar_network/client/__init__.py @@ -128,6 +128,20 @@ accept_language = Option( default=[], type_cast=Option.list_cast, type_repr=Option.list_repr, name='accept-language', short_option='-l') +cache_limit = Option( + 'the minimal disk free space, in percents, to preserve ' + 'while recycling disk cache', + default=10, type_cast=int, name='cache-limit') + +cache_lifetime = Option( + 'the number of days to keep unused objects on disk cache ' + 'before recycling', + default=7, type_cast=int, name='cache-lifetime') + +cache_timeout = Option( + 'check disk cache for recycling in specified delay in seconds', + default=3600, type_cast=int, name='cache-timeout') + def path(*args): """Calculate a path from the root. diff --git a/sugar_network/client/cache.py b/sugar_network/client/cache.py index d3930c8..e0d8c94 100644 --- a/sugar_network/client/cache.py +++ b/sugar_network/client/cache.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012 Aleksey Lim +# Copyright (C) 2012-2013 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -14,34 +14,86 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import os +import json +import time import shutil -from os.path import exists, join +import logging +from os.path import exists, join, isdir from sugar_network.client import IPCClient, local_root +from sugar_network.client import cache_limit, cache_lifetime from sugar_network.toolkit.bundle import Bundle -from sugar_network.toolkit import pipe, util +from sugar_network.toolkit import pipe, util, enforce -def get(guid): +_logger = logging.getLogger('cache') + + +def recycle(): + stat = os.statvfs(local_root.value) + total = stat.f_blocks * stat.f_frsize + free = stat.f_bfree * stat.f_frsize + to_free = cache_limit.value * total / 100 - free + ts = time.time() + + __, items = _list() + for mtime, neg_size, path in items: + if to_free > 0: + shutil.rmtree(path, ignore_errors=True) + _logger.debug('Recycled %r to save %s bytes', path, -neg_size) + to_free += neg_size + elif mtime == 0: + shutil.rmtree(path, ignore_errors=True) + _logger.debug('Recycled malformed cache item %r', path) + elif cache_lifetime.value and \ + cache_lifetime.value < (ts - mtime) / 86400.0: + shutil.rmtree(path, ignore_errors=True) + _logger.debug('Recycled stale %r to get %s bytes', path, -neg_size) + else: + break + + +def ensure(requested_size=0, temp_size=0): + stat = os.statvfs(local_root.value) + total = stat.f_blocks * stat.f_frsize + free = stat.f_bfree * stat.f_frsize + + to_free = max(cache_limit.value * total / 100, temp_size) - \ + (free - requested_size) + if to_free <= 0: + return + + _logger.debug('Recycle %s bytes free=%d requested_size=%d temp_size=%d', + to_free, free, requested_size, temp_size) + + cached_total, items = _list() + enforce(cached_total >= to_free, 'No free disk space') + + for __, neg_size, path in items: + shutil.rmtree(path, ignore_errors=True) + _logger.debug('Recycled %r to save %s bytes', path, -neg_size) + to_free += neg_size + if to_free <= 0: + break + + +def get(guid, hints=None): path = join(local_root.value, 'cache', 'implementation', guid) if exists(path): pipe.trace('Reuse cached %s implementation from %r', guid, path) + ts = time.time() + os.utime(path, (ts, ts)) return path pipe.trace('Download %s implementation', guid) # TODO Per download progress pipe.feedback('download') - with util.NamedTemporaryFile() as tmp_file: - IPCClient().download(['implementation', guid, 'data'], tmp_file) - tmp_file.flush() - os.makedirs(path) - try: - with Bundle(tmp_file.name, 'application/zip') as bundle: - bundle.extractall(path) - except Exception: - shutil.rmtree(path, ignore_errors=True) - raise + ensure(hints.get('unpack_size') or 0, hints.get('bundle_size') or 0) + blob = IPCClient().download(['implementation', guid, 'data']) + _unpack_stream(blob, path) + with util.new_file(join(path, '.unpack_size')) as f: + json.dump(hints.get('unpack_size') or 0, f) topdir = os.listdir(path)[-1:] if topdir: @@ -53,3 +105,38 @@ def get(guid): os.chmod(join(bin_path, filename), 0755) return path + + +def _list(): + total = 0 + result = [] + root = join(local_root.value, 'cache', 'implementation') + for filename in os.listdir(root): + path = join(root, filename) + if not isdir(path): + continue + try: + with file(join(path, '.unpack_size')) as f: + unpack_size = json.load(f) + total += unpack_size + # Negative `unpack_size` to process large impls at first + result.append((os.stat(path).st_mtime, -unpack_size, path)) + except Exception: + util.exception('Cannot list %r cached implementation', path) + result.append((0, 0, path)) + return total, sorted(result) + + +def _unpack_stream(stream, dst): + with util.NamedTemporaryFile() as tmp_file: + for chunk in stream: + tmp_file.write(chunk) + tmp_file.flush() + if not exists(dst): + os.makedirs(dst) + try: + with Bundle(tmp_file.name, 'application/zip') as bundle: + bundle.extractall(dst) + except Exception: + shutil.rmtree(dst, ignore_errors=True) + raise diff --git a/sugar_network/client/injector.py b/sugar_network/client/injector.py index ec1c1e6..c180c05 100644 --- a/sugar_network/client/injector.py +++ b/sugar_network/client/injector.py @@ -104,9 +104,7 @@ def _make(context): for impl in solution: if 'path' in impl or impl['stability'] == 'packaged': continue - - # TODO Process different mountpoints - impl_path = cache.get(impl['id']) + impl_path = cache.get(impl['id'], impl) if 'prefix' in impl: impl_path = join(impl_path, impl['prefix']) impl['path'] = impl_path @@ -163,7 +161,7 @@ def _clone_impl(context_guid, params): impl = impls[0] spec = impl['spec']['*-*'] - src_path = cache.get(impl['guid']) + src_path = cache.get(impl['guid'], impl) if 'extract' in spec: src_path = join(src_path, spec['extract']) dst_path = util.unique_filename( diff --git a/sugar_network/client/solver.py b/sugar_network/client/solver.py index 1155fe8..1892aef 100644 --- a/sugar_network/client/solver.py +++ b/sugar_network/client/solver.py @@ -161,6 +161,12 @@ def _impl_new(config, iface, sel): 'name': feed.title, 'stability': sel.impl.upstream_stability.name, } + if sel.impl.hints: + for key in ('mime_type', 'blob_size', 'unpack_size'): + value = sel.impl.hints.get(key) + if value is not None: + impl[key] = value + if isabs(sel.id): impl['spec'] = join(sel.id, 'activity', 'activity.info') if sel.local_path: @@ -265,6 +271,7 @@ class _Feed(model.ZeroInstallFeed): impl.arch = release['arch'] impl.upstream_stability = model.stability_levels[release['stability']] impl.requires.extend(_read_requires(release.get('requires'))) + impl.hints = release if isabs(impl_id): impl.local_path = impl_id @@ -295,6 +302,7 @@ class _Feed(model.ZeroInstallFeed): class _Implementation(model.ZeroInstallImplementation): to_install = None + hints = None class _Dependency(model.InterfaceDependency): diff --git a/sugar_network/node/commands.py b/sugar_network/node/commands.py index a71fad6..7678d25 100644 --- a/sugar_network/node/commands.py +++ b/sugar_network/node/commands.py @@ -243,9 +243,10 @@ class NodeCommands(db.VolumeCommands, Commands): def feed(self, document, guid, layer, distro, request): enforce(document == 'context') context = self.volume['context'].get(guid) - + implementations = self.volume['implementation'] versions = [] - impls, __ = self.volume['implementation'].find(limit=db.MAX_LIMIT, + + impls, __ = implementations.find(limit=db.MAX_LIMIT, context=context.guid, layer=layer) for impl in impls: for arch, spec in impl['spec'].items(): @@ -257,6 +258,11 @@ class NodeCommands(db.VolumeCommands, Commands): requires = spec.setdefault('requires', {}) for i in context['dependencies']: requires.setdefault(i, {}) + blob = implementations.get(impl.guid).meta('data') + if blob: + spec['mime_type'] = blob.get('mime_type') + spec['blob_size'] = blob.get('blob_size') + spec['unpack_size'] = blob.get('unpack_size') versions.append(spec) result = { diff --git a/sugar_network/resources/implementation.py b/sugar_network/resources/implementation.py index d9774e7..e038377 100644 --- a/sugar_network/resources/implementation.py +++ b/sugar_network/resources/implementation.py @@ -100,20 +100,20 @@ class Implementation(Resource): if 'activity' not in context['type']: return value - def calc_uncompressed_size(path): - uncompressed_size = 0 + def calc_unpack_size(path): + unpack_size = 0 with Bundle(path, mime_type='application/zip') as bundle: for arcname in bundle.get_names(): - uncompressed_size += bundle.getmember(arcname).size - value['uncompressed_size'] = uncompressed_size + unpack_size += bundle.getmember(arcname).size + value['unpack_size'] = unpack_size if 'blob' in value: - calc_uncompressed_size(value['blob']) + calc_unpack_size(value['blob']) elif 'url' in value: with util.NamedTemporaryFile() as f: http.download(value['url'], f.name) value['blob_size'] = os.stat(f.name).st_size - calc_uncompressed_size(f.name) + calc_unpack_size(f.name) value['mime_type'] = 'application/vnd.olpc-sugar' return value diff --git a/sugar_network/toolkit/http.py b/sugar_network/toolkit/http.py index 691d13a..9e66e23 100644 --- a/sugar_network/toolkit/http.py +++ b/sugar_network/toolkit/http.py @@ -89,7 +89,7 @@ class ServiceUnavailable(Status): status_code = 503 -def download(url, dst_path): +def download(url, dst_path=None): # TODO (?) Reuse HTTP session return Client().download(url, dst_path) @@ -146,13 +146,18 @@ class Client(object): response = self.request('DELETE', path_, params=kwargs) return self._decode_reply(response) - def download(self, path, dst): + def download(self, path, dst=None): response = self.request('GET', path, allow_redirects=True) + content_length = response.headers.get('Content-Length') if content_length: chunk_size = min(int(content_length), BUFFER_SIZE) else: chunk_size = BUFFER_SIZE + + if dst is None: + return response.iter_content(chunk_size=chunk_size) + f = file(dst, 'wb') if isinstance(dst, basestring) else dst try: for chunk in response.iter_content(chunk_size=chunk_size): diff --git a/tests/__init__.py b/tests/__init__.py index 928aa69..ebc53a0 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -7,6 +7,7 @@ import signal import shutil import hashlib import logging +import zipfile import unittest import tempfile import subprocess @@ -15,7 +16,7 @@ from os.path import dirname, join, exists, abspath, isfile from M2Crypto import DSA from gevent import monkey -from sugar_network.toolkit import coroutine, http, mountpoints, util, Option +from sugar_network.toolkit import coroutine, http, mountpoints, util, Option, pipe from sugar_network.db.router import Router from sugar_network.client import journal, IPCRouter, commands from sugar_network.client.commands import ClientCommands @@ -93,6 +94,8 @@ class Test(unittest.TestCase): client.mounts_root.value = None client.ipc_port.value = 5555 client.layers.value = None + client.cache_limit.value = 0 + client.cache_lifetime.value = 0 commands._RECONNECT_TIMEOUT = 0 mountpoints._connects.clear() mountpoints._found.clear() @@ -113,6 +116,8 @@ class Test(unittest.TestCase): journal._ds_root = tmpdir + '/datastore' solver.nodeps = False downloads._POOL_SIZE = 256 + pipe._pipe = None + pipe._trace = None Volume.RESOURCES = [ 'sugar_network.resources.user', @@ -218,6 +223,14 @@ class Test(unittest.TestCase): for i in files: os.utime(join(root, i), (ts, ts)) + def zips(self, *items): + with util.NamedTemporaryFile() as f: + bundle = zipfile.ZipFile(f.name, 'w') + for arcname, data in items: + bundle.writestr(arcname, data) + bundle.close() + return file(f.name, 'rb').read() + def fork(self, cb, *args): pid = os.fork() if pid: diff --git a/tests/units/client/__main__.py b/tests/units/client/__main__.py index 5034221..fd37288 100644 --- a/tests/units/client/__main__.py +++ b/tests/units/client/__main__.py @@ -10,6 +10,7 @@ from offline_commands import * from online_commands import * from server_commands import * from solver import * +from cache import * if __name__ == '__main__': tests.main() diff --git a/tests/units/client/cache.py b/tests/units/client/cache.py new file mode 100755 index 0000000..221572a --- /dev/null +++ b/tests/units/client/cache.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python +# sugar-lint: disable + +import os +import time +import json +from cStringIO import StringIO +from os.path import exists + +from __init__ import tests + +from sugar_network.client import cache, cache_limit, cache_lifetime +from sugar_network.toolkit import http + + +class CacheTest(tests.Test): + + def setUp(self): + tests.Test.setUp(self) + + class statvfs(object): + f_blocks = 100 + f_bfree = 100 + f_frsize = 1 + + self.statvfs = statvfs + self.override(os, 'statvfs', lambda *args: statvfs()) + cache_limit.value = 0 + + def test_get(self): + self.override(http.Client, 'download', lambda self_, path: StringIO(self.zips(('topdir/probe', '/'.join(path))))) + cache.get('impl', {'unpack_size': 100}) + self.assertEqual(100, json.load(file('cache/implementation/impl/.unpack_size'))) + self.assertEqual('implementation/impl/data', file('cache/implementation/impl/topdir/probe').read()) + + def test_ensure(self): + self.touch(('cache/implementation/1/.unpack_size', '1', 1)) + self.touch(('cache/implementation/2/.unpack_size', '1', 2)) + self.touch(('cache/implementation/3/.unpack_size', '1', 3)) + cache_limit.value = 10 + + self.statvfs.f_bfree = 11 + cache.ensure(1, 0) + assert exists('cache/implementation/1') + assert exists('cache/implementation/2') + assert exists('cache/implementation/3') + + self.statvfs.f_bfree = 10 + cache.ensure(1, 0) + assert not exists('cache/implementation/1') + assert exists('cache/implementation/2') + assert exists('cache/implementation/3') + + self.statvfs.f_bfree = 11 + cache.ensure(3, 0) + assert not exists('cache/implementation/1') + assert not exists('cache/implementation/2') + assert not exists('cache/implementation/3') + + self.statvfs.f_bfree = 10 + self.assertRaises(RuntimeError, cache.ensure, 1, 0) + + def test_ensure_FailRightAway(self): + self.touch(('cache/implementation/1/.unpack_size', '1', 1)) + cache_limit.value = 10 + self.statvfs.f_bfree = 10 + + self.assertRaises(RuntimeError, cache.ensure, 2, 0) + assert exists('cache/implementation/1') + + cache.ensure(1, 0) + assert not exists('cache/implementation/1') + + def test_ensure_ConsiderTmpSize(self): + self.touch(('cache/implementation/1/.unpack_size', '1', 1)) + cache_limit.value = 10 + self.statvfs.f_bfree = 10 + + self.assertRaises(RuntimeError, cache.ensure, 2, 0) + assert exists('cache/implementation/1') + + cache.ensure(1, 0) + assert not exists('cache/implementation/1') + + def test_recycle(self): + ts = time.time() + self.touch(('cache/implementation/1/.unpack_size', '1')) + os.utime('cache/implementation/1', (ts - 1.5 * 86400, ts - 1.5 * 86400)) + self.touch(('cache/implementation/2/.unpack_size', '1')) + os.utime('cache/implementation/2', (ts - 2.5 * 86400, ts - 2.5 * 86400)) + self.touch(('cache/implementation/3/.unpack_size', '1')) + os.utime('cache/implementation/3', (ts - 3.5 * 86400, ts - 3.5 * 86400)) + + cache_lifetime.value = 4 + cache.recycle() + assert exists('cache/implementation/1') + assert exists('cache/implementation/2') + assert exists('cache/implementation/3') + + cache_lifetime.value = 3 + cache.recycle() + assert exists('cache/implementation/1') + assert exists('cache/implementation/2') + assert not exists('cache/implementation/3') + + cache_lifetime.value = 1 + cache.recycle() + assert not exists('cache/implementation/1') + assert not exists('cache/implementation/2') + assert not exists('cache/implementation/3') + + def test_recycle_CallEnsure(self): + self.touch(('cache/implementation/1/.unpack_size', '1', 100)) + cache_limit.value = 10 + cache_lifetime.value = 0 + + self.statvfs.f_bfree = 100 + cache.recycle() + assert exists('cache/implementation/1') + + self.statvfs.f_bfree = 0 + cache.recycle() + assert not exists('cache/implementation/1') + + def test_RecycleBadDirs(self): + cache_limit.value = 10 + self.statvfs.f_bfree = 10 + self.touch('cache/implementation/1/foo') + self.touch('cache/implementation/2/bar') + self.touch(('cache/implementation/3/.unpack_size', '1')) + cache.ensure(1, 0) + assert not exists('cache/implementation/1') + assert not exists('cache/implementation/2') + assert not exists('cache/implementation/3') + + self.statvfs.f_bfree = 100 + self.touch('cache/implementation/1/foo') + self.touch('cache/implementation/2/bar') + cache.recycle() + assert not exists('cache/implementation/1') + assert not exists('cache/implementation/2') + + +if __name__ == '__main__': + tests.main() diff --git a/tests/units/client/injector.py b/tests/units/client/injector.py index 56dee84..11e0067 100755 --- a/tests/units/client/injector.py +++ b/tests/units/client/injector.py @@ -85,8 +85,8 @@ Can't find all required implementations: 'error_type': 'NotFound', 'solution': [{'name': 'title', 'prefix': 'topdir', 'version': '1', 'command': ['echo'], 'context': context, 'id': impl, 'stability': 'stable'}], }, - ], - [i for i in pipe]) + ][-1], + [i for i in pipe][-1]) assert not exists('cache/implementation/%s' % impl) blob_path = 'master/implementation/%s/%s/data' % (impl[:2], impl) diff --git a/tests/units/node/node.py b/tests/units/node/node.py index 93ae517..5e9eb33 100755 --- a/tests/units/node/node.py +++ b/tests/units/node/node.py @@ -4,7 +4,6 @@ import os import time import json -import zipfile from email.utils import formatdate, parsedate from os.path import exists @@ -435,10 +434,7 @@ class NodeTest(tests.Test): 'notes': '', 'requires': ['foo', 'bar'], }) - bundle = zipfile.ZipFile('blob', 'w') - bundle.writestr('topdir/probe', 'probe') - bundle.close() - blob = file('blob', 'rb').read() + blob = self.zips(('topdir/probe', 'probe')) client.request('PUT', ['implementation', impl, 'data'], blob) self.assertEqual(blob, client.get(['context', context], cmd='clone', version='1', stability='stable', requires=['foo', 'bar'])) diff --git a/tests/units/resources/implementation.py b/tests/units/resources/implementation.py index 967b588..b50344d 100755 --- a/tests/units/resources/implementation.py +++ b/tests/units/resources/implementation.py @@ -2,7 +2,6 @@ # sugar-lint: disable import os -import zipfile import xapian @@ -88,23 +87,16 @@ class ImplementationTest(tests.Test): self.assertEqual('image/png', self.node_volume['implementation'].get(impl).meta('data')['mime_type']) client.put(['context', context, 'type'], 'activity') - bundle = zipfile.ZipFile('blob', 'w') - bundle.writestr('topdir/probe', 'probe') - bundle.close() - client.request('PUT', ['implementation', impl, 'data'], file('blob', 'rb').read()) + client.request('PUT', ['implementation', impl, 'data'], self.zips(('topdir/probe', 'probe'))) data = self.node_volume['implementation'].get(impl).meta('data') self.assertEqual('application/vnd.olpc-sugar', data['mime_type']) self.assertNotEqual(5, data['blob_size']) - self.assertEqual(5, data.get('uncompressed_size')) + self.assertEqual(5, data.get('unpack_size')) def test_ActivityUrls(self): - bundle = zipfile.ZipFile('blob', 'w') - bundle.writestr('topdir/probe', 'probe') - bundle.close() - bundle = file('blob', 'rb').read() - bundle_size = os.stat('blob').st_size - uncompressed_size = 5 + bundle = self.zips(('topdir/probe', 'probe')) + unpack_size = len('probe') class Files(db.CommandsProcessor): @@ -135,8 +127,8 @@ class ImplementationTest(tests.Test): data = self.node_volume['implementation'].get(impl).meta('data') self.assertEqual('application/vnd.olpc-sugar', data['mime_type']) - self.assertEqual(bundle_size, data['blob_size']) - self.assertEqual(uncompressed_size, data.get('uncompressed_size')) + self.assertEqual(len(bundle), data['blob_size']) + self.assertEqual(unpack_size, data.get('unpack_size')) self.assertEqual('http://127.0.0.1:9999/bundle', data['url']) assert 'blob' not in data |