Web   ·   Wiki   ·   Activities   ·   Blog   ·   Lists   ·   Chat   ·   Meeting   ·   Bugs   ·   Git   ·   Translate   ·   Archive   ·   People   ·   Donate
summaryrefslogtreecommitdiffstats
path: root/extensions/web/facebook/facebook.py
diff options
context:
space:
mode:
Diffstat (limited to 'extensions/web/facebook/facebook.py')
-rw-r--r--extensions/web/facebook/facebook.py293
1 files changed, 293 insertions, 0 deletions
diff --git a/extensions/web/facebook/facebook.py b/extensions/web/facebook/facebook.py
new file mode 100644
index 0000000..48d5921
--- /dev/null
+++ b/extensions/web/facebook/facebook.py
@@ -0,0 +1,293 @@
+#!/usr/bin/env python
+#
+# Copyright (c) 2012 Raul Gutierrez S. - rgs@itevenworks.net
+
+#Permission is hereby granted, free of charge, to any person obtaining a copy
+#of this software and associated documentation files (the "Software"), to deal
+#in the Software without restriction, including without limitation the rights
+#to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+#copies of the Software, and to permit persons to whom the Software is
+#furnished to do so, subject to the following conditions:
+
+#The above copyright notice and this permission notice shall be included in
+#all copies or substantial portions of the Software.
+
+#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+#IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+#FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+#AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+#LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+#OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+#THE SOFTWARE.
+
+import json
+import logging
+import pycurl
+import time
+import urllib
+
+from gi.repository import GObject
+
+class FbAccount():
+ _access_token = ""
+
+ @classmethod
+ def set_access_token(cls, access_token):
+ cls._access_token = access_token
+
+ @classmethod
+ def access_token(cls):
+ return cls._access_token
+
+class FbObjectNotCreatedException(Exception):
+ pass
+
+class FbBadCall(Exception):
+ pass
+
+class FbPhoto(GObject.GObject):
+ PHOTOS_URL = "https://graph.facebook.com/me/photos?access_token=%s"
+ COMMENTS_URL = "https://graph.facebook.com/%s/comments"
+
+ __gsignals__ = {
+ 'photo-created': (GObject.SignalFlags.RUN_FIRST, None, ([str])),
+ 'photo-create-failed': (GObject.SignalFlags.RUN_FIRST, None, ([str])),
+ 'comment-added': (GObject.SignalFlags.RUN_FIRST, None, ([str])),
+ 'comment-add-failed': (GObject.SignalFlags.RUN_FIRST, None, ([str])),
+ 'comments-downloaded': (GObject.SignalFlags.RUN_FIRST, None, ([object])),
+ 'comments-download-failed': (GObject.SignalFlags.RUN_FIRST, None, ([str])),
+ 'likes-downloaded': (GObject.SignalFlags.RUN_FIRST, None, ([object])),
+ }
+
+ def __init__(self, fb_object_id=None):
+ GObject.GObject.__init__(self)
+ self.fb_object_id = fb_object_id
+
+ def create(self, image_path):
+ GObject.idle_add(self._create, image_path)
+
+ def add_comment(self, comment):
+ self.check_created('add_comment')
+ GObject.idle_add(self._add_comment, comment)
+
+ def refresh_comments(self):
+ """ raise an exception if no one is listening """
+ self.check_created('refresh_comments')
+ GObject.idle_add(self._refresh_comments)
+
+ def check_created(self, method_name):
+ if self.fb_object_id is None:
+ errmsg = "Need to call create before calling %s" % (method_name)
+ raise FbObjectNotCreatedException(errmsg)
+
+ def _add_comment(self, comment):
+ url = self.COMMENTS_URL % (self.fb_object_id)
+
+ response = []
+ def write_cb(buf):
+ response.append(buf)
+
+ res = self._http_call(url, [('message', comment)], write_cb, post=True)
+ if res == 200:
+ try:
+ comment_id = self._id_from_response("".join(response))
+ self.emit('comment-added', comment_id)
+ except FbBadCall as ex:
+ self.emit('comment-add-failed', str(ex))
+ else:
+ logging.debug("_add_comment failed, HTTP resp code: %d" % (res))
+ self.emit('comment-add-failed', "Add comment failed: %d" % (res))
+
+ def _create(self, image_path):
+ url = self.PHOTOS_URL % (FbAccount.access_token())
+ c = pycurl.Curl()
+ params = [('source', (c.FORM_FILE, image_path))]
+
+ response = []
+ def write_cb(buf):
+ response.append(buf)
+
+ result = self._http_call(url, params, write_cb, post=True)
+ if result == 200:
+ photo_id = self._id_from_response("".join(response))
+ self.fb_object_id = photo_id
+ self.emit('photo-created', photo_id)
+ else:
+ logging.debug("_create failed, HTTP resp code: %d" % result)
+
+ if result == 400:
+ failed_reason = "Expired access token."
+ elif result == 6:
+ failed_reason = "Network is down."
+ failed_reason += \
+ "Please connect to the network and try again."
+ else:
+ failed_reason = "Failed reason unknown: %s" % (str(result))
+
+ self.emit('photo-create-failed', failed_reason)
+
+ def _id_from_response(self, response_str):
+ response_object = json.loads(response_str)
+
+ if not "id" in response_object:
+ raise FbBadCall(response_str)
+
+ fb_object_id = response_object['id'].encode('ascii', 'replace')
+ return fb_object_id
+
+ def _refresh_comments(self):
+ """ this blocks """
+ url = self.COMMENTS_URL % (self.fb_object_id)
+
+ logging.debug("_refresh_comments fetching %s" % (url))
+
+ response_comments = []
+ def write_cb(buf):
+ response_comments.append(buf)
+
+ ret = self._http_call(url, [], write_cb, post=False)
+ if ret != 200:
+ logging.debug("_refresh_comments failed, HTTP resp code: %d" % ret)
+ self.emit('comments-download-failed',
+ "Comments download failed: %d" % (ret))
+ return
+
+ logging.debug("_refresh_comments: %s" % ("".join(response_comments)))
+
+ try:
+ response_data = json.loads("".join(response_comments))
+ if 'data' not in response_data:
+ logging.debug("No data inside the FB response")
+ self.emit('comments-download-failed',
+ "Comments download failed with no data")
+ return
+ except Exception as ex:
+ logging.debug("Couldn't parse FB response: %s" % str(ex))
+ self.emit('comments-download-failed',
+ "Comments download failed: %s" % (str(ex)))
+ return
+
+ comments = []
+ for c in response_data['data']:
+ comment = {} # this should be an Object
+ comment['from'] = c['from']['name']
+ comment['message'] = c['message']
+ comment['created_time'] = c['created_time']
+ comment['like_count'] = c['like_count']
+ comments.append(comment)
+
+ if len(comments) > 0:
+ self.emit('comments-downloaded', comments)
+ else:
+ self.emit('comments-download-failed', 'No comments found')
+
+ def _http_call(self, url, params, write_cb, post=False):
+ app_auth_params = [('access_token', FbAccount.access_token())]
+
+ c = pycurl.Curl()
+ c.setopt(c.WRITEFUNCTION, write_cb)
+
+ if post:
+ c.setopt(c.POST, 1)
+ c.setopt(c.HTTPPOST, app_auth_params + params)
+ else:
+ c.setopt(c.HTTPGET, 1)
+ params_str = urllib.urlencode(app_auth_params + params)
+ url = "%s?%s" % (url, params_str)
+
+ logging.debug("_http_call: %s" % (url))
+
+ c.setopt(c.URL, url)
+ c.perform()
+ result = c.getinfo(c.HTTP_CODE)
+ c.close()
+
+ return result
+
+
+if __name__ == '__main__':
+ import sys
+ if len(sys.argv) != 3:
+ print "Tests need access_token and an image path!"
+ exit(1)
+
+ access_token, photo_path = sys.argv[1:3]
+ FbAccount.set_access_token(access_token)
+
+
+def test_create_photo(loop):
+ def photo_created_cb(photo, photo_id, loop):
+ print "Photo created: %s" % (photo_id)
+ loop.quit()
+
+ photo = FbPhoto()
+ photo.connect('photo-created', photo_created_cb, loop)
+ photo.create(photo_path)
+
+def test_add_comment(loop):
+ def photo_created_cb(photo, photo_id, loop):
+ print "Photo created: %s" % (photo_id)
+
+ def comment_added_cb(photo, comment_id, loop):
+ print "Comment created: %s" % (comment_id)
+ loop.quit()
+ return False
+
+ photo = FbPhoto(photo_id)
+ photo.connect("comment-added", comment_added_cb, loop)
+ photo.add_comment("this is a test")
+ return False
+
+ photo = FbPhoto()
+ photo.connect('photo-created', photo_created_cb, loop)
+ photo.create(photo_path)
+
+def test_get_comments(loop):
+ def photo_created_cb(photo, photo_id, loop):
+ print "Photo created: %s" % (photo_id)
+
+ def comment_added_cb(photo, comment_id, loop):
+ print "Comment created: %s" % (comment_id)
+
+ def comments_downloaded_cb(photo, comments, loop):
+ print "%s comments for photo %s" % \
+ (len(comments), photo.fb_object_id)
+
+ for c in comments:
+ print "Comment from %s with message: %s" % \
+ (c["from"], c["message"])
+
+ loop.quit()
+
+ photo.connect('comments-downloaded',
+ comments_downloaded_cb,
+ loop)
+ photo.refresh_comments()
+ return False
+
+ photo = FbPhoto(photo_id)
+ photo.connect("comment-added", comment_added_cb, loop)
+ photo.add_comment("this is a test")
+ return False
+
+ photo = FbPhoto()
+ photo.connect('photo-created', photo_created_cb, loop)
+ photo.create(photo_path)
+
+
+def timeout_cb(test_name, loop):
+ print "%s timed out and failed" % (test_name)
+ loop.quit()
+ return False
+
+if __name__ == '__main__':
+ tests = [eval(t) for t in dir() if t.startswith('test_')]
+
+ for t in tests:
+ print "\n=== Starting %s (%s) ===" % (t.__name__, time.time())
+ loop = GObject.MainLoop()
+ tid = GObject.timeout_add(30000, timeout_cb, t.__name__, loop)
+ t(loop)
+ loop.run()
+ GObject.source_remove(tid)
+ print "=== Finished %s (%s) ===\n" % (t.__name__, time.time())