Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/sugar/presence/Service.py
blob: 4e1d72f6ca64c043a00ee28fd89ea7d83348d9cc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
import avahi
from sugar import util
import string

def new_group_service(group_name, resource):
	"""Create a new service suitable for defining a new group."""
	if type(group_name) != type("") or not len(group_name):
		raise ValueError("group name must be a valid string.")
	if type(resource) != type("") or not len(resource):
		raise ValueError("group resource must be a valid string.")

	# Create a randomized service type
	data = "%s%s" % (group_name, resource)
	stype = "_%s_group_olpc._udp" % sugar.util.unique_id(data)

	properties = {__GROUP_NAME_TAG: group_name, __GROUP_RESOURCE_TAG: resource }
	owner_nick = ""
	port = random.randint(5000, 65000)
	# Use random currently unassigned multicast address
	address = "232.%d.%d.%d" % (random.randint(0, 254), random.randint(1, 254),
			random.randint(1, 254))
	service = Service.Service(owner_nick, stype, "local", address=address,
			port=port, properties=properties)
	return service


def _txt_to_dict(txt):
	"""Convert an avahi-returned TXT record formatted
	as nested arrays of integers (from dbus) into a dict
	of key/value string pairs."""
	prop_dict = {}
	props = avahi.txt_array_to_string_array(txt)
	for item in props:
		key = value = None
		if '=' not in item:
			# No = means a boolean value of true
			key = item
			value = True
		else:
			(key, value) = item.split('=')
		prop_dict[key] = value
	return prop_dict

def compose_service_type(stype, activity_uid):
	return "_%s_%s" % (activity_uid, stype)

def _decompose_service_type(stype):
	"""Break a service type into the UID and real service type, if we can."""
	if len(stype) < util.ACTIVITY_UID_LEN + 5:
		return (None, stype)
	if stype[0] != "_":
		return (None, stype)
	start = 1
	end = start + util.ACTIVITY_UID_LEN
	if stype[end] != "_":
		return (None, stype)
	uid = stype[start:end]
	if not util.validate_activity_uid(uid):
		return (None, stype)
	return (uid, stype[end+1:])

def is_multicast_address(address):
	"""Simple numerical check for whether an IP4 address
	is in the range for multicast addresses or not."""
	if not address:
		return False
	if address[3] != '.':
		return False
	first = int(address[:3])
	if first >= 224 and first <= 239:
		return True
	return False


_ACTIVITY_UID_TAG = "ActivityUID"

class Service(object):
	"""Encapsulates information about a specific ZeroConf/mDNS
	service as advertised on the network."""
	def __init__(self, name, stype, domain, address=None, port=-1, properties=None):
		# Validate immutable options
		if not name or (type(name) != type("") and type(name) != type(u"")) or not len(name):
			raise ValueError("must specify a valid service name.")

		if not stype or (type(stype) != type("") and type(stype) != type(u"")) or not len(stype):
			raise ValueError("must specify a service type.")
		if not stype.endswith("._tcp") and not stype.endswith("._udp"):
			raise ValueError("must specify a TCP or UDP service type.")

		if type(domain) != type("") and type(domain) != type(u""):
			raise ValueError("must specify a domain.")
		if len(domain) and domain != "local" and domain != u"local":
			raise ValueError("must use the 'local' domain (for now).")

		if type(stype) == type(u""):
			stype = stype.encode()
		(uid, real_stype) = _decompose_service_type(stype)
		if uid and not util.validate_activity_uid(uid):
			raise ValueError("service type activity uid not a valid activity UID.")

		if type(name) == type(u""):
			name = name.encode()
		self._name = name
		self._stype = stype
		self._activity_stype = real_stype
		if type(domain) == type(u""):
			domain = domain.encode()
		self._domain = domain
		self._address = None
		self.set_address(address)
		self._port = -1
		self.set_port(port)
		self._properties = {}
		self.set_properties(properties)

		# Ensure that an ActivityUID tag, if given, matches
		# what we expect from the service type
		if self._properties.has_key(_ACTIVITY_UID_TAG):
			prop_uid = self._properties[_ACTIVITY_UID_TAG]
			if (prop_uid and not uid) or (prop_uid != uid):
				raise ValueError("ActivityUID property specified, but the service type's activity UID didn't match it.")
		self._activity_uid = uid
		if uid and not self._properties.has_key(_ACTIVITY_UID_TAG):
			self._properties[_ACTIVITY_UID_TAG] = uid

	def get_name(self):
		"""Return the service's name, usually that of the
		buddy who provides it."""
		return self._name

	def is_multicast_service(self):
		"""Return True if the service's address is a multicast address,
		False if it is not."""
		return is_multicast_address(self._address)

	def get_one_property(self, key):
		"""Return one property of the service, or None
		if the property was not found.  Cannot distinguish
		between lack of a property, and a property value that
		actually is None."""
		if key in self._properties.keys():
			return self._properties[key]
		return None

	def get_properties(self):
		"""Return a python dictionary of all the service's
		properties."""
		return self._properties

	def set_properties(self, properties):
		"""Set the service's properties from either an Avahi
		TXT record (a list of lists of integers), or a
		python dictionary."""
		self._properties = {}
		if type(properties) == type([]):
			self._properties = _txt_to_dict(properties)
		elif type(properties) == type({}):
			self._properties = properties
			for key, value in self._properties.items():
				if type(self._properties[key]) == type(u""):
					self._properties[key] = self._properties[key].encode()

	def get_type(self):
		"""Return the service's service type."""
		return self._stype

	def get_port(self):
		return self._port

	def set_port(self, port):
		if type(port) != type(1):
			raise ValueError("must specify a valid port number.")
		self._port = port

	def get_address(self):
		return self._address

	def set_address(self, address):
		if address is not None:
			if type(address) != type("") and type(address) != type(u""):
				raise ValueError("must specify a valid address.")
			if not len(address):
				raise ValueError("must specify a valid address.")
		if address and type(address) == type(u""):
			address = address.encode()
		self._address = address

	def get_domain(self):
		"""Return the ZeroConf/mDNS domain the service was found in."""
		return self._domain

	def get_activity_uid(self):
		"""Return the activity UID this service is associated with, if any."""
		return (self._activity_uid, self._activity_stype)


#################################################################
# Tests
#################################################################

import unittest

class ServiceTestCase(unittest.TestCase):
	_DEF_NAME = "foobar"
	_DEF_STYPE = "_foo._bar._tcp"
	_DEF_DOMAIN = "local"
	_DEF_ADDRESS = "1.1.1.1"
	_DEF_PORT = 1234
	_DEF_PROPS = {'foobar': 'baz'}
	
	_STR_TEST_ARGS = [None, 0, [], {}]

	def _test_init_fail(self, name, stype, domain, address, port, properties, fail_msg):
		"""Test something we expect to fail."""
		try:
			service = Service(name, stype, domain, address, port, properties)
		except ValueError, exc:
			pass
		else:
			self.fail("expected a ValueError for %s." % fail_msg)

	def testName(self):
		for item in self._STR_TEST_ARGS:
			self._test_init_fail(item, self._DEF_STYPE, self._DEF_DOMAIN, self._DEF_ADDRESS,
					self._DEF_PORT, self._DEF_PROPS, "invalid name")

	def testType(self):
		for item in self._STR_TEST_ARGS:
			self._test_init_fail(self._DEF_NAME, item, self._DEF_DOMAIN, self._DEF_ADDRESS,
					self._DEF_PORT, self._DEF_PROPS, "invalid service type")
		self._test_init_fail(self._DEF_NAME, "_bork._foobar", self._DEF_DOMAIN, self._DEF_ADDRESS,
				self._DEF_PORT, self._DEF_PROPS, "invalid service type")

	def testDomain(self):
		for item in self._STR_TEST_ARGS:
			self._test_init_fail(self._DEF_NAME, self._DEF_STYPE, item, self._DEF_ADDRESS,
					self._DEF_PORT, self._DEF_PROPS, "invalid domain")
		# Only accept local for now
		self._test_init_fail(self._DEF_NAME, self._DEF_STYPE, "foobar", self._DEF_ADDRESS,
				self._DEF_PORT, self._DEF_PROPS, "invalid domain")
		# Make sure "" works
		service = Service(self._DEF_NAME, self._DEF_STYPE, "", self._DEF_ADDRESS,
				self._DEF_PORT, self._DEF_PROPS)
		assert service, "Empty domain was not accepted!"

	def testAddress(self):
		self._test_init_fail(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, [],
				self._DEF_PORT, self._DEF_PROPS, "invalid address")
		self._test_init_fail(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, {},
				self._DEF_PORT, self._DEF_PROPS, "invalid address")
		self._test_init_fail(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, 1234,
				self._DEF_PORT, self._DEF_PROPS, "invalid address")

	def testPort(self):
		self._test_init_fail(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, self._DEF_ADDRESS,
				[], self._DEF_PROPS, "invalid port")
		self._test_init_fail(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, self._DEF_ADDRESS,
				{}, self._DEF_PROPS, "invalid port")
		self._test_init_fail(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, self._DEF_ADDRESS,
				"adf", self._DEF_PROPS, "invalid port")

	def testGoodInit(self):
		service = Service(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, self._DEF_ADDRESS,
				self._DEF_PORT, self._DEF_PROPS)
		assert service.get_name() == self._DEF_NAME, "service name wasn't correct after init."
		assert service.get_type() == self._DEF_STYPE, "service type wasn't correct after init."
		assert service.get_domain() == "local", "service domain wasn't correct after init."
		assert service.get_address() == self._DEF_ADDRESS, "service address wasn't correct after init."
		assert service.get_port() == self._DEF_PORT, "service port wasn't correct after init."
		value = service.get_one_property('foobar')
		assert value and value == 'baz', "service property wasn't correct after init."

	def testAvahiProperties(self):
		props = [[111, 114, 103, 46, 102, 114, 101, 101, 100, 101, 115, 107, 116, 111, 112, 46, 65, 118, 97, 104, 105, 46, 99, 111, 111, 107, 105, 101, 61, 50, 54, 48, 49, 53, 52, 51, 57, 53, 50]]
		key = "org.freedesktop.Avahi.cookie"
		expected_value = "2601543952"
		service = Service(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, self._DEF_ADDRESS,
				self._DEF_PORT, props)
		value = service.get_one_property(key)
		assert value and value == expected_value, "service properties weren't correct after init."
		value = service.get_one_property('bork')
		assert not value, "service properties weren't correct after init."

	def testBoolProperty(self):
		props = [[111, 114, 103, 46, 102, 114, 101, 101, 100, 101, 115, 107, 116, 111, 112, 46, 65, 118, 97, 104, 105, 46, 99, 111, 111, 107, 105, 101]]
		key = "org.freedesktop.Avahi.cookie"
		expected_value = True
		service = Service(self._DEF_NAME, self._DEF_STYPE, self._DEF_DOMAIN, self._DEF_ADDRESS,
				self._DEF_PORT, props)
		value = service.get_one_property(key)
		assert value is not None and value == expected_value, "service properties weren't correct after init."

	def testGroupService(self):
		# Valid group service type, non-multicast address
		group_stype = "_af5e5a7c998e89b9a_group_olpc._udp"
		self._test_init_fail(self._DEF_NAME, group_stype, self._DEF_DOMAIN, self._DEF_ADDRESS,
				self._DEF_PORT, self._DEF_PROPS, "group service type, but non-multicast address")

		# Valid group service type, None address
		service = Service(self._DEF_NAME, group_stype, self._DEF_DOMAIN, None,
				self._DEF_PORT, self._DEF_PROPS)
		assert service.get_address() == None, "address was not None as expected!"
		# Set address to invalid multicast address
		try:
			service.set_address(self._DEF_ADDRESS)
		except ValueError, exc:
			pass
		else:
			self.fail("expected a ValueError for invalid address.")

		# Valid group service type and multicast address, ensure it works
		mc_addr = "224.0.0.34"
		service = Service(self._DEF_NAME, group_stype, self._DEF_DOMAIN, mc_addr,
				self._DEF_PORT, self._DEF_PROPS)
		assert service.get_address() == mc_addr, "address was not expected address!"

	def addToSuite(suite):
		suite.addTest(ServiceTestCase("testName"))
		suite.addTest(ServiceTestCase("testType"))
		suite.addTest(ServiceTestCase("testDomain"))
		suite.addTest(ServiceTestCase("testAddress"))
		suite.addTest(ServiceTestCase("testPort"))
		suite.addTest(ServiceTestCase("testGoodInit"))
		suite.addTest(ServiceTestCase("testAvahiProperties"))
		suite.addTest(ServiceTestCase("testBoolProperty"))
		suite.addTest(ServiceTestCase("testGroupService"))
	addToSuite = staticmethod(addToSuite)


def main():
	suite = unittest.TestSuite()
	ServiceTestCase.addToSuite(suite)
	runner = unittest.TextTestRunner()
	runner.run(suite)

if __name__ == "__main__":
	main()