aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDevendra2014-04-16 00:00:40 +0530
committerDevendra2014-04-16 00:00:40 +0530
commit150ae1566d813acbb773839e919db2c0f467931c (patch)
tree6f74d6dcdcb0ecff6d8a51988d8a461b6f9a4668
parent99096b8c11b9a541f6350639e8735495cf90091c (diff)
downloadpubnub-python-150ae1566d813acbb773839e919db2c0f467931c.tar.bz2
adding code to support async and pam client capabilities with python v2 and v3
-rw-r--r--common/PubnubBase.py133
-rw-r--r--common/PubnubCoreAsync.py125
-rw-r--r--python-tornado/Pubnub.py286
-rw-r--r--python-tornado/examples/publish-example.py35
-rwxr-xr-xpython-tornado/tests/subscribe-test.py19
-rw-r--r--python-tornado/unassembled/Platform.py28
-rw-r--r--python-twisted/Pubnub.py325
-rw-r--r--python-twisted/examples/publish-example.py27
-rw-r--r--python-twisted/unassembled/Platform.py67
-rw-r--r--python/Pubnub.py376
-rwxr-xr-xpython/examples/publish-example.py87
-rwxr-xr-xpython/tests/subscribe-test.py19
-rw-r--r--python/unassembled/Platform.py120
13 files changed, 875 insertions, 772 deletions
diff --git a/common/PubnubBase.py b/common/PubnubBase.py
index ac41e0e..98c68eb 100644
--- a/common/PubnubBase.py
+++ b/common/PubnubBase.py
@@ -5,12 +5,13 @@ import time
import hashlib
import uuid
import sys
-from urllib import quote
+
+try: from urllib.parse import quote
+except: from urllib2 import quote
from base64 import urlsafe_b64encode
from hashlib import sha256
-from urllib import quote
-from urllib import urlopen
+
import hmac
@@ -62,12 +63,11 @@ class PubnubBase(object):
self.uuid = UUID or str(uuid.uuid4())
if type(sys.version_info) is tuple:
- self.python_version = 2
- self.pc = PubnubCrypto2()
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
else:
self.python_version = 3
self.pc = PubnubCrypto3()
-
if not isinstance(self.uuid, str):
raise AttributeError("pres_uuid must be a string")
@@ -186,7 +186,10 @@ class PubnubBase(object):
if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
else:
if (callback != None):callback(response)
- if (callback != None): return _new_format_callback
+ if (callback != None):
+ return _new_format_callback
+ else:
+ return None
def publish( self, args ) :
@@ -221,23 +224,28 @@ class PubnubBase(object):
if 'callback' in args :
callback = args['callback']
else :
- callback = None
+ callback = None
+
+ if 'error' in args :
+ error = args['error']
+ else :
+ error = None
- #message = json.dumps(args['message'], separators=(',',':'))
message = self.encrypt(args['message'])
- signature = self.sign(channel, message)
+ #signature = self.sign(channel, message)
## Send Message
return self._request({"urlcomponents": [
'publish',
self.publish_key,
self.subscribe_key,
- signature,
+ '0',
channel,
'0',
message
- ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
def presence( self, args ) :
"""
@@ -301,12 +309,10 @@ class PubnubBase(object):
"""
channel = str(args['channel'])
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
+
+ callback = args['callback'] if 'callback' in args else None
+ error = args['error'] if 'error' in args else None
+
## Fail if bad input.
if not channel :
raise Exception('Missing Channel')
@@ -317,59 +323,16 @@ class PubnubBase(object):
'v2','presence',
'sub_key', self.subscribe_key,
'channel', channel
- ]}, callback);
-
-
- def history( self, args ) :
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+ def history(self, args) :
"""
#**
#* History
#*
#* Load history from a channel.
#*
- #* @param array args with 'channel' and 'limit'.
- #* @return mixed false on fail, array on success.
- #*
-
- ## History Example
- history = pubnub.history({
- 'channel' : 'hello_world',
- 'limit' : 1
- })
- print(history)
-
- """
- ## Capture User Input
- limit = 'limit' in args and int(args['limit']) or 10
- channel = str(args['channel'])
-
- ## Fail if bad input.
- if not channel :
- raise Exception('Missing Channel')
- return False
-
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
- ## Get History
- return self._request({ "urlcomponents" : [
- 'history',
- self.subscribe_key,
- channel,
- '0',
- str(limit)
- ] }, callback);
-
- def detailedHistory(self, args) :
- """
- #**
- #* Detailed History
- #*
- #* Load Detailed history from a channel.
- #*
#* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count'
#* @return mixed false on fail, array on success.
#*
@@ -385,34 +348,21 @@ class PubnubBase(object):
## Capture User Input
channel = str(args['channel'])
- params = dict()
- count = 100
-
- if 'count' in args:
- count = int(args['count'])
-
- params['count'] = str(count)
-
- if 'reverse' in args:
- params['reverse'] = str(args['reverse']).lower()
+ callback = args['callback'] if 'callback' in args else None
+ error = args['error'] if 'error' in args else None
- if 'start' in args:
- params['start'] = str(args['start'])
+ params = dict()
- if 'end' in args:
- params['end'] = str(args['end'])
+ params['count'] = str(args['count']) if 'count' in args else 100
+ params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false'
+ params['start'] = str(args['start']) if 'start' in args else None
+ params['end'] = str(args['end']) if 'end' in args else None
## Fail if bad input.
if not channel :
raise Exception('Missing Channel')
return False
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
## Get History
return self._request({ 'urlcomponents' : [
'v2',
@@ -421,7 +371,8 @@ class PubnubBase(object):
self.subscribe_key,
'channel',
channel,
- ],'urlparams' : params }, callback=callback);
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
def time(self, args = None) :
"""
@@ -439,10 +390,9 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and 'callback' in args:
- callback = args['callback']
- else :
- callback = None
+
+ callback = callback if args and 'callback' in args else None
+
time = self._request({'urlcomponents' : [
'time',
'0'
@@ -466,5 +416,6 @@ class PubnubBase(object):
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
- url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()])
+ url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None])
+ #print(url)
return url
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py
index 4251d47..f7b57cc 100644
--- a/common/PubnubCoreAsync.py
+++ b/common/PubnubCoreAsync.py
@@ -5,9 +5,14 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
-import threading
-from threading import current_thread
-import threading
+
+class EmptyLock():
+ def __enter__(self):
+ pass
+ def __exit__(self,a,b,c):
+ pass
+
+empty_lock = EmptyLock()
class PubnubCoreAsync(PubnubBase):
@@ -23,7 +28,9 @@ class PubnubCoreAsync(PubnubBase):
auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
- uuid = None
+ uuid = None,
+ _tt_lock=empty_lock,
+ _channel_list_lock=empty_lock
) :
"""
#**
@@ -53,29 +60,20 @@ class PubnubCoreAsync(PubnubBase):
UUID=uuid
)
- self.subscriptions = {}
- self.timetoken = 0
- self.last_timetoken = 0
- self.version = '3.3.4'
- self.accept_encoding = 'gzip'
- self.SUB_RECEIVER = None
- self._connect = None
- self._tt_lock = threading.RLock()
+ self.subscriptions = {}
+ self.timetoken = 0
+ self.last_timetoken = 0
+ self.version = '3.3.4'
+ self.accept_encoding = 'gzip'
+ self.SUB_RECEIVER = None
+ self._connect = None
+ self._tt_lock = _tt_lock
+ self._channel_list_lock = _channel_list_lock
def get_channel_list(self, channels):
channel = ''
first = True
- if self._channel_list_lock:
- with self._channel_list_lock:
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
- else:
+ with self._channel_list_lock:
for ch in channels:
if not channels[ch]['subscribed']:
continue
@@ -84,9 +82,15 @@ class PubnubCoreAsync(PubnubBase):
else:
first = False
channel += ch
-
return channel
+
+ def each(l, func):
+ if func is None:
+ return
+ for i in l:
+ func(i)
+
def subscribe( self, args=None, sync=False ) :
"""
#**
@@ -122,12 +126,12 @@ class PubnubCoreAsync(PubnubBase):
if args is None:
_invoke(error, "Arguments Missing")
return
- channel = args['channel'] if 'channel' in args else None
- callback = args['callback'] if 'callback' in args else None
- connect = args['connect'] if 'connect' in args else None
- disconnect = args['disconnect'] if 'disconnect' in args else None
- reconnect = args['reconnect'] if 'reconnect' in args else None
- error = args['error'] if 'error' in args else None
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
with self._tt_lock:
self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
@@ -160,10 +164,15 @@ class PubnubCoreAsync(PubnubBase):
chobj['connected'] = True
_invoke(chobj['connect'],chobj['name'])
- def _invoke_error(err=None):
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- _invoke(chobj.error,err)
+ def _invoke_error(channel_list=None, err=None):
+ if channel_list is None:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
+ else:
+ for ch in channel_list:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
'''
if callback is None:
@@ -184,19 +193,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- if self._channel_list_lock:
- with self._channel_list_lock:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
- else:
+ with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
'first' : False,
@@ -205,9 +202,11 @@ class PubnubCoreAsync(PubnubBase):
'callback' : callback,
'connect' : connect,
'disconnect' : disconnect,
- 'reconnect' : reconnect
+ 'reconnect' : reconnect,
+ 'error' : error
}
+
## return if already connected to channel
if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
@@ -222,8 +221,11 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- if not response or error in response:
- _invoke_error()
+ #print response
+ if not response or ('message' in response and response['message'] == 'Forbidden'):
+ _invoke_error(response['payload']['channels'], response['message'])
+ _connect()
+ return
_invoke_connect()
@@ -250,7 +252,6 @@ class PubnubCoreAsync(PubnubBase):
_connect()
-
channel_list = self.get_channel_list(self.subscriptions)
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
@@ -260,9 +261,9 @@ class PubnubCoreAsync(PubnubBase):
channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )
except Exception as e:
- print e
+ print(e)
self.timeout( 1, _connect)
return
@@ -283,14 +284,18 @@ class PubnubCoreAsync(PubnubBase):
def unsubscribe( self, args ):
- #print(args['channel'])
- channel = str(args['channel'])
- if not (channel in self.subscriptions):
+
+ if 'channel' in self.subscriptions is False:
return False
+ channel = str(args['channel'])
+
+
## DISCONNECT
- self.subscriptions[channel]['connected'] = 0
- self.subscriptions[channel]['subscribed'] = False
- self.subscriptions[channel]['timetoken'] = 0
- self.subscriptions[channel]['first'] = False
+ with self._channel_list_lock:
+ if channel in self.subscriptions:
+ self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
+ self.subscriptions[channel]['timetoken'] = 0
+ self.subscriptions[channel]['first'] = False
self.CONNECT()
diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py
index 61f7c3d..ccff021 100644
--- a/python-tornado/Pubnub.py
+++ b/python-tornado/Pubnub.py
@@ -176,12 +176,13 @@ import time
import hashlib
import uuid
import sys
-from urllib import quote
+
+try: from urllib.parse import quote
+except: from urllib2 import quote
from base64 import urlsafe_b64encode
from hashlib import sha256
-from urllib import quote
-from urllib import urlopen
+
import hmac
@@ -233,12 +234,11 @@ class PubnubBase(object):
self.uuid = UUID or str(uuid.uuid4())
if type(sys.version_info) is tuple:
- self.python_version = 2
- self.pc = PubnubCrypto2()
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
else:
self.python_version = 3
self.pc = PubnubCrypto3()
-
if not isinstance(self.uuid, str):
raise AttributeError("pres_uuid must be a string")
@@ -357,7 +357,10 @@ class PubnubBase(object):
if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
else:
if (callback != None):callback(response)
- if (callback != None): return _new_format_callback
+ if (callback != None):
+ return _new_format_callback
+ else:
+ return None
def publish( self, args ) :
@@ -392,23 +395,28 @@ class PubnubBase(object):
if 'callback' in args :
callback = args['callback']
else :
- callback = None
+ callback = None
+
+ if 'error' in args :
+ error = args['error']
+ else :
+ error = None
- #message = json.dumps(args['message'], separators=(',',':'))
message = self.encrypt(args['message'])
- signature = self.sign(channel, message)
+ #signature = self.sign(channel, message)
## Send Message
return self._request({"urlcomponents": [
'publish',
self.publish_key,
self.subscribe_key,
- signature,
+ '0',
channel,
'0',
message
- ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
def presence( self, args ) :
"""
@@ -472,12 +480,10 @@ class PubnubBase(object):
"""
channel = str(args['channel'])
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
+
+ callback = args['callback'] if 'callback' in args else None
+ error = args['error'] if 'error' in args else None
+
## Fail if bad input.
if not channel :
raise Exception('Missing Channel')
@@ -488,59 +494,16 @@ class PubnubBase(object):
'v2','presence',
'sub_key', self.subscribe_key,
'channel', channel
- ]}, callback);
-
-
- def history( self, args ) :
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+ def history(self, args) :
"""
#**
#* History
#*
#* Load history from a channel.
#*
- #* @param array args with 'channel' and 'limit'.
- #* @return mixed false on fail, array on success.
- #*
-
- ## History Example
- history = pubnub.history({
- 'channel' : 'hello_world',
- 'limit' : 1
- })
- print(history)
-
- """
- ## Capture User Input
- limit = 'limit' in args and int(args['limit']) or 10
- channel = str(args['channel'])
-
- ## Fail if bad input.
- if not channel :
- raise Exception('Missing Channel')
- return False
-
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
- ## Get History
- return self._request({ "urlcomponents" : [
- 'history',
- self.subscribe_key,
- channel,
- '0',
- str(limit)
- ] }, callback);
-
- def detailedHistory(self, args) :
- """
- #**
- #* Detailed History
- #*
- #* Load Detailed history from a channel.
- #*
#* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count'
#* @return mixed false on fail, array on success.
#*
@@ -556,34 +519,21 @@ class PubnubBase(object):
## Capture User Input
channel = str(args['channel'])
- params = dict()
- count = 100
-
- if 'count' in args:
- count = int(args['count'])
-
- params['count'] = str(count)
-
- if 'reverse' in args:
- params['reverse'] = str(args['reverse']).lower()
+ callback = args['callback'] if 'callback' in args else None
+ error = args['error'] if 'error' in args else None
- if 'start' in args:
- params['start'] = str(args['start'])
+ params = dict()
- if 'end' in args:
- params['end'] = str(args['end'])
+ params['count'] = str(args['count']) if 'count' in args else 100
+ params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false'
+ params['start'] = str(args['start']) if 'start' in args else None
+ params['end'] = str(args['end']) if 'end' in args else None
## Fail if bad input.
if not channel :
raise Exception('Missing Channel')
return False
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
## Get History
return self._request({ 'urlcomponents' : [
'v2',
@@ -592,7 +542,8 @@ class PubnubBase(object):
self.subscribe_key,
'channel',
channel,
- ],'urlparams' : params }, callback=callback);
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
def time(self, args = None) :
"""
@@ -610,10 +561,9 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and 'callback' in args:
- callback = args['callback']
- else :
- callback = None
+
+ callback = callback if args and 'callback' in args else None
+
time = self._request({'urlcomponents' : [
'time',
'0'
@@ -637,7 +587,8 @@ class PubnubBase(object):
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
- url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()])
+ url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None])
+ #print(url)
return url
@@ -648,9 +599,14 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
-import threading
-from threading import current_thread
-import threading
+
+class EmptyLock():
+ def __enter__(self):
+ pass
+ def __exit__(self,a,b,c):
+ pass
+
+empty_lock = EmptyLock()
class PubnubCoreAsync(PubnubBase):
@@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase):
auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
- uuid = None
+ uuid = None,
+ _tt_lock=empty_lock,
+ _channel_list_lock=empty_lock
) :
"""
#**
@@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase):
UUID=uuid
)
- self.subscriptions = {}
- self.timetoken = 0
- self.last_timetoken = 0
- self.version = '3.3.4'
- self.accept_encoding = 'gzip'
- self.SUB_RECEIVER = None
- self._connect = None
- self._tt_lock = threading.RLock()
+ self.subscriptions = {}
+ self.timetoken = 0
+ self.last_timetoken = 0
+ self.version = '3.3.4'
+ self.accept_encoding = 'gzip'
+ self.SUB_RECEIVER = None
+ self._connect = None
+ self._tt_lock = _tt_lock
+ self._channel_list_lock = _channel_list_lock
def get_channel_list(self, channels):
channel = ''
first = True
- if self._channel_list_lock:
- with self._channel_list_lock:
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
- else:
+ with self._channel_list_lock:
for ch in channels:
if not channels[ch]['subscribed']:
continue
@@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase):
else:
first = False
channel += ch
-
return channel
+
+ def each(l, func):
+ if func is None:
+ return
+ for i in l:
+ func(i)
+
def subscribe( self, args=None, sync=False ) :
"""
#**
@@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase):
if args is None:
_invoke(error, "Arguments Missing")
return
- channel = args['channel'] if 'channel' in args else None
- callback = args['callback'] if 'callback' in args else None
- connect = args['connect'] if 'connect' in args else None
- disconnect = args['disconnect'] if 'disconnect' in args else None
- reconnect = args['reconnect'] if 'reconnect' in args else None
- error = args['error'] if 'error' in args else None
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
with self._tt_lock:
self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
@@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase):
chobj['connected'] = True
_invoke(chobj['connect'],chobj['name'])
- def _invoke_error(err=None):
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- _invoke(chobj.error,err)
+ def _invoke_error(channel_list=None, err=None):
+ if channel_list is None:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
+ else:
+ for ch in channel_list:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
'''
if callback is None:
@@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- if self._channel_list_lock:
- with self._channel_list_lock:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
- else:
+ with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
'first' : False,
@@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase):
'callback' : callback,
'connect' : connect,
'disconnect' : disconnect,
- 'reconnect' : reconnect
+ 'reconnect' : reconnect,
+ 'error' : error
}
+
## return if already connected to channel
if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
@@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- if not response or error in response:
- _invoke_error()
+ #print response
+ if not response or ('message' in response and response['message'] == 'Forbidden'):
+ _invoke_error(response['payload']['channels'], response['message'])
+ _connect()
+ return
_invoke_connect()
@@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase):
_connect()
-
channel_list = self.get_channel_list(self.subscriptions)
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
@@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase):
channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )
except Exception as e:
- print e
+ print(e)
self.timeout( 1, _connect)
return
@@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase):
def unsubscribe( self, args ):
- #print(args['channel'])
- channel = str(args['channel'])
- if not (channel in self.subscriptions):
+
+ if 'channel' in self.subscriptions is False:
return False
+ channel = str(args['channel'])
+
+
## DISCONNECT
- self.subscriptions[channel]['connected'] = 0
- self.subscriptions[channel]['subscribed'] = False
- self.subscriptions[channel]['timetoken'] = 0
- self.subscriptions[channel]['first'] = False
+ with self._channel_list_lock:
+ if channel in self.subscriptions:
+ self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
+ self.subscriptions[channel]['timetoken'] = 0
+ self.subscriptions[channel]['first'] = False
self.CONNECT()
@@ -984,9 +940,13 @@ class Pubnub(PubnubCoreAsync):
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
self.id = None
- self._channel_list_lock = None
+
+ def _request( self, request, callback=None, error=None, single=False ) :
+
+ def _invoke(func, data):
+ if func is not None:
+ func(data)
- def _request( self, request, callback, single=False ) :
url = self.getUrl(request)
request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )
if single is True:
@@ -997,18 +957,30 @@ class Pubnub(PubnubCoreAsync):
if single is True:
if not id == self.id:
return None
-
+
body = response._get_body()
+
if body is None:
return
-
+ #print(body)
def handle_exc(*args):
return True
if response.error is not None:
with ExceptionStackContext(handle_exc):
response.rethrow()
- elif callback:
- callback(eval(response._get_body()))
+ return
+ try:
+ data = json.loads(body)
+ except TypeError as e:
+ try:
+ data = json.loads(body.decode("utf-8"))
+ except:
+ _invoke(error, {'error' : 'json decode error'})
+
+ if 'error' in data and 'status' in data and 'status' != 200:
+ _invoke(error, data)
+ else:
+ _invoke(callback, data)
self.http.fetch(
request=request,
diff --git a/python-tornado/examples/publish-example.py b/python-tornado/examples/publish-example.py
index b9eaa15..bb8b199 100644
--- a/python-tornado/examples/publish-example.py
+++ b/python-tornado/examples/publish-example.py
@@ -10,54 +10,59 @@
## -----------------------------------
import sys
-import tornado
+from twisted.internet import reactor
sys.path.append('../')
-sys.path.append('../..')
-sys.path.append('../../common')
+sys.path.append('../../')
from Pubnub import Pubnub
publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
-cipher_key = len(sys.argv) > 4 and sys.argv[4] or 'demo' ##(Cipher key is Optional)
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or '' ##(Cipher key is Optional)
ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
## -----------------------------------------------------------------------
## Initiate Pubnub State
## -----------------------------------------------------------------------
-pubnub = Pubnub( publish_key=publish_key, subscribe_key=subscribe_key, secret_key=secret_key,cipher_key=cipher_key, ssl_on=ssl_on )
-#pubnub = Pubnub( publish_key, subscribe_key, secret_key, ssl_on )
+pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
crazy = 'hello_world'
+## -----------------------------------------------------------------------
+## Publish Example
+## -----------------------------------------------------------------------
def publish_complete(info):
print(info)
+def publish_error(info):
+ print('ERROR : ' + str(info))
+
## Publish string
pubnub.publish({
'channel' : crazy,
'message' : 'Hello World!',
- 'callback' : publish_complete
+ 'callback' : publish_complete,
+ 'error' : publish_error
})
## Publish list
li = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
+
pubnub.publish({
'channel' : crazy,
'message' : li,
- 'callback' : publish_complete
+ 'callback' : publish_complete,
+ 'error' : publish_error
})
def done_cb(info):
publish_complete(info)
- tornado.ioloop.IOLoop.instance().stop()
-## Publish Dictionary Object
pubnub.publish({
'channel' : crazy,
'message' : { 'some_key' : 'some_val' },
- 'callback' : done_cb
+ 'callback' : done_cb,
+ 'error' : publish_error
})
-## -----------------------------------------------------------------------
-## IO Event Loop
-## -----------------------------------------------------------------------
-tornado.ioloop.IOLoop.instance().start()
+
+
+pubnub.start()
diff --git a/python-tornado/tests/subscribe-test.py b/python-tornado/tests/subscribe-test.py
index 0d4c65e..be4a416 100755
--- a/python-tornado/tests/subscribe-test.py
+++ b/python-tornado/tests/subscribe-test.py
@@ -38,31 +38,31 @@ received = 0
## Subscribe Example
## -----------------------------------------------------------------------
def message_received(message):
- print message
+ print(message)
def check_received(message):
global current
global errors
global received
- print message
- print current
+ print(message)
+ print(current)
if message <= current:
- print 'ERROR'
+ print('ERROR')
#sys.exit()
errors += 1
else:
received += 1
- print 'active thread count : ', threading.activeCount()
- print 'errors = ' , errors
- print current_thread().getName(), ' , ', 'received = ', received
+ print('active thread count : ' + str( threading.activeCount()))
+ print('errors = ' + str(errors))
+ print(current_thread().getName() + ' , ' + 'received = ' + str(received))
if received != message:
- print '********** MISSED **************** ', message - received
+ print('********** MISSED **************** ' + str( message - received ))
current = message
def connected_test(ch) :
- print 'Connected' , ch
+ print('Connected ' + ch)
def connected(ch) :
pass
@@ -103,7 +103,6 @@ def subscribe(channel):
})
-print threading.activeCount()
pubnub.timeout(15,cb1)
diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py
index 501993e..5200136 100644
--- a/python-tornado/unassembled/Platform.py
+++ b/python-tornado/unassembled/Platform.py
@@ -43,9 +43,13 @@ class Pubnub(PubnubCoreAsync):
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
self.id = None
- self._channel_list_lock = None
+
+ def _request( self, request, callback=None, error=None, single=False ) :
+
+ def _invoke(func, data):
+ if func is not None:
+ func(data)
- def _request( self, request, callback, single=False ) :
url = self.getUrl(request)
request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )
if single is True:
@@ -56,18 +60,30 @@ class Pubnub(PubnubCoreAsync):
if single is True:
if not id == self.id:
return None
-
+
body = response._get_body()
+
if body is None:
return
-
+ #print(body)
def handle_exc(*args):
return True
if response.error is not None:
with ExceptionStackContext(handle_exc):
response.rethrow()
- elif callback:
- callback(eval(response._get_body()))
+ return
+ try:
+ data = json.loads(body)
+ except TypeError as e:
+ try:
+ data = json.loads(body.decode("utf-8"))
+ except:
+ _invoke(error, {'error' : 'json decode error'})
+
+ if 'error' in data and 'status' in data and 'status' != 200:
+ _invoke(error, data)
+ else:
+ _invoke(callback, data)
self.http.fetch(
request=request,
diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py
index 7171efe..383ee6d 100644
--- a/python-twisted/Pubnub.py
+++ b/python-twisted/Pubnub.py
@@ -176,12 +176,13 @@ import time
import hashlib
import uuid
import sys
-from urllib import quote
+
+try: from urllib.parse import quote
+except: from urllib2 import quote
from base64 import urlsafe_b64encode
from hashlib import sha256
-from urllib import quote
-from urllib import urlopen
+
import hmac
@@ -233,12 +234,11 @@ class PubnubBase(object):
self.uuid = UUID or str(uuid.uuid4())
if type(sys.version_info) is tuple:
- self.python_version = 2
- self.pc = PubnubCrypto2()
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
else:
self.python_version = 3
self.pc = PubnubCrypto3()
-
if not isinstance(self.uuid, str):
raise AttributeError("pres_uuid must be a string")
@@ -357,7 +357,10 @@ class PubnubBase(object):
if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
else:
if (callback != None):callback(response)
- if (callback != None): return _new_format_callback
+ if (callback != None):
+ return _new_format_callback
+ else:
+ return None
def publish( self, args ) :
@@ -392,23 +395,28 @@ class PubnubBase(object):
if 'callback' in args :
callback = args['callback']
else :
- callback = None
+ callback = None
+
+ if 'error' in args :
+ error = args['error']
+ else :
+ error = None
- #message = json.dumps(args['message'], separators=(',',':'))
message = self.encrypt(args['message'])
- signature = self.sign(channel, message)
+ #signature = self.sign(channel, message)
## Send Message
return self._request({"urlcomponents": [
'publish',
self.publish_key,
self.subscribe_key,
- signature,
+ '0',
channel,
'0',
message
- ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
def presence( self, args ) :
"""
@@ -472,12 +480,10 @@ class PubnubBase(object):
"""
channel = str(args['channel'])
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
+
+ callback = args['callback'] if 'callback' in args else None
+ error = args['error'] if 'error' in args else None
+
## Fail if bad input.
if not channel :
raise Exception('Missing Channel')
@@ -488,59 +494,16 @@ class PubnubBase(object):
'v2','presence',
'sub_key', self.subscribe_key,
'channel', channel
- ]}, callback);
-
-
- def history( self, args ) :
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+ def history(self, args) :
"""
#**
#* History
#*
#* Load history from a channel.
#*
- #* @param array args with 'channel' and 'limit'.
- #* @return mixed false on fail, array on success.
- #*
-
- ## History Example
- history = pubnub.history({
- 'channel' : 'hello_world',
- 'limit' : 1
- })
- print(history)
-
- """
- ## Capture User Input
- limit = 'limit' in args and int(args['limit']) or 10
- channel = str(args['channel'])
-
- ## Fail if bad input.
- if not channel :
- raise Exception('Missing Channel')
- return False
-
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
- ## Get History
- return self._request({ "urlcomponents" : [
- 'history',
- self.subscribe_key,
- channel,
- '0',
- str(limit)
- ] }, callback);
-
- def detailedHistory(self, args) :
- """
- #**
- #* Detailed History
- #*
- #* Load Detailed history from a channel.
- #*
#* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count'
#* @return mixed false on fail, array on success.
#*
@@ -556,34 +519,21 @@ class PubnubBase(object):
## Capture User Input
channel = str(args['channel'])
- params = dict()
- count = 100
-
- if 'count' in args:
- count = int(args['count'])
-
- params['count'] = str(count)
-
- if 'reverse' in args:
- params['reverse'] = str(args['reverse']).lower()
+ callback = args['callback'] if 'callback' in args else None
+ error = args['error'] if 'error' in args else None
- if 'start' in args:
- params['start'] = str(args['start'])
+ params = dict()
- if 'end' in args:
- params['end'] = str(args['end'])
+ params['count'] = str(args['count']) if 'count' in args else 100
+ params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false'
+ params['start'] = str(args['start']) if 'start' in args else None
+ params['end'] = str(args['end']) if 'end' in args else None
## Fail if bad input.
if not channel :
raise Exception('Missing Channel')
return False
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
## Get History
return self._request({ 'urlcomponents' : [
'v2',
@@ -592,7 +542,8 @@ class PubnubBase(object):
self.subscribe_key,
'channel',
channel,
- ],'urlparams' : params }, callback=callback);
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
def time(self, args = None) :
"""
@@ -610,10 +561,9 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and 'callback' in args:
- callback = args['callback']
- else :
- callback = None
+
+ callback = callback if args and 'callback' in args else None
+
time = self._request({'urlcomponents' : [
'time',
'0'
@@ -637,7 +587,8 @@ class PubnubBase(object):
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
- url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()])
+ url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None])
+ #print(url)
return url
@@ -648,9 +599,14 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
-import threading
-from threading import current_thread
-import threading
+
+class EmptyLock():
+ def __enter__(self):
+ pass
+ def __exit__(self,a,b,c):
+ pass
+
+empty_lock = EmptyLock()
class PubnubCoreAsync(PubnubBase):
@@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase):
auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
- uuid = None
+ uuid = None,
+ _tt_lock=empty_lock,
+ _channel_list_lock=empty_lock
) :
"""
#**
@@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase):
UUID=uuid
)
- self.subscriptions = {}
- self.timetoken = 0
- self.last_timetoken = 0
- self.version = '3.3.4'
- self.accept_encoding = 'gzip'
- self.SUB_RECEIVER = None
- self._connect = None
- self._tt_lock = threading.RLock()
+ self.subscriptions = {}
+ self.timetoken = 0
+ self.last_timetoken = 0
+ self.version = '3.3.4'
+ self.accept_encoding = 'gzip'
+ self.SUB_RECEIVER = None
+ self._connect = None
+ self._tt_lock = _tt_lock
+ self._channel_list_lock = _channel_list_lock
def get_channel_list(self, channels):
channel = ''
first = True
- if self._channel_list_lock:
- with self._channel_list_lock:
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
- else:
+ with self._channel_list_lock:
for ch in channels:
if not channels[ch]['subscribed']:
continue
@@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase):
else:
first = False
channel += ch
-
return channel
+
+ def each(l, func):
+ if func is None:
+ return
+ for i in l:
+ func(i)
+
def subscribe( self, args=None, sync=False ) :
"""
#**
@@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase):
if args is None:
_invoke(error, "Arguments Missing")
return
- channel = args['channel'] if 'channel' in args else None
- callback = args['callback'] if 'callback' in args else None
- connect = args['connect'] if 'connect' in args else None
- disconnect = args['disconnect'] if 'disconnect' in args else None
- reconnect = args['reconnect'] if 'reconnect' in args else None
- error = args['error'] if 'error' in args else None
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
with self._tt_lock:
self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
@@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase):
chobj['connected'] = True
_invoke(chobj['connect'],chobj['name'])
- def _invoke_error(err=None):
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- _invoke(chobj.error,err)
+ def _invoke_error(channel_list=None, err=None):
+ if channel_list is None:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
+ else:
+ for ch in channel_list:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
'''
if callback is None:
@@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- if self._channel_list_lock:
- with self._channel_list_lock:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
- else:
+ with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
'first' : False,
@@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase):
'callback' : callback,
'connect' : connect,
'disconnect' : disconnect,
- 'reconnect' : reconnect
+ 'reconnect' : reconnect,
+ 'error' : error
}
+
## return if already connected to channel
if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
@@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- if not response or error in response:
- _invoke_error()
+ #print response
+ if not response or ('message' in response and response['message'] == 'Forbidden'):
+ _invoke_error(response['payload']['channels'], response['message'])
+ _connect()
+ return
_invoke_connect()
@@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase):
_connect()
-
channel_list = self.get_channel_list(self.subscriptions)
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
@@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase):
channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )
except Exception as e:
- print e
+ print(e)
self.timeout( 1, _connect)
return
@@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase):
def unsubscribe( self, args ):
- #print(args['channel'])
- channel = str(args['channel'])
- if not (channel in self.subscriptions):
+
+ if 'channel' in self.subscriptions is False:
return False
+ channel = str(args['channel'])
+
+
## DISCONNECT
- self.subscriptions[channel]['connected'] = 0
- self.subscriptions[channel]['subscribed'] = False
- self.subscriptions[channel]['timetoken'] = 0
- self.subscriptions[channel]['first'] = False
+ with self._channel_list_lock:
+ if channel in self.subscriptions:
+ self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
+ self.subscriptions[channel]['timetoken'] = 0
+ self.subscriptions[channel]['first'] = False
self.CONNECT()
@@ -951,6 +907,9 @@ from twisted.internet.task import LoopingCall
import twisted
from hashlib import sha256
import time
+import json
+from twisted.python.compat import (
+ _PY3, unicode, intToBytes, networkString, nativeString)
pnconn_pool = HTTPConnectionPool(reactor, persistent=True)
pnconn_pool.maxPersistentPerHost = 100000
@@ -968,8 +927,9 @@ class Pubnub(PubnubCoreAsync):
self,
publish_key,
subscribe_key,
- secret_key = False,
- cipher_key = False,
+ secret_key=False,
+ cipher_key=False,
+ auth_key=None,
ssl_on = False,
origin = 'pubsub.pubnub.com'
) :
@@ -978,6 +938,7 @@ class Pubnub(PubnubCoreAsync):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
)
@@ -985,11 +946,14 @@ class Pubnub(PubnubCoreAsync):
self.headers['User-Agent'] = ['Python-Twisted']
#self.headers['Accept-Encoding'] = [self.accept_encoding]
self.headers['V'] = [self.version]
- self._channel_list_lock = None
- def _request( self, request, callback, single=False ) :
+ def _request( self, request, callback=None, error=None, single=False ) :
global pnconn_pool
+ def _invoke(func, data):
+ if func is not None:
+ func(data)
+
## Build URL
'''
url = self.origin + '/' + "/".join([
@@ -1006,7 +970,12 @@ class Pubnub(PubnubCoreAsync):
pool = self.ssl and None or pnconn_pool
)), [('gzip', GzipDecoder)])
- request = agent.request( 'GET', url, Headers(self.headers), None )
+ try:
+ request = agent.request( 'GET', url, Headers(self.headers), None )
+ except TypeError as te:
+ print(url.encode())
+ request = agent.request( 'GET', url.encode(), Headers(self.headers), None )
+
if single is True:
id = time.time()
@@ -1014,35 +983,65 @@ class Pubnub(PubnubCoreAsync):
def received(response):
finished = Deferred()
- response.deliverBody(PubNubResponse(finished))
+ if response.code == 403:
+ response.deliverBody(PubNub403Response(finished))
+ else:
+ response.deliverBody(PubNubResponse(finished))
+
+ return finished
+
+ def error_handler(response):
+ finished = Deferred()
+ if response.code == 403:
+ response.deliverBody(PubNub403Response(finished))
+ else:
+ response.deliverBody(PubNubResponse(finished))
+
return finished
def complete(data):
if single is True:
- if not id == self.id:
+ if id != self.id:
return None
try:
- callback(eval(data))
+ data = json.loads(data)
except Exception as e:
- pass
- #need error handling here
+ try:
+ data = json.loads(data.decode("utf-8"))
+ except:
+ _invoke(error, {'error' : 'json decode error'})
+
+ if 'error' in data and 'status' in data and 'status' != 200:
+ _invoke(error, data)
+ else:
+ _invoke(callback, data)
def abort():
pass
request.addCallback(received)
- request.addBoth(complete)
+ request.addCallback(complete)
+ request.addErrback(error_handler)
return abort
class WebClientContextFactory(ClientContextFactory):
def getContext(self, hostname, port):
return ClientContextFactory.getContext(self)
+
+class PubNub403Response(Protocol):
+ def __init__( self, finished ):
+ self.finished = finished
+
+ def dataReceived( self, bytes ):
+ #print '403 resp ', bytes
+ self.finished.callback(bytes)
class PubNubResponse(Protocol):
def __init__( self, finished ):
self.finished = finished
def dataReceived( self, bytes ):
- self.finished.callback(bytes)
+ #print bytes
+ self.finished.callback(bytes)
diff --git a/python-twisted/examples/publish-example.py b/python-twisted/examples/publish-example.py
index d09ad8d..d15b21b 100644
--- a/python-twisted/examples/publish-example.py
+++ b/python-twisted/examples/publish-example.py
@@ -19,12 +19,13 @@ publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
cipher_key = len(sys.argv) > 4 and sys.argv[4] or '' ##(Cipher key is Optional)
-ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+auth_key = len(sys.argv) > 5 and sys.argv[5] or 'abcd' ##(Cipher key is Optional)
+ssl_on = len(sys.argv) > 6 and bool(sys.argv[5]) or False
## -----------------------------------------------------------------------
## Initiate Pubnub State
## -----------------------------------------------------------------------
-pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
+pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, auth_key, ssl_on )
crazy = 'hello_world'
## -----------------------------------------------------------------------
@@ -33,32 +34,36 @@ crazy = 'hello_world'
def publish_complete(info):
print(info)
+def publish_error(info):
+ print('ERROR : ' + str(info))
+
## Publish string
pubnub.publish({
'channel' : crazy,
'message' : 'Hello World!',
- 'callback' : publish_complete
+ 'callback' : publish_complete,
+ 'error' : publish_error
})
## Publish list
li = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
+
pubnub.publish({
'channel' : crazy,
'message' : li,
- 'callback' : publish_complete
+ 'callback' : publish_complete,
+ 'error' : publish_error
})
def done_cb(info):
publish_complete(info)
- reactor.stop()
-## Publish Dictionary Object
+
pubnub.publish({
'channel' : crazy,
'message' : { 'some_key' : 'some_val' },
- 'callback' : done_cb
+ 'callback' : done_cb,
+ 'error' : publish_error
})
-## -----------------------------------------------------------------------
-## IO Event Loop
-## -----------------------------------------------------------------------
-reactor.run()
+
+pubnub.start()
diff --git a/python-twisted/unassembled/Platform.py b/python-twisted/unassembled/Platform.py
index 5268446..c7fe5cd 100644
--- a/python-twisted/unassembled/Platform.py
+++ b/python-twisted/unassembled/Platform.py
@@ -10,6 +10,9 @@ from twisted.internet.task import LoopingCall
import twisted
from hashlib import sha256
import time
+import json
+from twisted.python.compat import (
+ _PY3, unicode, intToBytes, networkString, nativeString)
pnconn_pool = HTTPConnectionPool(reactor, persistent=True)
pnconn_pool.maxPersistentPerHost = 100000
@@ -27,8 +30,9 @@ class Pubnub(PubnubCoreAsync):
self,
publish_key,
subscribe_key,
- secret_key = False,
- cipher_key = False,
+ secret_key=False,
+ cipher_key=False,
+ auth_key=None,
ssl_on = False,
origin = 'pubsub.pubnub.com'
) :
@@ -37,6 +41,7 @@ class Pubnub(PubnubCoreAsync):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
)
@@ -44,11 +49,14 @@ class Pubnub(PubnubCoreAsync):
self.headers['User-Agent'] = ['Python-Twisted']
#self.headers['Accept-Encoding'] = [self.accept_encoding]
self.headers['V'] = [self.version]
- self._channel_list_lock = None
- def _request( self, request, callback, single=False ) :
+ def _request( self, request, callback=None, error=None, single=False ) :
global pnconn_pool
+ def _invoke(func, data):
+ if func is not None:
+ func(data)
+
## Build URL
'''
url = self.origin + '/' + "/".join([
@@ -65,7 +73,12 @@ class Pubnub(PubnubCoreAsync):
pool = self.ssl and None or pnconn_pool
)), [('gzip', GzipDecoder)])
- request = agent.request( 'GET', url, Headers(self.headers), None )
+ try:
+ request = agent.request( 'GET', url, Headers(self.headers), None )
+ except TypeError as te:
+ print(url.encode())
+ request = agent.request( 'GET', url.encode(), Headers(self.headers), None )
+
if single is True:
id = time.time()
@@ -73,35 +86,65 @@ class Pubnub(PubnubCoreAsync):
def received(response):
finished = Deferred()
- response.deliverBody(PubNubResponse(finished))
+ if response.code == 403:
+ response.deliverBody(PubNub403Response(finished))
+ else:
+ response.deliverBody(PubNubResponse(finished))
+
+ return finished
+
+ def error_handler(response):
+ finished = Deferred()
+ if response.code == 403:
+ response.deliverBody(PubNub403Response(finished))
+ else:
+ response.deliverBody(PubNubResponse(finished))
+
return finished
def complete(data):
if single is True:
- if not id == self.id:
+ if id != self.id:
return None
try:
- callback(eval(data))
+ data = json.loads(data)
except Exception as e:
- pass
- #need error handling here
+ try:
+ data = json.loads(data.decode("utf-8"))
+ except:
+ _invoke(error, {'error' : 'json decode error'})
+
+ if 'error' in data and 'status' in data and 'status' != 200:
+ _invoke(error, data)
+ else:
+ _invoke(callback, data)
def abort():
pass
request.addCallback(received)
- request.addBoth(complete)
+ request.addCallback(complete)
+ request.addErrback(error_handler)
return abort
class WebClientContextFactory(ClientContextFactory):
def getContext(self, hostname, port):
return ClientContextFactory.getContext(self)
+
+class PubNub403Response(Protocol):
+ def __init__( self, finished ):
+ self.finished = finished
+
+ def dataReceived( self, bytes ):
+ #print '403 resp ', bytes
+ self.finished.callback(bytes)
class PubNubResponse(Protocol):
def __init__( self, finished ):
self.finished = finished
def dataReceived( self, bytes ):
- self.finished.callback(bytes)
+ #print bytes
+ self.finished.callback(bytes)
diff --git a/python/Pubnub.py b/python/Pubnub.py
index f3c518c..95eafd0 100644
--- a/python/Pubnub.py
+++ b/python/Pubnub.py
@@ -176,12 +176,13 @@ import time
import hashlib
import uuid
import sys
-from urllib import quote
+
+try: from urllib.parse import quote
+except: from urllib2 import quote
from base64 import urlsafe_b64encode
from hashlib import sha256
-from urllib import quote
-from urllib import urlopen
+
import hmac
@@ -233,12 +234,11 @@ class PubnubBase(object):
self.uuid = UUID or str(uuid.uuid4())
if type(sys.version_info) is tuple:
- self.python_version = 2
- self.pc = PubnubCrypto2()
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
else:
self.python_version = 3
self.pc = PubnubCrypto3()
-
if not isinstance(self.uuid, str):
raise AttributeError("pres_uuid must be a string")
@@ -357,7 +357,10 @@ class PubnubBase(object):
if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
else:
if (callback != None):callback(response)
- if (callback != None): return _new_format_callback
+ if (callback != None):
+ return _new_format_callback
+ else:
+ return None
def publish( self, args ) :
@@ -392,23 +395,28 @@ class PubnubBase(object):
if 'callback' in args :
callback = args['callback']
else :
- callback = None
+ callback = None
+
+ if 'error' in args :
+ error = args['error']
+ else :
+ error = None
- #message = json.dumps(args['message'], separators=(',',':'))
message = self.encrypt(args['message'])
- signature = self.sign(channel, message)
+ #signature = self.sign(channel, message)
## Send Message
return self._request({"urlcomponents": [
'publish',
self.publish_key,
self.subscribe_key,
- signature,
+ '0',
channel,
'0',
message
- ], 'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
def presence( self, args ) :
"""
@@ -472,12 +480,10 @@ class PubnubBase(object):
"""
channel = str(args['channel'])
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
+
+ callback = args['callback'] if 'callback' in args else None
+ error = args['error'] if 'error' in args else None
+
## Fail if bad input.
if not channel :
raise Exception('Missing Channel')
@@ -488,59 +494,16 @@ class PubnubBase(object):
'v2','presence',
'sub_key', self.subscribe_key,
'channel', channel
- ]}, callback);
-
-
- def history( self, args ) :
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
+
+ def history(self, args) :
"""
#**
#* History
#*
#* Load history from a channel.
#*
- #* @param array args with 'channel' and 'limit'.
- #* @return mixed false on fail, array on success.
- #*
-
- ## History Example
- history = pubnub.history({
- 'channel' : 'hello_world',
- 'limit' : 1
- })
- print(history)
-
- """
- ## Capture User Input
- limit = 'limit' in args and int(args['limit']) or 10
- channel = str(args['channel'])
-
- ## Fail if bad input.
- if not channel :
- raise Exception('Missing Channel')
- return False
-
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
- ## Get History
- return self._request({ "urlcomponents" : [
- 'history',
- self.subscribe_key,
- channel,
- '0',
- str(limit)
- ] }, callback);
-
- def detailedHistory(self, args) :
- """
- #**
- #* Detailed History
- #*
- #* Load Detailed history from a channel.
- #*
#* @param array args with 'channel', optional: 'start', 'end', 'reverse', 'count'
#* @return mixed false on fail, array on success.
#*
@@ -556,34 +519,21 @@ class PubnubBase(object):
## Capture User Input
channel = str(args['channel'])
- params = dict()
- count = 100
-
- if 'count' in args:
- count = int(args['count'])
+ callback = args['callback'] if 'callback' in args else None
+ error = args['error'] if 'error' in args else None
- params['count'] = str(count)
-
- if 'reverse' in args:
- params['reverse'] = str(args['reverse']).lower()
-
- if 'start' in args:
- params['start'] = str(args['start'])
+ params = dict()
- if 'end' in args:
- params['end'] = str(args['end'])
+ params['count'] = str(args['count']) if 'count' in args else 100
+ params['reverse'] = str(args['reverse']).lower() if 'reverse' in args else 'false'
+ params['start'] = str(args['start']) if 'start' in args else None
+ params['end'] = str(args['end']) if 'end' in args else None
## Fail if bad input.
if not channel :
raise Exception('Missing Channel')
return False
- ## Capture Callback
- if 'callback' in args :
- callback = args['callback']
- else :
- callback = None
-
## Get History
return self._request({ 'urlcomponents' : [
'v2',
@@ -592,7 +542,8 @@ class PubnubBase(object):
self.subscribe_key,
'channel',
channel,
- ],'urlparams' : params }, callback=callback);
+ ], 'urlparams' : {'auth' : self.auth_key}}, callback=self._return_wrapped_callback(callback),
+ error=self._return_wrapped_callback(error))
def time(self, args = None) :
"""
@@ -610,10 +561,9 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and 'callback' in args:
- callback = args['callback']
- else :
- callback = None
+
+ callback = callback if args and 'callback' in args else None
+
time = self._request({'urlcomponents' : [
'time',
'0'
@@ -637,7 +587,8 @@ class PubnubBase(object):
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
if ("urlparams" in request):
- url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items()])
+ url = url + '?' + "&".join([ x + "=" + str(y) for x,y in request["urlparams"].items() if y is not None])
+ #print(url)
return url
@@ -648,9 +599,14 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
-import threading
-from threading import current_thread
-import threading
+
+class EmptyLock():
+ def __enter__(self):
+ pass
+ def __exit__(self,a,b,c):
+ pass
+
+empty_lock = EmptyLock()
class PubnubCoreAsync(PubnubBase):
@@ -666,7 +622,9 @@ class PubnubCoreAsync(PubnubBase):
auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
- uuid = None
+ uuid = None,
+ _tt_lock=empty_lock,
+ _channel_list_lock=empty_lock
) :
"""
#**
@@ -696,29 +654,20 @@ class PubnubCoreAsync(PubnubBase):
UUID=uuid
)
- self.subscriptions = {}
- self.timetoken = 0
- self.last_timetoken = 0
- self.version = '3.3.4'
- self.accept_encoding = 'gzip'
- self.SUB_RECEIVER = None
- self._connect = None
- self._tt_lock = threading.RLock()
+ self.subscriptions = {}
+ self.timetoken = 0
+ self.last_timetoken = 0
+ self.version = '3.3.4'
+ self.accept_encoding = 'gzip'
+ self.SUB_RECEIVER = None
+ self._connect = None
+ self._tt_lock = _tt_lock
+ self._channel_list_lock = _channel_list_lock
def get_channel_list(self, channels):
channel = ''
first = True
- if self._channel_list_lock:
- with self._channel_list_lock:
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
- else:
+ with self._channel_list_lock:
for ch in channels:
if not channels[ch]['subscribed']:
continue
@@ -727,9 +676,15 @@ class PubnubCoreAsync(PubnubBase):
else:
first = False
channel += ch
-
return channel
+
+ def each(l, func):
+ if func is None:
+ return
+ for i in l:
+ func(i)
+
def subscribe( self, args=None, sync=False ) :
"""
#**
@@ -765,12 +720,12 @@ class PubnubCoreAsync(PubnubBase):
if args is None:
_invoke(error, "Arguments Missing")
return
- channel = args['channel'] if 'channel' in args else None
- callback = args['callback'] if 'callback' in args else None
- connect = args['connect'] if 'connect' in args else None
- disconnect = args['disconnect'] if 'disconnect' in args else None
- reconnect = args['reconnect'] if 'reconnect' in args else None
- error = args['error'] if 'error' in args else None
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
with self._tt_lock:
self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
@@ -803,10 +758,15 @@ class PubnubCoreAsync(PubnubBase):
chobj['connected'] = True
_invoke(chobj['connect'],chobj['name'])
- def _invoke_error(err=None):
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- _invoke(chobj.error,err)
+ def _invoke_error(channel_list=None, err=None):
+ if channel_list is None:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
+ else:
+ for ch in channel_list:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
'''
if callback is None:
@@ -827,19 +787,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- if self._channel_list_lock:
- with self._channel_list_lock:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
- else:
+ with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
'first' : False,
@@ -848,9 +796,11 @@ class PubnubCoreAsync(PubnubBase):
'callback' : callback,
'connect' : connect,
'disconnect' : disconnect,
- 'reconnect' : reconnect
+ 'reconnect' : reconnect,
+ 'error' : error
}
+
## return if already connected to channel
if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
@@ -865,8 +815,11 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- if not response or error in response:
- _invoke_error()
+ #print response
+ if not response or ('message' in response and response['message'] == 'Forbidden'):
+ _invoke_error(response['payload']['channels'], response['message'])
+ _connect()
+ return
_invoke_connect()
@@ -893,7 +846,6 @@ class PubnubCoreAsync(PubnubBase):
_connect()
-
channel_list = self.get_channel_list(self.subscriptions)
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
@@ -903,9 +855,9 @@ class PubnubCoreAsync(PubnubBase):
channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )
except Exception as e:
- print e
+ print(e)
self.timeout( 1, _connect)
return
@@ -926,16 +878,20 @@ class PubnubCoreAsync(PubnubBase):
def unsubscribe( self, args ):
- #print(args['channel'])
- channel = str(args['channel'])
- if not (channel in self.subscriptions):
+
+ if 'channel' in self.subscriptions is False:
return False
+ channel = str(args['channel'])
+
+
## DISCONNECT
- self.subscriptions[channel]['connected'] = 0
- self.subscriptions[channel]['subscribed'] = False
- self.subscriptions[channel]['timetoken'] = 0
- self.subscriptions[channel]['first'] = False
+ with self._channel_list_lock:
+ if channel in self.subscriptions:
+ self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
+ self.subscriptions[channel]['timetoken'] = 0
+ self.subscriptions[channel]['first'] = False
self.CONNECT()
@@ -953,19 +909,43 @@ from threading import current_thread
latest_sub_callback_lock = threading.RLock()
latest_sub_callback = {'id' : None, 'callback' : None}
+
+
+
class HTTPClient:
- def __init__(self, url, callback, id=None):
+ def __init__(self, url, urllib_func=None, callback=None, error=None, id=None):
self.url = url
self.id = id
self.callback = callback
+ self.error = error
self.stop = False
+ self._urllib_func = urllib_func
def cancel(self):
self.stop = True
self.callback = None
+ self.error = None
+
def run(self):
- data = urllib2.urlopen(self.url, timeout=310).read()
+
+ def _invoke(func, data):
+ if func is not None:
+ func(data)
+
+ if self._urllib_func is None:
+ return
+
+ '''
+ try:
+ resp = urllib2.urlopen(self.url, timeout=320)
+ except urllib2.HTTPError as http_error:
+ resp = http_error
+ '''
+ resp = self._urllib_func(self.url, timeout=320)
+ data = resp[0]
+ code = resp[1]
+
if self.stop is True:
return
if self.callback is None:
@@ -975,14 +955,49 @@ class HTTPClient:
if latest_sub_callback['id'] != self.id:
return
else:
- print(data)
if latest_sub_callback['callback'] is not None:
latest_sub_callback['id'] = 0
- latest_sub_callback['callback'](json.loads(data))
+ try:
+ data = json.loads(data)
+ except:
+ _invoke(latest_sub_callback['error'], {'error' : 'json decoding error'})
+ return
+ if code != 200:
+ _invoke(latest_sub_callback['error'],data)
+ else:
+ _invoke(latest_sub_callback['callback'],data)
else:
- self.callback(json.loads(data))
+ try:
+ data = json.loads(data)
+ except:
+ _invoke(self.error, {'error' : 'json decoding error'})
+ return
+
+ if code != 200:
+ _invoke(self.error,data)
+ else:
+ _invoke(self.callback,data)
+def _urllib_request_2(url, timeout=320):
+ try:
+ resp = urllib2.urlopen(url,timeout=timeout)
+ except urllib2.HTTPError as http_error:
+ resp = http_error
+ return (resp.read(),resp.code)
+
+def _urllib_request_3(url, timeout=320):
+ #print(url)
+ try:
+ resp = urllib.request.urlopen(url,timeout=timeout)
+ except urllib.request.HTTPError as http_error:
+ resp = http_error
+ r = resp.read().decode("utf-8")
+ #print(r)
+ return (r,resp.code)
+
+_urllib_request = None
+
class Pubnub(PubnubCoreAsync):
def __init__(
self,
@@ -1003,13 +1018,15 @@ class Pubnub(PubnubCoreAsync):
auth_key = auth_key,
ssl_on = ssl_on,
origin = origin,
- uuid = pres_uuid
+ uuid = pres_uuid,
+ _tt_lock=threading.RLock(),
+ _channel_list_lock=threading.RLock()
)
+ global _urllib_request
if self.python_version == 2:
- self._request = self._request2
+ _urllib_request = _urllib_request_2
else:
- self._request = self._request3
- self._channel_list_lock = threading.RLock()
+ _urllib_request = _urllib_request_3
def timeout(self, interval, func):
def cb():
@@ -1018,17 +1035,20 @@ class Pubnub(PubnubCoreAsync):
thread = threading.Thread(target=cb)
thread.start()
- def _request2_async( self, request, callback, single=False ) :
+
+ def _request_async( self, request, callback=None, error=None, single=False ) :
+ global _urllib_request
## Build URL
url = self.getUrl(request)
if single is True:
id = time.time()
- client = HTTPClient(url, None, id)
+ client = HTTPClient(url=url, urllib_func=_urllib_request, callback=None, error=None, id=id)
with latest_sub_callback_lock:
latest_sub_callback['id'] = id
latest_sub_callback['callback'] = callback
+ latest_sub_callback['error'] = error
else:
- client = HTTPClient(url, callback)
+ client = HTTPClient(url=url, urllib_func=_urllib_request, callback=callback, error=error)
thread = threading.Thread(target=client.run)
thread.start()
@@ -1037,31 +1057,30 @@ class Pubnub(PubnubCoreAsync):
return abort
- def _request2_sync( self, request) :
-
+ def _request_sync( self, request) :
+ global _urllib_request
## Build URL
url = self.getUrl(request)
## Send Request Expecting JSONP Response
+ response = _urllib_request(url, timeout=320)
try:
- try: usock = urllib2.urlopen( url, None, 310 )
- except TypeError: usock = urllib2.urlopen( url, None )
- response = usock.read()
- usock.close()
- resp_json = json.loads(response)
- except Exception as e:
- print e
- return None
-
+ resp_json = json.loads(response[0])
+ except:
+ return [0,"JSON Error"]
+
+ if response[1] != 200 and 'status' in resp_json:
+ return {'message' : resp_json['message'], 'payload' : resp_json['payload']}
+
return resp_json
- def _request2(self, request, callback=None, single=False):
+ def _request(self, request, callback=None, error=None, single=False):
if callback is None:
- return self._request2_sync(request)
+ return self._request_sync(request)
else:
- self._request2_async(request, callback, single=single)
-
+ self._request_async(request, callback, error, single=single)
+'''
def _request3_sync( self, request) :
## Build URL
@@ -1083,3 +1102,4 @@ class Pubnub(PubnubCoreAsync):
return self._request3_sync(request,single=single)
else:
self._request3_async(request, callback, single=single)
+ '''
diff --git a/python/examples/publish-example.py b/python/examples/publish-example.py
index 31ae198..bb8b199 100755
--- a/python/examples/publish-example.py
+++ b/python/examples/publish-example.py
@@ -1,43 +1,68 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.1 Real-time Push Cloud API
+## -----------------------------------
+
import sys
-sys.path.append('.')
-sys.path.append('..')
+from twisted.internet import reactor
+sys.path.append('../')
+sys.path.append('../../')
from Pubnub import Pubnub
-## Initiate Class
-pubnub = Pubnub( publish_key='demo', subscribe_key='demo', cipher_key='enigma', ssl_on=False )
-#pubnub = Pubnub( publish_key='demo', subscribe_key='demo', ssl_on=False )
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or '' ##(Cipher key is Optional)
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+## -----------------------------------------------------------------------
+## Initiate Pubnub State
+## -----------------------------------------------------------------------
+pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
+crazy = 'hello_world'
+
+## -----------------------------------------------------------------------
## Publish Example
-info = pubnub.publish({
- 'channel' : 'abcd',
- 'message' : {
- 'iam' : 'object'
- }
-})
-print(info)
+## -----------------------------------------------------------------------
+def publish_complete(info):
+ print(info)
-info = pubnub.publish({
- 'channel' : 'abcd',
- 'message' : "hi I am string"
-})
-print(info)
+def publish_error(info):
+ print('ERROR : ' + str(info))
-info = pubnub.publish({
- 'channel' : 'abcd',
- 'message' : 1234
+## Publish string
+pubnub.publish({
+ 'channel' : crazy,
+ 'message' : 'Hello World!',
+ 'callback' : publish_complete,
+ 'error' : publish_error
})
-print(info)
-info = pubnub.publish({
- 'channel' : 'abcd',
- 'message' : "1234"
+## Publish list
+li = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
+
+pubnub.publish({
+ 'channel' : crazy,
+ 'message' : li,
+ 'callback' : publish_complete,
+ 'error' : publish_error
})
-print(info)
-info = pubnub.publish({
- 'channel' : 'abcd',
- 'message' : [
- 'i' , 'am', 'array'
- ]
+def done_cb(info):
+ publish_complete(info)
+
+pubnub.publish({
+ 'channel' : crazy,
+ 'message' : { 'some_key' : 'some_val' },
+ 'callback' : done_cb,
+ 'error' : publish_error
})
-print(info)
+
+
+pubnub.start()
diff --git a/python/tests/subscribe-test.py b/python/tests/subscribe-test.py
index 0d4c65e..be4a416 100755
--- a/python/tests/subscribe-test.py
+++ b/python/tests/subscribe-test.py
@@ -38,31 +38,31 @@ received = 0
## Subscribe Example
## -----------------------------------------------------------------------
def message_received(message):
- print message
+ print(message)
def check_received(message):
global current
global errors
global received
- print message
- print current
+ print(message)
+ print(current)
if message <= current:
- print 'ERROR'
+ print('ERROR')
#sys.exit()
errors += 1
else:
received += 1
- print 'active thread count : ', threading.activeCount()
- print 'errors = ' , errors
- print current_thread().getName(), ' , ', 'received = ', received
+ print('active thread count : ' + str( threading.activeCount()))
+ print('errors = ' + str(errors))
+ print(current_thread().getName() + ' , ' + 'received = ' + str(received))
if received != message:
- print '********** MISSED **************** ', message - received
+ print('********** MISSED **************** ' + str( message - received ))
current = message
def connected_test(ch) :
- print 'Connected' , ch
+ print('Connected ' + ch)
def connected(ch) :
pass
@@ -103,7 +103,6 @@ def subscribe(channel):
})
-print threading.activeCount()
pubnub.timeout(15,cb1)
diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py
index 22893f8..0ffccbb 100644
--- a/python/unassembled/Platform.py
+++ b/python/unassembled/Platform.py
@@ -12,19 +12,43 @@ from threading import current_thread
latest_sub_callback_lock = threading.RLock()
latest_sub_callback = {'id' : None, 'callback' : None}
+
+
+
class HTTPClient:
- def __init__(self, url, callback, id=None):
+ def __init__(self, url, urllib_func=None, callback=None, error=None, id=None):
self.url = url
self.id = id
self.callback = callback
+ self.error = error
self.stop = False
+ self._urllib_func = urllib_func
def cancel(self):
self.stop = True
self.callback = None
+ self.error = None
+
def run(self):
- data = urllib2.urlopen(self.url, timeout=310).read()
+
+ def _invoke(func, data):
+ if func is not None:
+ func(data)
+
+ if self._urllib_func is None:
+ return
+
+ '''
+ try:
+ resp = urllib2.urlopen(self.url, timeout=320)
+ except urllib2.HTTPError as http_error:
+ resp = http_error
+ '''
+ resp = self._urllib_func(self.url, timeout=320)
+ data = resp[0]
+ code = resp[1]
+
if self.stop is True:
return
if self.callback is None:
@@ -34,13 +58,48 @@ class HTTPClient:
if latest_sub_callback['id'] != self.id:
return
else:
- print(data)
if latest_sub_callback['callback'] is not None:
latest_sub_callback['id'] = 0
- latest_sub_callback['callback'](json.loads(data))
+ try:
+ data = json.loads(data)
+ except:
+ _invoke(latest_sub_callback['error'], {'error' : 'json decoding error'})
+ return
+ if code != 200:
+ _invoke(latest_sub_callback['error'],data)
+ else:
+ _invoke(latest_sub_callback['callback'],data)
else:
- self.callback(json.loads(data))
-
+ try:
+ data = json.loads(data)
+ except:
+ _invoke(self.error, {'error' : 'json decoding error'})
+ return
+
+ if code != 200:
+ _invoke(self.error,data)
+ else:
+ _invoke(self.callback,data)
+
+
+def _urllib_request_2(url, timeout=320):
+ try:
+ resp = urllib2.urlopen(url,timeout=timeout)
+ except urllib2.HTTPError as http_error:
+ resp = http_error
+ return (resp.read(),resp.code)
+
+def _urllib_request_3(url, timeout=320):
+ #print(url)
+ try:
+ resp = urllib.request.urlopen(url,timeout=timeout)
+ except urllib.request.HTTPError as http_error:
+ resp = http_error
+ r = resp.read().decode("utf-8")
+ #print(r)
+ return (r,resp.code)
+
+_urllib_request = None
class Pubnub(PubnubCoreAsync):
def __init__(
@@ -62,13 +121,15 @@ class Pubnub(PubnubCoreAsync):
auth_key = auth_key,
ssl_on = ssl_on,
origin = origin,
- uuid = pres_uuid
+ uuid = pres_uuid,
+ _tt_lock=threading.RLock(),
+ _channel_list_lock=threading.RLock()
)
+ global _urllib_request
if self.python_version == 2:
- self._request = self._request2
+ _urllib_request = _urllib_request_2
else:
- self._request = self._request3
- self._channel_list_lock = threading.RLock()
+ _urllib_request = _urllib_request_3
def timeout(self, interval, func):
def cb():
@@ -77,17 +138,20 @@ class Pubnub(PubnubCoreAsync):
thread = threading.Thread(target=cb)
thread.start()
- def _request2_async( self, request, callback, single=False ) :
+
+ def _request_async( self, request, callback=None, error=None, single=False ) :
+ global _urllib_request
## Build URL
url = self.getUrl(request)
if single is True:
id = time.time()
- client = HTTPClient(url, None, id)
+ client = HTTPClient(url=url, urllib_func=_urllib_request, callback=None, error=None, id=id)
with latest_sub_callback_lock:
latest_sub_callback['id'] = id
latest_sub_callback['callback'] = callback
+ latest_sub_callback['error'] = error
else:
- client = HTTPClient(url, callback)
+ client = HTTPClient(url=url, urllib_func=_urllib_request, callback=callback, error=error)
thread = threading.Thread(target=client.run)
thread.start()
@@ -96,31 +160,30 @@ class Pubnub(PubnubCoreAsync):
return abort
- def _request2_sync( self, request) :
-
+ def _request_sync( self, request) :
+ global _urllib_request
## Build URL
url = self.getUrl(request)
## Send Request Expecting JSONP Response
+ response = _urllib_request(url, timeout=320)
try:
- try: usock = urllib2.urlopen( url, None, 310 )
- except TypeError: usock = urllib2.urlopen( url, None )
- response = usock.read()
- usock.close()
- resp_json = json.loads(response)
- except Exception as e:
- print e
- return None
-
+ resp_json = json.loads(response[0])
+ except:
+ return [0,"JSON Error"]
+
+ if response[1] != 200 and 'status' in resp_json:
+ return {'message' : resp_json['message'], 'payload' : resp_json['payload']}
+
return resp_json
- def _request2(self, request, callback=None, single=False):
+ def _request(self, request, callback=None, error=None, single=False):
if callback is None:
- return self._request2_sync(request)
+ return self._request_sync(request)
else:
- self._request2_async(request, callback, single=single)
-
+ self._request_async(request, callback, error, single=single)
+'''
def _request3_sync( self, request) :
## Build URL
@@ -142,3 +205,4 @@ class Pubnub(PubnubCoreAsync):
return self._request3_sync(request,single=single)
else:
self._request3_async(request, callback, single=single)
+ '''