Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksey Lim <alsroot@sugarlabs.org>2012-08-09 16:59:50 (GMT)
committer Aleksey Lim <alsroot@sugarlabs.org>2012-08-09 17:30:56 (GMT)
commitf81fee8335345b025610fd8789ee19c03d9e15f4 (patch)
tree6d0a0fb5013ee204494e1c34fcf6efb6cbede4be
parentc8ce1a074715d809dd3ec435978113e5b0c30ed7 (diff)
Process stats in node sync
-rwxr-xr-xsugar-network-service2
-rw-r--r--sugar_network/client/bus.py3
-rw-r--r--sugar_network/node/stats.py69
-rw-r--r--sugar_network/node/sync_master.py14
-rw-r--r--sugar_network/node/sync_node.py30
-rw-r--r--sugar_network/resources/user.py18
-rw-r--r--sugar_network/toolkit/collection.py5
-rw-r--r--sugar_network/toolkit/files_sync.py4
-rw-r--r--sugar_network/toolkit/rrd.py15
-rw-r--r--tests/__init__.py5
-rwxr-xr-xtests/integration/sync.py22
-rwxr-xr-xtests/units/sync_master.py52
-rwxr-xr-xtests/units/sync_node.py100
13 files changed, 301 insertions, 38 deletions
diff --git a/sugar-network-service b/sugar-network-service
index 25bcb55..e525ab8 100755
--- a/sugar-network-service
+++ b/sugar-network-service
@@ -33,6 +33,7 @@ from sugar_network.local.dbus_network import Network
from sugar_network.local.bus import IPCServer
from sugar_network.local.mounts import HomeMount, RemoteMount
from sugar_network.local.mountset import Mountset
+from sugar_network.node import stats
from sugar_network.resources.volume import Volume
from active_toolkit.options import Option
from active_toolkit import util, printf, application, coroutine, enforce
@@ -259,6 +260,7 @@ Option.seek('main', [application.debug])
Option.seek('webui', webui)
Option.seek('local', local)
Option.seek('node', [node.port, node.subscribe_port, node.sync_dirs])
+Option.seek('stats', stats)
Option.seek('active-document', ad)
application = Application(
diff --git a/sugar_network/client/bus.py b/sugar_network/client/bus.py
index f273a52..2fb4b68 100644
--- a/sugar_network/client/bus.py
+++ b/sugar_network/client/bus.py
@@ -61,7 +61,8 @@ class Client(object):
cls._connection = None
@classmethod
- def call(cls, method, cmd=None, content=None, content_type=None, **kwargs):
+ def call(cls, method, cmd=None, content=None,
+ content_type='application/json', **kwargs):
request = ad.Request(kwargs)
request.access_level = ad.ACCESS_LOCAL
request.principal = sugar.uid()
diff --git a/sugar_network/node/stats.py b/sugar_network/node/stats.py
index 0f6a049..817def5 100644
--- a/sugar_network/node/stats.py
+++ b/sugar_network/node/stats.py
@@ -13,7 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import os
+import logging
+from os.path import join, exists, isdir
+
+from pylru import lrucache
+
from active_toolkit.options import Option
+from sugar_network.toolkit.rrd import Rrd, ReadOnlyRrd
+from sugar_network.toolkit.collection import Sequence, PersistentSequence
stats = Option(
@@ -32,3 +40,64 @@ stats_rras = Option(
'space separated list of RRAs for RRD databases',
default=['RRA:AVERAGE:0.5:1:4320', 'RRA:AVERAGE:0.5:5:2016'],
type_cast=Option.list_cast, type_repr=Option.list_repr)
+
+
+_logger = logging.getLogger('node.stats')
+_cache = lrucache(32)
+
+
+def get_rrd(user):
+ if user in _cache:
+ return _cache[user]
+ else:
+ rrd = _cache[user] = Rrd(join(stats_root.value, user[:2], user),
+ stats_step.value, stats_rras.value)
+ return rrd
+
+
+def pull(in_seq, packet):
+ for user, rrd in _walk_rrd():
+ in_seq.setdefault(user, {})
+
+ for db, db_start, db_end in rrd.dbs:
+ seq = in_seq[user].get(db)
+ if seq is None:
+ seq = in_seq[user][db] = PersistentSequence(
+ join(rrd.root, db + '.push'), [1, None])
+ elif seq is not dict:
+ seq = in_seq[user][db] = Sequence(seq)
+ out_seq = Sequence()
+
+ def dump():
+ for start, end in seq:
+ for timestamp, values in \
+ rrd.get(db, max(start, db_start), end or db_end):
+ yield {'timestamp': timestamp, 'values': values}
+ seq.exclude(start, timestamp)
+ out_seq.include(start, timestamp)
+ start = timestamp
+
+ packet.push(dump(), arcname=join('stats', user, db),
+ cmd='stats_push', user=user, db=db,
+ sequence=out_seq)
+
+
+def commit(sequences):
+ for user, dbs in sequences.items():
+ for db, merged in dbs.items():
+ seq = PersistentSequence(
+ join(stats_root.value, user[:2], user, db + '.push'),
+ [1, None])
+ seq.exclude(merged)
+ seq.commit()
+
+
+def _walk_rrd():
+ if not exists(stats_root.value):
+ return
+ for users_dirname in os.listdir(stats_root.value):
+ users_dir = join(stats_root.value, users_dirname)
+ if not isdir(users_dir):
+ continue
+ for user in os.listdir(users_dir):
+ yield user, ReadOnlyRrd(join(users_dir, user))
diff --git a/sugar_network/node/sync_master.py b/sugar_network/node/sync_master.py
index c1016a0..86e86a0 100644
--- a/sugar_network/node/sync_master.py
+++ b/sugar_network/node/sync_master.py
@@ -29,6 +29,7 @@ from sugar_network.toolkit.sneakernet import InPacket, OutBufferPacket, \
OutPacket, DiskFull
from sugar_network.toolkit.collection import Sequence
from sugar_network.toolkit.files_sync import Seeders
+from sugar_network.node import stats
from active_toolkit import coroutine, util, enforce
@@ -64,6 +65,7 @@ class SyncCommands(object):
pushed = Sequence()
merged = Sequence()
cookie = _Cookie()
+ stats_pushed = {}
for record in in_packet.records(dst=self._guid):
cmd = record.get('cmd')
@@ -77,11 +79,23 @@ class SyncCommands(object):
cookie['sn_pull'].include(record['sequence'])
elif cmd == 'files_pull':
cookie[record['directory']].include(record['sequence'])
+ elif cmd == 'stats_push':
+ db = record['db']
+ user = record['user']
+
+ rrd = stats.get_rrd(user)
+ rrd.put(db, record['values'], record['timestamp'])
+
+ user_seq = stats_pushed.setdefault(user, {})
+ db_seq = user_seq.setdefault(db, Sequence())
+ db_seq.include(record['sequence'])
enforce(not merged or pushed,
'"sn_push" record without "sn_commit"')
if pushed:
out_packet.push(cmd='sn_ack', sequence=pushed, merged=merged)
+ if stats_pushed:
+ out_packet.push(cmd='stats_ack', sequence=stats_pushed)
cookie['sn_pull'].exclude(merged)
# Read passed cookie only after excluding `merged`.
diff --git a/sugar_network/node/sync_node.py b/sugar_network/node/sync_node.py
index c887ccd..b19a0db 100644
--- a/sugar_network/node/sync_node.py
+++ b/sugar_network/node/sync_node.py
@@ -22,11 +22,11 @@ from gettext import gettext as _
import active_document as ad
from sugar_network import node, local
-from sugar_network.toolkit import mounts_monitor, sneakernet
+from sugar_network.toolkit import mounts_monitor, sneakernet, files_sync
from sugar_network.toolkit.collection import MutableStack
-from sugar_network.toolkit.files_sync import Leechers
from sugar_network.toolkit.collection import Sequence, PersistentSequence
from sugar_network.toolkit.sneakernet import OutFilePacket, DiskFull
+from sugar_network.node import stats
from active_toolkit import coroutine, util, enforce
@@ -40,7 +40,8 @@ class SyncCommands(object):
def __init__(self, sequences_path):
self._sync = coroutine.Pool()
self._sync_mounts = MutableStack()
- self._file_syncs = Leechers(node.sync_dirs.value, sequences_path)
+ self._file_syncs = \
+ files_sync.Leechers(node.sync_dirs.value, sequences_path)
self._sync_session = None
self._push_seq = PersistentSequence(
join(sequences_path, 'push'), [1, None])
@@ -82,14 +83,18 @@ class SyncCommands(object):
def break_sync(self):
self._sync.kill()
- def sync(self, path, accept_length=None, push_sequence=None, session=None):
+ def sync(self, path, accept_length=None, diff_sequence=None,
+ stats_sequence=None, session=None):
enforce(self._mount is not None, 'No server to sync')
to_push_seq = Sequence(empty_value=[1, None])
- if push_sequence is None:
+ if diff_sequence is None:
to_push_seq.include(self._push_seq)
else:
- to_push_seq = Sequence(push_sequence)
+ to_push_seq = Sequence(diff_sequence)
+
+ if stats_sequence is None:
+ stats_sequence = {}
if session is None:
session_is_new = True
@@ -132,8 +137,12 @@ class SyncCommands(object):
try:
self._mount.volume.diff(to_push_seq, packet)
+ stats.pull(stats_sequence, packet)
except DiskFull:
- return {'push_sequence': to_push_seq, 'session': session}
+ return {'diff_sequence': to_push_seq,
+ 'stats_sequence': stats_sequence,
+ 'session': session,
+ }
else:
break
@@ -195,6 +204,11 @@ class SyncCommands(object):
to_push_seq.exclude(record['sequence'])
self._mount.volume.seqno.next()
self._mount.volume.seqno.commit()
+ elif cmd == 'stats_ack' and \
+ record['dst'] == self._mount.node_guid:
+ _logger.debug('Processing %r stats ACK from %r',
+ record, packet)
+ stats.commit(record['sequence'])
elif record.get('directory') in self._file_syncs:
self._file_syncs[record['directory']].push(record)
@@ -207,6 +221,6 @@ class SyncCommands(object):
_logger.debug('Found %r sync mount but no servers', path)
def __lost_mount_cb(self, path):
- self._sync_mounts.remove(join(path, _SYNC_DIRNAME))
+ self._sync_mounts.remove(path)
if not self._sync_mounts:
self.break_sync()
diff --git a/sugar_network/resources/user.py b/sugar_network/resources/user.py
index 591f4bd..a120832 100644
--- a/sugar_network/resources/user.py
+++ b/sugar_network/resources/user.py
@@ -13,17 +13,12 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from os.path import join
-
import active_document as ad
from sugar_network.node import stats
-from sugar_network.toolkit.rrd import Rrd
class User(ad.Document):
- _rrd = {}
-
@ad.active_property(slot=1, prefix='N', full_text=True)
def name(self, value):
return value
@@ -60,7 +55,7 @@ class User(ad.Document):
permissions=ad.ACCESS_AUTHOR)
def _stats_info(self):
status = {}
- rrd = User._get_rrd(self.guid)
+ rrd = stats.get_rrd(self.guid)
for name, __, last_update in rrd.dbs:
status[name] = last_update + stats.stats_step.value
# TODO Process client configuration in more general manner
@@ -75,15 +70,6 @@ class User(ad.Document):
def _stats_upload(self, request):
name = request.content['name']
values = request.content['values']
- rrd = User._get_rrd(self.guid)
+ rrd = stats.get_rrd(self.guid)
for timestamp, values in values:
rrd.put(name, values, timestamp)
-
- @classmethod
- def _get_rrd(cls, guid):
- rrd = cls._rrd.get(guid)
- if rrd is None:
- rrd = cls._rrd[guid] = Rrd(
- join(stats.stats_root.value, guid[:2], guid),
- stats.stats_step.value, stats.stats_rras.value)
- return rrd
diff --git a/sugar_network/toolkit/collection.py b/sugar_network/toolkit/collection.py
index 1413ffc..f71b5cf 100644
--- a/sugar_network/toolkit/collection.py
+++ b/sugar_network/toolkit/collection.py
@@ -16,7 +16,7 @@
import os
import json
import collections
-from os.path import exists
+from os.path import exists, dirname
from active_toolkit import util, enforce
@@ -201,6 +201,9 @@ class PersistentSequence(Sequence):
self[:] = json.load(f)
def commit(self):
+ dir_path = dirname(self._path)
+ if dir_path and not exists(dir_path):
+ os.makedirs(dir_path)
with util.new_file(self._path) as f:
json.dump(self, f)
f.flush()
diff --git a/sugar_network/toolkit/files_sync.py b/sugar_network/toolkit/files_sync.py
index 088eb08..636f067 100644
--- a/sugar_network/toolkit/files_sync.py
+++ b/sugar_network/toolkit/files_sync.py
@@ -37,8 +37,6 @@ class Seeder(object):
self._seqno = seqno
self._index = []
self._stamp = 0
- # Below calls will mutate `self._index` and trigger coroutine switches.
- # Thus, avoid chnaing `self._index` by different coroutines.
self._mutex = coroutine.Lock()
if exists(self._index_path):
@@ -49,6 +47,8 @@ class Seeder(object):
os.makedirs(self._files_path)
def pull(self, in_seq, packet):
+ # Below calls will mutate `self._index` and trigger coroutine switches.
+ # Thus, avoid changing `self._index` by different coroutines.
with self._mutex:
self._sync()
orig_seq = Sequence(in_seq)
diff --git a/sugar_network/toolkit/rrd.py b/sugar_network/toolkit/rrd.py
index 87b3c40..5fe9f44 100644
--- a/sugar_network/toolkit/rrd.py
+++ b/sugar_network/toolkit/rrd.py
@@ -17,7 +17,7 @@
$Repo: git://git.sugarlabs.org/alsroot/codelets.git$
$File: src/rrd.py$
-$Date: 2012-07-12$
+$Date: 2012-08-09$
"""
@@ -59,6 +59,10 @@ class Rrd(object):
self._dbset(name).load(filename, int(revision or 0))
@property
+ def root(self):
+ return self._root
+
+ @property
def step(self):
return self._step
@@ -83,6 +87,15 @@ class Rrd(object):
return db
+class ReadOnlyRrd(Rrd):
+
+ def __init__(self, root):
+ Rrd.__init__(self, root, 1, [])
+
+ def put(self, name, values, timestamp=None):
+ raise RuntimeError('Write access is denied')
+
+
class _DbSet(object):
def __init__(self, root, name, step, rras):
diff --git a/tests/__init__.py b/tests/__init__.py
index f5dfad5..0fdc6c3 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -28,6 +28,7 @@ from sugar_network.resources.context import Context
from sugar_network.node.router import Router
from sugar_network.node.commands import NodeCommands
from sugar_network.node.subscribe_socket import SubscribeSocket
+from sugar_network.node import stats
from sugar_network.resources.volume import Volume
@@ -83,6 +84,10 @@ class Test(unittest.TestCase):
local.mounts_root.value = None
mounts_monitor.stop()
mounts_monitor._COMPLETE_MOUNT_TIMEOUT = .1
+ stats.stats_root.value = tmpdir + '/stats'
+ stats.stats_step.value = 1
+ stats.stats_rras.value = ['RRA:AVERAGE:0.5:1:100']
+ stats._cache.clear()
Volume.RESOURCES = [
'sugar_network.resources.user',
diff --git a/tests/integration/sync.py b/tests/integration/sync.py
index 896bc4c..50e3502 100755
--- a/tests/integration/sync.py
+++ b/tests/integration/sync.py
@@ -2,18 +2,22 @@
# sugar-lint: disable
import os
+import time
import shutil
import signal
from cStringIO import StringIO
from contextlib import contextmanager
from os.path import exists
+import rrdtool
+
from __init__ import tests
import active_document as ad
from sugar_network import Client
from sugar_network.local import local_root
+from sugar_network import sugar
from sugar_network.toolkit.sneakernet import InPacket, OutPacket
from active_toolkit import util, coroutine
@@ -32,7 +36,10 @@ class SyncTest(tests.Test):
'--data-root=master/db', '--index-flush-threshold=1024',
'--index-flush-timeout=3', '--only-commit-events',
'--tmpdir=tmp', '--sync-dirs=master/files/1:master/files/2',
- '--pull-timeout=1', '-DDDF', 'start',
+ '--pull-timeout=1',
+ '--stats', '--stats-root=master/stats', '--stats-step=1',
+ '--stats-rras=RRA:AVERAGE:0.5:1:100',
+ '-DDDF', 'start',
])
self.node_pid = self.popen([
'sugar-network-service', '--port=8200', '--subscribe-port=8201',
@@ -40,6 +47,8 @@ class SyncTest(tests.Test):
'--mounts-root=mnt', '--server-mode', '--tmpdir=tmp',
'--api-url=http://localhost:8100',
'--sync-dirs=node/files/1:node/files/2',
+ '--stats', '--stats-root=node/stats', '--stats-step=1',
+ '--stats-rras=RRA:AVERAGE:0.5:1:100',
'-DDD', 'debug',
])
@@ -97,6 +106,12 @@ class SyncTest(tests.Test):
self.touch(('preview_4', 'preview_4'))
context.upload_blob('preview', 'preview_4')
+ stats_timestamp = int(time.time())
+ client.call('POST', 'stats-upload', mountpoint=mountpoint, document='user', guid=sugar.uid(), content={
+ 'name': 'db',
+ 'values': [(stats_timestamp + 1, {'f': 1})],
+ })
+
# Create node push packets with newly create data
self.touch('mnt_2/.sugar-network-sync')
os.rename('mnt_2', 'mnt/mnt_2')
@@ -134,6 +149,11 @@ class SyncTest(tests.Test):
self.assertEqual('1', file('node/files/1/1').read())
self.assertEqual('2', file('node/files/2/2').read())
+ master_stats = 'master/stats/%s/%s/db.rrd' % (sugar.uid()[:2], sugar.uid())
+ assert exists(master_stats)
+ __, __, values = rrdtool.fetch(master_stats, 'AVERAGE', '-s', str(stats_timestamp - 1), '-e', str(stats_timestamp + 1))
+ self.assertEqual([(None,), (1,), (None,)], values)
+
def wait_for_events(self, *events):
events = list(events)
connected = coroutine.Event()
diff --git a/tests/units/sync_master.py b/tests/units/sync_master.py
index a1d9295..ea2e6fb 100755
--- a/tests/units/sync_master.py
+++ b/tests/units/sync_master.py
@@ -2,11 +2,14 @@
# sugar-lint: disable
import os
+import time
import json
import base64
import hashlib
from os.path import join, exists
+import rrdtool
+
from __init__ import tests
import active_document as ad
@@ -228,6 +231,55 @@ class SyncMasterTest(tests.Test):
],
response.get('Set-Cookie'))
+ def test_push_ProcessStatsPushes(self):
+ master = MasterCommands('master')
+ request = Request()
+ response = ad.Response()
+
+ ts = int(time.time()) - 1000
+ packet = OutBufferPacket(src='node', dst='master')
+ packet.push(cmd='stats_push', user='user1', db='db1', sequence=[[1, ts + 2]], data=[
+ {'timestamp': ts + 1, 'values': {'f': 1}},
+ {'timestamp': ts + 2, 'values': {'f': 2}},
+ ])
+ packet.push(cmd='stats_push', user='user1', db='db2', sequence=[[2, ts + 3]], data=[
+ {'timestamp': ts + 3, 'values': {'f': 3}},
+ ])
+ packet.push(cmd='stats_push', user='user2', db='db3', sequence=[[ts + 4, ts + 4]], data=[
+ {'timestamp': ts + 4, 'values': {'f': 4}},
+ ])
+ request.content_stream = packet.pop()
+ request.content_length = len(request.content_stream.getvalue())
+
+ reply = master.push(request, response)
+ self.assertEqual([
+ 'sugar_network_sync=unset_sugar_network_sync; Max-Age=0; HttpOnly',
+ 'sugar_network_delay=unset_sugar_network_delay; Max-Age=0; HttpOnly',
+ ],
+ response.get('Set-Cookie'))
+
+ packet = InPacket(stream=reply)
+ self.assertEqual('master', packet.header['src'])
+ self.assertEqual('node', packet.header.get('dst'))
+ self.assertEqual([
+ {'filename': 'ack.node.packet', 'src': 'master', 'dst': 'node', 'cmd': 'stats_ack',
+ 'sequence': {
+ 'user1': {'db1': [[1, ts + 2]], 'db2': [[2, ts + 3]]},
+ 'user2': {'db3': [[ts + 4, ts + 4]]},
+ },
+ }
+ ],
+ [i for i in packet])
+
+ __, __, values = rrdtool.fetch('stats/us/user1/db1.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 2))
+ self.assertEqual([(1,), (2,), (None,)], values)
+
+ __, __, values = rrdtool.fetch('stats/us/user1/db2.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 3))
+ self.assertEqual([(None,), (None,), (3,), (None,)], values)
+
+ __, __, values = rrdtool.fetch('stats/us/user2/db3.rrd', 'AVERAGE', '-s', str(ts), '-e', str(ts + 4))
+ self.assertEqual([(None,), (None,), (None,), (4,), (None,)], values)
+
def test_pull_ProcessPulls(self):
master = MasterCommands('master')
request = Request()
diff --git a/tests/units/sync_node.py b/tests/units/sync_node.py
index d943066..1e56158 100755
--- a/tests/units/sync_node.py
+++ b/tests/units/sync_node.py
@@ -2,9 +2,12 @@
# sugar-lint: disable
import os
+import time
import json
from os.path import exists, join
+import rrdtool
+
from __init__ import tests
import active_document as ad
@@ -87,15 +90,15 @@ class SyncNodeTest(tests.Test):
kwargs = node.sync('mnt', accept_length=1024, session=0)
self.assertEqual(0, kwargs['session'])
- self.assertEqual([[1, None]], kwargs['push_sequence'])
+ self.assertEqual([[1, None]], kwargs['diff_sequence'])
self.assertEqual([], self.read_packets('mnt'))
- kwargs = node.sync('mnt', accept_length=1024, push_sequence=kwargs['push_sequence'], session=0)
- self.assertEqual([[1, None]], kwargs['push_sequence'])
+ kwargs = node.sync('mnt', accept_length=1024, diff_sequence=kwargs['diff_sequence'], session=0)
+ self.assertEqual([[1, None]], kwargs['diff_sequence'])
self.assertEqual([], self.read_packets('mnt'))
- kwargs = node.sync('mnt', accept_length=1024 * 2, push_sequence=kwargs['push_sequence'], session=1)
- self.assertEqual([[2, None]], kwargs['push_sequence'])
+ kwargs = node.sync('mnt', accept_length=1024 * 2, diff_sequence=kwargs['diff_sequence'], session=1)
+ self.assertEqual([[2, None]], kwargs['diff_sequence'])
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '1', 'session': 1, 'diff': {
'user': {'value': [], 'mtime': os.stat('db/document/1/1/user').st_mtime},
@@ -109,8 +112,8 @@ class SyncNodeTest(tests.Test):
],
self.read_packets('mnt'))
- kwargs = node.sync('mnt', accept_length=1024 * 3, push_sequence=kwargs['push_sequence'], session=2)
- self.assertEqual([[4, None]], kwargs['push_sequence'])
+ kwargs = node.sync('mnt', accept_length=1024 * 3, diff_sequence=kwargs['diff_sequence'], session=2)
+ self.assertEqual([[4, None]], kwargs['diff_sequence'])
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '2', 'session': 2, 'diff': {
'user': {'value': [], 'mtime': os.stat('db/document/2/2/user').st_mtime},
@@ -132,7 +135,7 @@ class SyncNodeTest(tests.Test):
],
self.read_packets('mnt'))
- kwargs = node.sync('mnt', push_sequence=kwargs['push_sequence'], session=3)
+ kwargs = node.sync('mnt', diff_sequence=kwargs['diff_sequence'], session=3)
self.assertEqual(None, kwargs)
self.assertEqual([
{'api_url': api_url.value, 'filename': 'node-6.packet', 'content_type': 'records', 'cmd': 'sn_push', 'src': 'node', 'dst': 'master', 'document': 'document', 'guid': '4', 'session': 3, 'diff': {
@@ -256,6 +259,30 @@ class SyncNodeTest(tests.Test):
['new one'],
[i.header['session'] for i in sneakernet.walk('mnt')])
+ def test_Import_StatsAcks(self):
+ node = SyncCommands('node', 'master')
+
+ master_packet = OutFilePacket('mnt', src='master')
+ master_packet.push(data=[
+ {'cmd': 'stats_ack', 'dst': 'node', 'sequence': {
+ 'user1': {
+ 'db1': [[1, 2]],
+ 'db2': [[3, 4]],
+ },
+ 'user2': {
+ 'db3': [[5, 6]],
+ },
+ }},
+ ])
+ master_packet.close()
+
+ node.sync('mnt', session=0)
+ assert exists(master_packet.path)
+
+ self.assertEqual([[3, None]], json.load(file('stats/us/user1/db1.push')))
+ self.assertEqual([[1, 2], [5, None]], json.load(file('stats/us/user1/db2.push')))
+ self.assertEqual([[1, 4], [7, None]], json.load(file('stats/us/user2/db3.push')))
+
def test_sync_session(self):
node = SyncCommands('node', 'master')
node.volume['document'].create(guid='1', prop='*' * 1024)
@@ -313,6 +340,63 @@ class SyncNodeTest(tests.Test):
],
self.read_packets('mnt'))
+ def test_ExportStats(self):
+ node = SyncCommands('node', 'master')
+
+ ts = int(time.time())
+ os.makedirs('stats/1/1')
+ rrdtool.create('stats/1/1/db1.rrd', '--start', str(ts), '-s', '1', 'DS:f:GAUGE:1:U:U', 'RRA:AVERAGE:0.5:1:100')
+ rrdtool.update('stats/1/1/db1.rrd', '%s:1' % (ts + 1), '%s:2' % (ts + 2))
+ rrdtool.create('stats/1/1/db2.rrd', '--start', str(ts + 2), '-s', '1', 'DS:f:GAUGE:1:U:U', 'RRA:AVERAGE:0.5:1:100')
+ rrdtool.update('stats/1/1/db2.rrd', '%s:3' % (ts + 3), '%s:4' % (ts + 4))
+ os.makedirs('stats/2/2')
+ rrdtool.create('stats/2/2/db3.rrd', '--start', str(ts + 4), '-s', '1', 'DS:f:GAUGE:1:U:U', 'RRA:AVERAGE:0.5:1:100')
+ rrdtool.update('stats/2/2/db3.rrd', '%s:5' % (ts + 5))
+
+ node.sync('mnt', session=0)
+
+ self.assertEqual([
+ {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0,
+ 'cmd': 'stats_push', 'user': '1', 'db': 'db1', 'sequence': [[1, ts + 2]], 'timestamp': ts + 1, 'values': {'f': 1},
+ },
+ {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0,
+ 'cmd': 'stats_push', 'user': '1', 'db': 'db1', 'sequence': [[1, ts + 2]], 'timestamp': ts + 2, 'values': {'f': 2},
+ },
+ {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0,
+ 'cmd': 'stats_push', 'user': '1', 'db': 'db2', 'sequence': [[1, ts + 4]], 'timestamp': ts + 3, 'values': {'f': 3},
+ },
+ {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0,
+ 'cmd': 'stats_push', 'user': '1', 'db': 'db2', 'sequence': [[1, ts + 4]], 'timestamp': ts + 4, 'values': {'f': 4},
+ },
+ {'api_url': api_url.value, 'filename': 'node-0.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 0,
+ 'cmd': 'stats_push', 'user': '2', 'db': 'db3', 'sequence': [[1, ts + 5]], 'timestamp': ts + 5, 'values': {'f': 5},
+ },
+ ],
+ self.read_packets('mnt'))
+
+ def test_LimittedExportStats(self):
+ node = SyncCommands('node', 'master')
+
+ ts = int(time.time())
+ os.makedirs('stats/us/user')
+ rrdtool.create('stats/us/user/db.rrd', '--start', str(ts), '-s', '1', 'DS:f:GAUGE:1:U:U', 'RRA:AVERAGE:0.5:1:100')
+ rrdtool.update('stats/us/user/db.rrd', '%s:1' % (ts + 1), '%s:2' % (ts + 2))
+
+ node.volume['document'].create(guid='1', prop='*' * 1024)
+
+ kwargs = node.sync('mnt1', accept_length=1024 + 512, session=0)
+ self.assertEqual({'user': {'db': [[1, None]]}}, kwargs['stats_sequence'])
+ self.assertEqual([], self.read_packets('mnt1')[3:])
+
+ kwargs = node.sync('mnt2', stats_sequence={'user': {'db': [[ts + 2, None]]}}, session=1)
+ self.assertEqual(None, kwargs)
+ self.assertEqual([
+ {'api_url': api_url.value, 'filename': 'node-1.packet', 'content_type': 'records', 'src': 'node', 'dst': 'master', 'session': 1,
+ 'cmd': 'stats_push', 'user': 'user', 'db': 'db', 'sequence': [[ts + 2, ts + 2]], 'timestamp': ts + 2, 'values': {'f': 2},
+ },
+ ],
+ self.read_packets('mnt2')[2:])
+
def read_packets(self, path):
result = []
for filename in sorted(os.listdir(path)):