From 99096b8c11b9a541f6350639e8735495cf90091c Mon Sep 17 00:00:00 2001 From: Devendra Date: Fri, 11 Apr 2014 14:49:43 +0530 Subject: v1 MX and async code for python, twisted, tornado --- common/PubnubCoreAsync.py | 149 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 103 insertions(+), 46 deletions(-) (limited to 'common/PubnubCoreAsync.py') diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index 0038243..4251d47 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -5,6 +5,9 @@ except ImportError: import Crypto.Hash.SHA256 as digestmod sha256 = digestmod.new import hmac +import threading +from threading import current_thread +import threading class PubnubCoreAsync(PubnubBase): @@ -17,6 +20,7 @@ class PubnubCoreAsync(PubnubBase): subscribe_key, secret_key = False, cipher_key = False, + auth_key = None, ssl_on = False, origin = 'pubsub.pubnub.com', uuid = None @@ -43,6 +47,7 @@ class PubnubCoreAsync(PubnubBase): subscribe_key=subscribe_key, secret_key=secret_key, cipher_key=cipher_key, + auth_key=auth_key, ssl_on=ssl_on, origin=origin, UUID=uuid @@ -50,22 +55,36 @@ class PubnubCoreAsync(PubnubBase): self.subscriptions = {} self.timetoken = 0 + self.last_timetoken = 0 self.version = '3.3.4' self.accept_encoding = 'gzip' self.SUB_RECEIVER = None self._connect = None + self._tt_lock = threading.RLock() def get_channel_list(self, channels): channel = '' first = True - for ch in channels: - if not channels[ch]['subscribed']: - continue - if not first: - channel += ',' - else: - first = False - channel += ch + if self._channel_list_lock: + with self._channel_list_lock: + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + else: + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel def subscribe( self, args=None, sync=False ) : @@ -100,6 +119,26 @@ class PubnubCoreAsync(PubnubBase): }) """ + if args is None: + _invoke(error, "Arguments Missing") + return + channel = args['channel'] if 'channel' in args else None + callback = args['callback'] if 'callback' in args else None + connect = args['connect'] if 'connect' in args else None + disconnect = args['disconnect'] if 'disconnect' in args else None + reconnect = args['reconnect'] if 'reconnect' in args else None + error = args['error'] if 'error' in args else None + + with self._tt_lock: + self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken + self.timetoken = 0 + + if channel is None: + _invoke(error, "Channel Missing") + return + if callback is None: + _invoke(error, "Callback Missing") + return if sync is True and self.susbcribe_sync is not None: self.susbcribe_sync(args) @@ -113,18 +152,20 @@ class PubnubCoreAsync(PubnubBase): func() def _invoke_connect(): - for ch in self.subscriptions: - chobj = self.subscriptions[ch] - if chobj['connected'] is False: - chobj['connected'] = True - _invoke(chobj['connect']) + if self._channel_list_lock: + with self._channel_list_lock: + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect'],chobj['name']) def _invoke_error(err=None): for ch in self.subscriptions: chobj = self.subscriptions[ch] _invoke(chobj.error,err) - + ''' if callback is None: _invoke(error, "Callback Missing") return @@ -132,6 +173,7 @@ class PubnubCoreAsync(PubnubBase): if channel is None: _invoke(error, "Channel Missing") return + ''' def _get_channel(): for ch in self.subscriptions: @@ -142,22 +184,36 @@ class PubnubCoreAsync(PubnubBase): ## New Channel? if not channel in self.subscriptions: - self.subscriptions[channel] = { - 'name' : channel, - 'first' : False, - 'connected' : False, - 'subscribed' : True, - 'callback' : callback, - 'connect' : connect, - 'disconnect' : disconnect, - 'reconnect' : reconnect - } + if self._channel_list_lock: + with self._channel_list_lock: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } + else: + self.subscriptions[channel] = { + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect + } ## return if already connected to channel - if self.subscriptions[channel]['connected'] : + if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True: _invoke(error, "Already Connected") return + ## SUBSCRIPTION RECURSION def _connect(): @@ -165,37 +221,37 @@ class PubnubCoreAsync(PubnubBase): self._reset_offline() def sub_callback(response): - print response ## ERROR ? if not response or error in response: _invoke_error() _invoke_connect() - - self.timetoken = response[1] - - if len(response) > 2: - channel_list = response[2].split(',') - response_list = response[0] - for ch in enumerate(channel_list): - if ch[1] in self.subscriptions: - chobj = self.subscriptions[ch[1]] - _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) - else: - response_list = response[0] - chobj = _get_channel() - for r in response_list: - if chobj: - _invoke(chobj['callback'], self.decrypt(r)) - - - _connect() + with self._tt_lock: + #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken + self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] + #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) + + #with self._tt_lock: + # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1] + _connect() channel_list = self.get_channel_list(self.subscriptions) - print channel_list ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: self.SUB_RECEIVER = self._request( { "urlcomponents" : [ @@ -206,6 +262,7 @@ class PubnubCoreAsync(PubnubBase): str(self.timetoken) ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) except Exception as e: + print e self.timeout( 1, _connect) return -- cgit v1.2.3