diff options
Diffstat (limited to 'sugar_network/local/mountset.py')
-rw-r--r-- | sugar_network/local/mountset.py | 140 |
1 files changed, 28 insertions, 112 deletions
diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py index ae622a3..e9e081e 100644 --- a/sugar_network/local/mountset.py +++ b/sugar_network/local/mountset.py @@ -13,7 +13,6 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os import locale import socket import logging @@ -21,48 +20,35 @@ from os.path import join, exists import active_document as ad -from sugar_network.toolkit.inotify import Inotify, \ - IN_DELETE_SELF, IN_CREATE, IN_DELETE, IN_MOVED_TO, IN_MOVED_FROM from sugar_network import local, node -from sugar_network.toolkit import zeroconf, netlink, network -from sugar_network.toolkit.collection import MutableStack -from sugar_network.toolkit.files_sync import Leechers +from sugar_network.toolkit import zeroconf, netlink, network, mounts_monitor from sugar_network.local.mounts import LocalMount, NodeMount from sugar_network.node.subscribe_socket import SubscribeSocket from sugar_network.node.commands import NodeCommands +from sugar_network.node.sync_node import SyncCommands from sugar_network.node.router import Router from sugar_network.resources.volume import Volume from active_toolkit import util, coroutine, enforce _DB_DIRNAME = '.sugar-network' -_SYNC_DIRNAME = '.sugar-network-sync' - -_COMPLETE_MOUNT_TIMEOUT = 3 _logger = logging.getLogger('local.mountset') -class Mountset(dict, ad.CommandsProcessor): +class Mountset(dict, ad.CommandsProcessor, SyncCommands): - def __init__(self, home_volume, sync_dirs=None): + def __init__(self, home_volume): dict.__init__(self) ad.CommandsProcessor.__init__(self) + SyncCommands.__init__(self, local.path('sync')) self.opened = coroutine.Event() - self.home_volume = home_volume - if sync_dirs is None: - self._file_syncs = {} - else: - self._file_syncs = Leechers(sync_dirs, - join(home_volume.root, 'files')) self._subscriptions = {} self._locale = locale.getdefaultlocale()[0].replace('_', '-') self._jobs = coroutine.Pool() self._servers = coroutine.Pool() - self._sync_dirs = MutableStack() - self._sync = coroutine.Pool() def __getitem__(self, mountpoint): enforce(mountpoint in self, 'Unknown mountpoint %r', mountpoint) @@ -100,26 +86,6 @@ class Mountset(dict, ad.CommandsProcessor): mount.set_mounted(True) return mount.mounted - @ad.volume_command(method='POST', cmd='start_sync') - def start_sync(self, rewind=False, path=None): - if self._sync: - return - - enforce(path or self._sync_dirs, 'No mounts to synchronize with') - - for mount in self.values(): - if isinstance(mount, NodeMount): - if rewind: - self._sync_dirs.rewind() - self._sync.spawn(mount.sync_session, self._sync_dirs, path) - break - else: - raise RuntimeError('No mounted servers') - - @ad.volume_command(method='POST', cmd='break_sync') - def break_sync(self): - self._sync.kill() - @ad.volume_command(method='POST', cmd='publish') def republish(self, request): self.publish(request.content) @@ -128,28 +94,24 @@ class Mountset(dict, ad.CommandsProcessor): if response is None: response = ad.Response() request.accept_language = [self._locale] - mountpoint = None + mountpoint = request.get('mountpoint') - def process_call(): + try: try: - return ad.CommandsProcessor.call(self, request, response) + result = ad.CommandsProcessor.call(self, request, response) except ad.CommandNotFound: - mountpoint = request.pop('mountpoint') + request.pop('mountpoint') mount = self[mountpoint] if mountpoint == '/': mount.set_mounted(True) enforce(mount.mounted, '%r is not mounted', mountpoint) - return mount.call(request, response) - - try: - result = process_call() + result = mount.call(request, response) except Exception: - util.exception(_logger, - 'Failed to call %s on %r', request, mountpoint) + util.exception(_logger, 'Failed to call %s on %r', + request, mountpoint) raise - else: - _logger.debug('Called %s on %r: %r', request, mountpoint, result) + _logger.debug('Called %s on %r: %r', request, mountpoint, result) return result def connect(self, callback, condition=None, **kwargs): @@ -172,16 +134,8 @@ class Mountset(dict, ad.CommandsProcessor): def open(self): try: - mounts_root = local.mounts_root.value - if mounts_root: - for filename in os.listdir(mounts_root): - self._found_mount(join(mounts_root, filename)) - # In case if sync mounts processed before server mounts - # TODO More obvious code - for filename in os.listdir(mounts_root): - self._found_mount(join(mounts_root, filename)) - self._jobs.spawn(self._mounts_monitor) - + mounts_monitor.connect(_DB_DIRNAME, + self._found_mount, self._lost_mount) if '/' in self: if local.api_url.value: crawler = self._wait_for_master @@ -217,61 +171,23 @@ class Mountset(dict, ad.CommandsProcessor): # Otherwise, `socket.gethostbyname()` will return stale resolve network.res_init() - def _mounts_monitor(self): - root = local.mounts_root.value - _logger.info('Start monitoring %r for mounts', root) - - with Inotify() as monitor: - monitor.add_watch(root, IN_DELETE_SELF | IN_CREATE | - IN_DELETE | IN_MOVED_TO | IN_MOVED_FROM) - while not monitor.closed: - coroutine.select([monitor.fileno()], [], []) - for filename, event, __ in monitor.read(): - path = join(root, filename) - try: - if event & IN_DELETE_SELF: - _logger.warning('Lost %r, cannot monitor anymore', - root) - monitor.close() - break - elif event & (IN_DELETE | IN_MOVED_FROM): - self._lost_mount(path) - elif event & (IN_CREATE | IN_MOVED_TO): - # Right after moutning, access to directory - # might be restricted; let system enough time - # to complete mounting - coroutine.sleep(_COMPLETE_MOUNT_TIMEOUT) - self._found_mount(path) - except Exception: - util.exception(_logger, 'Mount %r failed', path) - def _found_mount(self, path): - if exists(join(path, _DB_DIRNAME)) and path not in self: - _logger.debug('Found %r server mount', path) - volume, server_mode = self._mount_volume(path) - if server_mode: - self[path] = NodeMount(volume, self.home_volume, - self._file_syncs) - else: - self[path] = LocalMount(volume) - - if exists(join(path, _SYNC_DIRNAME)): - self._sync_dirs.add(path) - if self._servers: - _logger.debug('Found %r sync mount', path) - self.start_sync() - else: - _logger.debug('Found %r sync mount but no servers', path) + volume, server_mode = self._mount_volume(path) + if server_mode: + _logger.debug('Mount %r in node mode', path) + self[path] = self.node_mount = NodeMount(volume, self.home_volume) + else: + _logger.debug('Mount %r in node-less mode', path) + self[path] = LocalMount(volume) def _lost_mount(self, path): + mount = self.get(path) + if mount is None: + return _logger.debug('Lost %r mount', path) - - self._sync_dirs.remove(join(path, _SYNC_DIRNAME)) - if not self._sync_dirs: - self.break_sync() - - if path in self: - del self[path] + if isinstance(mount, NodeMount): + self.node_mount = None + del self[path] def _mount_volume(self, path): lazy_open = local.lazy_open.value |