From 150ae1566d813acbb773839e919db2c0f467931c Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 16 Apr 2014 00:00:40 +0530 Subject: adding code to support async and pam client capabilities with python v2 and v3 --- python/Pubnub.py | 376 +++++++++++++++++++------------------ python/examples/publish-example.py | 87 ++++++--- python/tests/subscribe-test.py | 19 +- python/unassembled/Platform.py | 120 +++++++++--- 4 files changed, 355 insertions(+), 247 deletions(-) (limited to 'python') diff --git a/python/Pubnub.py b/python/Pubnub.py index f3c518c..95eafd0 100644 --- a/python/Pubnub.py +++ b/python/Pubnub.py @@ -176,12 +176,13 @@ import time import hashlib import uuid import sys -from urllib import quote + +try: from urllib.parse import quote +except: from urllib2 import quote from base64 import urlsafe_b64encode from hashlib import sha256 -from urllib import quote -from urllib import urlopen + import hmac @@ -233,12 +234,11 @@ class PubnubBase(object): self.uuid = UUID or str(uuid.uuid4()) if type(sys.version_info) is tuple: - self.python_version = 2 - self.pc = PubnubCrypto2() + self.python_version = 2 + self.pc = PubnubCrypto2() else: self.python_version = 3 self.pc = PubnubCrypto3() - if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") @@ -357,7 +357,10 @@ class PubnubBase(object): if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) else: if (callback != None):callback(response) - if (callback != None): return _new_format_callback + if (callback != None): + return _new_format_callback + else: + return None def publish( self, args ) : @@ -392,23 +395,28 @@ class PubnubBase(object): if 'callback' in args : callback = args['callback'] else : - callback = None + callback = None + + if 'error' in args : + error = args['error'] + else : + error = None - #message = json.dumps(args['message'], separators=(',',':')) message = self.encrypt(args['message']) - signature = self.sign(channel, message) + #signature = self.sign(channel, message) ## Send Message return self._request({"urlcomponents": [ 'publish', self.publish_key, self.subscribe_key, - signature, + '0', channel, '0', message - ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def presence( self, args ) : """ @@ -472,12 +480,10 @@ class PubnubBase(object): """ channel = str(args['channel']) - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - + + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None + ## Fail if bad input. if not channel : raise Exception('Missing Channel') @@ -488,59 +494,16 @@ class PubnubBase(object): 'v2','presence', 'sub_key', self.subscribe_key, 'channel', channel - ]}, callback); - - - def history( self, args ) : + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) + + def history(self, args) : """ #** #* History #* #* Load history from a channel. #* - #* @param array args with 'channel' and 'limit'. - #* @return mixed false on fail, array on success. - #* - - ## History Example - history = pubnub.history({ - 'channel' : 'hello_world', - 'limit' : 1 - }) - print(history) - - """ - ## Capture User Input - limit = 'limit' in args and int(args['limit']) or 10 - channel = str(args['channel']) - - ## Fail if bad input. - if not channel : - raise Exception('Missing Channel') - return False - - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - - ## Get History - return self._request({ "urlcomponents" : [ - 'history', - self.subscribe_key, - channel, - '0', - str(limit) - ] }, callback); - - def detailedHistory(self, args) : - """ - #** - #* Detailed History - #* - #* Load Detailed history from a channel. - #* #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count' #* @return mixed false on fail, array on success. #* @@ -556,34 +519,21 @@ class PubnubBase(object): ## Capture User Input channel = str(args['channel']) - params = dict() - count = 100 - - if 'count' in args: - count = int(args['count']) + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None - params['count'] = str(count) - - if 'reverse' in args: - params['reverse'] = str(args['reverse']).lower() - - if 'start' in args: - params['start'] = str(args['start']) + params = dict() - if 'end' in args: - params['end'] = str(args['end']) + params['count'] = str(args['count']) if 'count' in args else 100 + params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false' + params['start'] = str(args['start']) if 'start' in args else None + params['end'] = str(args['end']) if 'end' in args else None ## Fail if bad input. if not channel : raise Exception('Missing Channel') return False - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - ## Get History return self._request({ 'urlcomponents' : [ 'v2', @@ -592,7 +542,8 @@ class PubnubBase(object): self.subscribe_key, 'channel', channel, - ],'urlparams' : params }, callback=callback); + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def time(self, args = None) : """ @@ -610,10 +561,9 @@ class PubnubBase(object): """ ## Capture Callback - if args and 'callback' in args: - callback = args['callback'] - else : - callback = None + + callback = callback if args and 'callback' in args else None + time = self._request({'urlcomponents' : [ 'time', '0' @@ -637,7 +587,8 @@ class PubnubBase(object): ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): - url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()]) + url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None]) + #print(url) return url @@ -648,9 +599,14 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): + def __enter__(self): + pass + def __exit__(self,a,b,c): + pass + +empty_lock = EmptyLock() class PubnubCoreAsync(PubnubBase): @@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase): auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', - uuid = None + uuid = None, + _tt_lock=empty_lock, + _channel_list_lock=empty_lock ) : """ #** @@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase): UUID=uuid ) - self.subscriptions = {} - self.timetoken = 0 - self.last_timetoken = 0 - self.version = '3.3.4' - self.accept_encoding = 'gzip' - self.SUB_RECEIVER = None - self._connect = None - self._tt_lock = threading.RLock() + self.subscriptions = {} + self.timetoken = 0 + self.last_timetoken = 0 + self.version = '3.3.4' + self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + self._tt_lock = _tt_lock + self._channel_list_lock = _channel_list_lock def get_channel_list(self, channels): channel = '' first = True - if self._channel_list_lock: - with self._channel_list_lock: - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch - else: + with self._channel_list_lock: for ch in channels: if not channels[ch]['subscribed']: continue @@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase): else: first = False channel += ch - return channel + + def each(l, func): + if func is None: + return + for i in l: + func(i) + def subscribe( self, args=None, sync=False ) : """ #** @@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase): if args is None: _invoke(error, "Arguments Missing") return - channel = args['channel'] if 'channel' in args else None - callback = args['callback'] if 'callback' in args else None - connect = args['connect'] if 'connect' in args else None - disconnect = args['disconnect'] if 'disconnect' in args else None - reconnect = args['reconnect'] if 'reconnect' in args else None - error = args['error'] if 'error' in args else None + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None with self._tt_lock: self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase): chobj['connected'] = True _invoke(chobj['connect'],chobj['name']) - def _invoke_error(err=None): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - _invoke(chobj.error,err) + def _invoke_error(channel_list=None, err=None): + if channel_list is None: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) + else: + for ch in channel_list: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) ''' if callback is None: @@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - if self._channel_list_lock: - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } - else: + with self._channel_list_lock: self.subscriptions[channel] = { 'name' : channel, 'first' : False, @@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase): 'callback' : callback, 'connect' : connect, 'disconnect' : disconnect, - 'reconnect' : reconnect + 'reconnect' : reconnect, + 'error' : error } + ## return if already connected to channel if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") @@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase): def sub_callback(response): ## ERROR ? - if not response or error in response: - _invoke_error() + #print response + if not response or ('message' in response and response['message'] == 'Forbidden'): + _invoke_error(response['payload']['channels'], response['message']) + _connect() + return _invoke_connect() @@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase): _connect() - channel_list = self.get_channel_list(self.subscriptions) ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: @@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase): channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True ) except Exception as e: - print e + print(e) self.timeout( 1, _connect) return @@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase): def unsubscribe( self, args ): - #print(args['channel']) - channel = str(args['channel']) - if not (channel in self.subscriptions): + + if 'channel' in self.subscriptions is False: return False + channel = str(args['channel']) + + ## DISCONNECT - self.subscriptions[channel]['connected'] = 0 - self.subscriptions[channel]['subscribed'] = False - self.subscriptions[channel]['timetoken'] = 0 - self.subscriptions[channel]['first'] = False + with self._channel_list_lock: + if channel in self.subscriptions: + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False self.CONNECT() @@ -953,19 +909,43 @@ from threading import current_thread latest_sub_callback_lock = threading.RLock() latest_sub_callback = {'id' : None, 'callback' : None} + + + class HTTPClient: - def __init__(self, url, callback, id=None): + def __init__(self, url, urllib_func=None, callback=None, error=None, id=None): self.url = url self.id = id self.callback = callback + self.error = error self.stop = False + self._urllib_func = urllib_func def cancel(self): self.stop = True self.callback = None + self.error = None + def run(self): - data = urllib2.urlopen(self.url, timeout=310).read() + + def _invoke(func, data): + if func is not None: + func(data) + + if self._urllib_func is None: + return + + ''' + try: + resp = urllib2.urlopen(self.url, timeout=320) + except urllib2.HTTPError as http_error: + resp = http_error + ''' + resp = self._urllib_func(self.url, timeout=320) + data = resp[0] + code = resp[1] + if self.stop is True: return if self.callback is None: @@ -975,14 +955,49 @@ class HTTPClient: if latest_sub_callback['id'] != self.id: return else: - print(data) if latest_sub_callback['callback'] is not None: latest_sub_callback['id'] = 0 - latest_sub_callback['callback'](json.loads(data)) + try: + data = json.loads(data) + except: + _invoke(latest_sub_callback['error'], {'error' : 'json decoding error'}) + return + if code != 200: + _invoke(latest_sub_callback['error'],data) + else: + _invoke(latest_sub_callback['callback'],data) else: - self.callback(json.loads(data)) + try: + data = json.loads(data) + except: + _invoke(self.error, {'error' : 'json decoding error'}) + return + + if code != 200: + _invoke(self.error,data) + else: + _invoke(self.callback,data) +def _urllib_request_2(url, timeout=320): + try: + resp = urllib2.urlopen(url,timeout=timeout) + except urllib2.HTTPError as http_error: + resp = http_error + return (resp.read(),resp.code) + +def _urllib_request_3(url, timeout=320): + #print(url) + try: + resp = urllib.request.urlopen(url,timeout=timeout) + except urllib.request.HTTPError as http_error: + resp = http_error + r = resp.read().decode("utf-8") + #print(r) + return (r,resp.code) + +_urllib_request = None + class Pubnub(PubnubCoreAsync): def __init__( self, @@ -1003,13 +1018,15 @@ class Pubnub(PubnubCoreAsync): auth_key = auth_key, ssl_on = ssl_on, origin = origin, - uuid = pres_uuid + uuid = pres_uuid, + _tt_lock=threading.RLock(), + _channel_list_lock=threading.RLock() ) + global _urllib_request if self.python_version == 2: - self._request = self._request2 + _urllib_request = _urllib_request_2 else: - self._request = self._request3 - self._channel_list_lock = threading.RLock() + _urllib_request = _urllib_request_3 def timeout(self, interval, func): def cb(): @@ -1018,17 +1035,20 @@ class Pubnub(PubnubCoreAsync): thread = threading.Thread(target=cb) thread.start() - def _request2_async( self, request, callback, single=False ) : + + def _request_async( self, request, callback=None, error=None, single=False ) : + global _urllib_request ## Build URL url = self.getUrl(request) if single is True: id = time.time() - client = HTTPClient(url, None, id) + client = HTTPClient(url=url, urllib_func=_urllib_request, callback=None, error=None, id=id) with latest_sub_callback_lock: latest_sub_callback['id'] = id latest_sub_callback['callback'] = callback + latest_sub_callback['error'] = error else: - client = HTTPClient(url, callback) + client = HTTPClient(url=url, urllib_func=_urllib_request, callback=callback, error=error) thread = threading.Thread(target=client.run) thread.start() @@ -1037,31 +1057,30 @@ class Pubnub(PubnubCoreAsync): return abort - def _request2_sync( self, request) : - + def _request_sync( self, request) : + global _urllib_request ## Build URL url = self.getUrl(request) ## Send Request Expecting JSONP Response + response = _urllib_request(url, timeout=320) try: - try: usock = urllib2.urlopen( url, None, 310 ) - except TypeError: usock = urllib2.urlopen( url, None ) - response = usock.read() - usock.close() - resp_json = json.loads(response) - except Exception as e: - print e - return None - + resp_json = json.loads(response[0]) + except: + return [0,"JSON Error"] + + if response[1] != 200 and 'status' in resp_json: + return {'message' : resp_json['message'], 'payload' : resp_json['payload']} + return resp_json - def _request2(self, request, callback=None, single=False): + def _request(self, request, callback=None, error=None, single=False): if callback is None: - return self._request2_sync(request) + return self._request_sync(request) else: - self._request2_async(request, callback, single=single) - + self._request_async(request, callback, error, single=single) +''' def _request3_sync( self, request) : ## Build URL @@ -1083,3 +1102,4 @@ class Pubnub(PubnubCoreAsync): return self._request3_sync(request,single=single) else: self._request3_async(request, callback, single=single) + ''' diff --git a/python/examples/publish-example.py b/python/examples/publish-example.py index 31ae198..bb8b199 100755 --- a/python/examples/publish-example.py +++ b/python/examples/publish-example.py @@ -1,43 +1,68 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud. +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + +## ----------------------------------- +## PubNub 3.1 Real-time Push Cloud API +## ----------------------------------- + import sys -sys.path.append('.') -sys.path.append('..') +from twisted.internet import reactor +sys.path.append('../') +sys.path.append('../../') from Pubnub import Pubnub -## Initiate Class -pubnub = Pubnub( publish_key='demo', subscribe_key='demo', cipher_key='enigma', ssl_on=False ) -#pubnub = Pubnub( publish_key='demo', subscribe_key='demo', ssl_on=False ) +publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key = len(sys.argv) > 4 and sys.argv[4] or '' ##(Cipher key is Optional) +ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) +crazy = 'hello_world' + +## ----------------------------------------------------------------------- ## Publish Example -info = pubnub.publish({ - 'channel' : 'abcd', - 'message' : { - 'iam' : 'object' - } -}) -print(info) +## ----------------------------------------------------------------------- +def publish_complete(info): + print(info) -info = pubnub.publish({ - 'channel' : 'abcd', - 'message' : "hi I am string" -}) -print(info) +def publish_error(info): + print('ERROR : ' + str(info)) -info = pubnub.publish({ - 'channel' : 'abcd', - 'message' : 1234 +## Publish string +pubnub.publish({ + 'channel' : crazy, + 'message' : 'Hello World!', + 'callback' : publish_complete, + 'error' : publish_error }) -print(info) -info = pubnub.publish({ - 'channel' : 'abcd', - 'message' : "1234" +## Publish list +li = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'] + +pubnub.publish({ + 'channel' : crazy, + 'message' : li, + 'callback' : publish_complete, + 'error' : publish_error }) -print(info) -info = pubnub.publish({ - 'channel' : 'abcd', - 'message' : [ - 'i' , 'am', 'array' - ] +def done_cb(info): + publish_complete(info) + +pubnub.publish({ + 'channel' : crazy, + 'message' : { 'some_key' : 'some_val' }, + 'callback' : done_cb, + 'error' : publish_error }) -print(info) + + +pubnub.start() diff --git a/python/tests/subscribe-test.py b/python/tests/subscribe-test.py index 0d4c65e..be4a416 100755 --- a/python/tests/subscribe-test.py +++ b/python/tests/subscribe-test.py @@ -38,31 +38,31 @@ received = 0 ## Subscribe Example ## ----------------------------------------------------------------------- def message_received(message): - print message + print(message) def check_received(message): global current global errors global received - print message - print current + print(message) + print(current) if message <= current: - print 'ERROR' + print('ERROR') #sys.exit() errors += 1 else: received += 1 - print 'active thread count : ', threading.activeCount() - print 'errors = ' , errors - print current_thread().getName(), ' , ', 'received = ', received + print('active thread count : ' + str( threading.activeCount())) + print('errors = ' + str(errors)) + print(current_thread().getName() + ' , ' + 'received = ' + str(received)) if received != message: - print '********** MISSED **************** ', message - received + print('********** MISSED **************** ' + str( message - received )) current = message def connected_test(ch) : - print 'Connected' , ch + print('Connected ' + ch) def connected(ch) : pass @@ -103,7 +103,6 @@ def subscribe(channel): }) -print threading.activeCount() pubnub.timeout(15,cb1) diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py index 22893f8..0ffccbb 100644 --- a/python/unassembled/Platform.py +++ b/python/unassembled/Platform.py @@ -12,19 +12,43 @@ from threading import current_thread latest_sub_callback_lock = threading.RLock() latest_sub_callback = {'id' : None, 'callback' : None} + + + class HTTPClient: - def __init__(self, url, callback, id=None): + def __init__(self, url, urllib_func=None, callback=None, error=None, id=None): self.url = url self.id = id self.callback = callback + self.error = error self.stop = False + self._urllib_func = urllib_func def cancel(self): self.stop = True self.callback = None + self.error = None + def run(self): - data = urllib2.urlopen(self.url, timeout=310).read() + + def _invoke(func, data): + if func is not None: + func(data) + + if self._urllib_func is None: + return + + ''' + try: + resp = urllib2.urlopen(self.url, timeout=320) + except urllib2.HTTPError as http_error: + resp = http_error + ''' + resp = self._urllib_func(self.url, timeout=320) + data = resp[0] + code = resp[1] + if self.stop is True: return if self.callback is None: @@ -34,13 +58,48 @@ class HTTPClient: if latest_sub_callback['id'] != self.id: return else: - print(data) if latest_sub_callback['callback'] is not None: latest_sub_callback['id'] = 0 - latest_sub_callback['callback'](json.loads(data)) + try: + data = json.loads(data) + except: + _invoke(latest_sub_callback['error'], {'error' : 'json decoding error'}) + return + if code != 200: + _invoke(latest_sub_callback['error'],data) + else: + _invoke(latest_sub_callback['callback'],data) else: - self.callback(json.loads(data)) - + try: + data = json.loads(data) + except: + _invoke(self.error, {'error' : 'json decoding error'}) + return + + if code != 200: + _invoke(self.error,data) + else: + _invoke(self.callback,data) + + +def _urllib_request_2(url, timeout=320): + try: + resp = urllib2.urlopen(url,timeout=timeout) + except urllib2.HTTPError as http_error: + resp = http_error + return (resp.read(),resp.code) + +def _urllib_request_3(url, timeout=320): + #print(url) + try: + resp = urllib.request.urlopen(url,timeout=timeout) + except urllib.request.HTTPError as http_error: + resp = http_error + r = resp.read().decode("utf-8") + #print(r) + return (r,resp.code) + +_urllib_request = None class Pubnub(PubnubCoreAsync): def __init__( @@ -62,13 +121,15 @@ class Pubnub(PubnubCoreAsync): auth_key = auth_key, ssl_on = ssl_on, origin = origin, - uuid = pres_uuid + uuid = pres_uuid, + _tt_lock=threading.RLock(), + _channel_list_lock=threading.RLock() ) + global _urllib_request if self.python_version == 2: - self._request = self._request2 + _urllib_request = _urllib_request_2 else: - self._request = self._request3 - self._channel_list_lock = threading.RLock() + _urllib_request = _urllib_request_3 def timeout(self, interval, func): def cb(): @@ -77,17 +138,20 @@ class Pubnub(PubnubCoreAsync): thread = threading.Thread(target=cb) thread.start() - def _request2_async( self, request, callback, single=False ) : + + def _request_async( self, request, callback=None, error=None, single=False ) : + global _urllib_request ## Build URL url = self.getUrl(request) if single is True: id = time.time() - client = HTTPClient(url, None, id) + client = HTTPClient(url=url, urllib_func=_urllib_request, callback=None, error=None, id=id) with latest_sub_callback_lock: latest_sub_callback['id'] = id latest_sub_callback['callback'] = callback + latest_sub_callback['error'] = error else: - client = HTTPClient(url, callback) + client = HTTPClient(url=url, urllib_func=_urllib_request, callback=callback, error=error) thread = threading.Thread(target=client.run) thread.start() @@ -96,31 +160,30 @@ class Pubnub(PubnubCoreAsync): return abort - def _request2_sync( self, request) : - + def _request_sync( self, request) : + global _urllib_request ## Build URL url = self.getUrl(request) ## Send Request Expecting JSONP Response + response = _urllib_request(url, timeout=320) try: - try: usock = urllib2.urlopen( url, None, 310 ) - except TypeError: usock = urllib2.urlopen( url, None ) - response = usock.read() - usock.close() - resp_json = json.loads(response) - except Exception as e: - print e - return None - + resp_json = json.loads(response[0]) + except: + return [0,"JSON Error"] + + if response[1] != 200 and 'status' in resp_json: + return {'message' : resp_json['message'], 'payload' : resp_json['payload']} + return resp_json - def _request2(self, request, callback=None, single=False): + def _request(self, request, callback=None, error=None, single=False): if callback is None: - return self._request2_sync(request) + return self._request_sync(request) else: - self._request2_async(request, callback, single=single) - + self._request_async(request, callback, error, single=single) +''' def _request3_sync( self, request) : ## Build URL @@ -142,3 +205,4 @@ class Pubnub(PubnubCoreAsync): return self._request3_sync(request,single=single) else: self._request3_async(request, callback, single=single) + ''' -- cgit v1.2.3