diff options
author | Aleksey Lim <alsroot@sugarlabs.org> | 2012-08-09 16:59:50 (GMT) |
---|---|---|
committer | Aleksey Lim <alsroot@sugarlabs.org> | 2012-08-09 17:30:56 (GMT) |
commit | f81fee8335345b025610fd8789ee19c03d9e15f4 (patch) | |
tree | 6d0a0fb5013ee204494e1c34fcf6efb6cbede4be | |
parent | c8ce1a074715d809dd3ec435978113e5b0c30ed7 (diff) |
Process stats in node sync
-rwxr-xr-x | sugar-network-service | 2 | ||||
-rw-r--r-- | sugar_network/client/bus.py | 3 | ||||
-rw-r--r-- | sugar_network/node/stats.py | 69 | ||||
-rw-r--r-- | sugar_network/node/sync_master.py | 14 | ||||
-rw-r--r-- | sugar_network/node/sync_node.py | 30 | ||||
-rw-r--r-- | sugar_network/resources/user.py | 18 | ||||
-rw-r--r-- | sugar_network/toolkit/collection.py | 5 | ||||
-rw-r--r-- | sugar_network/toolkit/files_sync.py | 4 | ||||
-rw-r--r-- | sugar_network/toolkit/rrd.py | 15 | ||||
-rw-r--r-- | tests/__init__.py | 5 | ||||
-rwxr-xr-x | tests/integration/sync.py | 22 | ||||
-rwxr-xr-x | tests/units/sync_master.py | 52 | ||||
-rwxr-xr-x | tests/units/sync_node.py | 100 |
13 files changed, 301 insertions, 38 deletions
diff --git a/sugar-network-service b/sugar-network-service index 25bcb55..e525ab8 100755 --- a/sugar-network-service +++ b/sugar-network-service @@ -33,6 +33,7 @@ from sugar_network.local.dbus_network import Network from sugar_network.local.bus import IPCServer from sugar_network.local.mounts import HomeMount, RemoteMount from sugar_network.local.mountset import Mountset +from sugar_network.node import stats from sugar_network.resources.volume import Volume from active_toolkit.options import Option from active_toolkit import util, printf, application, coroutine, enforce @@ -259,6 +260,7 @@ Option.seek('main', [application.debug]) Option.seek('webui', webui) Option.seek('local', local) Option.seek('node', [node.port, node.subscribe_port, node.sync_dirs]) +Option.seek('stats', stats) Option.seek('active-document', ad) application = Application( diff --git a/sugar_network/client/bus.py b/sugar_network/client/bus.py index f273a52..2fb4b68 100644 --- a/sugar_network/client/bus.py +++ b/sugar_network/client/bus.py @@ -61,7 +61,8 @@ class Client(object): cls._connection = None @classmethod - def call(cls, method, cmd=None, content=None, content_type=None, **kwargs): + def call(cls, method, cmd=None, content=None, + content_type='application/json', **kwargs): request = ad.Request(kwargs) request.access_level = ad.ACCESS_LOCAL request.principal = sugar.uid() diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py index 0f6a049..817def5 100644 --- a/sugar_network/node/stats.py +++ b/sugar_network/node/stats.py @@ -13,7 +13,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import os +import logging +from os.path import join, exists, isdir + +from pylru import lrucache + from active_toolkit.options import Option +from sugar_network.toolkit.rrd import Rrd, ReadOnlyRrd +from sugar_network.toolkit.collection import Sequence, PersistentSequence stats = Option( @@ -32,3 +40,64 @@ stats_rras = Option( 'space separated list of RRAs for RRD databases', default=['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'], type_cast=Option.list_cast, type_repr=Option.list_repr) + + +_logger = logging.getLogger('node.stats') +_cache = lrucache(32) + + +def get_rrd(user): + if user in _cache: + return _cache[user] + else: + rrd = _cache[user] = Rrd(join(stats_root.value, user[:2], user), + stats_step.value, stats_rras.value) + return rrd + + +def pull(in_seq, packet): + for user, rrd in _walk_rrd(): + in_seq.setdefault(user, {}) + + for db, db_start, db_end in rrd.dbs: + seq = in_seq[user].get(db) + if seq is None: + seq = in_seq[user][db] = PersistentSequence( + join(rrd.root, db + '.push'), [1, None]) + elif seq is not dict: + seq = in_seq[user][db] = Sequence(seq) + out_seq = Sequence() + + def dump(): + for start, end in seq: + for timestamp, values in \ + rrd.get(db, max(start, db_start), end or db_end): + yield {'timestamp': timestamp, 'values': values} + seq.exclude(start, timestamp) + out_seq.include(start, timestamp) + start = timestamp + + packet.push(dump(), arcname=join('stats', user, db), + cmd='stats_push', user=user, db=db, + sequence=out_seq) + + +def commit(sequences): + for user, dbs in sequences.items(): + for db, merged in dbs.items(): + seq = PersistentSequence( + join(stats_root.value, user[:2], user, db + '.push'), + [1, None]) + seq.exclude(merged) + seq.commit() + + +def _walk_rrd(): + if not exists(stats_root.value): + return + for users_dirname in os.listdir(stats_root.value): + users_dir = join(stats_root.value, users_dirname) + if not isdir(users_dir): + continue + for user in os.listdir(users_dir): + yield user, ReadOnlyRrd(join(users_dir, user)) diff --git a/sugar_network/node/sync_master.py b/sugar_network/node/sync_master.py index c1016a0..86e86a0 100644 --- a/sugar_network/node/sync_master.py +++ b/sugar_network/node/sync_master.py @@ -29,6 +29,7 @@ from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, \ OutPacket, DiskFull from sugar_network.toolkit.collection import Sequence from sugar_network.toolkit.files_sync import Seeders +from sugar_network.node import stats from active_toolkit import coroutine, util, enforce @@ -64,6 +65,7 @@ class SyncCommands(object): pushed = Sequence() merged = Sequence() cookie = _Cookie() + stats_pushed = {} for record in in_packet.records(dst=self._guid): cmd = record.get('cmd') @@ -77,11 +79,23 @@ class SyncCommands(object): cookie['sn_pull'].include(record['sequence']) elif cmd == 'files_pull': cookie[record['directory']].include(record['sequence']) + elif cmd == 'stats_push': + db = record['db'] + user = record['user'] + + rrd = stats.get_rrd(user) + rrd.put(db, record['values'], record['timestamp']) + + user_seq = stats_pushed.setdefault(user, {}) + db_seq = user_seq.setdefault(db, Sequence()) + db_seq.include(record['sequence']) enforce(not merged or pushed, '"sn_push" record without "sn_commit"') if pushed: out_packet.push(cmd='sn_ack', sequence=pushed, merged=merged) + if stats_pushed: + out_packet.push(cmd='stats_ack', sequence=stats_pushed) cookie['sn_pull'].exclude(merged) # Read passed cookie only after excluding `merged`. diff --git a/sugar_network/node/sync_node.py b/sugar_network/node/sync_node.py index c887ccd..b19a0db 100644 --- a/sugar_network/node/sync_node.py +++ b/sugar_network/node/sync_node.py @@ -22,11 +22,11 @@ from gettext import gettext as _ import active_document as ad from sugar_network import node, local -from sugar_network.toolkit import mounts_monitor, sneakernet +from sugar_network.toolkit import mounts_monitor, sneakernet, files_sync from sugar_network.toolkit.collection import MutableStack -from sugar_network.toolkit.files_sync import Leechers from sugar_network.toolkit.collection import Sequence, PersistentSequence from sugar_network.toolkit.sneakernet import OutFilePacket, DiskFull +from sugar_network.node import stats from active_toolkit import coroutine, util, enforce @@ -40,7 +40,8 @@ class SyncCommands(object): def __init__(self, sequences_path): self._sync = coroutine.Pool() self._sync_mounts = MutableStack() - self._file_syncs = Leechers(node.sync_dirs.value, sequences_path) + self._file_syncs = \ + files_sync.Leechers(node.sync_dirs.value, sequences_path) self._sync_session = None self._push_seq = PersistentSequence( join(sequences_path, 'push'), [1, None]) @@ -82,14 +83,18 @@ class SyncCommands(object): def break_sync(self): self._sync.kill() - def sync(self, path, accept_length=None, push_sequence=None, session=None): + def sync(self, path, accept_length=None, diff_sequence=None, + stats_sequence=None, session=None): enforce(self._mount is not None, 'No server to sync') to_push_seq = Sequence(empty_value=[1, None]) - if push_sequence is None: + if diff_sequence is None: to_push_seq.include(self._push_seq) else: - to_push_seq = Sequence(push_sequence) + to_push_seq = Sequence(diff_sequence) + + if stats_sequence is None: + stats_sequence = {} if session is None: session_is_new = True @@ -132,8 +137,12 @@ class SyncCommands(object): try: self._mount.volume.diff(to_push_seq, packet) + stats.pull(stats_sequence, packet) except DiskFull: - return {'push_sequence': to_push_seq, 'session': session} + return {'diff_sequence': to_push_seq, + 'stats_sequence': stats_sequence, + 'session': session, + } else: break @@ -195,6 +204,11 @@ class SyncCommands(object): to_push_seq.exclude(record['sequence']) self._mount.volume.seqno.next() self._mount.volume.seqno.commit() + elif cmd == 'stats_ack' and \ + record['dst'] == self._mount.node_guid: + _logger.debug('Processing %r stats ACK from %r', + record, packet) + stats.commit(record['sequence']) elif record.get('directory') in self._file_syncs: self._file_syncs[record['directory']].push(record) @@ -207,6 +221,6 @@ class SyncCommands(object): _logger.debug('Found %r sync mount but no servers', path) def __lost_mount_cb(self, path): - self._sync_mounts.remove(join(path, _SYNC_DIRNAME)) + self._sync_mounts.remove(path) if not self._sync_mounts: self.break_sync() diff --git a/sugar_network/resources/user.py b/sugar_network/resources/user.py index 591f4bd..a120832 100644 --- a/sugar_network/resources/user.py +++ b/sugar_network/resources/user.py @@ -13,17 +13,12 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -from os.path import join - import active_document as ad from sugar_network.node import stats -from sugar_network.toolkit.rrd import Rrd class User(ad.Document): - _rrd = {} - @ad.active_property(slot=1, prefix='N', full_text=True) def name(self, value): return value @@ -60,7 +55,7 @@ class User(ad.Document): permissions=ad.ACCESS_AUTHOR) def _stats_info(self): status = {} - rrd = User._get_rrd(self.guid) + rrd = stats.get_rrd(self.guid) for name, __, last_update in rrd.dbs: status[name] = last_update + stats.stats_step.value # TODO Process client configuration in more general manner @@ -75,15 +70,6 @@ class User(ad.Document): def _stats_upload(self, request): name = request.content['name'] values = request.content['values'] - rrd = User._get_rrd(self.guid) + rrd = stats.get_rrd(self.guid) for timestamp, values in values: rrd.put(name, values, timestamp) - - @classmethod - def _get_rrd(cls, guid): - rrd = cls._rrd.get(guid) - if rrd is None: - rrd = cls._rrd[guid] = Rrd( - join(stats.stats_root.value, guid[:2], guid), - stats.stats_step.value, stats.stats_rras.value) - return rrd diff --git a/sugar_network/toolkit/collection.py b/sugar_network/toolkit/collection.py index 1413ffc..f71b5cf 100644 --- a/sugar_network/toolkit/collection.py +++ b/sugar_network/toolkit/collection.py @@ -16,7 +16,7 @@ import os import json import collections -from os.path import exists +from os.path import exists, dirname from active_toolkit import util, enforce @@ -201,6 +201,9 @@ class PersistentSequence(Sequence): self[:] = json.load(f) def commit(self): + dir_path = dirname(self._path) + if dir_path and not exists(dir_path): + os.makedirs(dir_path) with util.new_file(self._path) as f: json.dump(self, f) f.flush() diff --git a/sugar_network/toolkit/files_sync.py b/sugar_network/toolkit/files_sync.py index 088eb08..636f067 100644 --- a/sugar_network/toolkit/files_sync.py +++ b/sugar_network/toolkit/files_sync.py @@ -37,8 +37,6 @@ class Seeder(object): self._seqno = seqno self._index = [] self._stamp = 0 - # Below calls will mutate `self._index` and trigger coroutine switches. - # Thus, avoid chnaing `self._index` by different coroutines. self._mutex = coroutine.Lock() if exists(self._index_path): @@ -49,6 +47,8 @@ class Seeder(object): os.makedirs(self._files_path) def pull(self, in_seq, packet): + # Below calls will mutate `self._index` and trigger coroutine switches. + # Thus, avoid changing `self._index` by different coroutines. with self._mutex: self._sync() orig_seq = Sequence(in_seq) diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py index 87b3c40..5fe9f44 100644 --- a/sugar_network/toolkit/rrd.py +++ b/sugar_network/toolkit/rrd.py @@ -17,7 +17,7 @@ $Repo: git://git.sugarlabs.org/alsroot/codelets.git$ $File: src/rrd.py$ -$Date: 2012-07-12$ +$Date: 2012-08-09$ """ @@ -59,6 +59,10 @@ class Rrd(object): self._dbset(name).load(filename, int(revision or 0)) @property + def root(self): + return self._root + + @property def step(self): return self._step @@ -83,6 +87,15 @@ class Rrd(object): return db +class ReadOnlyRrd(Rrd): + + def __init__(self, root): + Rrd.__init__(self, root, 1, []) + + def put(self, name, values, timestamp=None): + raise RuntimeError('Write access is denied') + + class _DbSet(object): def __init__(self, root, name, step, rras): diff --git a/tests/__init__.py b/tests/__init__.py index f5dfad5..0fdc6c3 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -28,6 +28,7 @@ from sugar_network.resources.context import Context from sugar_network.node.router import Router from sugar_network.node.commands import NodeCommands from sugar_network.node.subscribe_socket import SubscribeSocket +from sugar_network.node import stats from sugar_network.resources.volume import Volume @@ -83,6 +84,10 @@ class Test(unittest.TestCase): local.mounts_root.value = None mounts_monitor.stop() mounts_monitor._COMPLETE_MOUNT_TIMEOUT = .1 + stats.stats_root.value = tmpdir + '/stats' + stats.stats_step.value = 1 + stats.stats_rras.value = ['RRA:AVERAGE:0.5:1:100'] + stats._cache.clear() Volume.RESOURCES = [ 'sugar_network.resources.user', diff --git a/tests/integration/sync.py b/tests/integration/sync.py index 896bc4c..50e3502 100755 --- a/tests/integration/sync.py +++ b/tests/integration/sync.py @@ -2,18 +2,22 @@ # sugar-lint: disable import os +import time import shutil import signal from cStringIO import StringIO from contextlib import contextmanager from os.path import exists +import rrdtool + from __init__ import tests import active_document as ad from sugar_network import Client from sugar_network.local import local_root +from sugar_network import sugar from sugar_network.toolkit.sneakernet import InPacket, OutPacket from active_toolkit import util, coroutine @@ -32,7 +36,10 @@ class SyncTest(tests.Test): '--data-root=master/db', '--index-flush-threshold=1024', '--index-flush-timeout=3', '--only-commit-events', '--tmpdir=tmp', '--sync-dirs=master/files/1:master/files/2', - '--pull-timeout=1', '-DDDF', 'start', + '--pull-timeout=1', + '--stats', '--stats-root=master/stats', '--stats-step=1', + '--stats-rras=RRA:AVERAGE:0.5:1:100', + '-DDDF', 'start', ]) self.node_pid = self.popen([ 'sugar-network-service', '--port=8200', '--subscribe-port=8201', @@ -40,6 +47,8 @@ class SyncTest(tests.Test): '--mounts-root=mnt', '--server-mode', '--tmpdir=tmp', '--api-url=http://localhost:8100', '--sync-dirs=node/files/1:node/files/2', + '--stats', '--stats-root=node/stats', '--stats-step=1', + '--stats-rras=RRA:AVERAGE:0.5:1:100', '-DDD', 'debug', ]) @@ -97,6 +106,12 @@ class SyncTest(tests.Test): self.touch(('preview_4', 'preview_4')) context.upload_blob('preview', 'preview_4') + stats_timestamp = int(time.time()) + client.call('POST', 'stats-upload', mountpoint=mountpoint, document='user', guid=sugar.uid(), content={ + 'name': 'db', + 'values': [(stats_timestamp + 1, {'f': 1})], + }) + # Create node push packets with newly create data self.touch('mnt_2/.sugar-network-sync') os.rename('mnt_2', 'mnt/mnt_2') @@ -134,6 +149,11 @@ class SyncTest(tests.Test): self.assertEqual('1', file('node/files/1/1').read()) self.assertEqual('2', file('node/files/2/2').read()) + master_stats = 'master/stats/%s/%s/db.rrd' % (sugar.uid()[:2], sugar.uid()) + assert exists(master_stats) + __, __, values = rrdtool.fetch(master_stats, 'AVERAGE', '-s', str(stats_timestamp - 1), '-e', str(stats_timestamp + 1)) + self.assertEqual([(None,), (1,), (None,)], values) + def wait_for_events(self, *events): events = list(events) connected = coroutine.Event() diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py index a1d9295..ea2e6fb 100755 --- a/tests/units/sync_master.py +++ b/tests/units/sync_master.py @@ -2,11 +2,14 @@ # sugar-lint: disable import os +import time import json import base64 import hashlib from os.path import join, exists +import rrdtool + from __init__ import tests import active_document as ad @@ -228,6 +231,55 @@ class SyncMasterTest(tests.Test): ], response.get('Set-Cookie')) + def test_push_ProcessStatsPushes(self): + master = MasterCommands('master') + request = Request() + response = ad.Response() + + ts = int(time.time()) - 1000 + packet = OutBufferPacket(src='node', dst='master') + packet.push(cmd='stats_push', user='user1', db='db1', sequence=[[1, ts + 2]], data=[ + {'timestamp': ts + 1, 'values': {'f': 1}}, + {'timestamp': ts + 2, 'values': {'f': 2}}, + ]) + packet.push(cmd='stats_push', user='user1', db='db2', sequence=[[2, ts + 3]], data=[ + {'timestamp': ts + 3, 'values': {'f': 3}}, + ]) + packet.push(cmd='stats_push', user='user2', db='db3', sequence=[[ts + 4, ts + 4]], data=[ + {'timestamp': ts + 4, 'values': {'f': 4}}, + ]) + request.content_stream = packet.pop() + request.content_length = len(request.content_stream.getvalue()) + + reply = master.push(request, response) + self.assertEqual([ + 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly', + 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly', + ], + response.get('Set-Cookie')) + + packet = InPacket(stream=reply) + self.assertEqual('master', packet.header['src']) + self.assertEqual('node', packet.header.get('dst')) + self.assertEqual([ + {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'stats_ack', + 'sequence': { + 'user1': {'db1': [[1, ts + 2]], 'db2': [[2, ts + 3]]}, + 'user2': {'db3': [[ts + 4, ts + 4]]}, + }, + } + ], + [i for i in packet]) + + __, __, values = rrdtool.fetch('stats/us/user1/db1.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 2)) + self.assertEqual([(1,), (2,), (None,)], values) + + __, __, values = rrdtool.fetch('stats/us/user1/db2.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 3)) + self.assertEqual([(None,), (None,), (3,), (None,)], values) + + __, __, values = rrdtool.fetch('stats/us/user2/db3.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 4)) + self.assertEqual([(None,), (None,), (None,), (4,), (None,)], values) + def test_pull_ProcessPulls(self): master = MasterCommands('master') request = Request() diff --git a/tests/units/sync_node.py b/tests/units/sync_node.py index d943066..1e56158 100755 --- a/tests/units/sync_node.py +++ b/tests/units/sync_node.py @@ -2,9 +2,12 @@ # sugar-lint: disable import os +import time import json from os.path import exists, join +import rrdtool + from __init__ import tests import active_document as ad @@ -87,15 +90,15 @@ class SyncNodeTest(tests.Test): kwargs = node.sync('mnt', accept_length=1024, session=0) self.assertEqual(0, kwargs['session']) - self.assertEqual([[1, None]], kwargs['push_sequence']) + self.assertEqual([[1, None]], kwargs['diff_sequence']) self.assertEqual([], self.read_packets('mnt')) - kwargs = node.sync('mnt', accept_length=1024, push_sequence=kwargs['push_sequence'], session=0) - self.assertEqual([[1, None]], kwargs['push_sequence']) + kwargs = node.sync('mnt', accept_length=1024, diff_sequence=kwargs['diff_sequence'], session=0) + self.assertEqual([[1, None]], kwargs['diff_sequence']) self.assertEqual([], self.read_packets('mnt')) - kwargs = node.sync('mnt', accept_length=1024 * 2, push_sequence=kwargs['push_sequence'], session=1) - self.assertEqual([[2, None]], kwargs['push_sequence']) + kwargs = node.sync('mnt', accept_length=1024 * 2, diff_sequence=kwargs['diff_sequence'], session=1) + self.assertEqual([[2, None]], kwargs['diff_sequence']) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 1, 'diff': { 'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime}, @@ -109,8 +112,8 @@ class SyncNodeTest(tests.Test): ], self.read_packets('mnt')) - kwargs = node.sync('mnt', accept_length=1024 * 3, push_sequence=kwargs['push_sequence'], session=2) - self.assertEqual([[4, None]], kwargs['push_sequence']) + kwargs = node.sync('mnt', accept_length=1024 * 3, diff_sequence=kwargs['diff_sequence'], session=2) + self.assertEqual([[4, None]], kwargs['diff_sequence']) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '2', 'session': 2, 'diff': { 'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime}, @@ -132,7 +135,7 @@ class SyncNodeTest(tests.Test): ], self.read_packets('mnt')) - kwargs = node.sync('mnt', push_sequence=kwargs['push_sequence'], session=3) + kwargs = node.sync('mnt', diff_sequence=kwargs['diff_sequence'], session=3) self.assertEqual(None, kwargs) self.assertEqual([ {'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '4', 'session': 3, 'diff': { @@ -256,6 +259,30 @@ class SyncNodeTest(tests.Test): ['new one'], [i.header['session'] for i in sneakernet.walk('mnt')]) + def test_Import_StatsAcks(self): + node = SyncCommands('node', 'master') + + master_packet = OutFilePacket('mnt', src='master') + master_packet.push(data=[ + {'cmd': 'stats_ack', 'dst': 'node', 'sequence': { + 'user1': { + 'db1': [[1, 2]], + 'db2': [[3, 4]], + }, + 'user2': { + 'db3': [[5, 6]], + }, + }}, + ]) + master_packet.close() + + node.sync('mnt', session=0) + assert exists(master_packet.path) + + self.assertEqual([[3, None]], json.load(file('stats/us/user1/db1.push'))) + self.assertEqual([[1, 2], [5, None]], json.load(file('stats/us/user1/db2.push'))) + self.assertEqual([[1, 4], [7, None]], json.load(file('stats/us/user2/db3.push'))) + def test_sync_session(self): node = SyncCommands('node', 'master') node.volume['document'].create(guid='1', prop='*' * 1024) @@ -313,6 +340,63 @@ class SyncNodeTest(tests.Test): ], self.read_packets('mnt')) + def test_ExportStats(self): + node = SyncCommands('node', 'master') + + ts = int(time.time()) + os.makedirs('stats/1/1') + rrdtool.create('stats/1/1/db1.rrd', '--start', str(ts), '-s', '1', 'DS:f:GAUGE:1:U:U', 'RRA:AVERAGE:0.5:1:100') + rrdtool.update('stats/1/1/db1.rrd', '%s:1' % (ts + 1), '%s:2' % (ts + 2)) + rrdtool.create('stats/1/1/db2.rrd', '--start', str(ts + 2), '-s', '1', 'DS:f:GAUGE:1:U:U', 'RRA:AVERAGE:0.5:1:100') + rrdtool.update('stats/1/1/db2.rrd', '%s:3' % (ts + 3), '%s:4' % (ts + 4)) + os.makedirs('stats/2/2') + rrdtool.create('stats/2/2/db3.rrd', '--start', str(ts + 4), '-s', '1', 'DS:f:GAUGE:1:U:U', 'RRA:AVERAGE:0.5:1:100') + rrdtool.update('stats/2/2/db3.rrd', '%s:5' % (ts + 5)) + + node.sync('mnt', session=0) + + self.assertEqual([ + {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0, + 'cmd': 'stats_push', 'user': '1', 'db': 'db1', 'sequence': [[1, ts + 2]], 'timestamp': ts + 1, 'values': {'f': 1}, + }, + {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0, + 'cmd': 'stats_push', 'user': '1', 'db': 'db1', 'sequence': [[1, ts + 2]], 'timestamp': ts + 2, 'values': {'f': 2}, + }, + {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0, + 'cmd': 'stats_push', 'user': '1', 'db': 'db2', 'sequence': [[1, ts + 4]], 'timestamp': ts + 3, 'values': {'f': 3}, + }, + {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0, + 'cmd': 'stats_push', 'user': '1', 'db': 'db2', 'sequence': [[1, ts + 4]], 'timestamp': ts + 4, 'values': {'f': 4}, + }, + {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0, + 'cmd': 'stats_push', 'user': '2', 'db': 'db3', 'sequence': [[1, ts + 5]], 'timestamp': ts + 5, 'values': {'f': 5}, + }, + ], + self.read_packets('mnt')) + + def test_LimittedExportStats(self): + node = SyncCommands('node', 'master') + + ts = int(time.time()) + os.makedirs('stats/us/user') + rrdtool.create('stats/us/user/db.rrd', '--start', str(ts), '-s', '1', 'DS:f:GAUGE:1:U:U', 'RRA:AVERAGE:0.5:1:100') + rrdtool.update('stats/us/user/db.rrd', '%s:1' % (ts + 1), '%s:2' % (ts + 2)) + + node.volume['document'].create(guid='1', prop='*' * 1024) + + kwargs = node.sync('mnt1', accept_length=1024 + 512, session=0) + self.assertEqual({'user': {'db': [[1, None]]}}, kwargs['stats_sequence']) + self.assertEqual([], self.read_packets('mnt1')[3:]) + + kwargs = node.sync('mnt2', stats_sequence={'user': {'db': [[ts + 2, None]]}}, session=1) + self.assertEqual(None, kwargs) + self.assertEqual([ + {'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 1, + 'cmd': 'stats_push', 'user': 'user', 'db': 'db', 'sequence': [[ts + 2, ts + 2]], 'timestamp': ts + 2, 'values': {'f': 2}, + }, + ], + self.read_packets('mnt2')[2:]) + def read_packets(self, path): result = [] for filename in sorted(os.listdir(path)): |