diff options
| -rw-r--r-- | Pubnub.py | 191 | ||||
| -rw-r--r-- | python-twisted/examples/subscribe_group.py | 56 | ||||
| -rw-r--r-- | python/examples/subscribe_group.py | 54 | 
3 files changed, 266 insertions, 35 deletions
| @@ -897,6 +897,7 @@ class PubnubBase(object):              url = url + '?' + "&".join([x + "=" + str(y) for x, y in request[                  "urlparams"].items() if y is not None]) +        print url          return url      def _channel_registry(self, url=None, params=None, callback=None, error=None): @@ -1010,7 +1011,8 @@ class PubnubCoreAsync(PubnubBase):          origin='pubsub.pubnub.com',          uuid=None,          _tt_lock=empty_lock, -        _channel_list_lock=empty_lock +        _channel_list_lock=empty_lock, +        _channel_group_list_lock=empty_lock      ):          super(PubnubCoreAsync, self).__init__( @@ -1025,6 +1027,7 @@ class PubnubCoreAsync(PubnubBase):          )          self.subscriptions = {} +        self.subscription_groups = {}          self.timetoken = 0          self.last_timetoken = 0          self.accept_encoding = 'gzip' @@ -1032,6 +1035,7 @@ class PubnubCoreAsync(PubnubBase):          self._connect = None          self._tt_lock = _tt_lock          self._channel_list_lock = _channel_list_lock +        self._channel_group_list_lock = _channel_group_list_lock          self._connect = lambda: None          self.u = None @@ -1049,6 +1053,21 @@ class PubnubCoreAsync(PubnubBase):                  channel += ch          return channel +    def get_channel_group_list(self, channel_groups): +        channel_group = '' +        first = True +        with self._channel_group_list_lock: +            for ch in channel_groups: +                if not channel_groups[ch]['subscribed']: +                    continue +                if not first: +                    channel_group += ',' +                else: +                    first = False +                channel_group += ch +        return channel_group + +      def get_channel_array(self):          """Get List of currently subscribed channels @@ -1067,6 +1086,24 @@ class PubnubCoreAsync(PubnubBase):                  channel.append(ch)          return channel +    def get_channel_group_array(self): +        """Get List of currently subscribed channel groups + +        Returns: +            Returns a list containing names of channel groups subscribed + +            Sample return value: +                ["a","b","c] +        """ +        channel_groups = self.subscription_groups +        channel_group = [] +        with self._channel_group_list_lock: +            for ch in channel_groups: +                if not channel_groups[ch]['subscribed']: +                    continue +                channel_group.append(ch) +        return channel_group +      def each(l, func):          if func is None:              return @@ -1075,6 +1112,16 @@ class PubnubCoreAsync(PubnubBase):      def subscribe(self, channels, callback, error=None,                    connect=None, disconnect=None, reconnect=None, sync=False): +        return self._subscribe(channels=channels, callback=callback, error=error, +            connect=connect, disconnect=disconnect, reconnect=reconnect, sync=sync) + +    def subscribe_group(self, channel_groups, callback, error=None, +                  connect=None, disconnect=None, reconnect=None, sync=False): +        return self._subscribe(channel_groups=channel_groups, callback=callback, error=error, +            connect=connect, disconnect=disconnect, reconnect=reconnect, sync=sync) + +    def _subscribe(self, channels=None, channel_groups=None, callback=None, error=None, +                  connect=None, disconnect=None, reconnect=None, sync=False):          """Subscribe to data on a channel.          This function causes the client to create an open TCP socket to the @@ -1140,6 +1187,20 @@ class PubnubCoreAsync(PubnubBase):                                  chobj['disconnected'] = False                                  _invoke(chobj['reconnect'], chobj['name']) +            if self._channel_group_list_lock: +                with self._channel_group_list_lock: +                    for ch in self.subscription_groups: +                        chobj = self.subscription_groups[ch] +                        if chobj['connected'] is False: +                            chobj['connected'] = True +                            chobj['disconnected'] = False +                            _invoke(chobj['connect'], chobj['name']) +                        else: +                            if chobj['disconnected'] is True: +                                chobj['disconnected'] = False +                                _invoke(chobj['reconnect'], chobj['name']) + +          def _invoke_disconnect():              if self._channel_list_lock:                  with self._channel_list_lock: @@ -1149,42 +1210,77 @@ class PubnubCoreAsync(PubnubBase):                              if chobj['disconnected'] is False:                                  chobj['disconnected'] = True                                  _invoke(chobj['disconnect'], chobj['name']) +            if self._channel_group_list_lock: +                with self._channel_group_list_lock: +                    for ch in self.subscription_groups: +                        chobj = self.subscription_groups[ch] +                        if chobj['connected'] is True: +                            if chobj['disconnected'] is False: +                                chobj['disconnected'] = True +                                _invoke(chobj['disconnect'], chobj['name']) + -        def _invoke_error(channel_list=None, err=None): +        def _invoke_error(channel_list=None, error=None):              if channel_list is None:                  for ch in self.subscriptions:                      chobj = self.subscriptions[ch] -                    _invoke(chobj['error'], err) +                    _invoke(chobj['error'], error)              else:                  for ch in channel_list:                      chobj = self.subscriptions[ch] -                    _invoke(chobj['error'], err) +                    _invoke(chobj['error'], error)          def _get_channel():              for ch in self.subscriptions:                  chobj = self.subscriptions[ch]                  if chobj['subscribed'] is True:                      return chobj -        channels = channels if isinstance( -            channels, list) else channels.split(",") -        for channel in channels: -            ## New Channel? -            if len(channel) > 0 and \ -                    (not channel in self.subscriptions or -                     self.subscriptions[channel]['subscribed'] is False): -                    with self._channel_list_lock: -                        self.subscriptions[channel] = { -                            'name': channel, -                            'first': False, -                            'connected': False, -                            'disconnected': True, -                            'subscribed': True, -                            'callback': callback, -                            'connect': connect, -                            'disconnect': disconnect, -                            'reconnect': reconnect, -                            'error': error -                        } + +        if channels is not None: +            channels = channels if isinstance( +                channels, list) else channels.split(",") +            for channel in channels: +                ## New Channel? +                if len(channel) > 0 and \ +                        (not channel in self.subscriptions or +                         self.subscriptions[channel]['subscribed'] is False): +                        with self._channel_list_lock: +                            self.subscriptions[channel] = { +                                'name': channel, +                                'first': False, +                                'connected': False, +                                'disconnected': True, +                                'subscribed': True, +                                'callback': callback, +                                'connect': connect, +                                'disconnect': disconnect, +                                'reconnect': reconnect, +                                'error': error +                            } +         +        if channel_groups is not None: +            channel_groups = channel_groups if isinstance( +                channel_groups, list) else channel_groups.split(",") + +            for channel_group in channel_groups: +                ## New Channel? +                if len(channel_group) > 0 and \ +                        (not channel_group in self.subscription_groups or +                         self.subscription_groups[channel_group]['subscribed'] is False): +                        with self._channel_group_list_lock: +                            self.subscription_groups[channel_group] = { +                                'name': channel_group, +                                'first': False, +                                'connected': False, +                                'disconnected': True, +                                'subscribed': True, +                                'callback': callback, +                                'connect': connect, +                                'disconnect': disconnect, +                                'reconnect': reconnect, +                                'error': error +                            } +          '''          ## return if already connected to channel          if channel in self.subscriptions and \ @@ -1203,24 +1299,25 @@ class PubnubCoreAsync(PubnubBase):                  if not response or \                      ('message' in response and                          response['message'] == 'Forbidden'): -                            _invoke_error(response['payload'][ -                                'channels'], response['message']) +                            _invoke_error(channel_list=response['payload'][ +                                'channels'], error=response['message'])                              self.timeout(1, _connect)                              return                  if 'message' in response: -                    _invoke_error(err=response['message']) +                    _invoke_error(error=response['message'])                  else:                      _invoke_disconnect()                      self.timetoken = 0                      self.timeout(1, _connect)              def sub_callback(response): +                print response                  ## ERROR ?                  if not response or \                      ('message' in response and                          response['message'] == 'Forbidden'): -                            _invoke_error(response['payload'][ -                                'channels'], response['message']) +                            _invoke_error(channel_list=response['payload'][ +                                'channels'], error=response['message'])                              _connect()                              return @@ -1230,7 +1327,21 @@ class PubnubCoreAsync(PubnubBase):                      self.timetoken = \                          self.last_timetoken if self.timetoken == 0 and \                          self.last_timetoken != 0 else response[1] -                    if len(response) > 2: + +                    if len(response) > 3: +                        channel_list = response[2].split(',') +                        channel_list_2 = response[3].split(',') +                        response_list = response[0] +                        for ch in enumerate(channel_list): +                            if ch[1] in self.subscription_groups or ch[1] in self.subscriptions: +                                try: +                                    chobj = self.subscription_groups[ch[1]] +                                except KeyError as k: +                                    chobj = self.subscriptions[ch[1]] +                                _invoke(chobj['callback'], +                                        self.decrypt(response_list[ch[0]]), +                                        channel_list_2[ch[0]])                     +                    elif len(response) > 2:                          channel_list = response[2].split(',')                          response_list = response[0]                          for ch in enumerate(channel_list): @@ -1250,9 +1361,14 @@ class PubnubCoreAsync(PubnubBase):                      _connect()              channel_list = self.get_channel_list(self.subscriptions) -            if len(channel_list) <= 0: +            channel_group_list = self.get_channel_group_list(self.subscription_groups) + +            if len(channel_list) <= 0 and len(channel_group_list) <= 0:                  return +            if len(channel_list) <= 0: +                channel_list = ',' +              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              #try:              self.SUB_RECEIVER = self._request({"urlcomponents": [ @@ -1261,7 +1377,8 @@ class PubnubCoreAsync(PubnubBase):                  channel_list,                  '0',                  str(self.timetoken) -            ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key, 'pnsdk' : self.pnsdk}}, +            ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key, +            'pnsdk' : self.pnsdk, 'channel-group' : channel_group_list}},                  sub_callback,                  error_callback,                  single=True, timeout=320) @@ -1328,7 +1445,9 @@ class PubnubCore(PubnubCoreAsync):          origin='pubsub.pubnub.com',          uuid=None,          _tt_lock=None, -        _channel_list_lock=None +        _channel_list_lock=None, +        _channel_group_list_lock=None +      ):          super(PubnubCore, self).__init__(              publish_key=publish_key, @@ -1340,7 +1459,8 @@ class PubnubCore(PubnubCoreAsync):              origin=origin,              uuid=uuid,              _tt_lock=_tt_lock, -            _channel_list_lock=_channel_list_lock +            _channel_list_lock=_channel_list_lock, +            _channel_group_list_lock=_channel_group_list_lock          )          self.subscriptions = {} @@ -1542,7 +1662,8 @@ class Pubnub(PubnubCore):              origin=origin,              uuid=uuid or pres_uuid,              _tt_lock=threading.RLock(), -            _channel_list_lock=threading.RLock() +            _channel_list_lock=threading.RLock(), +            _channel_group_list_lock=threading.RLock()          )          global _urllib_request          if self.python_version == 2: diff --git a/python-twisted/examples/subscribe_group.py b/python-twisted/examples/subscribe_group.py new file mode 100644 index 0000000..67dbac5 --- /dev/null +++ b/python-twisted/examples/subscribe_group.py @@ -0,0 +1,56 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud. +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + + +import sys +from Pubnub import Pubnub + +publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key = len(sys.argv) > 4 and sys.argv[4] or 'abcd' +ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +pubnub = Pubnub(publish_key=publish_key, subscribe_key=subscribe_key, +                secret_key=secret_key, cipher_key=cipher_key, ssl_on=ssl_on, daemon=False) + +channel = 'ab' + + +# Asynchronous usage +def callback(message, channel): +    print(str(message) + ' , ' + channel) + + + +def error(message): +    print("ERROR : " + str(message)) + + +def connect(message): +    print("CONNECTED " + str(message)) + + +def reconnect(message): +    print("RECONNECTED " + str(message)) + + +def disconnect(message): +    print("DISCONNECTED " + str(message)) + +print pubnub.channel_group_add_channel(channel_group='abc', channel="a") + +pubnub.subscribe_group(channel_groups='abc', callback=callback, error=callback, +                 connect=connect, reconnect=reconnect, disconnect=disconnect) + +#pubnub.subscribe(channels='d', callback=callback, error=callback, +#                 connect=connect, reconnect=reconnect, disconnect=disconnect) + +pubnub.start() diff --git a/python/examples/subscribe_group.py b/python/examples/subscribe_group.py new file mode 100644 index 0000000..3cebcd9 --- /dev/null +++ b/python/examples/subscribe_group.py @@ -0,0 +1,54 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud. +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + + +import sys +from Pubnub import Pubnub + +publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key = len(sys.argv) > 4 and sys.argv[4] or 'abcd' +ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +pubnub = Pubnub(publish_key=publish_key, subscribe_key=subscribe_key, +                secret_key=secret_key, cipher_key=cipher_key, ssl_on=ssl_on, daemon=False) + +channel = 'ab' + + +# Asynchronous usage +def callback(message, channel): +    print(str(message) + ' , ' + channel) + + + +def error(message): +    print("ERROR : " + str(message)) + + +def connect(message): +    print("CONNECTED " + str(message)) + + +def reconnect(message): +    print("RECONNECTED " + str(message)) + + +def disconnect(message): +    print("DISCONNECTED " + str(message)) + +print pubnub.channel_group_add_channel(channel_group='abc', channel="b") + +pubnub.subscribe_group(channel_groups='abc', callback=callback, error=callback, +                 connect=connect, reconnect=reconnect, disconnect=disconnect) + +pubnub.subscribe(channels='d', callback=callback, error=callback, +                 connect=connect, reconnect=reconnect, disconnect=disconnect) | 
