From 150ae1566d813acbb773839e919db2c0f467931c Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 16 Apr 2014 00:00:40 +0530 Subject: adding code to support async and pam client capabilities with python v2 and v3 --- python-tornado/Pubnub.py | 286 +++++++++++++---------------- python-tornado/examples/publish-example.py | 35 ++-- python-tornado/tests/subscribe-test.py | 19 +- python-tornado/unassembled/Platform.py | 28 ++- 4 files changed, 180 insertions(+), 188 deletions(-) (limited to 'python-tornado') diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py index 61f7c3d..ccff021 100644 --- a/python-tornado/Pubnub.py +++ b/python-tornado/Pubnub.py @@ -176,12 +176,13 @@ import time import hashlib import uuid import sys -from urllib import quote + +try: from urllib.parse import quote +except: from urllib2 import quote from base64 import urlsafe_b64encode from hashlib import sha256 -from urllib import quote -from urllib import urlopen + import hmac @@ -233,12 +234,11 @@ class PubnubBase(object): self.uuid = UUID or str(uuid.uuid4()) if type(sys.version_info) is tuple: - self.python_version = 2 - self.pc = PubnubCrypto2() + self.python_version = 2 + self.pc = PubnubCrypto2() else: self.python_version = 3 self.pc = PubnubCrypto3() - if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") @@ -357,7 +357,10 @@ class PubnubBase(object): if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) else: if (callback != None):callback(response) - if (callback != None): return _new_format_callback + if (callback != None): + return _new_format_callback + else: + return None def publish( self, args ) : @@ -392,23 +395,28 @@ class PubnubBase(object): if 'callback' in args : callback = args['callback'] else : - callback = None + callback = None + + if 'error' in args : + error = args['error'] + else : + error = None - #message = json.dumps(args['message'], separators=(',',':')) message = self.encrypt(args['message']) - signature = self.sign(channel, message) + #signature = self.sign(channel, message) ## Send Message return self._request({"urlcomponents": [ 'publish', self.publish_key, self.subscribe_key, - signature, + '0', channel, '0', message - ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def presence( self, args ) : """ @@ -472,12 +480,10 @@ class PubnubBase(object): """ channel = str(args['channel']) - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - + + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None + ## Fail if bad input. if not channel : raise Exception('Missing Channel') @@ -488,59 +494,16 @@ class PubnubBase(object): 'v2','presence', 'sub_key', self.subscribe_key, 'channel', channel - ]}, callback); - - - def history( self, args ) : + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) + + def history(self, args) : """ #** #* History #* #* Load history from a channel. #* - #* @param array args with 'channel' and 'limit'. - #* @return mixed false on fail, array on success. - #* - - ## History Example - history = pubnub.history({ - 'channel' : 'hello_world', - 'limit' : 1 - }) - print(history) - - """ - ## Capture User Input - limit = 'limit' in args and int(args['limit']) or 10 - channel = str(args['channel']) - - ## Fail if bad input. - if not channel : - raise Exception('Missing Channel') - return False - - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - - ## Get History - return self._request({ "urlcomponents" : [ - 'history', - self.subscribe_key, - channel, - '0', - str(limit) - ] }, callback); - - def detailedHistory(self, args) : - """ - #** - #* Detailed History - #* - #* Load Detailed history from a channel. - #* #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count' #* @return mixed false on fail, array on success. #* @@ -556,34 +519,21 @@ class PubnubBase(object): ## Capture User Input channel = str(args['channel']) - params = dict() - count = 100 - - if 'count' in args: - count = int(args['count']) - - params['count'] = str(count) - - if 'reverse' in args: - params['reverse'] = str(args['reverse']).lower() + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None - if 'start' in args: - params['start'] = str(args['start']) + params = dict() - if 'end' in args: - params['end'] = str(args['end']) + params['count'] = str(args['count']) if 'count' in args else 100 + params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false' + params['start'] = str(args['start']) if 'start' in args else None + params['end'] = str(args['end']) if 'end' in args else None ## Fail if bad input. if not channel : raise Exception('Missing Channel') return False - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - ## Get History return self._request({ 'urlcomponents' : [ 'v2', @@ -592,7 +542,8 @@ class PubnubBase(object): self.subscribe_key, 'channel', channel, - ],'urlparams' : params }, callback=callback); + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def time(self, args = None) : """ @@ -610,10 +561,9 @@ class PubnubBase(object): """ ## Capture Callback - if args and 'callback' in args: - callback = args['callback'] - else : - callback = None + + callback = callback if args and 'callback' in args else None + time = self._request({'urlcomponents' : [ 'time', '0' @@ -637,7 +587,8 @@ class PubnubBase(object): ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): - url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()]) + url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None]) + #print(url) return url @@ -648,9 +599,14 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): + def __enter__(self): + pass + def __exit__(self,a,b,c): + pass + +empty_lock = EmptyLock() class PubnubCoreAsync(PubnubBase): @@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase): auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', - uuid = None + uuid = None, + _tt_lock=empty_lock, + _channel_list_lock=empty_lock ) : """ #** @@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase): UUID=uuid ) - self.subscriptions = {} - self.timetoken = 0 - self.last_timetoken = 0 - self.version = '3.3.4' - self.accept_encoding = 'gzip' - self.SUB_RECEIVER = None - self._connect = None - self._tt_lock = threading.RLock() + self.subscriptions = {} + self.timetoken = 0 + self.last_timetoken = 0 + self.version = '3.3.4' + self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + self._tt_lock = _tt_lock + self._channel_list_lock = _channel_list_lock def get_channel_list(self, channels): channel = '' first = True - if self._channel_list_lock: - with self._channel_list_lock: - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch - else: + with self._channel_list_lock: for ch in channels: if not channels[ch]['subscribed']: continue @@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase): else: first = False channel += ch - return channel + + def each(l, func): + if func is None: + return + for i in l: + func(i) + def subscribe( self, args=None, sync=False ) : """ #** @@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase): if args is None: _invoke(error, "Arguments Missing") return - channel = args['channel'] if 'channel' in args else None - callback = args['callback'] if 'callback' in args else None - connect = args['connect'] if 'connect' in args else None - disconnect = args['disconnect'] if 'disconnect' in args else None - reconnect = args['reconnect'] if 'reconnect' in args else None - error = args['error'] if 'error' in args else None + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None with self._tt_lock: self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase): chobj['connected'] = True _invoke(chobj['connect'],chobj['name']) - def _invoke_error(err=None): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - _invoke(chobj.error,err) + def _invoke_error(channel_list=None, err=None): + if channel_list is None: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) + else: + for ch in channel_list: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) ''' if callback is None: @@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - if self._channel_list_lock: - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } - else: + with self._channel_list_lock: self.subscriptions[channel] = { 'name' : channel, 'first' : False, @@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase): 'callback' : callback, 'connect' : connect, 'disconnect' : disconnect, - 'reconnect' : reconnect + 'reconnect' : reconnect, + 'error' : error } + ## return if already connected to channel if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") @@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase): def sub_callback(response): ## ERROR ? - if not response or error in response: - _invoke_error() + #print response + if not response or ('message' in response and response['message'] == 'Forbidden'): + _invoke_error(response['payload']['channels'], response['message']) + _connect() + return _invoke_connect() @@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase): _connect() - channel_list = self.get_channel_list(self.subscriptions) ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: @@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase): channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True ) except Exception as e: - print e + print(e) self.timeout( 1, _connect) return @@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase): def unsubscribe( self, args ): - #print(args['channel']) - channel = str(args['channel']) - if not (channel in self.subscriptions): + + if 'channel' in self.subscriptions is False: return False + channel = str(args['channel']) + + ## DISCONNECT - self.subscriptions[channel]['connected'] = 0 - self.subscriptions[channel]['subscribed'] = False - self.subscriptions[channel]['timetoken'] = 0 - self.subscriptions[channel]['first'] = False + with self._channel_list_lock: + if channel in self.subscriptions: + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False self.CONNECT() @@ -984,9 +940,13 @@ class Pubnub(PubnubCoreAsync): self.headers['V'] = self.version self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000) self.id = None - self._channel_list_lock = None + + def _request( self, request, callback=None, error=None, single=False ) : + + def _invoke(func, data): + if func is not None: + func(data) - def _request( self, request, callback, single=False ) : url = self.getUrl(request) request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 ) if single is True: @@ -997,18 +957,30 @@ class Pubnub(PubnubCoreAsync): if single is True: if not id == self.id: return None - + body = response._get_body() + if body is None: return - + #print(body) def handle_exc(*args): return True if response.error is not None: with ExceptionStackContext(handle_exc): response.rethrow() - elif callback: - callback(eval(response._get_body())) + return + try: + data = json.loads(body) + except TypeError as e: + try: + data = json.loads(body.decode("utf-8")) + except: + _invoke(error, {'error' : 'json decode error'}) + + if 'error' in data and 'status' in data and 'status' != 200: + _invoke(error, data) + else: + _invoke(callback, data) self.http.fetch( request=request, diff --git a/python-tornado/examples/publish-example.py b/python-tornado/examples/publish-example.py index b9eaa15..bb8b199 100644 --- a/python-tornado/examples/publish-example.py +++ b/python-tornado/examples/publish-example.py @@ -10,54 +10,59 @@ ## ----------------------------------- import sys -import tornado +from twisted.internet import reactor sys.path.append('../') -sys.path.append('../..') -sys.path.append('../../common') +sys.path.append('../../') from Pubnub import Pubnub publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo' -cipher_key = len(sys.argv) > 4 and sys.argv[4] or 'demo' ##(Cipher key is Optional) +cipher_key = len(sys.argv) > 4 and sys.argv[4] or '' ##(Cipher key is Optional) ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False ## ----------------------------------------------------------------------- ## Initiate Pubnub State ## ----------------------------------------------------------------------- -pubnub = Pubnub( publish_key=publish_key, subscribe_key=subscribe_key, secret_key=secret_key,cipher_key=cipher_key, ssl_on=ssl_on ) -#pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on ) +pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on ) crazy = 'hello_world' +## ----------------------------------------------------------------------- +## Publish Example +## ----------------------------------------------------------------------- def publish_complete(info): print(info) +def publish_error(info): + print('ERROR : ' + str(info)) + ## Publish string pubnub.publish({ 'channel' : crazy, 'message' : 'Hello World!', - 'callback' : publish_complete + 'callback' : publish_complete, + 'error' : publish_error }) ## Publish list li = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday'] + pubnub.publish({ 'channel' : crazy, 'message' : li, - 'callback' : publish_complete + 'callback' : publish_complete, + 'error' : publish_error }) def done_cb(info): publish_complete(info) - tornado.ioloop.IOLoop.instance().stop() -## Publish Dictionary Object pubnub.publish({ 'channel' : crazy, 'message' : { 'some_key' : 'some_val' }, - 'callback' : done_cb + 'callback' : done_cb, + 'error' : publish_error }) -## ----------------------------------------------------------------------- -## IO Event Loop -## ----------------------------------------------------------------------- -tornado.ioloop.IOLoop.instance().start() + + +pubnub.start() diff --git a/python-tornado/tests/subscribe-test.py b/python-tornado/tests/subscribe-test.py index 0d4c65e..be4a416 100755 --- a/python-tornado/tests/subscribe-test.py +++ b/python-tornado/tests/subscribe-test.py @@ -38,31 +38,31 @@ received = 0 ## Subscribe Example ## ----------------------------------------------------------------------- def message_received(message): - print message + print(message) def check_received(message): global current global errors global received - print message - print current + print(message) + print(current) if message <= current: - print 'ERROR' + print('ERROR') #sys.exit() errors += 1 else: received += 1 - print 'active thread count : ', threading.activeCount() - print 'errors = ' , errors - print current_thread().getName(), ' , ', 'received = ', received + print('active thread count : ' + str( threading.activeCount())) + print('errors = ' + str(errors)) + print(current_thread().getName() + ' , ' + 'received = ' + str(received)) if received != message: - print '********** MISSED **************** ', message - received + print('********** MISSED **************** ' + str( message - received )) current = message def connected_test(ch) : - print 'Connected' , ch + print('Connected ' + ch) def connected(ch) : pass @@ -103,7 +103,6 @@ def subscribe(channel): }) -print threading.activeCount() pubnub.timeout(15,cb1) diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py index 501993e..5200136 100644 --- a/python-tornado/unassembled/Platform.py +++ b/python-tornado/unassembled/Platform.py @@ -43,9 +43,13 @@ class Pubnub(PubnubCoreAsync): self.headers['V'] = self.version self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000) self.id = None - self._channel_list_lock = None + + def _request( self, request, callback=None, error=None, single=False ) : + + def _invoke(func, data): + if func is not None: + func(data) - def _request( self, request, callback, single=False ) : url = self.getUrl(request) request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 ) if single is True: @@ -56,18 +60,30 @@ class Pubnub(PubnubCoreAsync): if single is True: if not id == self.id: return None - + body = response._get_body() + if body is None: return - + #print(body) def handle_exc(*args): return True if response.error is not None: with ExceptionStackContext(handle_exc): response.rethrow() - elif callback: - callback(eval(response._get_body())) + return + try: + data = json.loads(body) + except TypeError as e: + try: + data = json.loads(body.decode("utf-8")) + except: + _invoke(error, {'error' : 'json decode error'}) + + if 'error' in data and 'status' in data and 'status' != 200: + _invoke(error, data) + else: + _invoke(callback, data) self.http.fetch( request=request, -- cgit v1.2.3