# Copyright (C) 2012-2013 Aleksey Lim # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os import json import logging from bisect import bisect_left from shutil import copyfileobj from os.path import join, exists, relpath, lexists, dirname from sugar_network import toolkit from sugar_network.toolkit import coroutine _logger = logging.getLogger('node.sync_files') def merge(files_path, packet): files_path = files_path.rstrip(os.sep) if not exists(files_path): os.makedirs(files_path) commit_seq = None for record in packet: op = record.get('op') if op == 'update': path = join(files_path, record['path']) if not exists(dirname(path)): os.makedirs(dirname(path)) with toolkit.new_file(path) as f: copyfileobj(record['blob'], f) elif op == 'delete': path = join(files_path, record['path']) if lexists(path): os.unlink(path) elif op == 'commit': commit_seq = record['sequence'] return commit_seq class Index(object): def __init__(self, files_path, index_path, seqno): self._files_path = files_path.rstrip(os.sep) self._index_path = index_path self._seqno = seqno self._index = [] self._stamp = 0 self._mutex = coroutine.Lock() if exists(self._index_path): with file(self._index_path) as f: self._index, self._stamp = json.load(f) if not exists(self._files_path): os.makedirs(self._files_path) def sync(self): with self._mutex: return self._sync() def diff(self, in_seq, out_seq=None, **kwargs): if out_seq is None: out_seq = toolkit.Sequence([]) is_initial_diff = not out_seq # Below calls will trigger coroutine switches, thius, # avoid changing `self._index` by different coroutines. with self._mutex: self._sync() _logger.debug('Start sync: in_seq=%r', in_seq) files = 0 deleted = 0 pos = 0 try: for start, end in in_seq: pos = bisect_left(self._index, [start, None, None], pos) for pos, (seqno, path, mtime) in \ enumerate(self._index[pos:]): if end is not None and seqno > end: break coroutine.dispatch() if mtime < 0: yield {'op': 'delete', 'path': path} deleted += 1 else: blob_path = join(self._files_path, path) yield {'op': 'update', 'path': path, 'blob_size': os.stat(blob_path).st_size, 'blob': toolkit.iter_file(blob_path), } out_seq.include(start, seqno) start = seqno files += 1 except StopIteration: pass if is_initial_diff: # There is only one diff, so, we can stretch it to remove holes out_seq.stretch() yield {'op': 'commit', 'sequence': out_seq} _logger.debug('Stop sync: in_seq=%r out_seq=%r updates=%r deletes=%r', in_seq, out_seq, files, deleted) def _sync(self): if os.stat(self._files_path).st_mtime <= self._stamp: return False new_files = set() updates = 0 deletes = 0 # Populate list of new files at first for root, __, files in os.walk(self._files_path): coroutine.dispatch() rel_root = relpath(root, self._files_path) if rel_root == '.': rel_root = '' else: rel_root += os.sep for filename in files: coroutine.dispatch() path = join(root, filename) if os.lstat(path).st_mtime > self._stamp: new_files.add(rel_root + filename) # Check for updates for already tracked files tail = [] for pos, (__, rel_path, mtime) in enumerate(self._index[:]): coroutine.dispatch() path = join(self._files_path, rel_path) existing = lexists(path) if existing == (mtime >= 0) and \ (not existing or os.lstat(path).st_mtime == mtime): continue if existing: new_files.discard(rel_path) pos -= len(tail) self._index = self._index[:pos] + self._index[pos + 1:] tail.append([ self._seqno.next(), rel_path, int(os.lstat(path).st_mtime) if existing else -1, ]) if existing: updates += 1 else: deletes += 1 self._index.extend(tail) _logger.debug('Updated %r index: new=%r updates=%r deletes=%r', self._files_path, len(self._files_path), updates, deletes) # Finally, add new files for rel_path in sorted(new_files): coroutine.dispatch() mtime = os.lstat(join(self._files_path, rel_path)).st_mtime self._index.append([self._seqno.next(), rel_path, mtime]) self._stamp = os.stat(self._files_path).st_mtime if self._seqno.commit(): with toolkit.new_file(self._index_path) as f: json.dump((self._index, self._stamp), f) return True