diff options
Diffstat (limited to 'common/PubnubCoreAsync.py')
| -rw-r--r-- | common/PubnubCoreAsync.py | 171 |
1 files changed, 115 insertions, 56 deletions
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index a7fbb7d..0038243 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -10,8 +10,6 @@ class PubnubCoreAsync(PubnubBase): def start(self): pass def stop(self): pass - def timeout( self, delay, callback ): - pass def __init__( self, @@ -54,8 +52,23 @@ class PubnubCoreAsync(PubnubBase): self.timetoken = 0 self.version = '3.3.4' self.accept_encoding = 'gzip' - - def subscribe( self, args ) : + self.SUB_RECEIVER = None + self._connect = None + + def get_channel_list(self, channels): + channel = '' + first = True + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel + + def subscribe( self, args=None, sync=False ) : """ #** #* Subscribe @@ -87,94 +100,140 @@ class PubnubCoreAsync(PubnubBase): }) """ - ## Fail if missing channel - if not 'channel' in args : - return 'Missing Channel.' - ## Fail if missing callback - if not 'callback' in args : - return 'Missing Callback.' + if sync is True and self.susbcribe_sync is not None: + self.susbcribe_sync(args) + return + + def _invoke(func,msg=None): + if func is not None: + if msg is not None: + func(msg) + else: + func() + + def _invoke_connect(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect']) + + def _invoke_error(err=None): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj.error,err) + - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - connectcb = args['connect'] + if callback is None: + _invoke(error, "Callback Missing") + return + + if channel is None: + _invoke(error, "Channel Missing") + return + + def _get_channel(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['subscribed'] is True: + return chobj - if 'errorback' in args: - errorback = args['errorback'] - else: - errorback = lambda x: x ## New Channel? - if not (channel in self.subscriptions) : + if not channel in self.subscriptions: self.subscriptions[channel] = { - 'first' : False, - 'connected' : False, + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect } - ## Ensure Single Connection + ## return if already connected to channel if self.subscriptions[channel]['connected'] : - return "Already Connected" + _invoke(error, "Already Connected") + return + - self.subscriptions[channel]['connected'] = 1 ## SUBSCRIPTION RECURSION - def _subscribe(): - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + def _connect(): + self._reset_offline() + def sub_callback(response): - if not self.subscriptions[channel]['first'] : - self.subscriptions[channel]['first'] = True - connectcb() + print response + ## ERROR ? + if not response or error in response: + _invoke_error() - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + _invoke_connect() + self.timetoken = response[1] - ## PROBLEM? - if not response: - def time_callback(_time): - if not _time: - self.timeout( 1, _subscribe ) - return errorback("Lost Network Connection") - else: - self.timeout( 1, _subscribe) + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) - ## ENSURE CONNECTED (Call Time Function) - return self.time({ 'callback' : time_callback }) - self.timetoken = response[1] - _subscribe() + _connect() + - pc = PubnubCrypto() - out = [] - for message in response[0]: - callback(self.decrypt(message)) + channel_list = self.get_channel_list(self.subscriptions) + print channel_list ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: - self._request( { "urlcomponents" : [ + self.SUB_RECEIVER = self._request( { "urlcomponents" : [ 'subscribe', self.subscribe_key, - channel, + channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) - except : - self.timeout( 1, _subscribe) + ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + except Exception as e: + self.timeout( 1, _connect) return + self._connect = _connect + + ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) - _subscribe() + _connect() + + def _reset_offline(self): + if self.SUB_RECEIVER is not None: + self.SUB_RECEIVER() + self.SUB_RECEIVER = None + + def CONNECT(self): + self._reset_offline() + self._connect() + + def unsubscribe( self, args ): + #print(args['channel']) channel = str(args['channel']) if not (channel in self.subscriptions): return False ## DISCONNECT self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False self.subscriptions[channel]['timetoken'] = 0 self.subscriptions[channel]['first'] = False + self.CONNECT() |
