aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/PubnubBase.py1
-rw-r--r--common/PubnubCoreAsync.py20
-rw-r--r--python-tornado/Pubnub.py23
-rw-r--r--python-tornado/unassembled/Platform.py2
-rw-r--r--python-twisted/Pubnub.py21
-rw-r--r--python/Pubnub.py21
-rwxr-xr-xpython/examples/dev-console.py49
7 files changed, 96 insertions, 41 deletions
diff --git a/common/PubnubBase.py b/common/PubnubBase.py
index 585be70..522c69f 100644
--- a/common/PubnubBase.py
+++ b/common/PubnubBase.py
@@ -421,5 +421,4 @@ class PubnubBase(object):
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None])
- #print(url)
return url
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py
index f7b57cc..30d337c 100644
--- a/common/PubnubCoreAsync.py
+++ b/common/PubnubCoreAsync.py
@@ -84,6 +84,15 @@ class PubnubCoreAsync(PubnubBase):
channel += ch
return channel
+ def get_channel_array(self):
+ channels = self.subscriptions
+ channel = []
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ channel.append(ch)
+ return channel
def each(l, func):
if func is None:
@@ -192,7 +201,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
- if not channel in self.subscriptions:
+ if not channel in self.subscriptions or self.subscriptions[channel]['subscribed'] is False:
with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
@@ -221,7 +230,6 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- #print response
if not response or ('message' in response and response['message'] == 'Forbidden'):
_invoke_error(response['payload']['channels'], response['message'])
_connect()
@@ -230,9 +238,7 @@ class PubnubCoreAsync(PubnubBase):
_invoke_connect()
with self._tt_lock:
- #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
- #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
if len(response) > 2:
channel_list = response[2].split(',')
response_list = response[0]
@@ -247,12 +253,13 @@ class PubnubCoreAsync(PubnubBase):
if chobj:
_invoke(chobj['callback'], self.decrypt(r))
- #with self._tt_lock:
- # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
_connect()
channel_list = self.get_channel_list(self.subscriptions)
+ if len(channel_list) <= 0:
+ return
+
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -269,7 +276,6 @@ class PubnubCoreAsync(PubnubBase):
self._connect = _connect
-
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
_connect()
diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py
index 2c6c98f..73cb62d 100644
--- a/python-tornado/Pubnub.py
+++ b/python-tornado/Pubnub.py
@@ -592,7 +592,6 @@ class PubnubBase(object):
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None])
- #print(url)
return url
@@ -682,6 +681,15 @@ class PubnubCoreAsync(PubnubBase):
channel += ch
return channel
+ def get_channel_array(self):
+ channels = self.subscriptions
+ channel = []
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ channel.append(ch)
+ return channel
def each(l, func):
if func is None:
@@ -790,7 +798,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
- if not channel in self.subscriptions:
+ if not channel in self.subscriptions or self.subscriptions[channel]['subscribed'] is False:
with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
@@ -819,7 +827,6 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- #print response
if not response or ('message' in response and response['message'] == 'Forbidden'):
_invoke_error(response['payload']['channels'], response['message'])
_connect()
@@ -828,9 +835,7 @@ class PubnubCoreAsync(PubnubBase):
_invoke_connect()
with self._tt_lock:
- #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
- #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
if len(response) > 2:
channel_list = response[2].split(',')
response_list = response[0]
@@ -845,12 +850,13 @@ class PubnubCoreAsync(PubnubBase):
if chobj:
_invoke(chobj['callback'], self.decrypt(r))
- #with self._tt_lock:
- # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
_connect()
channel_list = self.get_channel_list(self.subscriptions)
+ if len(channel_list) <= 0:
+ return
+
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -867,7 +873,6 @@ class PubnubCoreAsync(PubnubBase):
self._connect = _connect
-
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
_connect()
@@ -927,6 +932,7 @@ class Pubnub(PubnubCoreAsync):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = False,
ssl_on = False,
origin = 'pubsub.pubnub.com'
) :
@@ -935,6 +941,7 @@ class Pubnub(PubnubCoreAsync):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
)
diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py
index 5200136..02c374d 100644
--- a/python-tornado/unassembled/Platform.py
+++ b/python-tornado/unassembled/Platform.py
@@ -26,6 +26,7 @@ class Pubnub(PubnubCoreAsync):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = False,
ssl_on = False,
origin = 'pubsub.pubnub.com'
) :
@@ -34,6 +35,7 @@ class Pubnub(PubnubCoreAsync):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
)
diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py
index 99eda47..22d647b 100644
--- a/python-twisted/Pubnub.py
+++ b/python-twisted/Pubnub.py
@@ -592,7 +592,6 @@ class PubnubBase(object):
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None])
- #print(url)
return url
@@ -682,6 +681,15 @@ class PubnubCoreAsync(PubnubBase):
channel += ch
return channel
+ def get_channel_array(self):
+ channels = self.subscriptions
+ channel = []
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ channel.append(ch)
+ return channel
def each(l, func):
if func is None:
@@ -790,7 +798,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
- if not channel in self.subscriptions:
+ if not channel in self.subscriptions or self.subscriptions[channel]['subscribed'] is False:
with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
@@ -819,7 +827,6 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- #print response
if not response or ('message' in response and response['message'] == 'Forbidden'):
_invoke_error(response['payload']['channels'], response['message'])
_connect()
@@ -828,9 +835,7 @@ class PubnubCoreAsync(PubnubBase):
_invoke_connect()
with self._tt_lock:
- #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
- #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
if len(response) > 2:
channel_list = response[2].split(',')
response_list = response[0]
@@ -845,12 +850,13 @@ class PubnubCoreAsync(PubnubBase):
if chobj:
_invoke(chobj['callback'], self.decrypt(r))
- #with self._tt_lock:
- # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
_connect()
channel_list = self.get_channel_list(self.subscriptions)
+ if len(channel_list) <= 0:
+ return
+
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -867,7 +873,6 @@ class PubnubCoreAsync(PubnubBase):
self._connect = _connect
-
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
_connect()
diff --git a/python/Pubnub.py b/python/Pubnub.py
index 7af2e92..bca3ee5 100644
--- a/python/Pubnub.py
+++ b/python/Pubnub.py
@@ -592,7 +592,6 @@ class PubnubBase(object):
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None])
- #print(url)
return url
@@ -682,6 +681,15 @@ class PubnubCoreAsync(PubnubBase):
channel += ch
return channel
+ def get_channel_array(self):
+ channels = self.subscriptions
+ channel = []
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ channel.append(ch)
+ return channel
def each(l, func):
if func is None:
@@ -790,7 +798,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
- if not channel in self.subscriptions:
+ if not channel in self.subscriptions or self.subscriptions[channel]['subscribed'] is False:
with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
@@ -819,7 +827,6 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- #print response
if not response or ('message' in response and response['message'] == 'Forbidden'):
_invoke_error(response['payload']['channels'], response['message'])
_connect()
@@ -828,9 +835,7 @@ class PubnubCoreAsync(PubnubBase):
_invoke_connect()
with self._tt_lock:
- #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
- #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
if len(response) > 2:
channel_list = response[2].split(',')
response_list = response[0]
@@ -845,12 +850,13 @@ class PubnubCoreAsync(PubnubBase):
if chobj:
_invoke(chobj['callback'], self.decrypt(r))
- #with self._tt_lock:
- # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
_connect()
channel_list = self.get_channel_list(self.subscriptions)
+ if len(channel_list) <= 0:
+ return
+
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -867,7 +873,6 @@ class PubnubCoreAsync(PubnubBase):
self._connect = _connect
-
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
_connect()
diff --git a/python/examples/dev-console.py b/python/examples/dev-console.py
index 08e08d9..532edbe 100755
--- a/python/examples/dev-console.py
+++ b/python/examples/dev-console.py
@@ -47,11 +47,15 @@ parser.add_option("--ssl-on",
action="store_false", dest="ssl", default=False,
help="SSL")
+parser.add_option("--uuid",
+ dest="uuid", default=None,
+ help="UUID")
+
(options, args) = parser.parse_args()
print options
-pubnub = Pubnub(options.publish_key, options.subscribe_key, options.secret_key, options.cipher_key, options.auth_key, options.ssl)
+pubnub = Pubnub(options.publish_key, options.subscribe_key, options.secret_key, options.cipher_key, options.auth_key, options.ssl, options.origin, options.uuid)
class color:
@@ -67,11 +71,19 @@ class color:
END = '\033[0m'
-def print_ok(msg):
- print(color.GREEN + str(msg) + color.END)
+def print_ok(msg, channel=None):
+ chstr = " [Channel : " + channel + "] " if channel is not None else ""
+ try:
+ print(color.GREEN + chstr + str(msg) + color.END)
+ except:
+ print(msg)
-def print_error(msg):
- print(color.RED + color.BOLD + str(msg) + color.END)
+def print_error(msg, channel=None):
+ chstr = " [Channel : " + channel + "] " if channel is not None else ""
+ try:
+ print(color.RED + color.BOLD + chstr + str(msg) + color.END)
+ except:
+ print(msg)
def get_input(message, t=None):
while True:
@@ -115,15 +127,27 @@ def _publish_command_handler():
def _subscribe_command_handler():
channel = get_input("[SUBSCRIBE] Enter Channel Name ", str)
def _callback(r):
- print_ok(r)
+ print_ok(r, channel)
def _error(r):
- print_error(r)
+ print_error(r, channel)
pubnub.subscribe({
'channel' : channel,
'callback' : _callback,
'error' : _error
})
+def _unsubscribe_command_handler():
+ channel = get_input("[UNSUBSCRIBE] Enter Channel Name ", str)
+ def _callback(r):
+ print_ok(r)
+ def _error(r):
+ print_error(r)
+ pubnub.unsubscribe({
+ 'channel' : channel,
+ 'callback' : _callback,
+ 'error' : _error
+ })
+
def _grant_command_handler():
def _callback(r):
print_ok(r)
@@ -201,6 +225,7 @@ def kill_all_threads():
commands = []
commands.append({"command" : "publish", "handler" : _publish_command_handler})
commands.append({"command" : "subscribe", "handler" : _subscribe_command_handler})
+commands.append({"command" : "unsubscribe", "handler" : _unsubscribe_command_handler})
commands.append({"command" : "here_now", "handler" : _here_now_command_handler})
commands.append({"command" : "history", "handler" : _history_command_handler})
commands.append({"command" : "grant", "handler" : _grant_command_handler})
@@ -212,14 +237,20 @@ commands.append({"command" : "QUIT"})
def get_help():
help = ""
+ help += "Channels currently subscribed to : "
+ help += str(pubnub.get_channel_array())
+ help += "\n"
for i,v in enumerate(commands):
help += "Enter " + str(i) + " for " + v['command'] + "\n"
return help
while True:
- command = get_input(color.BLUE + get_help(), int)
-
+ try:
+ command = get_input(color.BLUE + get_help(), int)
+ except KeyboardInterrupt:
+ kill_all_threads()
+ break
if command == len(commands) - 1:
kill_all_threads()
break