diff options
| author | Devendra | 2014-12-18 02:37:25 +0530 |
|---|---|---|
| committer | Devendra | 2014-12-18 02:37:25 +0530 |
| commit | 3cdf70cb5021dc0cf95b4a37cf2a7f2273b60764 (patch) | |
| tree | d90b6f1b9941b3e396412bac4c1425fccd3af804 | |
| parent | 02766fc23b910f2f191a2e670052f31f0f0d0503 (diff) | |
| download | pubnub-python-3cdf70cb5021dc0cf95b4a37cf2a7f2273b60764.tar.bz2 | |
cg subscribe support v1
| -rw-r--r-- | Pubnub.py | 191 | ||||
| -rw-r--r-- | python-twisted/examples/subscribe_group.py | 56 | ||||
| -rw-r--r-- | python/examples/subscribe_group.py | 54 |
3 files changed, 266 insertions, 35 deletions
@@ -897,6 +897,7 @@ class PubnubBase(object): url = url + '?' + "&".join([x + "=" + str(y) for x, y in request[ "urlparams"].items() if y is not None]) + print url return url def _channel_registry(self, url=None, params=None, callback=None, error=None): @@ -1010,7 +1011,8 @@ class PubnubCoreAsync(PubnubBase): origin='pubsub.pubnub.com', uuid=None, _tt_lock=empty_lock, - _channel_list_lock=empty_lock + _channel_list_lock=empty_lock, + _channel_group_list_lock=empty_lock ): super(PubnubCoreAsync, self).__init__( @@ -1025,6 +1027,7 @@ class PubnubCoreAsync(PubnubBase): ) self.subscriptions = {} + self.subscription_groups = {} self.timetoken = 0 self.last_timetoken = 0 self.accept_encoding = 'gzip' @@ -1032,6 +1035,7 @@ class PubnubCoreAsync(PubnubBase): self._connect = None self._tt_lock = _tt_lock self._channel_list_lock = _channel_list_lock + self._channel_group_list_lock = _channel_group_list_lock self._connect = lambda: None self.u = None @@ -1049,6 +1053,21 @@ class PubnubCoreAsync(PubnubBase): channel += ch return channel + def get_channel_group_list(self, channel_groups): + channel_group = '' + first = True + with self._channel_group_list_lock: + for ch in channel_groups: + if not channel_groups[ch]['subscribed']: + continue + if not first: + channel_group += ',' + else: + first = False + channel_group += ch + return channel_group + + def get_channel_array(self): """Get List of currently subscribed channels @@ -1067,6 +1086,24 @@ class PubnubCoreAsync(PubnubBase): channel.append(ch) return channel + def get_channel_group_array(self): + """Get List of currently subscribed channel groups + + Returns: + Returns a list containing names of channel groups subscribed + + Sample return value: + ["a","b","c] + """ + channel_groups = self.subscription_groups + channel_group = [] + with self._channel_group_list_lock: + for ch in channel_groups: + if not channel_groups[ch]['subscribed']: + continue + channel_group.append(ch) + return channel_group + def each(l, func): if func is None: return @@ -1075,6 +1112,16 @@ class PubnubCoreAsync(PubnubBase): def subscribe(self, channels, callback, error=None, connect=None, disconnect=None, reconnect=None, sync=False): + return self._subscribe(channels=channels, callback=callback, error=error, + connect=connect, disconnect=disconnect, reconnect=reconnect, sync=sync) + + def subscribe_group(self, channel_groups, callback, error=None, + connect=None, disconnect=None, reconnect=None, sync=False): + return self._subscribe(channel_groups=channel_groups, callback=callback, error=error, + connect=connect, disconnect=disconnect, reconnect=reconnect, sync=sync) + + def _subscribe(self, channels=None, channel_groups=None, callback=None, error=None, + connect=None, disconnect=None, reconnect=None, sync=False): """Subscribe to data on a channel. This function causes the client to create an open TCP socket to the @@ -1140,6 +1187,20 @@ class PubnubCoreAsync(PubnubBase): chobj['disconnected'] = False _invoke(chobj['reconnect'], chobj['name']) + if self._channel_group_list_lock: + with self._channel_group_list_lock: + for ch in self.subscription_groups: + chobj = self.subscription_groups[ch] + if chobj['connected'] is False: + chobj['connected'] = True + chobj['disconnected'] = False + _invoke(chobj['connect'], chobj['name']) + else: + if chobj['disconnected'] is True: + chobj['disconnected'] = False + _invoke(chobj['reconnect'], chobj['name']) + + def _invoke_disconnect(): if self._channel_list_lock: with self._channel_list_lock: @@ -1149,42 +1210,77 @@ class PubnubCoreAsync(PubnubBase): if chobj['disconnected'] is False: chobj['disconnected'] = True _invoke(chobj['disconnect'], chobj['name']) + if self._channel_group_list_lock: + with self._channel_group_list_lock: + for ch in self.subscription_groups: + chobj = self.subscription_groups[ch] + if chobj['connected'] is True: + if chobj['disconnected'] is False: + chobj['disconnected'] = True + _invoke(chobj['disconnect'], chobj['name']) + - def _invoke_error(channel_list=None, err=None): + def _invoke_error(channel_list=None, error=None): if channel_list is None: for ch in self.subscriptions: chobj = self.subscriptions[ch] - _invoke(chobj['error'], err) + _invoke(chobj['error'], error) else: for ch in channel_list: chobj = self.subscriptions[ch] - _invoke(chobj['error'], err) + _invoke(chobj['error'], error) def _get_channel(): for ch in self.subscriptions: chobj = self.subscriptions[ch] if chobj['subscribed'] is True: return chobj - channels = channels if isinstance( - channels, list) else channels.split(",") - for channel in channels: - ## New Channel? - if len(channel) > 0 and \ - (not channel in self.subscriptions or - self.subscriptions[channel]['subscribed'] is False): - with self._channel_list_lock: - self.subscriptions[channel] = { - 'name': channel, - 'first': False, - 'connected': False, - 'disconnected': True, - 'subscribed': True, - 'callback': callback, - 'connect': connect, - 'disconnect': disconnect, - 'reconnect': reconnect, - 'error': error - } + + if channels is not None: + channels = channels if isinstance( + channels, list) else channels.split(",") + for channel in channels: + ## New Channel? + if len(channel) > 0 and \ + (not channel in self.subscriptions or + self.subscriptions[channel]['subscribed'] is False): + with self._channel_list_lock: + self.subscriptions[channel] = { + 'name': channel, + 'first': False, + 'connected': False, + 'disconnected': True, + 'subscribed': True, + 'callback': callback, + 'connect': connect, + 'disconnect': disconnect, + 'reconnect': reconnect, + 'error': error + } + + if channel_groups is not None: + channel_groups = channel_groups if isinstance( + channel_groups, list) else channel_groups.split(",") + + for channel_group in channel_groups: + ## New Channel? + if len(channel_group) > 0 and \ + (not channel_group in self.subscription_groups or + self.subscription_groups[channel_group]['subscribed'] is False): + with self._channel_group_list_lock: + self.subscription_groups[channel_group] = { + 'name': channel_group, + 'first': False, + 'connected': False, + 'disconnected': True, + 'subscribed': True, + 'callback': callback, + 'connect': connect, + 'disconnect': disconnect, + 'reconnect': reconnect, + 'error': error + } + ''' ## return if already connected to channel if channel in self.subscriptions and \ @@ -1203,24 +1299,25 @@ class PubnubCoreAsync(PubnubBase): if not response or \ ('message' in response and response['message'] == 'Forbidden'): - _invoke_error(response['payload'][ - 'channels'], response['message']) + _invoke_error(channel_list=response['payload'][ + 'channels'], error=response['message']) self.timeout(1, _connect) return if 'message' in response: - _invoke_error(err=response['message']) + _invoke_error(error=response['message']) else: _invoke_disconnect() self.timetoken = 0 self.timeout(1, _connect) def sub_callback(response): + print response ## ERROR ? if not response or \ ('message' in response and response['message'] == 'Forbidden'): - _invoke_error(response['payload'][ - 'channels'], response['message']) + _invoke_error(channel_list=response['payload'][ + 'channels'], error=response['message']) _connect() return @@ -1230,7 +1327,21 @@ class PubnubCoreAsync(PubnubBase): self.timetoken = \ self.last_timetoken if self.timetoken == 0 and \ self.last_timetoken != 0 else response[1] - if len(response) > 2: + + if len(response) > 3: + channel_list = response[2].split(',') + channel_list_2 = response[3].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscription_groups or ch[1] in self.subscriptions: + try: + chobj = self.subscription_groups[ch[1]] + except KeyError as k: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'], + self.decrypt(response_list[ch[0]]), + channel_list_2[ch[0]]) + elif len(response) > 2: channel_list = response[2].split(',') response_list = response[0] for ch in enumerate(channel_list): @@ -1250,9 +1361,14 @@ class PubnubCoreAsync(PubnubBase): _connect() channel_list = self.get_channel_list(self.subscriptions) - if len(channel_list) <= 0: + channel_group_list = self.get_channel_group_list(self.subscription_groups) + + if len(channel_list) <= 0 and len(channel_group_list) <= 0: return + if len(channel_list) <= 0: + channel_list = ',' + ## CONNECT TO PUBNUB SUBSCRIBE SERVERS #try: self.SUB_RECEIVER = self._request({"urlcomponents": [ @@ -1261,7 +1377,8 @@ class PubnubCoreAsync(PubnubBase): channel_list, '0', str(self.timetoken) - ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key, 'pnsdk' : self.pnsdk}}, + ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key, + 'pnsdk' : self.pnsdk, 'channel-group' : channel_group_list}}, sub_callback, error_callback, single=True, timeout=320) @@ -1328,7 +1445,9 @@ class PubnubCore(PubnubCoreAsync): origin='pubsub.pubnub.com', uuid=None, _tt_lock=None, - _channel_list_lock=None + _channel_list_lock=None, + _channel_group_list_lock=None + ): super(PubnubCore, self).__init__( publish_key=publish_key, @@ -1340,7 +1459,8 @@ class PubnubCore(PubnubCoreAsync): origin=origin, uuid=uuid, _tt_lock=_tt_lock, - _channel_list_lock=_channel_list_lock + _channel_list_lock=_channel_list_lock, + _channel_group_list_lock=_channel_group_list_lock ) self.subscriptions = {} @@ -1542,7 +1662,8 @@ class Pubnub(PubnubCore): origin=origin, uuid=uuid or pres_uuid, _tt_lock=threading.RLock(), - _channel_list_lock=threading.RLock() + _channel_list_lock=threading.RLock(), + _channel_group_list_lock=threading.RLock() ) global _urllib_request if self.python_version == 2: diff --git a/python-twisted/examples/subscribe_group.py b/python-twisted/examples/subscribe_group.py new file mode 100644 index 0000000..67dbac5 --- /dev/null +++ b/python-twisted/examples/subscribe_group.py @@ -0,0 +1,56 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud. +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + + +import sys +from Pubnub import Pubnub + +publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key = len(sys.argv) > 4 and sys.argv[4] or 'abcd' +ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +pubnub = Pubnub(publish_key=publish_key, subscribe_key=subscribe_key, + secret_key=secret_key, cipher_key=cipher_key, ssl_on=ssl_on, daemon=False) + +channel = 'ab' + + +# Asynchronous usage +def callback(message, channel): + print(str(message) + ' , ' + channel) + + + +def error(message): + print("ERROR : " + str(message)) + + +def connect(message): + print("CONNECTED " + str(message)) + + +def reconnect(message): + print("RECONNECTED " + str(message)) + + +def disconnect(message): + print("DISCONNECTED " + str(message)) + +print pubnub.channel_group_add_channel(channel_group='abc', channel="a") + +pubnub.subscribe_group(channel_groups='abc', callback=callback, error=callback, + connect=connect, reconnect=reconnect, disconnect=disconnect) + +#pubnub.subscribe(channels='d', callback=callback, error=callback, +# connect=connect, reconnect=reconnect, disconnect=disconnect) + +pubnub.start() diff --git a/python/examples/subscribe_group.py b/python/examples/subscribe_group.py new file mode 100644 index 0000000..3cebcd9 --- /dev/null +++ b/python/examples/subscribe_group.py @@ -0,0 +1,54 @@ +## www.pubnub.com - PubNub Real-time push service in the cloud. +# coding=utf8 + +## PubNub Real-time Push APIs and Notifications Framework +## Copyright (c) 2010 Stephen Blum +## http://www.pubnub.com/ + + +import sys +from Pubnub import Pubnub + +publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo' +subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo' +secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo' +cipher_key = len(sys.argv) > 4 and sys.argv[4] or 'abcd' +ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False + +## ----------------------------------------------------------------------- +## Initiate Pubnub State +## ----------------------------------------------------------------------- +pubnub = Pubnub(publish_key=publish_key, subscribe_key=subscribe_key, + secret_key=secret_key, cipher_key=cipher_key, ssl_on=ssl_on, daemon=False) + +channel = 'ab' + + +# Asynchronous usage +def callback(message, channel): + print(str(message) + ' , ' + channel) + + + +def error(message): + print("ERROR : " + str(message)) + + +def connect(message): + print("CONNECTED " + str(message)) + + +def reconnect(message): + print("RECONNECTED " + str(message)) + + +def disconnect(message): + print("DISCONNECTED " + str(message)) + +print pubnub.channel_group_add_channel(channel_group='abc', channel="b") + +pubnub.subscribe_group(channel_groups='abc', callback=callback, error=callback, + connect=connect, reconnect=reconnect, disconnect=disconnect) + +pubnub.subscribe(channels='d', callback=callback, error=callback, + connect=connect, reconnect=reconnect, disconnect=disconnect) |
