aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDevendra2014-04-11 14:49:43 +0530
committerDevendra2014-04-11 14:49:43 +0530
commit99096b8c11b9a541f6350639e8735495cf90091c (patch)
tree446e63037f76cb98d7e3cc0f93316a8bce96e19e
parent765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 (diff)
downloadpubnub-python-99096b8c11b9a541f6350639e8735495cf90091c.tar.bz2
v1 MX and async code for python, twisted, tornado
-rw-r--r--common/PubnubBase.py96
-rw-r--r--common/PubnubCore.py2
-rw-r--r--common/PubnubCoreAsync.py149
-rw-r--r--python-tornado/Pubnub.py246
-rwxr-xr-xpython-tornado/tests/subscribe-test.py148
-rw-r--r--python-tornado/unassembled/Platform.py1
-rw-r--r--python-twisted/Pubnub.py246
-rwxr-xr-xpython-twisted/tests/subscribe-test.py148
-rw-r--r--python-twisted/unassembled/Platform.py1
-rw-r--r--python/Pubnub.py286
-rwxr-xr-xpython/tests/subscribe-test.py148
-rw-r--r--python/unassembled/Platform.py41
12 files changed, 1290 insertions, 222 deletions
diff --git a/common/PubnubBase.py b/common/PubnubBase.py
index d287be3..ac41e0e 100644
--- a/common/PubnubBase.py
+++ b/common/PubnubBase.py
@@ -5,6 +5,14 @@ import time
import hashlib
import uuid
import sys
+from urllib import quote
+
+from base64 import urlsafe_b64encode
+from hashlib import sha256
+from urllib import quote
+from urllib import urlopen
+
+import hmac
class PubnubBase(object):
def __init__(
@@ -13,6 +21,7 @@ class PubnubBase(object):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
UUID = None
@@ -42,6 +51,7 @@ class PubnubBase(object):
self.secret_key = secret_key
self.cipher_key = cipher_key
self.ssl = ssl_on
+ self.auth_key = auth_key
if self.ssl :
@@ -76,6 +86,86 @@ class PubnubBase(object):
signature = '0'
return signature
+ def _pam_sign( self, msg ):
+ """Calculate a signature by secret key and message."""
+
+ return urlsafe_b64encode(hmac.new(
+ self.secret_key.encode("utf-8"),
+ msg.encode("utf-8"),
+ sha256
+ ).digest())
+
+ def _pam_auth( self, query , apicode=0, callback=None):
+ """Issue an authenticated request."""
+
+ if 'timestamp' not in query:
+ query['timestamp'] = int(time.time())
+
+ ## Global Grant?
+ if 'auth' in query and not query['auth']:
+ del query['auth']
+
+ if 'channel' in query and not query['channel']:
+ del query['channel']
+
+ params = "&".join([
+ x + "=" + quote(
+ str(query[x]), safe=""
+ ) for x in sorted(query)
+ ])
+ sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format(
+ subkey=self.subscribe_key,
+ pubkey=self.publish_key,
+ apitype="audit" if (apicode) else "grant",
+ params=params
+ )
+
+ signature = self._pam_sign(sign_input)
+
+ '''
+ url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") +
+ self.subscribe_key + "?" +
+ params + "&signature=" +
+ quote(signature, safe=""))
+ '''
+
+ return self._request({"urlcomponents": [
+ 'v1', 'auth', "audit" if (apicode) else "grant" ,
+ 'sub-key',
+ self.subscribe_key
+ ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},
+ self._return_wrapped_callback(callback))
+
+ def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None):
+ """Grant Access on a Channel."""
+
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey,
+ "r" : read and 1 or 0,
+ "w" : write and 1 or 0,
+ "ttl" : ttl
+ }, callback=callback)
+
+ def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None):
+ """Revoke Access on a Channel."""
+
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey,
+ "r" : read and 1 or 0,
+ "w" : write and 1 or 0,
+ "ttl" : ttl
+ }, callback=callback)
+
+ def audit(self, channel=False, authkey=False, callback=None):
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey
+ },1, callback=callback)
+
+
+
def encrypt(self, message):
if self.cipher_key:
message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n',''))
@@ -147,7 +237,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
+ ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -349,7 +439,7 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and 'callback' in args :
+ if args and 'callback' in args:
callback = args['callback']
else :
callback = None
@@ -376,5 +466,5 @@ class PubnubBase(object):
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
- url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()])
+ url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()])
return url
diff --git a/common/PubnubCore.py b/common/PubnubCore.py
index 3ed3a68..7fb67d6 100644
--- a/common/PubnubCore.py
+++ b/common/PubnubCore.py
@@ -5,6 +5,7 @@ class PubnubCore(PubnubCoreAsync):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
uuid = None
@@ -32,6 +33,7 @@ class PubnubCore(PubnubCoreAsync):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
UUID=uuid
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py
index 0038243..4251d47 100644
--- a/common/PubnubCoreAsync.py
+++ b/common/PubnubCoreAsync.py
@@ -5,6 +5,9 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
+import threading
+from threading import current_thread
+import threading
class PubnubCoreAsync(PubnubBase):
@@ -17,6 +20,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
uuid = None
@@ -43,6 +47,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
UUID=uuid
@@ -50,22 +55,36 @@ class PubnubCoreAsync(PubnubBase):
self.subscriptions = {}
self.timetoken = 0
+ self.last_timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
self.SUB_RECEIVER = None
self._connect = None
+ self._tt_lock = threading.RLock()
def get_channel_list(self, channels):
channel = ''
first = True
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ else:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+
return channel
def subscribe( self, args=None, sync=False ) :
@@ -100,6 +119,26 @@ class PubnubCoreAsync(PubnubBase):
})
"""
+ if args is None:
+ _invoke(error, "Arguments Missing")
+ return
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
+
+ with self._tt_lock:
+ self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
+ self.timetoken = 0
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
if sync is True and self.susbcribe_sync is not None:
self.susbcribe_sync(args)
@@ -113,18 +152,20 @@ class PubnubCoreAsync(PubnubBase):
func()
def _invoke_connect():
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- if chobj['connected'] is False:
- chobj['connected'] = True
- _invoke(chobj['connect'])
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'],chobj['name'])
def _invoke_error(err=None):
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
_invoke(chobj.error,err)
-
+ '''
if callback is None:
_invoke(error, "Callback Missing")
return
@@ -132,6 +173,7 @@ class PubnubCoreAsync(PubnubBase):
if channel is None:
_invoke(error, "Channel Missing")
return
+ '''
def _get_channel():
for ch in self.subscriptions:
@@ -142,22 +184,36 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
+ else:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
## return if already connected to channel
- if self.subscriptions[channel]['connected'] :
+ if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
return
+
## SUBSCRIPTION RECURSION
def _connect():
@@ -165,37 +221,37 @@ class PubnubCoreAsync(PubnubBase):
self._reset_offline()
def sub_callback(response):
- print response
## ERROR ?
if not response or error in response:
_invoke_error()
_invoke_connect()
-
- self.timetoken = response[1]
-
- if len(response) > 2:
- channel_list = response[2].split(',')
- response_list = response[0]
- for ch in enumerate(channel_list):
- if ch[1] in self.subscriptions:
- chobj = self.subscriptions[ch[1]]
- _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
- else:
- response_list = response[0]
- chobj = _get_channel()
- for r in response_list:
- if chobj:
- _invoke(chobj['callback'], self.decrypt(r))
-
-
- _connect()
+ with self._tt_lock:
+ #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
+
+ #with self._tt_lock:
+ # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ _connect()
channel_list = self.get_channel_list(self.subscriptions)
- print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -206,6 +262,7 @@ class PubnubCoreAsync(PubnubBase):
str(self.timetoken)
], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
except Exception as e:
+ print e
self.timeout( 1, _connect)
return
diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py
index ee66619..61f7c3d 100644
--- a/python-tornado/Pubnub.py
+++ b/python-tornado/Pubnub.py
@@ -176,6 +176,14 @@ import time
import hashlib
import uuid
import sys
+from urllib import quote
+
+from base64 import urlsafe_b64encode
+from hashlib import sha256
+from urllib import quote
+from urllib import urlopen
+
+import hmac
class PubnubBase(object):
def __init__(
@@ -184,6 +192,7 @@ class PubnubBase(object):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
UUID = None
@@ -213,6 +222,7 @@ class PubnubBase(object):
self.secret_key = secret_key
self.cipher_key = cipher_key
self.ssl = ssl_on
+ self.auth_key = auth_key
if self.ssl :
@@ -247,6 +257,86 @@ class PubnubBase(object):
signature = '0'
return signature
+ def _pam_sign( self, msg ):
+ """Calculate a signature by secret key and message."""
+
+ return urlsafe_b64encode(hmac.new(
+ self.secret_key.encode("utf-8"),
+ msg.encode("utf-8"),
+ sha256
+ ).digest())
+
+ def _pam_auth( self, query , apicode=0, callback=None):
+ """Issue an authenticated request."""
+
+ if 'timestamp' not in query:
+ query['timestamp'] = int(time.time())
+
+ ## Global Grant?
+ if 'auth' in query and not query['auth']:
+ del query['auth']
+
+ if 'channel' in query and not query['channel']:
+ del query['channel']
+
+ params = "&".join([
+ x + "=" + quote(
+ str(query[x]), safe=""
+ ) for x in sorted(query)
+ ])
+ sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format(
+ subkey=self.subscribe_key,
+ pubkey=self.publish_key,
+ apitype="audit" if (apicode) else "grant",
+ params=params
+ )
+
+ signature = self._pam_sign(sign_input)
+
+ '''
+ url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") +
+ self.subscribe_key + "?" +
+ params + "&signature=" +
+ quote(signature, safe=""))
+ '''
+
+ return self._request({"urlcomponents": [
+ 'v1', 'auth', "audit" if (apicode) else "grant" ,
+ 'sub-key',
+ self.subscribe_key
+ ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},
+ self._return_wrapped_callback(callback))
+
+ def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None):
+ """Grant Access on a Channel."""
+
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey,
+ "r" : read and 1 or 0,
+ "w" : write and 1 or 0,
+ "ttl" : ttl
+ }, callback=callback)
+
+ def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None):
+ """Revoke Access on a Channel."""
+
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey,
+ "r" : read and 1 or 0,
+ "w" : write and 1 or 0,
+ "ttl" : ttl
+ }, callback=callback)
+
+ def audit(self, channel=False, authkey=False, callback=None):
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey
+ },1, callback=callback)
+
+
+
def encrypt(self, message):
if self.cipher_key:
message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n',''))
@@ -318,7 +408,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
+ ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -520,7 +610,7 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and 'callback' in args :
+ if args and 'callback' in args:
callback = args['callback']
else :
callback = None
@@ -547,7 +637,7 @@ class PubnubBase(object):
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
- url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()])
+ url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()])
return url
@@ -558,6 +648,9 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
+import threading
+from threading import current_thread
+import threading
class PubnubCoreAsync(PubnubBase):
@@ -570,6 +663,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
uuid = None
@@ -596,6 +690,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
UUID=uuid
@@ -603,22 +698,36 @@ class PubnubCoreAsync(PubnubBase):
self.subscriptions = {}
self.timetoken = 0
+ self.last_timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
self.SUB_RECEIVER = None
self._connect = None
+ self._tt_lock = threading.RLock()
def get_channel_list(self, channels):
channel = ''
first = True
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ else:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+
return channel
def subscribe( self, args=None, sync=False ) :
@@ -653,6 +762,26 @@ class PubnubCoreAsync(PubnubBase):
})
"""
+ if args is None:
+ _invoke(error, "Arguments Missing")
+ return
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
+
+ with self._tt_lock:
+ self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
+ self.timetoken = 0
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
if sync is True and self.susbcribe_sync is not None:
self.susbcribe_sync(args)
@@ -666,18 +795,20 @@ class PubnubCoreAsync(PubnubBase):
func()
def _invoke_connect():
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- if chobj['connected'] is False:
- chobj['connected'] = True
- _invoke(chobj['connect'])
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'],chobj['name'])
def _invoke_error(err=None):
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
_invoke(chobj.error,err)
-
+ '''
if callback is None:
_invoke(error, "Callback Missing")
return
@@ -685,6 +816,7 @@ class PubnubCoreAsync(PubnubBase):
if channel is None:
_invoke(error, "Channel Missing")
return
+ '''
def _get_channel():
for ch in self.subscriptions:
@@ -695,22 +827,36 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
+ else:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
## return if already connected to channel
- if self.subscriptions[channel]['connected'] :
+ if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
return
+
## SUBSCRIPTION RECURSION
def _connect():
@@ -718,37 +864,37 @@ class PubnubCoreAsync(PubnubBase):
self._reset_offline()
def sub_callback(response):
- print response
## ERROR ?
if not response or error in response:
_invoke_error()
_invoke_connect()
-
- self.timetoken = response[1]
-
- if len(response) > 2:
- channel_list = response[2].split(',')
- response_list = response[0]
- for ch in enumerate(channel_list):
- if ch[1] in self.subscriptions:
- chobj = self.subscriptions[ch[1]]
- _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
- else:
- response_list = response[0]
- chobj = _get_channel()
- for r in response_list:
- if chobj:
- _invoke(chobj['callback'], self.decrypt(r))
-
-
- _connect()
+ with self._tt_lock:
+ #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
+
+ #with self._tt_lock:
+ # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ _connect()
channel_list = self.get_channel_list(self.subscriptions)
- print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -759,6 +905,7 @@ class PubnubCoreAsync(PubnubBase):
str(self.timetoken)
], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
except Exception as e:
+ print e
self.timeout( 1, _connect)
return
@@ -837,6 +984,7 @@ class Pubnub(PubnubCoreAsync):
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
self.id = None
+ self._channel_list_lock = None
def _request( self, request, callback, single=False ) :
url = self.getUrl(request)
diff --git a/python-tornado/tests/subscribe-test.py b/python-tornado/tests/subscribe-test.py
new file mode 100755
index 0000000..0d4c65e
--- /dev/null
+++ b/python-tornado/tests/subscribe-test.py
@@ -0,0 +1,148 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.1 Real-time Push Cloud API
+## -----------------------------------
+
+import sys
+sys.path.append('../')
+import datetime
+from Pubnub import Pubnub
+from functools import partial
+from threading import current_thread
+import threading
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or None
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+
+## -----------------------------------------------------------------------
+## Initiate Pubnub State
+## -----------------------------------------------------------------------
+#pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
+pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on )
+crazy = 'hello_world'
+
+current = -1
+
+errors = 0
+received = 0
+
+## -----------------------------------------------------------------------
+## Subscribe Example
+## -----------------------------------------------------------------------
+def message_received(message):
+ print message
+
+def check_received(message):
+ global current
+ global errors
+ global received
+ print message
+ print current
+ if message <= current:
+ print 'ERROR'
+ #sys.exit()
+ errors += 1
+ else:
+ received += 1
+ print 'active thread count : ', threading.activeCount()
+ print 'errors = ' , errors
+ print current_thread().getName(), ' , ', 'received = ', received
+
+ if received != message:
+ print '********** MISSED **************** ', message - received
+ current = message
+
+
+def connected_test(ch) :
+ print 'Connected' , ch
+
+def connected(ch) :
+ pass
+
+
+'''
+pubnub.subscribe({
+ 'channel' : 'abcd1',
+ 'connect' : connected,
+ 'callback' : message_received
+})
+'''
+def cb1():
+ pubnub.subscribe({
+ 'channel' : 'efgh1',
+ 'connect' : connected,
+ 'callback' : message_received
+ })
+
+def cb2():
+ pubnub.subscribe({
+ 'channel' : 'dsm-test',
+ 'connect' : connected_test,
+ 'callback' : check_received
+ })
+
+def cb3():
+ pubnub.unsubscribe({'channel' : 'efgh1'})
+
+def cb4():
+ pubnub.unsubscribe({'channel' : 'abcd1'})
+
+def subscribe(channel):
+ pubnub.subscribe({
+ 'channel' : channel,
+ 'connect' : connected,
+ 'callback' : message_received
+ })
+
+
+print threading.activeCount()
+
+
+pubnub.timeout(15,cb1)
+
+pubnub.timeout(30,cb2)
+
+
+pubnub.timeout(45,cb3)
+
+pubnub.timeout(60,cb4)
+
+#'''
+for x in range(1,1000):
+ #print x
+ def y(t):
+ subscribe('channel-' + str(t))
+
+ def z(t):
+ pubnub.unsubscribe({'channel' : 'channel-' + str(t)})
+
+ pubnub.timeout(x + 5, partial(y,x))
+ pubnub.timeout(x + 25, partial(z, x))
+ x += 10
+#'''
+
+'''
+for x in range(1,1000):
+ def cb(r): print r , ' : ', threading.activeCount()
+ def y(t):
+ pubnub.publish({
+ 'message' : t,
+ 'callback' : cb,
+ 'channel' : 'dsm-test'
+ })
+
+
+ pubnub.timeout(x + 1, partial(y,x))
+ x += 1
+'''
+
+
+pubnub.start()
diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py
index f98befb..501993e 100644
--- a/python-tornado/unassembled/Platform.py
+++ b/python-tornado/unassembled/Platform.py
@@ -43,6 +43,7 @@ class Pubnub(PubnubCoreAsync):
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
self.id = None
+ self._channel_list_lock = None
def _request( self, request, callback, single=False ) :
url = self.getUrl(request)
diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py
index 3bc2d35..7171efe 100644
--- a/python-twisted/Pubnub.py
+++ b/python-twisted/Pubnub.py
@@ -176,6 +176,14 @@ import time
import hashlib
import uuid
import sys
+from urllib import quote
+
+from base64 import urlsafe_b64encode
+from hashlib import sha256
+from urllib import quote
+from urllib import urlopen
+
+import hmac
class PubnubBase(object):
def __init__(
@@ -184,6 +192,7 @@ class PubnubBase(object):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
UUID = None
@@ -213,6 +222,7 @@ class PubnubBase(object):
self.secret_key = secret_key
self.cipher_key = cipher_key
self.ssl = ssl_on
+ self.auth_key = auth_key
if self.ssl :
@@ -247,6 +257,86 @@ class PubnubBase(object):
signature = '0'
return signature
+ def _pam_sign( self, msg ):
+ """Calculate a signature by secret key and message."""
+
+ return urlsafe_b64encode(hmac.new(
+ self.secret_key.encode("utf-8"),
+ msg.encode("utf-8"),
+ sha256
+ ).digest())
+
+ def _pam_auth( self, query , apicode=0, callback=None):
+ """Issue an authenticated request."""
+
+ if 'timestamp' not in query:
+ query['timestamp'] = int(time.time())
+
+ ## Global Grant?
+ if 'auth' in query and not query['auth']:
+ del query['auth']
+
+ if 'channel' in query and not query['channel']:
+ del query['channel']
+
+ params = "&".join([
+ x + "=" + quote(
+ str(query[x]), safe=""
+ ) for x in sorted(query)
+ ])
+ sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format(
+ subkey=self.subscribe_key,
+ pubkey=self.publish_key,
+ apitype="audit" if (apicode) else "grant",
+ params=params
+ )
+
+ signature = self._pam_sign(sign_input)
+
+ '''
+ url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") +
+ self.subscribe_key + "?" +
+ params + "&signature=" +
+ quote(signature, safe=""))
+ '''
+
+ return self._request({"urlcomponents": [
+ 'v1', 'auth', "audit" if (apicode) else "grant" ,
+ 'sub-key',
+ self.subscribe_key
+ ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},
+ self._return_wrapped_callback(callback))
+
+ def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None):
+ """Grant Access on a Channel."""
+
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey,
+ "r" : read and 1 or 0,
+ "w" : write and 1 or 0,
+ "ttl" : ttl
+ }, callback=callback)
+
+ def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None):
+ """Revoke Access on a Channel."""
+
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey,
+ "r" : read and 1 or 0,
+ "w" : write and 1 or 0,
+ "ttl" : ttl
+ }, callback=callback)
+
+ def audit(self, channel=False, authkey=False, callback=None):
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey
+ },1, callback=callback)
+
+
+
def encrypt(self, message):
if self.cipher_key:
message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n',''))
@@ -318,7 +408,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
+ ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -520,7 +610,7 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and 'callback' in args :
+ if args and 'callback' in args:
callback = args['callback']
else :
callback = None
@@ -547,7 +637,7 @@ class PubnubBase(object):
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
- url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()])
+ url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()])
return url
@@ -558,6 +648,9 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
+import threading
+from threading import current_thread
+import threading
class PubnubCoreAsync(PubnubBase):
@@ -570,6 +663,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
uuid = None
@@ -596,6 +690,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
UUID=uuid
@@ -603,22 +698,36 @@ class PubnubCoreAsync(PubnubBase):
self.subscriptions = {}
self.timetoken = 0
+ self.last_timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
self.SUB_RECEIVER = None
self._connect = None
+ self._tt_lock = threading.RLock()
def get_channel_list(self, channels):
channel = ''
first = True
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ else:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+
return channel
def subscribe( self, args=None, sync=False ) :
@@ -653,6 +762,26 @@ class PubnubCoreAsync(PubnubBase):
})
"""
+ if args is None:
+ _invoke(error, "Arguments Missing")
+ return
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
+
+ with self._tt_lock:
+ self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
+ self.timetoken = 0
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
if sync is True and self.susbcribe_sync is not None:
self.susbcribe_sync(args)
@@ -666,18 +795,20 @@ class PubnubCoreAsync(PubnubBase):
func()
def _invoke_connect():
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- if chobj['connected'] is False:
- chobj['connected'] = True
- _invoke(chobj['connect'])
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'],chobj['name'])
def _invoke_error(err=None):
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
_invoke(chobj.error,err)
-
+ '''
if callback is None:
_invoke(error, "Callback Missing")
return
@@ -685,6 +816,7 @@ class PubnubCoreAsync(PubnubBase):
if channel is None:
_invoke(error, "Channel Missing")
return
+ '''
def _get_channel():
for ch in self.subscriptions:
@@ -695,22 +827,36 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
+ else:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
## return if already connected to channel
- if self.subscriptions[channel]['connected'] :
+ if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
return
+
## SUBSCRIPTION RECURSION
def _connect():
@@ -718,37 +864,37 @@ class PubnubCoreAsync(PubnubBase):
self._reset_offline()
def sub_callback(response):
- print response
## ERROR ?
if not response or error in response:
_invoke_error()
_invoke_connect()
-
- self.timetoken = response[1]
-
- if len(response) > 2:
- channel_list = response[2].split(',')
- response_list = response[0]
- for ch in enumerate(channel_list):
- if ch[1] in self.subscriptions:
- chobj = self.subscriptions[ch[1]]
- _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
- else:
- response_list = response[0]
- chobj = _get_channel()
- for r in response_list:
- if chobj:
- _invoke(chobj['callback'], self.decrypt(r))
-
-
- _connect()
+ with self._tt_lock:
+ #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
+
+ #with self._tt_lock:
+ # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ _connect()
channel_list = self.get_channel_list(self.subscriptions)
- print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -759,6 +905,7 @@ class PubnubCoreAsync(PubnubBase):
str(self.timetoken)
], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
except Exception as e:
+ print e
self.timeout( 1, _connect)
return
@@ -838,6 +985,7 @@ class Pubnub(PubnubCoreAsync):
self.headers['User-Agent'] = ['Python-Twisted']
#self.headers['Accept-Encoding'] = [self.accept_encoding]
self.headers['V'] = [self.version]
+ self._channel_list_lock = None
def _request( self, request, callback, single=False ) :
global pnconn_pool
diff --git a/python-twisted/tests/subscribe-test.py b/python-twisted/tests/subscribe-test.py
new file mode 100755
index 0000000..0d4c65e
--- /dev/null
+++ b/python-twisted/tests/subscribe-test.py
@@ -0,0 +1,148 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.1 Real-time Push Cloud API
+## -----------------------------------
+
+import sys
+sys.path.append('../')
+import datetime
+from Pubnub import Pubnub
+from functools import partial
+from threading import current_thread
+import threading
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or None
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+
+## -----------------------------------------------------------------------
+## Initiate Pubnub State
+## -----------------------------------------------------------------------
+#pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
+pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on )
+crazy = 'hello_world'
+
+current = -1
+
+errors = 0
+received = 0
+
+## -----------------------------------------------------------------------
+## Subscribe Example
+## -----------------------------------------------------------------------
+def message_received(message):
+ print message
+
+def check_received(message):
+ global current
+ global errors
+ global received
+ print message
+ print current
+ if message <= current:
+ print 'ERROR'
+ #sys.exit()
+ errors += 1
+ else:
+ received += 1
+ print 'active thread count : ', threading.activeCount()
+ print 'errors = ' , errors
+ print current_thread().getName(), ' , ', 'received = ', received
+
+ if received != message:
+ print '********** MISSED **************** ', message - received
+ current = message
+
+
+def connected_test(ch) :
+ print 'Connected' , ch
+
+def connected(ch) :
+ pass
+
+
+'''
+pubnub.subscribe({
+ 'channel' : 'abcd1',
+ 'connect' : connected,
+ 'callback' : message_received
+})
+'''
+def cb1():
+ pubnub.subscribe({
+ 'channel' : 'efgh1',
+ 'connect' : connected,
+ 'callback' : message_received
+ })
+
+def cb2():
+ pubnub.subscribe({
+ 'channel' : 'dsm-test',
+ 'connect' : connected_test,
+ 'callback' : check_received
+ })
+
+def cb3():
+ pubnub.unsubscribe({'channel' : 'efgh1'})
+
+def cb4():
+ pubnub.unsubscribe({'channel' : 'abcd1'})
+
+def subscribe(channel):
+ pubnub.subscribe({
+ 'channel' : channel,
+ 'connect' : connected,
+ 'callback' : message_received
+ })
+
+
+print threading.activeCount()
+
+
+pubnub.timeout(15,cb1)
+
+pubnub.timeout(30,cb2)
+
+
+pubnub.timeout(45,cb3)
+
+pubnub.timeout(60,cb4)
+
+#'''
+for x in range(1,1000):
+ #print x
+ def y(t):
+ subscribe('channel-' + str(t))
+
+ def z(t):
+ pubnub.unsubscribe({'channel' : 'channel-' + str(t)})
+
+ pubnub.timeout(x + 5, partial(y,x))
+ pubnub.timeout(x + 25, partial(z, x))
+ x += 10
+#'''
+
+'''
+for x in range(1,1000):
+ def cb(r): print r , ' : ', threading.activeCount()
+ def y(t):
+ pubnub.publish({
+ 'message' : t,
+ 'callback' : cb,
+ 'channel' : 'dsm-test'
+ })
+
+
+ pubnub.timeout(x + 1, partial(y,x))
+ x += 1
+'''
+
+
+pubnub.start()
diff --git a/python-twisted/unassembled/Platform.py b/python-twisted/unassembled/Platform.py
index 3b84b30..5268446 100644
--- a/python-twisted/unassembled/Platform.py
+++ b/python-twisted/unassembled/Platform.py
@@ -44,6 +44,7 @@ class Pubnub(PubnubCoreAsync):
self.headers['User-Agent'] = ['Python-Twisted']
#self.headers['Accept-Encoding'] = [self.accept_encoding]
self.headers['V'] = [self.version]
+ self._channel_list_lock = None
def _request( self, request, callback, single=False ) :
global pnconn_pool
diff --git a/python/Pubnub.py b/python/Pubnub.py
index a449c2d..f3c518c 100644
--- a/python/Pubnub.py
+++ b/python/Pubnub.py
@@ -176,6 +176,14 @@ import time
import hashlib
import uuid
import sys
+from urllib import quote
+
+from base64 import urlsafe_b64encode
+from hashlib import sha256
+from urllib import quote
+from urllib import urlopen
+
+import hmac
class PubnubBase(object):
def __init__(
@@ -184,6 +192,7 @@ class PubnubBase(object):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
UUID = None
@@ -213,6 +222,7 @@ class PubnubBase(object):
self.secret_key = secret_key
self.cipher_key = cipher_key
self.ssl = ssl_on
+ self.auth_key = auth_key
if self.ssl :
@@ -247,6 +257,86 @@ class PubnubBase(object):
signature = '0'
return signature
+ def _pam_sign( self, msg ):
+ """Calculate a signature by secret key and message."""
+
+ return urlsafe_b64encode(hmac.new(
+ self.secret_key.encode("utf-8"),
+ msg.encode("utf-8"),
+ sha256
+ ).digest())
+
+ def _pam_auth( self, query , apicode=0, callback=None):
+ """Issue an authenticated request."""
+
+ if 'timestamp' not in query:
+ query['timestamp'] = int(time.time())
+
+ ## Global Grant?
+ if 'auth' in query and not query['auth']:
+ del query['auth']
+
+ if 'channel' in query and not query['channel']:
+ del query['channel']
+
+ params = "&".join([
+ x + "=" + quote(
+ str(query[x]), safe=""
+ ) for x in sorted(query)
+ ])
+ sign_input = "{subkey}\n{pubkey}\n{apitype}\n{params}".format(
+ subkey=self.subscribe_key,
+ pubkey=self.publish_key,
+ apitype="audit" if (apicode) else "grant",
+ params=params
+ )
+
+ signature = self._pam_sign(sign_input)
+
+ '''
+ url = ("https://pubsub.pubnub.com/v1/auth/{apitype}/sub-key/".format(apitype="audit" if (apicode) else "grant") +
+ self.subscribe_key + "?" +
+ params + "&signature=" +
+ quote(signature, safe=""))
+ '''
+
+ return self._request({"urlcomponents": [
+ 'v1', 'auth', "audit" if (apicode) else "grant" ,
+ 'sub-key',
+ self.subscribe_key
+ ], 'urlparams' : {'auth' : self.auth_key, 'signature' : signature}},
+ self._return_wrapped_callback(callback))
+
+ def grant( self, channel, authkey=False, read=True, write=True, ttl=5, callback=None):
+ """Grant Access on a Channel."""
+
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey,
+ "r" : read and 1 or 0,
+ "w" : write and 1 or 0,
+ "ttl" : ttl
+ }, callback=callback)
+
+ def revoke( self, channel, authkey=False, read=False, write=False, ttl=1, callback=None):
+ """Revoke Access on a Channel."""
+
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey,
+ "r" : read and 1 or 0,
+ "w" : write and 1 or 0,
+ "ttl" : ttl
+ }, callback=callback)
+
+ def audit(self, channel=False, authkey=False, callback=None):
+ return self._pam_auth({
+ "channel" : channel,
+ "auth" : authkey
+ },1, callback=callback)
+
+
+
def encrypt(self, message):
if self.cipher_key:
message = json.dumps(self.pc.encrypt(self.cipher_key, json.dumps(message)).replace('\n',''))
@@ -318,7 +408,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
+ ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -520,7 +610,7 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and 'callback' in args :
+ if args and 'callback' in args:
callback = args['callback']
else :
callback = None
@@ -547,7 +637,7 @@ class PubnubBase(object):
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
- url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()])
+ url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()])
return url
@@ -558,6 +648,9 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
+import threading
+from threading import current_thread
+import threading
class PubnubCoreAsync(PubnubBase):
@@ -570,6 +663,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
uuid = None
@@ -596,6 +690,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
UUID=uuid
@@ -603,22 +698,36 @@ class PubnubCoreAsync(PubnubBase):
self.subscriptions = {}
self.timetoken = 0
+ self.last_timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
self.SUB_RECEIVER = None
self._connect = None
+ self._tt_lock = threading.RLock()
def get_channel_list(self, channels):
channel = ''
first = True
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ else:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+
return channel
def subscribe( self, args=None, sync=False ) :
@@ -653,6 +762,26 @@ class PubnubCoreAsync(PubnubBase):
})
"""
+ if args is None:
+ _invoke(error, "Arguments Missing")
+ return
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
+
+ with self._tt_lock:
+ self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
+ self.timetoken = 0
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
if sync is True and self.susbcribe_sync is not None:
self.susbcribe_sync(args)
@@ -666,18 +795,20 @@ class PubnubCoreAsync(PubnubBase):
func()
def _invoke_connect():
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- if chobj['connected'] is False:
- chobj['connected'] = True
- _invoke(chobj['connect'])
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'],chobj['name'])
def _invoke_error(err=None):
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
_invoke(chobj.error,err)
-
+ '''
if callback is None:
_invoke(error, "Callback Missing")
return
@@ -685,6 +816,7 @@ class PubnubCoreAsync(PubnubBase):
if channel is None:
_invoke(error, "Channel Missing")
return
+ '''
def _get_channel():
for ch in self.subscriptions:
@@ -695,22 +827,36 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
+ else:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
## return if already connected to channel
- if self.subscriptions[channel]['connected'] :
+ if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
return
+
## SUBSCRIPTION RECURSION
def _connect():
@@ -718,37 +864,37 @@ class PubnubCoreAsync(PubnubBase):
self._reset_offline()
def sub_callback(response):
- print response
## ERROR ?
if not response or error in response:
_invoke_error()
_invoke_connect()
-
- self.timetoken = response[1]
-
- if len(response) > 2:
- channel_list = response[2].split(',')
- response_list = response[0]
- for ch in enumerate(channel_list):
- if ch[1] in self.subscriptions:
- chobj = self.subscriptions[ch[1]]
- _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
- else:
- response_list = response[0]
- chobj = _get_channel()
- for r in response_list:
- if chobj:
- _invoke(chobj['callback'], self.decrypt(r))
-
-
- _connect()
+ with self._tt_lock:
+ #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
+
+ #with self._tt_lock:
+ # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ _connect()
channel_list = self.get_channel_list(self.subscriptions)
- print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -759,6 +905,7 @@ class PubnubCoreAsync(PubnubBase):
str(self.timetoken)
], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
except Exception as e:
+ print e
self.timeout( 1, _connect)
return
@@ -800,8 +947,11 @@ except:
import threading
import json
import time
+import threading
+from threading import current_thread
-current_req_id = -1
+latest_sub_callback_lock = threading.RLock()
+latest_sub_callback = {'id' : None, 'callback' : None}
class HTTPClient:
def __init__(self, url, callback, id=None):
@@ -815,23 +965,32 @@ class HTTPClient:
self.callback = None
def run(self):
- global current_req_id
data = urllib2.urlopen(self.url, timeout=310).read()
if self.stop is True:
return
- if self.id is not None and current_req_id != self.id:
- return
- if self.callback is not None:
+ if self.callback is None:
+ global latest_sub_callback
+ global latest_sub_callback_lock
+ with latest_sub_callback_lock:
+ if latest_sub_callback['id'] != self.id:
+ return
+ else:
+ print(data)
+ if latest_sub_callback['callback'] is not None:
+ latest_sub_callback['id'] = 0
+ latest_sub_callback['callback'](json.loads(data))
+ else:
self.callback(json.loads(data))
-class Pubnub(PubnubCore):
+class Pubnub(PubnubCoreAsync):
def __init__(
self,
publish_key,
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
pres_uuid = None
@@ -841,6 +1000,7 @@ class Pubnub(PubnubCore):
subscribe_key = subscribe_key,
secret_key = secret_key,
cipher_key = cipher_key,
+ auth_key = auth_key,
ssl_on = ssl_on,
origin = origin,
uuid = pres_uuid
@@ -849,6 +1009,7 @@ class Pubnub(PubnubCore):
self._request = self._request2
else:
self._request = self._request3
+ self._channel_list_lock = threading.RLock()
def timeout(self, interval, func):
def cb():
@@ -858,13 +1019,14 @@ class Pubnub(PubnubCore):
thread.start()
def _request2_async( self, request, callback, single=False ) :
- global current_req_id
## Build URL
url = self.getUrl(request)
if single is True:
id = time.time()
- client = HTTPClient(url, callback, id)
- current_req_id = id
+ client = HTTPClient(url, None, id)
+ with latest_sub_callback_lock:
+ latest_sub_callback['id'] = id
+ latest_sub_callback['callback'] = callback
else:
client = HTTPClient(url, callback)
@@ -879,7 +1041,6 @@ class Pubnub(PubnubCore):
## Build URL
url = self.getUrl(request)
-
## Send Request Expecting JSONP Response
try:
try: usock = urllib2.urlopen( url, None, 310 )
@@ -887,15 +1048,16 @@ class Pubnub(PubnubCore):
response = usock.read()
usock.close()
resp_json = json.loads(response)
- except:
+ except Exception as e:
+ print e
return None
- return resp_json
+ return resp_json
def _request2(self, request, callback=None, single=False):
if callback is None:
- return self._request2_sync(request,single=single)
+ return self._request2_sync(request)
else:
self._request2_async(request, callback, single=single)
diff --git a/python/tests/subscribe-test.py b/python/tests/subscribe-test.py
new file mode 100755
index 0000000..0d4c65e
--- /dev/null
+++ b/python/tests/subscribe-test.py
@@ -0,0 +1,148 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.1 Real-time Push Cloud API
+## -----------------------------------
+
+import sys
+sys.path.append('../')
+import datetime
+from Pubnub import Pubnub
+from functools import partial
+from threading import current_thread
+import threading
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or None
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+
+## -----------------------------------------------------------------------
+## Initiate Pubnub State
+## -----------------------------------------------------------------------
+#pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
+pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on )
+crazy = 'hello_world'
+
+current = -1
+
+errors = 0
+received = 0
+
+## -----------------------------------------------------------------------
+## Subscribe Example
+## -----------------------------------------------------------------------
+def message_received(message):
+ print message
+
+def check_received(message):
+ global current
+ global errors
+ global received
+ print message
+ print current
+ if message <= current:
+ print 'ERROR'
+ #sys.exit()
+ errors += 1
+ else:
+ received += 1
+ print 'active thread count : ', threading.activeCount()
+ print 'errors = ' , errors
+ print current_thread().getName(), ' , ', 'received = ', received
+
+ if received != message:
+ print '********** MISSED **************** ', message - received
+ current = message
+
+
+def connected_test(ch) :
+ print 'Connected' , ch
+
+def connected(ch) :
+ pass
+
+
+'''
+pubnub.subscribe({
+ 'channel' : 'abcd1',
+ 'connect' : connected,
+ 'callback' : message_received
+})
+'''
+def cb1():
+ pubnub.subscribe({
+ 'channel' : 'efgh1',
+ 'connect' : connected,
+ 'callback' : message_received
+ })
+
+def cb2():
+ pubnub.subscribe({
+ 'channel' : 'dsm-test',
+ 'connect' : connected_test,
+ 'callback' : check_received
+ })
+
+def cb3():
+ pubnub.unsubscribe({'channel' : 'efgh1'})
+
+def cb4():
+ pubnub.unsubscribe({'channel' : 'abcd1'})
+
+def subscribe(channel):
+ pubnub.subscribe({
+ 'channel' : channel,
+ 'connect' : connected,
+ 'callback' : message_received
+ })
+
+
+print threading.activeCount()
+
+
+pubnub.timeout(15,cb1)
+
+pubnub.timeout(30,cb2)
+
+
+pubnub.timeout(45,cb3)
+
+pubnub.timeout(60,cb4)
+
+#'''
+for x in range(1,1000):
+ #print x
+ def y(t):
+ subscribe('channel-' + str(t))
+
+ def z(t):
+ pubnub.unsubscribe({'channel' : 'channel-' + str(t)})
+
+ pubnub.timeout(x + 5, partial(y,x))
+ pubnub.timeout(x + 25, partial(z, x))
+ x += 10
+#'''
+
+'''
+for x in range(1,1000):
+ def cb(r): print r , ' : ', threading.activeCount()
+ def y(t):
+ pubnub.publish({
+ 'message' : t,
+ 'callback' : cb,
+ 'channel' : 'dsm-test'
+ })
+
+
+ pubnub.timeout(x + 1, partial(y,x))
+ x += 1
+'''
+
+
+pubnub.start()
diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py
index f0f9327..22893f8 100644
--- a/python/unassembled/Platform.py
+++ b/python/unassembled/Platform.py
@@ -6,8 +6,11 @@ except:
import threading
import json
import time
+import threading
+from threading import current_thread
-current_req_id = -1
+latest_sub_callback_lock = threading.RLock()
+latest_sub_callback = {'id' : None, 'callback' : None}
class HTTPClient:
def __init__(self, url, callback, id=None):
@@ -21,23 +24,32 @@ class HTTPClient:
self.callback = None
def run(self):
- global current_req_id
data = urllib2.urlopen(self.url, timeout=310).read()
if self.stop is True:
return
- if self.id is not None and current_req_id != self.id:
- return
- if self.callback is not None:
+ if self.callback is None:
+ global latest_sub_callback
+ global latest_sub_callback_lock
+ with latest_sub_callback_lock:
+ if latest_sub_callback['id'] != self.id:
+ return
+ else:
+ print(data)
+ if latest_sub_callback['callback'] is not None:
+ latest_sub_callback['id'] = 0
+ latest_sub_callback['callback'](json.loads(data))
+ else:
self.callback(json.loads(data))
-class Pubnub(PubnubCore):
+class Pubnub(PubnubCoreAsync):
def __init__(
self,
publish_key,
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
pres_uuid = None
@@ -47,6 +59,7 @@ class Pubnub(PubnubCore):
subscribe_key = subscribe_key,
secret_key = secret_key,
cipher_key = cipher_key,
+ auth_key = auth_key,
ssl_on = ssl_on,
origin = origin,
uuid = pres_uuid
@@ -55,6 +68,7 @@ class Pubnub(PubnubCore):
self._request = self._request2
else:
self._request = self._request3
+ self._channel_list_lock = threading.RLock()
def timeout(self, interval, func):
def cb():
@@ -64,13 +78,14 @@ class Pubnub(PubnubCore):
thread.start()
def _request2_async( self, request, callback, single=False ) :
- global current_req_id
## Build URL
url = self.getUrl(request)
if single is True:
id = time.time()
- client = HTTPClient(url, callback, id)
- current_req_id = id
+ client = HTTPClient(url, None, id)
+ with latest_sub_callback_lock:
+ latest_sub_callback['id'] = id
+ latest_sub_callback['callback'] = callback
else:
client = HTTPClient(url, callback)
@@ -85,7 +100,6 @@ class Pubnub(PubnubCore):
## Build URL
url = self.getUrl(request)
-
## Send Request Expecting JSONP Response
try:
try: usock = urllib2.urlopen( url, None, 310 )
@@ -93,15 +107,16 @@ class Pubnub(PubnubCore):
response = usock.read()
usock.close()
resp_json = json.loads(response)
- except:
+ except Exception as e:
+ print e
return None
- return resp_json
+ return resp_json
def _request2(self, request, callback=None, single=False):
if callback is None:
- return self._request2_sync(request,single=single)
+ return self._request2_sync(request)
else:
self._request2_async(request, callback, single=single)