Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar_network/local/mountset.py
diff options
context:
space:
mode:
Diffstat (limited to 'sugar_network/local/mountset.py')
-rw-r--r--sugar_network/local/mountset.py140
1 files changed, 28 insertions, 112 deletions
diff --git a/sugar_network/local/mountset.py b/sugar_network/local/mountset.py
index ae622a3..e9e081e 100644
--- a/sugar_network/local/mountset.py
+++ b/sugar_network/local/mountset.py
@@ -13,7 +13,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import os
import locale
import socket
import logging
@@ -21,48 +20,35 @@ from os.path import join, exists
import active_document as ad
-from sugar_network.toolkit.inotify import Inotify, \
- IN_DELETE_SELF, IN_CREATE, IN_DELETE, IN_MOVED_TO, IN_MOVED_FROM
from sugar_network import local, node
-from sugar_network.toolkit import zeroconf, netlink, network
-from sugar_network.toolkit.collection import MutableStack
-from sugar_network.toolkit.files_sync import Leechers
+from sugar_network.toolkit import zeroconf, netlink, network, mounts_monitor
from sugar_network.local.mounts import LocalMount, NodeMount
from sugar_network.node.subscribe_socket import SubscribeSocket
from sugar_network.node.commands import NodeCommands
+from sugar_network.node.sync_node import SyncCommands
from sugar_network.node.router import Router
from sugar_network.resources.volume import Volume
from active_toolkit import util, coroutine, enforce
_DB_DIRNAME = '.sugar-network'
-_SYNC_DIRNAME = '.sugar-network-sync'
-
-_COMPLETE_MOUNT_TIMEOUT = 3
_logger = logging.getLogger('local.mountset')
-class Mountset(dict, ad.CommandsProcessor):
+class Mountset(dict, ad.CommandsProcessor, SyncCommands):
- def __init__(self, home_volume, sync_dirs=None):
+ def __init__(self, home_volume):
dict.__init__(self)
ad.CommandsProcessor.__init__(self)
+ SyncCommands.__init__(self, local.path('sync'))
self.opened = coroutine.Event()
-
self.home_volume = home_volume
- if sync_dirs is None:
- self._file_syncs = {}
- else:
- self._file_syncs = Leechers(sync_dirs,
- join(home_volume.root, 'files'))
self._subscriptions = {}
self._locale = locale.getdefaultlocale()[0].replace('_', '-')
self._jobs = coroutine.Pool()
self._servers = coroutine.Pool()
- self._sync_dirs = MutableStack()
- self._sync = coroutine.Pool()
def __getitem__(self, mountpoint):
enforce(mountpoint in self, 'Unknown mountpoint %r', mountpoint)
@@ -100,26 +86,6 @@ class Mountset(dict, ad.CommandsProcessor):
mount.set_mounted(True)
return mount.mounted
- @ad.volume_command(method='POST', cmd='start_sync')
- def start_sync(self, rewind=False, path=None):
- if self._sync:
- return
-
- enforce(path or self._sync_dirs, 'No mounts to synchronize with')
-
- for mount in self.values():
- if isinstance(mount, NodeMount):
- if rewind:
- self._sync_dirs.rewind()
- self._sync.spawn(mount.sync_session, self._sync_dirs, path)
- break
- else:
- raise RuntimeError('No mounted servers')
-
- @ad.volume_command(method='POST', cmd='break_sync')
- def break_sync(self):
- self._sync.kill()
-
@ad.volume_command(method='POST', cmd='publish')
def republish(self, request):
self.publish(request.content)
@@ -128,28 +94,24 @@ class Mountset(dict, ad.CommandsProcessor):
if response is None:
response = ad.Response()
request.accept_language = [self._locale]
- mountpoint = None
+ mountpoint = request.get('mountpoint')
- def process_call():
+ try:
try:
- return ad.CommandsProcessor.call(self, request, response)
+ result = ad.CommandsProcessor.call(self, request, response)
except ad.CommandNotFound:
- mountpoint = request.pop('mountpoint')
+ request.pop('mountpoint')
mount = self[mountpoint]
if mountpoint == '/':
mount.set_mounted(True)
enforce(mount.mounted, '%r is not mounted', mountpoint)
- return mount.call(request, response)
-
- try:
- result = process_call()
+ result = mount.call(request, response)
except Exception:
- util.exception(_logger,
- 'Failed to call %s on %r', request, mountpoint)
+ util.exception(_logger, 'Failed to call %s on %r',
+ request, mountpoint)
raise
- else:
- _logger.debug('Called %s on %r: %r', request, mountpoint, result)
+ _logger.debug('Called %s on %r: %r', request, mountpoint, result)
return result
def connect(self, callback, condition=None, **kwargs):
@@ -172,16 +134,8 @@ class Mountset(dict, ad.CommandsProcessor):
def open(self):
try:
- mounts_root = local.mounts_root.value
- if mounts_root:
- for filename in os.listdir(mounts_root):
- self._found_mount(join(mounts_root, filename))
- # In case if sync mounts processed before server mounts
- # TODO More obvious code
- for filename in os.listdir(mounts_root):
- self._found_mount(join(mounts_root, filename))
- self._jobs.spawn(self._mounts_monitor)
-
+ mounts_monitor.connect(_DB_DIRNAME,
+ self._found_mount, self._lost_mount)
if '/' in self:
if local.api_url.value:
crawler = self._wait_for_master
@@ -217,61 +171,23 @@ class Mountset(dict, ad.CommandsProcessor):
# Otherwise, `socket.gethostbyname()` will return stale resolve
network.res_init()
- def _mounts_monitor(self):
- root = local.mounts_root.value
- _logger.info('Start monitoring %r for mounts', root)
-
- with Inotify() as monitor:
- monitor.add_watch(root, IN_DELETE_SELF | IN_CREATE |
- IN_DELETE | IN_MOVED_TO | IN_MOVED_FROM)
- while not monitor.closed:
- coroutine.select([monitor.fileno()], [], [])
- for filename, event, __ in monitor.read():
- path = join(root, filename)
- try:
- if event & IN_DELETE_SELF:
- _logger.warning('Lost %r, cannot monitor anymore',
- root)
- monitor.close()
- break
- elif event & (IN_DELETE | IN_MOVED_FROM):
- self._lost_mount(path)
- elif event & (IN_CREATE | IN_MOVED_TO):
- # Right after moutning, access to directory
- # might be restricted; let system enough time
- # to complete mounting
- coroutine.sleep(_COMPLETE_MOUNT_TIMEOUT)
- self._found_mount(path)
- except Exception:
- util.exception(_logger, 'Mount %r failed', path)
-
def _found_mount(self, path):
- if exists(join(path, _DB_DIRNAME)) and path not in self:
- _logger.debug('Found %r server mount', path)
- volume, server_mode = self._mount_volume(path)
- if server_mode:
- self[path] = NodeMount(volume, self.home_volume,
- self._file_syncs)
- else:
- self[path] = LocalMount(volume)
-
- if exists(join(path, _SYNC_DIRNAME)):
- self._sync_dirs.add(path)
- if self._servers:
- _logger.debug('Found %r sync mount', path)
- self.start_sync()
- else:
- _logger.debug('Found %r sync mount but no servers', path)
+ volume, server_mode = self._mount_volume(path)
+ if server_mode:
+ _logger.debug('Mount %r in node mode', path)
+ self[path] = self.node_mount = NodeMount(volume, self.home_volume)
+ else:
+ _logger.debug('Mount %r in node-less mode', path)
+ self[path] = LocalMount(volume)
def _lost_mount(self, path):
+ mount = self.get(path)
+ if mount is None:
+ return
_logger.debug('Lost %r mount', path)
-
- self._sync_dirs.remove(join(path, _SYNC_DIRNAME))
- if not self._sync_dirs:
- self.break_sync()
-
- if path in self:
- del self[path]
+ if isinstance(mount, NodeMount):
+ self.node_mount = None
+ del self[path]
def _mount_volume(self, path):
lazy_open = local.lazy_open.value