Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/restful_document/subscribe_socket.py
blob: b73ce78214c72c5c2b1d0e7dfefa5e7f718c80f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# Copyright (C) 2012, Aleksey Lim
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import socket
import logging
from gettext import gettext as _

from restful_document import env
from restful_document.sockets import SocketFile, BUFFER_SIZE
from active_document import greenlet, util, enforce


_logger = logging.getLogger('restful_document.subscribe_socket')


class SubscribeSocket(object):

    def __init__(self, volume):
        self._server = None
        self._tickets = set()
        self._subscribers = set()

        volume.connect(self.__signal_cb)

    def subscribe(self):
        ticket = os.urandom(16).encode('hex')
        self._tickets.add(ticket)

        return {'host': env.host.value,
                'port': env.subscribe_port.value,
                'ticket': ticket,
                }

    def serve_forever(self):
        _logger.info(_('Listening for subscriptions on %s port'),
                env.subscribe_port.value)

        conn = greenlet.socket(socket.AF_INET, socket.SOCK_STREAM)
        # pylint: disable-msg=E1101
        conn.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        conn.bind((env.host.value, env.subscribe_port.value))
        conn.listen(5)

        self._server = greenlet.Server(conn, self._serve_client)
        try:
            env.subscriber = self
            self._server.serve_forever()
        finally:
            self._server.stop()
            self._server = None
            env.subscriber = None

    def stop(self):
        if self._server is not None:
            self._server.stop()

    def _serve_client(self, conn, host):
        _logger.debug('Got request from %r, making a handshake', host)

        try:
            handshake = SocketFile(conn).read_message()
            ticket = handshake.get('ticket')
            enforce(ticket and ticket in self._tickets, _('Unknown request'))
            self._tickets.remove(ticket)
        except Exception, error:
            _logger.warning(_('Handshake failed, discard the request: %s'),
                    error)
            return

        _logger.debug('Accepted %r subscriber', host)
        self._subscribers.add(conn)
        try:
            data = conn.recv(BUFFER_SIZE)
            enforce(not data, _('Subscriber misused connection ' \
                    'by sending %s bytes, discard it'), len(data))
        except Exception:
            util.exception(_('Failed to handle subscription from %r'), host)
        finally:
            _logger.debug('Close subscription from %r', host)
            self._subscribers.remove(conn)

    def __signal_cb(self, event):
        if env.only_sync_notification.value:
            if event['event'] != 'commit':
                # Even "sync" event can be ignored,
                # passing only "commit" is enough
                return
            event['event'] = 'sync'
        else:
            if event['event'] == 'commit':
                # Subscribers already got update notifications enough
                return

        for conn in self._subscribers:
            SocketFile(conn).write_message(event)