diff options
Diffstat (limited to 'common')
| -rw-r--r-- | common/PubnubBase.py | 133 | ||||
| -rw-r--r-- | common/PubnubCoreAsync.py | 125 |
2 files changed, 107 insertions, 151 deletions
diff --git a/common/PubnubBase.py b/common/PubnubBase.py index ac41e0e..98c68eb 100644 --- a/common/PubnubBase.py +++ b/common/PubnubBase.py @@ -5,12 +5,13 @@ import time import hashlib import uuid import sys -from urllib import quote + +try: from urllib.parse import quote +except: from urllib2 import quote from base64 import urlsafe_b64encode from hashlib import sha256 -from urllib import quote -from urllib import urlopen + import hmac @@ -62,12 +63,11 @@ class PubnubBase(object): self.uuid = UUID or str(uuid.uuid4()) if type(sys.version_info) is tuple: - self.python_version = 2 - self.pc = PubnubCrypto2() + self.python_version = 2 + self.pc = PubnubCrypto2() else: self.python_version = 3 self.pc = PubnubCrypto3() - if not isinstance(self.uuid, str): raise AttributeError("pres_uuid must be a string") @@ -186,7 +186,10 @@ class PubnubBase(object): if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) else: if (callback != None):callback(response) - if (callback != None): return _new_format_callback + if (callback != None): + return _new_format_callback + else: + return None def publish( self, args ) : @@ -221,23 +224,28 @@ class PubnubBase(object): if 'callback' in args : callback = args['callback'] else : - callback = None + callback = None + + if 'error' in args : + error = args['error'] + else : + error = None - #message = json.dumps(args['message'], separators=(',',':')) message = self.encrypt(args['message']) - signature = self.sign(channel, message) + #signature = self.sign(channel, message) ## Send Message return self._request({"urlcomponents": [ 'publish', self.publish_key, self.subscribe_key, - signature, + '0', channel, '0', message - ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def presence( self, args ) : """ @@ -301,12 +309,10 @@ class PubnubBase(object): """ channel = str(args['channel']) - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - + + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None + ## Fail if bad input. if not channel : raise Exception('Missing Channel') @@ -317,59 +323,16 @@ class PubnubBase(object): 'v2','presence', 'sub_key', self.subscribe_key, 'channel', channel - ]}, callback); - - - def history( self, args ) : + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) + + def history(self, args) : """ #** #* History #* #* Load history from a channel. #* - #* @param array args with 'channel' and 'limit'. - #* @return mixed false on fail, array on success. - #* - - ## History Example - history = pubnub.history({ - 'channel' : 'hello_world', - 'limit' : 1 - }) - print(history) - - """ - ## Capture User Input - limit = 'limit' in args and int(args['limit']) or 10 - channel = str(args['channel']) - - ## Fail if bad input. - if not channel : - raise Exception('Missing Channel') - return False - - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - - ## Get History - return self._request({ "urlcomponents" : [ - 'history', - self.subscribe_key, - channel, - '0', - str(limit) - ] }, callback); - - def detailedHistory(self, args) : - """ - #** - #* Detailed History - #* - #* Load Detailed history from a channel. - #* #* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count' #* @return mixed false on fail, array on success. #* @@ -385,34 +348,21 @@ class PubnubBase(object): ## Capture User Input channel = str(args['channel']) - params = dict() - count = 100 - - if 'count' in args: - count = int(args['count']) - - params['count'] = str(count) - - if 'reverse' in args: - params['reverse'] = str(args['reverse']).lower() + callback = args['callback'] if 'callback' in args else None + error = args['error'] if 'error' in args else None - if 'start' in args: - params['start'] = str(args['start']) + params = dict() - if 'end' in args: - params['end'] = str(args['end']) + params['count'] = str(args['count']) if 'count' in args else 100 + params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false' + params['start'] = str(args['start']) if 'start' in args else None + params['end'] = str(args['end']) if 'end' in args else None ## Fail if bad input. if not channel : raise Exception('Missing Channel') return False - ## Capture Callback - if 'callback' in args : - callback = args['callback'] - else : - callback = None - ## Get History return self._request({ 'urlcomponents' : [ 'v2', @@ -421,7 +371,8 @@ class PubnubBase(object): self.subscribe_key, 'channel', channel, - ],'urlparams' : params }, callback=callback); + ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback), + error=self._return_wrapped_callback(error)) def time(self, args = None) : """ @@ -439,10 +390,9 @@ class PubnubBase(object): """ ## Capture Callback - if args and 'callback' in args: - callback = args['callback'] - else : - callback = None + + callback = callback if args and 'callback' in args else None + time = self._request({'urlcomponents' : [ 'time', '0' @@ -466,5 +416,6 @@ class PubnubBase(object): ch for ch in list(bit) ]) for bit in request["urlcomponents"]]) if ("urlparams" in request): - url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()]) + url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None]) + #print(url) return url diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index 4251d47..f7b57cc 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -5,9 +5,14 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): + def __enter__(self): + pass + def __exit__(self,a,b,c): + pass + +empty_lock = EmptyLock() class PubnubCoreAsync(PubnubBase): @@ -23,7 +28,9 @@ class PubnubCoreAsync(PubnubBase): auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', - uuid = None + uuid = None, + _tt_lock=empty_lock, + _channel_list_lock=empty_lock ) : """ #** @@ -53,29 +60,20 @@ class PubnubCoreAsync(PubnubBase): UUID=uuid ) - self.subscriptions = {} - self.timetoken = 0 - self.last_timetoken = 0 - self.version = '3.3.4' - self.accept_encoding = 'gzip' - self.SUB_RECEIVER = None - self._connect = None - self._tt_lock = threading.RLock() + self.subscriptions = {} + self.timetoken = 0 + self.last_timetoken = 0 + self.version = '3.3.4' + self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + self._tt_lock = _tt_lock + self._channel_list_lock = _channel_list_lock def get_channel_list(self, channels): channel = '' first = True - if self._channel_list_lock: - with self._channel_list_lock: - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch - else: + with self._channel_list_lock: for ch in channels: if not channels[ch]['subscribed']: continue @@ -84,9 +82,15 @@ class PubnubCoreAsync(PubnubBase): else: first = False channel += ch - return channel + + def each(l, func): + if func is None: + return + for i in l: + func(i) + def subscribe( self, args=None, sync=False ) : """ #** @@ -122,12 +126,12 @@ class PubnubCoreAsync(PubnubBase): if args is None: _invoke(error, "Arguments Missing") return - channel = args['channel'] if 'channel' in args else None - callback = args['callback'] if 'callback' in args else None - connect = args['connect'] if 'connect' in args else None - disconnect = args['disconnect'] if 'disconnect' in args else None - reconnect = args['reconnect'] if 'reconnect' in args else None - error = args['error'] if 'error' in args else None + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None with self._tt_lock: self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -160,10 +164,15 @@ class PubnubCoreAsync(PubnubBase): chobj['connected'] = True _invoke(chobj['connect'],chobj['name']) - def _invoke_error(err=None): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - _invoke(chobj.error,err) + def _invoke_error(channel_list=None, err=None): + if channel_list is None: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) + else: + for ch in channel_list: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) ''' if callback is None: @@ -184,19 +193,7 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - if self._channel_list_lock: - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } - else: + with self._channel_list_lock: self.subscriptions[channel] = { 'name' : channel, 'first' : False, @@ -205,9 +202,11 @@ class PubnubCoreAsync(PubnubBase): 'callback' : callback, 'connect' : connect, 'disconnect' : disconnect, - 'reconnect' : reconnect + 'reconnect' : reconnect, + 'error' : error } + ## return if already connected to channel if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") @@ -222,8 +221,11 @@ class PubnubCoreAsync(PubnubBase): def sub_callback(response): ## ERROR ? - if not response or error in response: - _invoke_error() + #print response + if not response or ('message' in response and response['message'] == 'Forbidden'): + _invoke_error(response['payload']['channels'], response['message']) + _connect() + return _invoke_connect() @@ -250,7 +252,6 @@ class PubnubCoreAsync(PubnubBase): _connect() - channel_list = self.get_channel_list(self.subscriptions) ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: @@ -260,9 +261,9 @@ class PubnubCoreAsync(PubnubBase): channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True ) except Exception as e: - print e + print(e) self.timeout( 1, _connect) return @@ -283,14 +284,18 @@ class PubnubCoreAsync(PubnubBase): def unsubscribe( self, args ): - #print(args['channel']) - channel = str(args['channel']) - if not (channel in self.subscriptions): + + if 'channel' in self.subscriptions is False: return False + channel = str(args['channel']) + + ## DISCONNECT - self.subscriptions[channel]['connected'] = 0 - self.subscriptions[channel]['subscribed'] = False - self.subscriptions[channel]['timetoken'] = 0 - self.subscriptions[channel]['first'] = False + with self._channel_list_lock: + if channel in self.subscriptions: + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False self.CONNECT() |
