diff options
| author | Devendra | 2014-04-16 00:00:40 +0530 |
|---|---|---|
| committer | Devendra | 2014-04-16 00:00:40 +0530 |
| commit | 150ae1566d813acbb773839e919db2c0f467931c (patch) | |
| tree | 6f74d6dcdcb0ecff6d8a51988d8a461b6f9a4668 /common/PubnubCoreAsync.py | |
| parent | 99096b8c11b9a541f6350639e8735495cf90091c (diff) | |
| download | pubnub-python-150ae1566d813acbb773839e919db2c0f467931c.tar.bz2 | |
adding code to support async and pam client capabilities with python v2 and v3
Diffstat (limited to 'common/PubnubCoreAsync.py')
| -rw-r--r-- | common/PubnubCoreAsync.py | 125 |
1 files changed, 65 insertions, 60 deletions
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index 4251d47..f7b57cc 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -5,9 +5,14 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): + def __enter__(self): + pass + def __exit__(self,a,b,c): + pass + +empty_lock = EmptyLock() class PubnubCoreAsync(PubnubBase): @@ -23,7 +28,9 @@ class PubnubCoreAsync(PubnubBase): auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', - uuid = None + uuid = None, + _tt_lock=empty_lock, + _channel_list_lock=empty_lock ) : """ #** @@ -53,29 +60,20 @@ class PubnubCoreAsync(PubnubBase): UUID=uuid ) - self.subscriptions = {} - self.timetoken = 0 - self.last_timetoken = 0 - self.version = '3.3.4' - self.accept_encoding = 'gzip' - self.SUB_RECEIVER = None - self._connect = None - self._tt_lock = threading.RLock() + self.subscriptions = {} + self.timetoken = 0 + self.last_timetoken = 0 + self.version = '3.3.4' + self.accept_encoding = 'gzip' + self.SUB_RECEIVER = None + self._connect = None + self._tt_lock = _tt_lock + self._channel_list_lock = _channel_list_lock def get_channel_list(self, channels): channel = '' first = True - if self._channel_list_lock: - with self._channel_list_lock: - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch - else: + with self._channel_list_lock: for ch in channels: if not channels[ch]['subscribed']: continue @@ -84,9 +82,15 @@ class PubnubCoreAsync(PubnubBase): else: first = False channel += ch - return channel + + def each(l, func): + if func is None: + return + for i in l: + func(i) + def subscribe( self, args=None, sync=False ) : """ #** @@ -122,12 +126,12 @@ class PubnubCoreAsync(PubnubBase): if args is None: _invoke(error, "Arguments Missing") return - channel = args['channel'] if 'channel' in args else None - callback = args['callback'] if 'callback' in args else None - connect = args['connect'] if 'connect' in args else None - disconnect = args['disconnect'] if 'disconnect' in args else None - reconnect = args['reconnect'] if 'reconnect' in args else None - error = args['error'] if 'error' in args else None + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None with self._tt_lock: self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -160,10 +164,15 @@ class PubnubCoreAsync(PubnubBase): chobj['connected'] = True _invoke(chobj['connect'],chobj['name']) - def _invoke_error(err=None): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - _invoke(chobj.error,err) + def _invoke_error(channel_list=None, err=None): + if channel_list is None: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) + else: + for ch in channel_list: + chobj = self.subscriptions[ch] + _invoke(chobj['error'],err) ''' if callback is None: @@ -184,19 +193,7 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - if self._channel_list_lock: - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } - else: + with self._channel_list_lock: self.subscriptions[channel] = { 'name' : channel, 'first' : False, @@ -205,9 +202,11 @@ class PubnubCoreAsync(PubnubBase): 'callback' : callback, 'connect' : connect, 'disconnect' : disconnect, - 'reconnect' : reconnect + 'reconnect' : reconnect, + 'error' : error } + ## return if already connected to channel if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") @@ -222,8 +221,11 @@ class PubnubCoreAsync(PubnubBase): def sub_callback(response): ## ERROR ? - if not response or error in response: - _invoke_error() + #print response + if not response or ('message' in response and response['message'] == 'Forbidden'): + _invoke_error(response['payload']['channels'], response['message']) + _connect() + return _invoke_connect() @@ -250,7 +252,6 @@ class PubnubCoreAsync(PubnubBase): _connect() - channel_list = self.get_channel_list(self.subscriptions) ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: @@ -260,9 +261,9 @@ class PubnubCoreAsync(PubnubBase): channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True ) except Exception as e: - print e + print(e) self.timeout( 1, _connect) return @@ -283,14 +284,18 @@ class PubnubCoreAsync(PubnubBase): def unsubscribe( self, args ): - #print(args['channel']) - channel = str(args['channel']) - if not (channel in self.subscriptions): + + if 'channel' in self.subscriptions is False: return False + channel = str(args['channel']) + + ## DISCONNECT - self.subscriptions[channel]['connected'] = 0 - self.subscriptions[channel]['subscribed'] = False - self.subscriptions[channel]['timetoken'] = 0 - self.subscriptions[channel]['first'] = False + with self._channel_list_lock: + if channel in self.subscriptions: + self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False + self.subscriptions[channel]['timetoken'] = 0 + self.subscriptions[channel]['first'] = False self.CONNECT() |
