diff options
| author | Devendra | 2014-04-16 00:00:40 +0530 |
|---|---|---|
| committer | Devendra | 2014-04-16 00:00:40 +0530 |
| commit | 150ae1566d813acbb773839e919db2c0f467931c (patch) | |
| tree | 6f74d6dcdcb0ecff6d8a51988d8a461b6f9a4668 /python-twisted/Pubnub.py | |
| parent | 99096b8c11b9a541f6350639e8735495cf90091c (diff) | |
| download | pubnub-python-150ae1566d813acbb773839e919db2c0f467931c.tar.bz2 | |
adding code to support async and pam client capabilities with python v2 and v3
Diffstat (limited to 'python-twisted/Pubnub.py')
| -rw-r--r-- | python-twisted/Pubnub.py | 325 |
1 files changed, 162 insertions, 163 deletions
diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py index 7171efe..383ee6d 100644 --- a/python-twisted/Pubnub.py +++ b/python-twisted/Pubnub.py @@ -176,12 +176,13 @@ import time import hashlib import uuid import sys -from urllib import quote + +try: from urllib.parse import quote +except: from urllib2 import quote from base64 import urlsafe_b64encode from hashlib import sha256 -from urllib import quote -from urllib import urlopen + import hmac @@ -233,12 +234,11 @@ class PubnubBase(object): self.uuid = UUID or str(uuid.uuid4()) if type(sys.version_info) is tuple: - self.python_version = 2 - self.pc = PubnubCrypto2() + self.python_version = 2 + self.pc = PubnubCrypto2() else: self.python_version = 3 self.pc = PubnubCrypto3() - if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") @@ -357,7 +357,10 @@ class PubnubBase(object): if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) else: if (callback != None):callback(response) - if (callback != None): return _new_format_callback + if (callback != None): + return _new_format_callback + else: + return None def publish( self, args ) : @@ -392,23 +395,28 @@ class PubnubBase(object): if 'callback' in args : callback = args['callback'] else : - callback = None + callback = None + + if 'error' in args : + error = args['error'] + else : + error = None - #message = json.dumps(args['message'], separators=(',',':')) message = self.encrypt(args['message']) - signature = self.sign(channel, message) + #signature = self.sign(channel, message) ## Send Message return self._request({"urlcomponents": [ 'publish', self.publish_key, self.subscribe_key, - signature, + '0', channel, '0', message - ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def presence( self, args ) : """ @@ -472,12 +480,10 @@ class PubnubBase(object): """ channel = str(args['channel']) - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - + + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None + ## Fail if bad input. if not channel : raise Exception('Missing Channel') @@ -488,59 +494,16 @@ class PubnubBase(object): 'v2','presence', 'sub_key', self.subscribe_key, 'channel', channel - ]}, callback); - - - def history( self, args ) : + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) + + def history(self, args) : """ #** #* History #* #* Load history from a channel. #* - #* @param array args with 'channel' and 'limit'. - #* @return mixed false on fail, array on success. - #* - - ## History Example - history = pubnub.history({ - 'channel' : 'hello_world', - 'limit' : 1 - }) - print(history) - - """ - ## Capture User Input - limit = 'limit' in args and int(args['limit']) or 10 - channel = str(args['channel']) - - ## Fail if bad input. - if not channel : - raise Exception('Missing Channel') - return False - - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - - ## Get History - return self._request({ "urlcomponents" : [ - 'history', - self.subscribe_key, - channel, - '0', - str(limit) - ] }, callback); - - def detailedHistory(self, args) : - """ - #** - #* Detailed History - #* - #* Load Detailed history from a channel. - #* #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count' #* @return mixed false on fail, array on success. #* @@ -556,34 +519,21 @@ class PubnubBase(object): ## Capture User Input channel = str(args['channel']) - params = dict() - count = 100 - - if 'count' in args: - count = int(args['count']) - - params['count'] = str(count) - - if 'reverse' in args: - params['reverse'] = str(args['reverse']).lower() + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None - if 'start' in args: - params['start'] = str(args['start']) + params = dict() - if 'end' in args: - params['end'] = str(args['end']) + params['count'] = str(args['count']) if 'count' in args else 100 + params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false' + params['start'] = str(args['start']) if 'start' in args else None + params['end'] = str(args['end']) if 'end' in args else None ## Fail if bad input. if not channel : raise Exception('Missing Channel') return False - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - ## Get History return self._request({ 'urlcomponents' : [ 'v2', @@ -592,7 +542,8 @@ class PubnubBase(object): self.subscribe_key, 'channel', channel, - ],'urlparams' : params }, callback=callback); + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def time(self, args = None) : """ @@ -610,10 +561,9 @@ class PubnubBase(object): """ ## Capture Callback - if args and 'callback' in args: - callback = args['callback'] - else : - callback = None + + callback = callback if args and 'callback' in args else None + time = self._request({'urlcomponents' : [ 'time', '0' @@ -637,7 +587,8 @@ class PubnubBase(object): ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): - url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()]) + url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None]) + #print(url) return url @@ -648,9 +599,14 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): + def __enter__(self): + pass + def __exit__(self,a,b,c): + pass + +empty_lock = EmptyLock() class PubnubCoreAsync(PubnubBase): @@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase): auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', - uuid = None + uuid = None, + _tt_lock=empty_lock, + _channel_list_lock=empty_lock ) : """ #** @@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase): UUID=uuid ) - self.subscriptions = {} - self.timetoken = 0 - self.last_timetoken = 0 - self.version = '3.3.4' - self.accept_encoding = 'gzip' - self.SUB_RECEIVER = None - self._connect = None - self._tt_lock = threading.RLock() + self.subscriptions = {} + self.timetoken = 0 + self.last_timetoken = 0 + self.version = '3.3.4' + self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + self._tt_lock = _tt_lock + self._channel_list_lock = _channel_list_lock def get_channel_list(self, channels): channel = '' first = True - if self._channel_list_lock: - with self._channel_list_lock: - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch - else: + with self._channel_list_lock: for ch in channels: if not channels[ch]['subscribed']: continue @@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase): else: first = False channel += ch - return channel + + def each(l, func): + if func is None: + return + for i in l: + func(i) + def subscribe( self, args=None, sync=False ) : """ #** @@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase): if args is None: _invoke(error, "Arguments Missing") return - channel = args['channel'] if 'channel' in args else None - callback = args['callback'] if 'callback' in args else None - connect = args['connect'] if 'connect' in args else None - disconnect = args['disconnect'] if 'disconnect' in args else None - reconnect = args['reconnect'] if 'reconnect' in args else None - error = args['error'] if 'error' in args else None + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None with self._tt_lock: self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase): chobj['connected'] = True _invoke(chobj['connect'],chobj['name']) - def _invoke_error(err=None): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - _invoke(chobj.error,err) + def _invoke_error(channel_list=None, err=None): + if channel_list is None: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) + else: + for ch in channel_list: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) ''' if callback is None: @@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - if self._channel_list_lock: - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } - else: + with self._channel_list_lock: self.subscriptions[channel] = { 'name' : channel, 'first' : False, @@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase): 'callback' : callback, 'connect' : connect, 'disconnect' : disconnect, - 'reconnect' : reconnect + 'reconnect' : reconnect, + 'error' : error } + ## return if already connected to channel if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") @@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase): def sub_callback(response): ## ERROR ? - if not response or error in response: - _invoke_error() + #print response + if not response or ('message' in response and response['message'] == 'Forbidden'): + _invoke_error(response['payload']['channels'], response['message']) + _connect() + return _invoke_connect() @@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase): _connect() - channel_list = self.get_channel_list(self.subscriptions) ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: @@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase): channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True ) except Exception as e: - print e + print(e) self.timeout( 1, _connect) return @@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase): def unsubscribe( self, args ): - #print(args['channel']) - channel = str(args['channel']) - if not (channel in self.subscriptions): + + if 'channel' in self.subscriptions is False: return False + channel = str(args['channel']) + + ## DISCONNECT - self.subscriptions[channel]['connected'] = 0 - self.subscriptions[channel]['subscribed'] = False - self.subscriptions[channel]['timetoken'] = 0 - self.subscriptions[channel]['first'] = False + with self._channel_list_lock: + if channel in self.subscriptions: + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False self.CONNECT() @@ -951,6 +907,9 @@ from twisted.internet.task import LoopingCall import twisted from hashlib import sha256 import time +import json +from twisted.python.compat import ( + _PY3, unicode, intToBytes, networkString, nativeString) pnconn_pool = HTTPConnectionPool(reactor, persistent=True) pnconn_pool.maxPersistentPerHost = 100000 @@ -968,8 +927,9 @@ class Pubnub(PubnubCoreAsync): self, publish_key, subscribe_key, - secret_key = False, - cipher_key = False, + secret_key=False, + cipher_key=False, + auth_key=None, ssl_on = False, origin = 'pubsub.pubnub.com' ) : @@ -978,6 +938,7 @@ class Pubnub(PubnubCoreAsync): subscribe_key=subscribe_key, secret_key=secret_key, cipher_key=cipher_key, + auth_key=auth_key, ssl_on=ssl_on, origin=origin, ) @@ -985,11 +946,14 @@ class Pubnub(PubnubCoreAsync): self.headers['User-Agent'] = ['Python-Twisted'] #self.headers['Accept-Encoding'] = [self.accept_encoding] self.headers['V'] = [self.version] - self._channel_list_lock = None - def _request( self, request, callback, single=False ) : + def _request( self, request, callback=None, error=None, single=False ) : global pnconn_pool + def _invoke(func, data): + if func is not None: + func(data) + ## Build URL ''' url = self.origin + '/' + "/".join([ @@ -1006,7 +970,12 @@ class Pubnub(PubnubCoreAsync): pool = self.ssl and None or pnconn_pool )), [('gzip', GzipDecoder)]) - request = agent.request( 'GET', url, Headers(self.headers), None ) + try: + request = agent.request( 'GET', url, Headers(self.headers), None ) + except TypeError as te: + print(url.encode()) + request = agent.request( 'GET', url.encode(), Headers(self.headers), None ) + if single is True: id = time.time() @@ -1014,35 +983,65 @@ class Pubnub(PubnubCoreAsync): def received(response): finished = Deferred() - response.deliverBody(PubNubResponse(finished)) + if response.code == 403: + response.deliverBody(PubNub403Response(finished)) + else: + response.deliverBody(PubNubResponse(finished)) + + return finished + + def error_handler(response): + finished = Deferred() + if response.code == 403: + response.deliverBody(PubNub403Response(finished)) + else: + response.deliverBody(PubNubResponse(finished)) + return finished def complete(data): if single is True: - if not id == self.id: + if id != self.id: return None try: - callback(eval(data)) + data = json.loads(data) except Exception as e: - pass - #need error handling here + try: + data = json.loads(data.decode("utf-8")) + except: + _invoke(error, {'error' : 'json decode error'}) + + if 'error' in data and 'status' in data and 'status' != 200: + _invoke(error, data) + else: + _invoke(callback, data) def abort(): pass request.addCallback(received) - request.addBoth(complete) + request.addCallback(complete) + request.addErrback(error_handler) return abort class WebClientContextFactory(ClientContextFactory): def getContext(self, hostname, port): return ClientContextFactory.getContext(self) + +class PubNub403Response(Protocol): + def __init__( self, finished ): + self.finished = finished + + def dataReceived( self, bytes ): + #print '403 resp ', bytes + self.finished.callback(bytes) class PubNubResponse(Protocol): def __init__( self, finished ): self.finished = finished def dataReceived( self, bytes ): - self.finished.callback(bytes) + #print bytes + self.finished.callback(bytes) |
