diff options
| author | Devendra | 2014-04-02 02:44:29 +0530 | 
|---|---|---|
| committer | Devendra | 2014-04-02 02:44:29 +0530 | 
| commit | 765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 (patch) | |
| tree | ec6e9e2d102e866ae1b54a43d805607f0c62b8c2 /common | |
| parent | 9ac3ccf6283772b404a0c80945e3cdf3406ac5bf (diff) | |
| download | pubnub-python-765ee5db6fc39d77e55dcf4fe97fb96da2f46d30.tar.bz2 | |
multiplexing support
Diffstat (limited to 'common')
| -rw-r--r-- | common/PubnubBase.py | 12 | ||||
| -rw-r--r-- | common/PubnubCore.py | 4 | ||||
| -rw-r--r-- | common/PubnubCoreAsync.py | 171 | 
3 files changed, 127 insertions, 60 deletions
| diff --git a/common/PubnubBase.py b/common/PubnubBase.py index 4c5b422..d287be3 100644 --- a/common/PubnubBase.py +++ b/common/PubnubBase.py @@ -90,6 +90,14 @@ class PubnubBase(object):          return message +    def _return_wrapped_callback(self, callback=None): +        def _new_format_callback(response): +            if 'payload' in response: +                if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']}) +            else: +                if (callback != None):callback(response) +        if (callback != None): return _new_format_callback +      def publish( self, args ) :          """ @@ -139,7 +147,7 @@ class PubnubBase(object):              channel,              '0',              message -        ]}, callback) +        ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))      def presence( self, args ) :          """ @@ -179,7 +187,7 @@ class PubnubBase(object):          callback  = args['callback']          subscribe_key = args.get('subscribe_key') or self.subscribe_key -        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback}) +        return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})      def here_now( self, args ) : diff --git a/common/PubnubCore.py b/common/PubnubCore.py index dcfd319..3ed3a68 100644 --- a/common/PubnubCore.py +++ b/common/PubnubCore.py @@ -1,4 +1,4 @@ -class PubnubCore(PubnubBase): +class PubnubCore(PubnubCoreAsync):      def __init__(          self,          publish_key, @@ -44,7 +44,7 @@ class PubnubCore(PubnubBase): -    def subscribe( self, args ) : +    def subscribe_sync( self, args ) :          """          #**          #* Subscribe diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index a7fbb7d..0038243 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -10,8 +10,6 @@ class PubnubCoreAsync(PubnubBase):      def start(self): pass       def stop(self):  pass -    def timeout( self, delay, callback ): -        pass      def __init__(          self, @@ -54,8 +52,23 @@ class PubnubCoreAsync(PubnubBase):          self.timetoken     = 0          self.version       = '3.3.4'          self.accept_encoding = 'gzip' - -    def subscribe( self, args ) : +        self.SUB_RECEIVER  = None +        self._connect    = None + +    def get_channel_list(self, channels): +        channel = '' +        first = True +        for ch in channels: +            if not channels[ch]['subscribed']: +                continue +            if not first: +                channel += ',' +            else: +                first = False +            channel += ch +        return channel + +    def subscribe( self, args=None, sync=False ) :          """          #**          #* Subscribe @@ -87,94 +100,140 @@ class PubnubCoreAsync(PubnubBase):          })          """ -        ## Fail if missing channel -        if not 'channel' in args : -            return 'Missing Channel.' -        ## Fail if missing callback -        if not 'callback' in args : -            return 'Missing Callback.' +        if sync is True and self.susbcribe_sync is not None: +            self.susbcribe_sync(args) +            return + +        def _invoke(func,msg=None): +            if func is not None: +                if msg is not None: +                    func(msg) +                else: +                    func() + +        def _invoke_connect(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['connected'] is False: +                    chobj['connected'] = True +                    _invoke(chobj['connect']) + +        def _invoke_error(err=None): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                _invoke(chobj.error,err) + -        ## Capture User Input -        channel   = str(args['channel']) -        callback  = args['callback'] -        connectcb = args['connect'] +        if callback is None: +            _invoke(error, "Callback Missing") +            return + +        if channel is None: +            _invoke(error, "Channel Missing") +            return + +        def _get_channel(): +            for ch in self.subscriptions: +                chobj = self.subscriptions[ch] +                if chobj['subscribed'] is True: +                    return chobj -        if 'errorback' in args: -            errorback = args['errorback'] -        else: -            errorback = lambda x: x          ## New Channel? -        if not (channel in self.subscriptions) : +        if not channel in self.subscriptions:              self.subscriptions[channel] = { -                'first'     : False, -                'connected' : False, +                'name'          : channel, +                'first'         : False, +                'connected'     : False, +                'subscribed'    : True, +                'callback'      : callback, +                'connect'       : connect, +                'disconnect'    : disconnect, +                'reconnect'     : reconnect              } -        ## Ensure Single Connection +        ## return if already connected to channel          if self.subscriptions[channel]['connected'] : -            return "Already Connected" +            _invoke(error, "Already Connected") +            return +             -        self.subscriptions[channel]['connected'] = 1          ## SUBSCRIPTION RECURSION  -        def _subscribe(): -            ## STOP CONNECTION? -            if not self.subscriptions[channel]['connected']: -                return +        def _connect(): +            self._reset_offline() +              def sub_callback(response): -                if not self.subscriptions[channel]['first'] : -                    self.subscriptions[channel]['first'] = True -                    connectcb() +                print response +                ## ERROR ? +                if not response or error in response: +                    _invoke_error() -                ## STOP CONNECTION? -                if not self.subscriptions[channel]['connected']: -                    return +                _invoke_connect() +                self.timetoken = response[1] -                ## PROBLEM? -                if not response: -                    def time_callback(_time): -                        if not _time: -                            self.timeout( 1, _subscribe ) -                            return errorback("Lost Network Connection") -                        else: -                            self.timeout( 1, _subscribe) +                if len(response) > 2: +                    channel_list = response[2].split(',') +                    response_list = response[0] +                    for ch in enumerate(channel_list): +                        if ch[1] in self.subscriptions: +                            chobj = self.subscriptions[ch[1]] +                            _invoke(chobj['callback'],self.decrypt(response_list[ch[0]])) +                else: +                    response_list = response[0] +                    chobj = _get_channel() +                    for r in response_list: +                        if chobj: +                            _invoke(chobj['callback'], self.decrypt(r)) -                    ## ENSURE CONNECTED (Call Time Function) -                    return self.time({ 'callback' : time_callback }) -                self.timetoken = response[1] -                _subscribe() +                _connect() + -                pc = PubnubCrypto() -                out = [] -                for message in response[0]: -                     callback(self.decrypt(message)) +            channel_list = self.get_channel_list(self.subscriptions) +            print channel_list              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try: -                self._request( { "urlcomponents" : [ +                self.SUB_RECEIVER = self._request( { "urlcomponents" : [                      'subscribe',                      self.subscribe_key, -                    channel, +                    channel_list,                      '0',                      str(self.timetoken) -                ], "urlparams" : {"uuid":self.uuid} }, sub_callback ) -            except : -                self.timeout( 1, _subscribe) +                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +            except Exception as e: +                self.timeout( 1, _connect)                  return +        self._connect = _connect + +          ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES) -        _subscribe() +        _connect() + +    def _reset_offline(self): +        if self.SUB_RECEIVER is not None: +            self.SUB_RECEIVER() +        self.SUB_RECEIVER = None + +    def CONNECT(self): +        self._reset_offline() +        self._connect() + +      def unsubscribe( self, args ): +        #print(args['channel'])          channel = str(args['channel'])          if not (channel in self.subscriptions):              return False          ## DISCONNECT          self.subscriptions[channel]['connected'] = 0 +        self.subscriptions[channel]['subscribed'] = False          self.subscriptions[channel]['timetoken'] = 0          self.subscriptions[channel]['first']     = False +        self.CONNECT() | 
