aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDevendra2014-12-18 02:37:25 +0530
committerDevendra2014-12-18 02:37:25 +0530
commit3cdf70cb5021dc0cf95b4a37cf2a7f2273b60764 (patch)
treed90b6f1b9941b3e396412bac4c1425fccd3af804
parent02766fc23b910f2f191a2e670052f31f0f0d0503 (diff)
downloadpubnub-python-3cdf70cb5021dc0cf95b4a37cf2a7f2273b60764.tar.bz2
cg subscribe support v1
-rw-r--r--Pubnub.py191
-rw-r--r--python-twisted/examples/subscribe_group.py56
-rw-r--r--python/examples/subscribe_group.py54
3 files changed, 266 insertions, 35 deletions
diff --git a/Pubnub.py b/Pubnub.py
index 9d69fa0..ddacb1c 100644
--- a/Pubnub.py
+++ b/Pubnub.py
@@ -897,6 +897,7 @@ class PubnubBase(object):
url = url + '?' + "&".join([x + "=" + str(y) for x, y in request[
"urlparams"].items() if y is not None])
+ print url
return url
def _channel_registry(self, url=None, params=None, callback=None, error=None):
@@ -1010,7 +1011,8 @@ class PubnubCoreAsync(PubnubBase):
origin='pubsub.pubnub.com',
uuid=None,
_tt_lock=empty_lock,
- _channel_list_lock=empty_lock
+ _channel_list_lock=empty_lock,
+ _channel_group_list_lock=empty_lock
):
super(PubnubCoreAsync, self).__init__(
@@ -1025,6 +1027,7 @@ class PubnubCoreAsync(PubnubBase):
)
self.subscriptions = {}
+ self.subscription_groups = {}
self.timetoken = 0
self.last_timetoken = 0
self.accept_encoding = 'gzip'
@@ -1032,6 +1035,7 @@ class PubnubCoreAsync(PubnubBase):
self._connect = None
self._tt_lock = _tt_lock
self._channel_list_lock = _channel_list_lock
+ self._channel_group_list_lock = _channel_group_list_lock
self._connect = lambda: None
self.u = None
@@ -1049,6 +1053,21 @@ class PubnubCoreAsync(PubnubBase):
channel += ch
return channel
+ def get_channel_group_list(self, channel_groups):
+ channel_group = ''
+ first = True
+ with self._channel_group_list_lock:
+ for ch in channel_groups:
+ if not channel_groups[ch]['subscribed']:
+ continue
+ if not first:
+ channel_group += ','
+ else:
+ first = False
+ channel_group += ch
+ return channel_group
+
+
def get_channel_array(self):
"""Get List of currently subscribed channels
@@ -1067,6 +1086,24 @@ class PubnubCoreAsync(PubnubBase):
channel.append(ch)
return channel
+ def get_channel_group_array(self):
+ """Get List of currently subscribed channel groups
+
+ Returns:
+ Returns a list containing names of channel groups subscribed
+
+ Sample return value:
+ ["a","b","c]
+ """
+ channel_groups = self.subscription_groups
+ channel_group = []
+ with self._channel_group_list_lock:
+ for ch in channel_groups:
+ if not channel_groups[ch]['subscribed']:
+ continue
+ channel_group.append(ch)
+ return channel_group
+
def each(l, func):
if func is None:
return
@@ -1075,6 +1112,16 @@ class PubnubCoreAsync(PubnubBase):
def subscribe(self, channels, callback, error=None,
connect=None, disconnect=None, reconnect=None, sync=False):
+ return self._subscribe(channels=channels, callback=callback, error=error,
+ connect=connect, disconnect=disconnect, reconnect=reconnect, sync=sync)
+
+ def subscribe_group(self, channel_groups, callback, error=None,
+ connect=None, disconnect=None, reconnect=None, sync=False):
+ return self._subscribe(channel_groups=channel_groups, callback=callback, error=error,
+ connect=connect, disconnect=disconnect, reconnect=reconnect, sync=sync)
+
+ def _subscribe(self, channels=None, channel_groups=None, callback=None, error=None,
+ connect=None, disconnect=None, reconnect=None, sync=False):
"""Subscribe to data on a channel.
This function causes the client to create an open TCP socket to the
@@ -1140,6 +1187,20 @@ class PubnubCoreAsync(PubnubBase):
chobj['disconnected'] = False
_invoke(chobj['reconnect'], chobj['name'])
+ if self._channel_group_list_lock:
+ with self._channel_group_list_lock:
+ for ch in self.subscription_groups:
+ chobj = self.subscription_groups[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ chobj['disconnected'] = False
+ _invoke(chobj['connect'], chobj['name'])
+ else:
+ if chobj['disconnected'] is True:
+ chobj['disconnected'] = False
+ _invoke(chobj['reconnect'], chobj['name'])
+
+
def _invoke_disconnect():
if self._channel_list_lock:
with self._channel_list_lock:
@@ -1149,42 +1210,77 @@ class PubnubCoreAsync(PubnubBase):
if chobj['disconnected'] is False:
chobj['disconnected'] = True
_invoke(chobj['disconnect'], chobj['name'])
+ if self._channel_group_list_lock:
+ with self._channel_group_list_lock:
+ for ch in self.subscription_groups:
+ chobj = self.subscription_groups[ch]
+ if chobj['connected'] is True:
+ if chobj['disconnected'] is False:
+ chobj['disconnected'] = True
+ _invoke(chobj['disconnect'], chobj['name'])
+
- def _invoke_error(channel_list=None, err=None):
+ def _invoke_error(channel_list=None, error=None):
if channel_list is None:
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
- _invoke(chobj['error'], err)
+ _invoke(chobj['error'], error)
else:
for ch in channel_list:
chobj = self.subscriptions[ch]
- _invoke(chobj['error'], err)
+ _invoke(chobj['error'], error)
def _get_channel():
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
if chobj['subscribed'] is True:
return chobj
- channels = channels if isinstance(
- channels, list) else channels.split(",")
- for channel in channels:
- ## New Channel?
- if len(channel) > 0 and \
- (not channel in self.subscriptions or
- self.subscriptions[channel]['subscribed'] is False):
- with self._channel_list_lock:
- self.subscriptions[channel] = {
- 'name': channel,
- 'first': False,
- 'connected': False,
- 'disconnected': True,
- 'subscribed': True,
- 'callback': callback,
- 'connect': connect,
- 'disconnect': disconnect,
- 'reconnect': reconnect,
- 'error': error
- }
+
+ if channels is not None:
+ channels = channels if isinstance(
+ channels, list) else channels.split(",")
+ for channel in channels:
+ ## New Channel?
+ if len(channel) > 0 and \
+ (not channel in self.subscriptions or
+ self.subscriptions[channel]['subscribed'] is False):
+ with self._channel_list_lock:
+ self.subscriptions[channel] = {
+ 'name': channel,
+ 'first': False,
+ 'connected': False,
+ 'disconnected': True,
+ 'subscribed': True,
+ 'callback': callback,
+ 'connect': connect,
+ 'disconnect': disconnect,
+ 'reconnect': reconnect,
+ 'error': error
+ }
+
+ if channel_groups is not None:
+ channel_groups = channel_groups if isinstance(
+ channel_groups, list) else channel_groups.split(",")
+
+ for channel_group in channel_groups:
+ ## New Channel?
+ if len(channel_group) > 0 and \
+ (not channel_group in self.subscription_groups or
+ self.subscription_groups[channel_group]['subscribed'] is False):
+ with self._channel_group_list_lock:
+ self.subscription_groups[channel_group] = {
+ 'name': channel_group,
+ 'first': False,
+ 'connected': False,
+ 'disconnected': True,
+ 'subscribed': True,
+ 'callback': callback,
+ 'connect': connect,
+ 'disconnect': disconnect,
+ 'reconnect': reconnect,
+ 'error': error
+ }
+
'''
## return if already connected to channel
if channel in self.subscriptions and \
@@ -1203,24 +1299,25 @@ class PubnubCoreAsync(PubnubBase):
if not response or \
('message' in response and
response['message'] == 'Forbidden'):
- _invoke_error(response['payload'][
- 'channels'], response['message'])
+ _invoke_error(channel_list=response['payload'][
+ 'channels'], error=response['message'])
self.timeout(1, _connect)
return
if 'message' in response:
- _invoke_error(err=response['message'])
+ _invoke_error(error=response['message'])
else:
_invoke_disconnect()
self.timetoken = 0
self.timeout(1, _connect)
def sub_callback(response):
+ print response
## ERROR ?
if not response or \
('message' in response and
response['message'] == 'Forbidden'):
- _invoke_error(response['payload'][
- 'channels'], response['message'])
+ _invoke_error(channel_list=response['payload'][
+ 'channels'], error=response['message'])
_connect()
return
@@ -1230,7 +1327,21 @@ class PubnubCoreAsync(PubnubBase):
self.timetoken = \
self.last_timetoken if self.timetoken == 0 and \
self.last_timetoken != 0 else response[1]
- if len(response) > 2:
+
+ if len(response) > 3:
+ channel_list = response[2].split(',')
+ channel_list_2 = response[3].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscription_groups or ch[1] in self.subscriptions:
+ try:
+ chobj = self.subscription_groups[ch[1]]
+ except KeyError as k:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],
+ self.decrypt(response_list[ch[0]]),
+ channel_list_2[ch[0]])
+ elif len(response) > 2:
channel_list = response[2].split(',')
response_list = response[0]
for ch in enumerate(channel_list):
@@ -1250,9 +1361,14 @@ class PubnubCoreAsync(PubnubBase):
_connect()
channel_list = self.get_channel_list(self.subscriptions)
- if len(channel_list) <= 0:
+ channel_group_list = self.get_channel_group_list(self.subscription_groups)
+
+ if len(channel_list) <= 0 and len(channel_group_list) <= 0:
return
+ if len(channel_list) <= 0:
+ channel_list = ','
+
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
#try:
self.SUB_RECEIVER = self._request({"urlcomponents": [
@@ -1261,7 +1377,8 @@ class PubnubCoreAsync(PubnubBase):
channel_list,
'0',
str(self.timetoken)
- ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key, 'pnsdk' : self.pnsdk}},
+ ], "urlparams": {"uuid": self.uuid, "auth": self.auth_key,
+ 'pnsdk' : self.pnsdk, 'channel-group' : channel_group_list}},
sub_callback,
error_callback,
single=True, timeout=320)
@@ -1328,7 +1445,9 @@ class PubnubCore(PubnubCoreAsync):
origin='pubsub.pubnub.com',
uuid=None,
_tt_lock=None,
- _channel_list_lock=None
+ _channel_list_lock=None,
+ _channel_group_list_lock=None
+
):
super(PubnubCore, self).__init__(
publish_key=publish_key,
@@ -1340,7 +1459,8 @@ class PubnubCore(PubnubCoreAsync):
origin=origin,
uuid=uuid,
_tt_lock=_tt_lock,
- _channel_list_lock=_channel_list_lock
+ _channel_list_lock=_channel_list_lock,
+ _channel_group_list_lock=_channel_group_list_lock
)
self.subscriptions = {}
@@ -1542,7 +1662,8 @@ class Pubnub(PubnubCore):
origin=origin,
uuid=uuid or pres_uuid,
_tt_lock=threading.RLock(),
- _channel_list_lock=threading.RLock()
+ _channel_list_lock=threading.RLock(),
+ _channel_group_list_lock=threading.RLock()
)
global _urllib_request
if self.python_version == 2:
diff --git a/python-twisted/examples/subscribe_group.py b/python-twisted/examples/subscribe_group.py
new file mode 100644
index 0000000..67dbac5
--- /dev/null
+++ b/python-twisted/examples/subscribe_group.py
@@ -0,0 +1,56 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+
+import sys
+from Pubnub import Pubnub
+
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or 'abcd'
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+
+## -----------------------------------------------------------------------
+## Initiate Pubnub State
+## -----------------------------------------------------------------------
+pubnub = Pubnub(publish_key=publish_key, subscribe_key=subscribe_key,
+ secret_key=secret_key, cipher_key=cipher_key, ssl_on=ssl_on, daemon=False)
+
+channel = 'ab'
+
+
+# Asynchronous usage
+def callback(message, channel):
+ print(str(message) + ' , ' + channel)
+
+
+
+def error(message):
+ print("ERROR : " + str(message))
+
+
+def connect(message):
+ print("CONNECTED " + str(message))
+
+
+def reconnect(message):
+ print("RECONNECTED " + str(message))
+
+
+def disconnect(message):
+ print("DISCONNECTED " + str(message))
+
+print pubnub.channel_group_add_channel(channel_group='abc', channel="a")
+
+pubnub.subscribe_group(channel_groups='abc', callback=callback, error=callback,
+ connect=connect, reconnect=reconnect, disconnect=disconnect)
+
+#pubnub.subscribe(channels='d', callback=callback, error=callback,
+# connect=connect, reconnect=reconnect, disconnect=disconnect)
+
+pubnub.start()
diff --git a/python/examples/subscribe_group.py b/python/examples/subscribe_group.py
new file mode 100644
index 0000000..3cebcd9
--- /dev/null
+++ b/python/examples/subscribe_group.py
@@ -0,0 +1,54 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+
+import sys
+from Pubnub import Pubnub
+
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or 'abcd'
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+
+## -----------------------------------------------------------------------
+## Initiate Pubnub State
+## -----------------------------------------------------------------------
+pubnub = Pubnub(publish_key=publish_key, subscribe_key=subscribe_key,
+ secret_key=secret_key, cipher_key=cipher_key, ssl_on=ssl_on, daemon=False)
+
+channel = 'ab'
+
+
+# Asynchronous usage
+def callback(message, channel):
+ print(str(message) + ' , ' + channel)
+
+
+
+def error(message):
+ print("ERROR : " + str(message))
+
+
+def connect(message):
+ print("CONNECTED " + str(message))
+
+
+def reconnect(message):
+ print("RECONNECTED " + str(message))
+
+
+def disconnect(message):
+ print("DISCONNECTED " + str(message))
+
+print pubnub.channel_group_add_channel(channel_group='abc', channel="b")
+
+pubnub.subscribe_group(channel_groups='abc', callback=callback, error=callback,
+ connect=connect, reconnect=reconnect, disconnect=disconnect)
+
+pubnub.subscribe(channels='d', callback=callback, error=callback,
+ connect=connect, reconnect=reconnect, disconnect=disconnect)