From 765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 Mon Sep 17 00:00:00 2001 From: Devendra Date: Wed, 2 Apr 2014 02:44:29 +0530 Subject: multiplexing support --- common/PubnubBase.py | 12 +++- common/PubnubCore.py | 4 +- common/PubnubCoreAsync.py | 171 +++++++++++++++++++++++++++++++--------------- 3 files changed, 127 insertions(+), 60 deletions(-) (limited to 'common') diff --git a/common/PubnubBase.py b/common/PubnubBase.py index 4c5b422..d287be3 100644 --- a/common/PubnubBase.py +++ b/common/PubnubBase.py @@ -90,6 +90,14 @@ class PubnubBase(object): return message + def _return_wrapped_callback(self, callback=None): + def _new_format_callback(response): + if 'payload' in response: + if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) + else: + if (callback != None):callback(response) + if (callback != None): return _new_format_callback + def publish( self, args ) : """ @@ -139,7 +147,7 @@ class PubnubBase(object): channel, '0', message - ]}, callback) + ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback)) def presence( self, args ) : """ @@ -179,7 +187,7 @@ class PubnubBase(object): callback = args['callback'] subscribe_key = args.get('subscribe_key') or self.subscribe_key - return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) + return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)}) def here_now( self, args ) : diff --git a/common/PubnubCore.py b/common/PubnubCore.py index dcfd319..3ed3a68 100644 --- a/common/PubnubCore.py +++ b/common/PubnubCore.py @@ -1,4 +1,4 @@ -class PubnubCore(PubnubBase): +class PubnubCore(PubnubCoreAsync): def __init__( self, publish_key, @@ -44,7 +44,7 @@ class PubnubCore(PubnubBase): - def subscribe( self, args ) : + def subscribe_sync( self, args ) : """ #** #* Subscribe diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index a7fbb7d..0038243 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -10,8 +10,6 @@ class PubnubCoreAsync(PubnubBase): def start(self): pass def stop(self): pass - def timeout( self, delay, callback ): - pass def __init__( self, @@ -54,8 +52,23 @@ class PubnubCoreAsync(PubnubBase): self.timetoken = 0 self.version = '3.3.4' self.accept_encoding = 'gzip' - - def subscribe( self, args ) : + self.SUB_RECEIVER = None + self._connect = None + + def get_channel_list(self, channels): + channel = '' + first = True + for ch in channels: + if not channels[ch]['subscribed']: + continue + if not first: + channel += ',' + else: + first = False + channel += ch + return channel + + def subscribe( self, args=None, sync=False ) : """ #** #* Subscribe @@ -87,94 +100,140 @@ class PubnubCoreAsync(PubnubBase): }) """ - ## Fail if missing channel - if not 'channel' in args : - return 'Missing Channel.' - ## Fail if missing callback - if not 'callback' in args : - return 'Missing Callback.' + if sync is True and self.susbcribe_sync is not None: + self.susbcribe_sync(args) + return + + def _invoke(func,msg=None): + if func is not None: + if msg is not None: + func(msg) + else: + func() + + def _invoke_connect(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['connected'] is False: + chobj['connected'] = True + _invoke(chobj['connect']) + + def _invoke_error(err=None): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + _invoke(chobj.error,err) + - ## Capture User Input - channel = str(args['channel']) - callback = args['callback'] - connectcb = args['connect'] + if callback is None: + _invoke(error, "Callback Missing") + return + + if channel is None: + _invoke(error, "Channel Missing") + return + + def _get_channel(): + for ch in self.subscriptions: + chobj = self.subscriptions[ch] + if chobj['subscribed'] is True: + return chobj - if 'errorback' in args: - errorback = args['errorback'] - else: - errorback = lambda x: x ## New Channel? - if not (channel in self.subscriptions) : + if not channel in self.subscriptions: self.subscriptions[channel] = { - 'first' : False, - 'connected' : False, + 'name' : channel, + 'first' : False, + 'connected' : False, + 'subscribed' : True, + 'callback' : callback, + 'connect' : connect, + 'disconnect' : disconnect, + 'reconnect' : reconnect } - ## Ensure Single Connection + ## return if already connected to channel if self.subscriptions[channel]['connected'] : - return "Already Connected" + _invoke(error, "Already Connected") + return + - self.subscriptions[channel]['connected'] = 1 ## SUBSCRIPTION RECURSION - def _subscribe(): - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + def _connect(): + self._reset_offline() + def sub_callback(response): - if not self.subscriptions[channel]['first'] : - self.subscriptions[channel]['first'] = True - connectcb() + print response + ## ERROR ? + if not response or error in response: + _invoke_error() - ## STOP CONNECTION? - if not self.subscriptions[channel]['connected']: - return + _invoke_connect() + self.timetoken = response[1] - ## PROBLEM? - if not response: - def time_callback(_time): - if not _time: - self.timeout( 1, _subscribe ) - return errorback("Lost Network Connection") - else: - self.timeout( 1, _subscribe) + if len(response) > 2: + channel_list = response[2].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) + else: + response_list = response[0] + chobj = _get_channel() + for r in response_list: + if chobj: + _invoke(chobj['callback'], self.decrypt(r)) - ## ENSURE CONNECTED (Call Time Function) - return self.time({ 'callback' : time_callback }) - self.timetoken = response[1] - _subscribe() + _connect() + - pc = PubnubCrypto() - out = [] - for message in response[0]: - callback(self.decrypt(message)) + channel_list = self.get_channel_list(self.subscriptions) + print channel_list ## CONNECT TO PUBNUB SUBSCRIBE SERVERS try: - self._request( { "urlcomponents" : [ + self.SUB_RECEIVER = self._request( { "urlcomponents" : [ 'subscribe', self.subscribe_key, - channel, + channel_list, '0', str(self.timetoken) - ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) - except : - self.timeout( 1, _subscribe) + ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) + except Exception as e: + self.timeout( 1, _connect) return + self._connect = _connect + + ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) - _subscribe() + _connect() + + def _reset_offline(self): + if self.SUB_RECEIVER is not None: + self.SUB_RECEIVER() + self.SUB_RECEIVER = None + + def CONNECT(self): + self._reset_offline() + self._connect() + + def unsubscribe( self, args ): + #print(args['channel']) channel = str(args['channel']) if not (channel in self.subscriptions): return False ## DISCONNECT self.subscriptions[channel]['connected'] = 0 + self.subscriptions[channel]['subscribed'] = False self.subscriptions[channel]['timetoken'] = 0 self.subscriptions[channel]['first'] = False + self.CONNECT() -- cgit v1.2.3