Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
path: root/services
diff options
authorSimon McVittie <simon.mcvittie@collabora.co.uk>2007-05-24 13:27:36 (GMT)
committer Simon McVittie <simon.mcvittie@collabora.co.uk>2007-05-24 17:58:58 (GMT)
commitf7ba2aa1e2d77e6a5fc6819f1402be0a0f9f03d8 (patch)
tree3ee043c24ff5f7a16b16c62428d32c942adff3f5 /services
parentf75747015d0ac2f47b8ea22154b3c86d5d250524 (diff)
services/presence/: separate test code into a separate module
Diffstat (limited to 'services')
4 files changed, 331 insertions, 322 deletions
diff --git a/services/presence/Makefile.am b/services/presence/Makefile.am
index 4e28624..569fec7 100644
--- a/services/presence/Makefile.am
+++ b/services/presence/Makefile.am
@@ -13,6 +13,7 @@ sugar_PYTHON = \
buddyiconcache.py \
linklocal_plugin.py \
presenceservice.py \
+ pstest.py \
psutils.py \
diff --git a/services/presence/buddy.py b/services/presence/buddy.py
index 90b000e..67c5eda 100644
--- a/services/presence/buddy.py
+++ b/services/presence/buddy.py
@@ -21,23 +21,15 @@ import gobject
import dbus
import dbus.service
from dbus.gobject_service import ExportedGObject
-from ConfigParser import ConfigParser, NoOptionError
import psutils
-from sugar import env, profile, util
+from sugar import env, profile
import logging
-import random
_BUDDY_PATH = "/org/laptop/Sugar/Presence/Buddies/"
_BUDDY_INTERFACE = "org.laptop.Sugar.Presence.Buddy"
_OWNER_INTERFACE = "org.laptop.Sugar.Presence.Buddy.Owner"
-class NotFoundError(dbus.DBusException):
- """Raised when a given actor is not found on the network"""
- def __init__(self):
- dbus.DBusException.__init__(self)
- self._dbus_error_name = _PRESENCE_INTERFACE + '.NotFound'
_PROP_NICK = "nick"
_PROP_KEY = "key"
_PROP_ICON = "icon"
@@ -75,14 +67,25 @@ class Buddy(ExportedGObject):
__gsignals__ = {
- 'validity-changed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
- ([gobject.TYPE_BOOLEAN])),
- 'property-changed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
- ([gobject.TYPE_PYOBJECT])),
- 'icon-changed': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
- ([gobject.TYPE_PYOBJECT])),
- 'disappeared': (gobject.SIGNAL_RUN_FIRST, gobject.TYPE_NONE,
- ([])),
+ 'validity-changed':
+ # The buddy's validity changed.
+ # Validity starts off False, and becomes True when the buddy
+ # has a color, a nick and a key.
+ # * the new validity: bool
+ (gobject.SIGNAL_RUN_FIRST, None, [bool]),
+ 'property-changed':
+ # One of the buddy's properties has changed.
+ # * those properties that have changed:
+ # dict { str => object }
+ (gobject.SIGNAL_RUN_FIRST, None, [object]),
+ 'icon-changed':
+ # The buddy's icon changed.
+ # * the bytes of the icon: str
+ (gobject.SIGNAL_RUN_FIRST, None, [object]),
+ 'disappeared':
+ # The buddy is offline (has no Telepathy handles and is not the
+ # Owner)
+ (gobject.SIGNAL_RUN_FIRST, None, []),
__gproperties__ = {
@@ -482,6 +485,7 @@ class Buddy(ExportedGObject):
except AttributeError:
self._valid = False
class GenericOwner(Buddy):
"""Common functionality for Local User-like objects
@@ -560,6 +564,7 @@ class GenericOwner(Buddy):
"""Customisation point: handle the registration of the owner"""
raise RuntimeError("Subclasses must implement")
class ShellOwner(GenericOwner):
"""Representation of the local-machine owner using Sugar's Shell
@@ -667,300 +672,3 @@ class ShellOwner(GenericOwner):
activity_id = None
props = {_PROP_CURACT: activity_id}
-class TestOwner(GenericOwner):
- """Class representing the owner of the machine. This test owner
- changes random attributes periodically."""
- __gtype_name__ = "TestOwner"
- def __init__(self, ps, bus_name, object_id, test_num, randomize):
- self._cp = ConfigParser()
- self._section = "Info"
- self._test_activities = []
- self._test_cur_act = ""
- self._change_timeout = 0
- self._cfg_file = os.path.join(env.get_profile_path(), 'test-buddy-%d' % test_num)
- (pubkey, privkey, registered) = self._load_config()
- if not pubkey or not len(pubkey) or not privkey or not len(privkey):
- (pubkey, privkey) = _get_new_keypair(test_num)
- if not pubkey or not privkey:
- raise RuntimeError("Couldn't get or create test buddy keypair")
- self._save_config(pubkey, privkey, registered)
- privkey_hash = util.printable_hash(util._sha_data(privkey))
- nick = _get_random_name()
- from sugar.graphics import xocolor
- color = xocolor.XoColor().to_string()
- icon = _get_random_image()
- _logger.debug("pubkey is %s" % pubkey)
- GenericOwner.__init__(self, ps, bus_name, object_id, key=pubkey, nick=nick,
- color=color, icon=icon, registered=registered, key_hash=privkey_hash)
- # Only do the random stuff if randomize is true
- if randomize:
- self._ps.connect('connection-status', self._ps_connection_status_cb)
- def _share_reply_cb(self, actid, object_path):
- activity = self._ps.internal_get_activity(actid)
- if not activity or not object_path:
- _logger.debug("Couldn't find activity %s even though it was shared." % actid)
- return
- _logger.debug("Shared activity %s (%s)." % (actid, activity.props.name))
- self._test_activities.append(activity)
- def _share_error_cb(self, actid, err):
- _logger.debug("Error sharing activity %s: %s" % (actid, str(err)))
- def _ps_connection_status_cb(self, ps, connected):
- if not connected:
- return
- if not len(self._test_activities):
- # Share some activities
- actid = util.unique_id("Activity 1")
- callbacks = (lambda *args: self._share_reply_cb(actid, *args),
- lambda *args: self._share_error_cb(actid, *args))
- atype = "org.laptop.WebActivity"
- properties = {"foo": "bar"}
- self._ps._share_activity(actid, atype, "Wembley Stadium", properties, callbacks)
- actid2 = util.unique_id("Activity 2")
- callbacks = (lambda *args: self._share_reply_cb(actid2, *args),
- lambda *args: self._share_error_cb(actid2, *args))
- atype = "org.laptop.WebActivity"
- properties = {"baz": "bar"}
- self._ps._share_activity(actid2, atype, "Maine Road", properties, callbacks)
- # Change a random property ever 10 seconds
- if self._change_timeout == 0:
- self._change_timeout = gobject.timeout_add(10000, self._update_something)
- def set_registered(self, value):
- if value:
- self._registered = True
- def _load_config(self):
- if not os.path.exists(self._cfg_file):
- return (None, None, False)
- if not self._cp.read([self._cfg_file]):
- return (None, None, False)
- if not self._cp.has_section(self._section):
- return (None, None, False)
- try:
- pubkey = self._cp.get(self._section, "pubkey")
- privkey = self._cp.get(self._section, "privkey")
- registered = self._cp.get(self._section, "registered")
- return (pubkey, privkey, registered)
- except NoOptionError:
- pass
- return (None, None, False)
- def _save_config(self, pubkey, privkey, registered):
- # Save config again
- if not self._cp.has_section(self._section):
- self._cp.add_section(self._section)
- self._cp.set(self._section, "pubkey", pubkey)
- self._cp.set(self._section, "privkey", privkey)
- self._cp.set(self._section, "registered", registered)
- f = open(self._cfg_file, 'w')
- self._cp.write(f)
- f.close()
- def _update_something(self):
- it = random.randint(0, 10000) % 4
- if it == 0:
- self.props.icon = _get_random_image()
- elif it == 1:
- from sugar.graphics import xocolor
- props = {_PROP_COLOR: xocolor.XoColor().to_string()}
- self.set_properties(props)
- elif it == 2:
- props = {_PROP_NICK: _get_random_name()}
- self.set_properties(props)
- elif it == 3:
- actid = ""
- idx = random.randint(0, len(self._test_activities))
- # if idx == len(self._test_activites), it means no current
- # activity
- if idx < len(self._test_activities):
- activity = self._test_activities[idx]
- actid = activity.props.id
- props = {_PROP_CURACT: actid}
- self.set_properties(props)
- return True
-def _hash_private_key(self):
- """Unused method to has a private key, see profile"""
- self.privkey_hash = None
- key_path = os.path.join(env.get_profile_path(), 'owner.key')
- try:
- f = open(key_path, "r")
- lines = f.readlines()
- f.close()
- except IOError, e:
- _logger.error("Error reading private key: %s" % e)
- return
- key = ""
- for l in lines:
- l = l.strip()
- if l.startswith("-----BEGIN DSA PRIVATE KEY-----"):
- continue
- if l.startswith("-----END DSA PRIVATE KEY-----"):
- continue
- key += l
- if not len(key):
- _logger.error("Error parsing public key.")
- # hash it
- key_hash = util._sha_data(key)
- self.privkey_hash = util.printable_hash(key_hash)
-def _extract_public_key(keyfile):
- try:
- f = open(keyfile, "r")
- lines = f.readlines()
- f.close()
- except IOError, e:
- _logger.error("Error reading public key: %s" % e)
- return None
- # Extract the public key
- magic = "ssh-dss "
- key = ""
- for l in lines:
- l = l.strip()
- if not l.startswith(magic):
- continue
- key = l[len(magic):]
- break
- if not len(key):
- _logger.error("Error parsing public key.")
- return None
- return key
-def _extract_private_key(keyfile):
- """Get a private key from a private key file"""
- # Extract the private key
- try:
- f = open(keyfile, "r")
- lines = f.readlines()
- f.close()
- except IOError, e:
- _logger.error("Error reading private key: %s" % e)
- return None
- key = ""
- for l in lines:
- l = l.strip()
- if l.startswith("-----BEGIN DSA PRIVATE KEY-----"):
- continue
- if l.startswith("-----END DSA PRIVATE KEY-----"):
- continue
- key += l
- if not len(key):
- _logger.error("Error parsing private key.")
- return None
- return key
-def _get_new_keypair(num):
- """Retrieve a public/private key pair for testing"""
- # Generate keypair
- privkeyfile = os.path.join("/tmp", "test%d.key" % num)
- pubkeyfile = os.path.join("/tmp", 'test%d.key.pub' % num)
- # force-remove key files if they exist to ssh-keygen doesn't
- # start asking questions
- try:
- os.remove(pubkeyfile)
- os.remove(privkeyfile)
- except OSError:
- pass
- cmd = "ssh-keygen -q -t dsa -f %s -C '' -N ''" % privkeyfile
- import commands
- print "Generating new keypair..."
- (s, o) = commands.getstatusoutput(cmd)
- print "Done."
- pubkey = privkey = None
- if s != 0:
- _logger.error("Could not generate key pair: %d (%s)" % (s, o))
- else:
- pubkey = _extract_public_key(pubkeyfile)
- privkey = _extract_private_key(privkeyfile)
- try:
- os.remove(pubkeyfile)
- os.remove(privkeyfile)
- except OSError:
- pass
- return (pubkey, privkey)
-def _get_random_name():
- """Produce random names for testing"""
- names = ["Liam", "Noel", "Guigsy", "Whitey", "Bonehead"]
- return names[random.randint(0, len(names) - 1)]
-def _get_random_image():
- """Produce a random image for display"""
- import cairo, math, random, gtk
- def rand():
- return random.random()
- SIZE = 200
- s = cairo.ImageSurface(cairo.FORMAT_ARGB32, SIZE, SIZE)
- cr = cairo.Context(s)
- # background gradient
- cr.save()
- g = cairo.LinearGradient(0, 0, 1, 1)
- g.add_color_stop_rgba(1, rand(), rand(), rand(), rand())
- g.add_color_stop_rgba(0, rand(), rand(), rand(), rand())
- cr.set_source(g)
- cr.rectangle(0, 0, SIZE, SIZE);
- cr.fill()
- cr.restore()
- # random path
- cr.set_line_width(10 * rand() + 5)
- cr.move_to(SIZE * rand(), SIZE * rand())
- cr.line_to(SIZE * rand(), SIZE * rand())
- cr.rel_line_to(SIZE * rand() * -1, 0)
- cr.close_path()
- cr.stroke()
- # a circle
- cr.set_source_rgba(rand(), rand(), rand(), rand())
- cr.arc(SIZE * rand(), SIZE * rand(), 100 * rand() + 30, 0, 2 * math.pi)
- cr.fill()
- # another circle
- cr.set_source_rgba(rand(), rand(), rand(), rand())
- cr.arc(SIZE * rand(), SIZE * rand(), 100 * rand() + 30, 0, 2 * math.pi)
- cr.fill()
- def img_convert_func(buf, data):
- data[0] += buf
- return True
- data = [""]
- pixbuf = gtk.gdk.pixbuf_new_from_data(s.get_data(), gtk.gdk.COLORSPACE_RGB,
- True, 8, s.get_width(), s.get_height(), s.get_stride())
- pixbuf.save_to_callback(img_convert_func, "jpeg", {"quality": "90"}, data)
- del pixbuf
- return str(data[0])
diff --git a/services/presence/presenceservice.py b/services/presence/presenceservice.py
index 3ca2560..94b6f12 100644
--- a/services/presence/presenceservice.py
+++ b/services/presence/presenceservice.py
@@ -33,7 +33,7 @@ from server_plugin import ServerPlugin
from linklocal_plugin import LinkLocalPlugin
from sugar import util
-from buddy import Buddy, ShellOwner, TestOwner
+from buddy import Buddy, ShellOwner
from activity import Activity
_PRESENCE_SERVICE = "org.laptop.Sugar.Presence"
@@ -57,7 +57,11 @@ class PresenceService(ExportedGObject):
- def __init__(self, test_num=0, randomize=False):
+ def _create_owner(self):
+ # Overridden by TestPresenceService
+ return ShellOwner(self, self._bus_name, self._get_next_object_id())
+ def __init__(self):
self._next_object_id = 0
self._connected = False
@@ -72,11 +76,7 @@ class PresenceService(ExportedGObject):
# Create the Owner object
- objid = self._get_next_object_id()
- if test_num > 0:
- self._owner = TestOwner(self, self._bus_name, objid, test_num, randomize)
- else:
- self._owner = ShellOwner(self, self._bus_name, objid)
+ self._owner = self._create_owner()
self._buddies[self._owner.props.key] = self._owner
self._registry = ManagerRegistry()
@@ -427,7 +427,13 @@ class PresenceService(ExportedGObject):
def main(test_num=0, randomize=False):
loop = gobject.MainLoop()
- ps = PresenceService(test_num, randomize)
+ if test_num > 0:
+ from pstest import TestPresenceService
+ ps = TestPresenceService(test_num, randomize)
+ else:
+ ps = PresenceService()
except KeyboardInterrupt:
diff --git a/services/presence/pstest.py b/services/presence/pstest.py
new file mode 100644
index 0000000..74cad8c
--- /dev/null
+++ b/services/presence/pstest.py
@@ -0,0 +1,294 @@
+import logging
+import os
+import random
+from ConfigParser import ConfigParser, NoOptionError
+import gobject
+from sugar import env, util
+from buddy import GenericOwner, _PROP_NICK, _PROP_CURACT, _PROP_COLOR
+from presenceservice import PresenceService
+_logger = logging.getLogger('s-p-s.pstest')
+class TestOwner(GenericOwner):
+ """Class representing the owner of the machine. This test owner
+ changes random attributes periodically."""
+ __gtype_name__ = "TestOwner"
+ def __init__(self, ps, bus_name, object_id, test_num, randomize):
+ self._cp = ConfigParser()
+ self._section = "Info"
+ self._test_activities = []
+ self._test_cur_act = ""
+ self._change_timeout = 0
+ self._cfg_file = os.path.join(env.get_profile_path(), 'test-buddy-%d' % test_num)
+ (pubkey, privkey, registered) = self._load_config()
+ if not pubkey or not len(pubkey) or not privkey or not len(privkey):
+ (pubkey, privkey) = _get_new_keypair(test_num)
+ if not pubkey or not privkey:
+ raise RuntimeError("Couldn't get or create test buddy keypair")
+ self._save_config(pubkey, privkey, registered)
+ privkey_hash = util.printable_hash(util._sha_data(privkey))
+ nick = _get_random_name()
+ from sugar.graphics import xocolor
+ color = xocolor.XoColor().to_string()
+ icon = _get_random_image()
+ _logger.debug("pubkey is %s" % pubkey)
+ GenericOwner.__init__(self, ps, bus_name, object_id, key=pubkey, nick=nick,
+ color=color, icon=icon, registered=registered, key_hash=privkey_hash)
+ # Only do the random stuff if randomize is true
+ if randomize:
+ self._ps.connect('connection-status', self._ps_connection_status_cb)
+ def _share_reply_cb(self, actid, object_path):
+ activity = self._ps.internal_get_activity(actid)
+ if not activity or not object_path:
+ _logger.debug("Couldn't find activity %s even though it was shared." % actid)
+ return
+ _logger.debug("Shared activity %s (%s)." % (actid, activity.props.name))
+ self._test_activities.append(activity)
+ def _share_error_cb(self, actid, err):
+ _logger.debug("Error sharing activity %s: %s" % (actid, str(err)))
+ def _ps_connection_status_cb(self, ps, connected):
+ if not connected:
+ return
+ if not len(self._test_activities):
+ # Share some activities
+ actid = util.unique_id("Activity 1")
+ callbacks = (lambda *args: self._share_reply_cb(actid, *args),
+ lambda *args: self._share_error_cb(actid, *args))
+ atype = "org.laptop.WebActivity"
+ properties = {"foo": "bar"}
+ self._ps._share_activity(actid, atype, "Wembley Stadium", properties, callbacks)
+ actid2 = util.unique_id("Activity 2")
+ callbacks = (lambda *args: self._share_reply_cb(actid2, *args),
+ lambda *args: self._share_error_cb(actid2, *args))
+ atype = "org.laptop.WebActivity"
+ properties = {"baz": "bar"}
+ self._ps._share_activity(actid2, atype, "Maine Road", properties, callbacks)
+ # Change a random property ever 10 seconds
+ if self._change_timeout == 0:
+ self._change_timeout = gobject.timeout_add(10000, self._update_something)
+ def set_registered(self, value):
+ if value:
+ self._registered = True
+ def _load_config(self):
+ if not os.path.exists(self._cfg_file):
+ return (None, None, False)
+ if not self._cp.read([self._cfg_file]):
+ return (None, None, False)
+ if not self._cp.has_section(self._section):
+ return (None, None, False)
+ try:
+ pubkey = self._cp.get(self._section, "pubkey")
+ privkey = self._cp.get(self._section, "privkey")
+ registered = self._cp.get(self._section, "registered")
+ return (pubkey, privkey, registered)
+ except NoOptionError:
+ pass
+ return (None, None, False)
+ def _save_config(self, pubkey, privkey, registered):
+ # Save config again
+ if not self._cp.has_section(self._section):
+ self._cp.add_section(self._section)
+ self._cp.set(self._section, "pubkey", pubkey)
+ self._cp.set(self._section, "privkey", privkey)
+ self._cp.set(self._section, "registered", registered)
+ f = open(self._cfg_file, 'w')
+ self._cp.write(f)
+ f.close()
+ def _update_something(self):
+ it = random.randint(0, 10000) % 4
+ if it == 0:
+ self.props.icon = _get_random_image()
+ elif it == 1:
+ from sugar.graphics import xocolor
+ props = {_PROP_COLOR: xocolor.XoColor().to_string()}
+ self.set_properties(props)
+ elif it == 2:
+ props = {_PROP_NICK: _get_random_name()}
+ self.set_properties(props)
+ elif it == 3:
+ actid = ""
+ idx = random.randint(0, len(self._test_activities))
+ # if idx == len(self._test_activites), it means no current
+ # activity
+ if idx < len(self._test_activities):
+ activity = self._test_activities[idx]
+ actid = activity.props.id
+ props = {_PROP_CURACT: actid}
+ self.set_properties(props)
+ return True
+class TestPresenceService(PresenceService):
+ def __init__(self, test_num=0, randomize=False):
+ self.__test_num = test_num
+ self.__randomize = randomize
+ PresenceService.__init__(self)
+ def _create_owner(self):
+ return TestOwner(self, self._bus_name, self._get_next_object_id(),
+ self.__test_num, self.__randomize)
+def _extract_public_key(keyfile):
+ try:
+ f = open(keyfile, "r")
+ lines = f.readlines()
+ f.close()
+ except IOError, e:
+ _logger.error("Error reading public key: %s" % e)
+ return None
+ # Extract the public key
+ magic = "ssh-dss "
+ key = ""
+ for l in lines:
+ l = l.strip()
+ if not l.startswith(magic):
+ continue
+ key = l[len(magic):]
+ break
+ if not len(key):
+ _logger.error("Error parsing public key.")
+ return None
+ return key
+def _extract_private_key(keyfile):
+ """Get a private key from a private key file"""
+ # Extract the private key
+ try:
+ f = open(keyfile, "r")
+ lines = f.readlines()
+ f.close()
+ except IOError, e:
+ _logger.error("Error reading private key: %s" % e)
+ return None
+ key = ""
+ for l in lines:
+ l = l.strip()
+ if l.startswith("-----BEGIN DSA PRIVATE KEY-----"):
+ continue
+ if l.startswith("-----END DSA PRIVATE KEY-----"):
+ continue
+ key += l
+ if not len(key):
+ _logger.error("Error parsing private key.")
+ return None
+ return key
+def _get_new_keypair(num):
+ """Retrieve a public/private key pair for testing"""
+ # Generate keypair
+ privkeyfile = os.path.join("/tmp", "test%d.key" % num)
+ pubkeyfile = os.path.join("/tmp", 'test%d.key.pub' % num)
+ # force-remove key files if they exist to ssh-keygen doesn't
+ # start asking questions
+ try:
+ os.remove(pubkeyfile)
+ os.remove(privkeyfile)
+ except OSError:
+ pass
+ cmd = "ssh-keygen -q -t dsa -f %s -C '' -N ''" % privkeyfile
+ import commands
+ print "Generating new keypair..."
+ (s, o) = commands.getstatusoutput(cmd)
+ print "Done."
+ pubkey = privkey = None
+ if s != 0:
+ _logger.error("Could not generate key pair: %d (%s)" % (s, o))
+ else:
+ pubkey = _extract_public_key(pubkeyfile)
+ privkey = _extract_private_key(privkeyfile)
+ try:
+ os.remove(pubkeyfile)
+ os.remove(privkeyfile)
+ except OSError:
+ pass
+ return (pubkey, privkey)
+def _get_random_name():
+ """Produce random names for testing"""
+ names = ["Liam", "Noel", "Guigsy", "Whitey", "Bonehead"]
+ return names[random.randint(0, len(names) - 1)]
+def _get_random_image():
+ """Produce a random image for display"""
+ import cairo, math, gtk
+ def rand():
+ return random.random()
+ SIZE = 200
+ s = cairo.ImageSurface(cairo.FORMAT_ARGB32, SIZE, SIZE)
+ cr = cairo.Context(s)
+ # background gradient
+ cr.save()
+ g = cairo.LinearGradient(0, 0, 1, 1)
+ g.add_color_stop_rgba(1, rand(), rand(), rand(), rand())
+ g.add_color_stop_rgba(0, rand(), rand(), rand(), rand())
+ cr.set_source(g)
+ cr.rectangle(0, 0, SIZE, SIZE);
+ cr.fill()
+ cr.restore()
+ # random path
+ cr.set_line_width(10 * rand() + 5)
+ cr.move_to(SIZE * rand(), SIZE * rand())
+ cr.line_to(SIZE * rand(), SIZE * rand())
+ cr.rel_line_to(SIZE * rand() * -1, 0)
+ cr.close_path()
+ cr.stroke()
+ # a circle
+ cr.set_source_rgba(rand(), rand(), rand(), rand())
+ cr.arc(SIZE * rand(), SIZE * rand(), 100 * rand() + 30, 0, 2 * math.pi)
+ cr.fill()
+ # another circle
+ cr.set_source_rgba(rand(), rand(), rand(), rand())
+ cr.arc(SIZE * rand(), SIZE * rand(), 100 * rand() + 30, 0, 2 * math.pi)
+ cr.fill()
+ def img_convert_func(buf, data):
+ data[0] += buf
+ return True
+ data = [""]
+ pixbuf = gtk.gdk.pixbuf_new_from_data(s.get_data(), gtk.gdk.COLORSPACE_RGB,
+ True, 8, s.get_width(), s.get_height(), s.get_stride())
+ pixbuf.save_to_callback(img_convert_func, "jpeg", {"quality": "90"}, data)
+ del pixbuf
+ return str(data[0])