aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDevendra2014-04-02 02:44:29 +0530
committerDevendra2014-04-02 02:44:29 +0530
commit765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 (patch)
treeec6e9e2d102e866ae1b54a43d805607f0c62b8c2
parent9ac3ccf6283772b404a0c80945e3cdf3406ac5bf (diff)
downloadpubnub-python-765ee5db6fc39d77e55dcf4fe97fb96da2f46d30.tar.bz2
multiplexing support
-rw-r--r--common/PubnubBase.py12
-rw-r--r--common/PubnubCore.py4
-rw-r--r--common/PubnubCoreAsync.py171
-rw-r--r--python-tornado/Pubnub.py329
-rw-r--r--python-tornado/unassembled/Platform.py28
-rw-r--r--python-twisted/Pubnub.py329
-rw-r--r--python-twisted/unassembled/Platform.py28
-rw-r--r--python/Makefile2
-rw-r--r--python/Pubnub.py326
-rw-r--r--python/unassembled/Platform.py82
10 files changed, 1002 insertions, 309 deletions
diff --git a/common/PubnubBase.py b/common/PubnubBase.py
index 4c5b422..d287be3 100644
--- a/common/PubnubBase.py
+++ b/common/PubnubBase.py
@@ -90,6 +90,14 @@ class PubnubBase(object):
return message
+ def _return_wrapped_callback(self, callback=None):
+ def _new_format_callback(response):
+ if 'payload' in response:
+ if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
+ else:
+ if (callback != None):callback(response)
+ if (callback != None): return _new_format_callback
+
def publish( self, args ) :
"""
@@ -139,7 +147,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]}, callback)
+ ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -179,7 +187,7 @@ class PubnubBase(object):
callback = args['callback']
subscribe_key = args.get('subscribe_key') or self.subscribe_key
- return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback})
+ return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})
def here_now( self, args ) :
diff --git a/common/PubnubCore.py b/common/PubnubCore.py
index dcfd319..3ed3a68 100644
--- a/common/PubnubCore.py
+++ b/common/PubnubCore.py
@@ -1,4 +1,4 @@
-class PubnubCore(PubnubBase):
+class PubnubCore(PubnubCoreAsync):
def __init__(
self,
publish_key,
@@ -44,7 +44,7 @@ class PubnubCore(PubnubBase):
- def subscribe( self, args ) :
+ def subscribe_sync( self, args ) :
"""
#**
#* Subscribe
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py
index a7fbb7d..0038243 100644
--- a/common/PubnubCoreAsync.py
+++ b/common/PubnubCoreAsync.py
@@ -10,8 +10,6 @@ class PubnubCoreAsync(PubnubBase):
def start(self): pass
def stop(self): pass
- def timeout( self, delay, callback ):
- pass
def __init__(
self,
@@ -54,8 +52,23 @@ class PubnubCoreAsync(PubnubBase):
self.timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
-
- def subscribe( self, args ) :
+ self.SUB_RECEIVER = None
+ self._connect = None
+
+ def get_channel_list(self, channels):
+ channel = ''
+ first = True
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ return channel
+
+ def subscribe( self, args=None, sync=False ) :
"""
#**
#* Subscribe
@@ -87,94 +100,140 @@ class PubnubCoreAsync(PubnubBase):
})
"""
- ## Fail if missing channel
- if not 'channel' in args :
- return 'Missing Channel.'
- ## Fail if missing callback
- if not 'callback' in args :
- return 'Missing Callback.'
+ if sync is True and self.susbcribe_sync is not None:
+ self.susbcribe_sync(args)
+ return
+
+ def _invoke(func,msg=None):
+ if func is not None:
+ if msg is not None:
+ func(msg)
+ else:
+ func()
+
+ def _invoke_connect():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'])
+
+ def _invoke_error(err=None):
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj.error,err)
+
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- connectcb = args['connect']
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+
+ def _get_channel():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['subscribed'] is True:
+ return chobj
- if 'errorback' in args:
- errorback = args['errorback']
- else:
- errorback = lambda x: x
## New Channel?
- if not (channel in self.subscriptions) :
+ if not channel in self.subscriptions:
self.subscriptions[channel] = {
- 'first' : False,
- 'connected' : False,
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
}
- ## Ensure Single Connection
+ ## return if already connected to channel
if self.subscriptions[channel]['connected'] :
- return "Already Connected"
+ _invoke(error, "Already Connected")
+ return
+
- self.subscriptions[channel]['connected'] = 1
## SUBSCRIPTION RECURSION
- def _subscribe():
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ def _connect():
+ self._reset_offline()
+
def sub_callback(response):
- if not self.subscriptions[channel]['first'] :
- self.subscriptions[channel]['first'] = True
- connectcb()
+ print response
+ ## ERROR ?
+ if not response or error in response:
+ _invoke_error()
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ _invoke_connect()
+ self.timetoken = response[1]
- ## PROBLEM?
- if not response:
- def time_callback(_time):
- if not _time:
- self.timeout( 1, _subscribe )
- return errorback("Lost Network Connection")
- else:
- self.timeout( 1, _subscribe)
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
- ## ENSURE CONNECTED (Call Time Function)
- return self.time({ 'callback' : time_callback })
- self.timetoken = response[1]
- _subscribe()
+ _connect()
+
- pc = PubnubCrypto()
- out = []
- for message in response[0]:
- callback(self.decrypt(message))
+ channel_list = self.get_channel_list(self.subscriptions)
+ print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
- self._request( { "urlcomponents" : [
+ self.SUB_RECEIVER = self._request( { "urlcomponents" : [
'subscribe',
self.subscribe_key,
- channel,
+ channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback )
- except :
- self.timeout( 1, _subscribe)
+ ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ except Exception as e:
+ self.timeout( 1, _connect)
return
+ self._connect = _connect
+
+
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
- _subscribe()
+ _connect()
+
+ def _reset_offline(self):
+ if self.SUB_RECEIVER is not None:
+ self.SUB_RECEIVER()
+ self.SUB_RECEIVER = None
+
+ def CONNECT(self):
+ self._reset_offline()
+ self._connect()
+
+
def unsubscribe( self, args ):
+ #print(args['channel'])
channel = str(args['channel'])
if not (channel in self.subscriptions):
return False
## DISCONNECT
self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
self.subscriptions[channel]['timetoken'] = 0
self.subscriptions[channel]['first'] = False
+ self.CONNECT()
diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py
index 89c0d97..ee66619 100644
--- a/python-tornado/Pubnub.py
+++ b/python-tornado/Pubnub.py
@@ -16,7 +16,7 @@ from base64 import encodestring, decodestring
import hashlib
import hmac
-class PubnubCrypto() :
+class PubnubCrypto2() :
"""
#**
#* PubnubCrypto
@@ -93,13 +93,89 @@ class PubnubCrypto() :
return self.depad((cipher.decrypt(decodestring(msg))))
+class PubnubCrypto3() :
+ """
+ #**
+ #* PubnubCrypto
+ #*
+ #**
+
+ ## Initiate Class
+ pc = PubnubCrypto
+
+ """
+
+ def pad( self, msg, block_size=16 ):
+ """
+ #**
+ #* pad
+ #*
+ #* pad the text to be encrypted
+ #* appends a padding character to the end of the String
+ #* until the string has block_size length
+ #* @return msg with padding.
+ #**
+ """
+ padding = block_size - (len(msg) % block_size)
+ return msg + (chr(padding)*padding).encode('utf-8')
+
+ def depad( self, msg ):
+ """
+ #**
+ #* depad
+ #*
+ #* depad the decryptet message"
+ #* @return msg without padding.
+ #**
+ """
+ return msg[0:-ord(msg[-1])]
+
+ def getSecret( self, key ):
+ """
+ #**
+ #* getSecret
+ #*
+ #* hases the key to MD5
+ #* @return key in MD5 format
+ #**
+ """
+ return hashlib.sha256(key.encode("utf-8")).hexdigest()
+
+ def encrypt( self, key, msg ):
+ """
+ #**
+ #* encrypt
+ #*
+ #* encrypts the message
+ #* @return message in encrypted format
+ #**
+ """
+ secret = self.getSecret(key)
+ Initial16bytes='0123456789012345'
+ cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes)
+ return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8')
+ def decrypt( self, key, msg ):
+ """
+ #**
+ #* decrypt
+ #*
+ #* decrypts the message
+ #* @return message in decryped format
+ #**
+ """
+ secret = self.getSecret(key)
+ Initial16bytes='0123456789012345'
+ cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes)
+ return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8')
+
+
try: import json
except ImportError: import simplejson as json
import time
import hashlib
-import urllib2
-import uuid
+import uuid
+import sys
class PubnubBase(object):
def __init__(
@@ -137,7 +213,7 @@ class PubnubBase(object):
self.secret_key = secret_key
self.cipher_key = cipher_key
self.ssl = ssl_on
- self.pc = PubnubCrypto()
+
if self.ssl :
self.origin = 'https://' + self.origin
@@ -145,8 +221,16 @@ class PubnubBase(object):
self.origin = 'http://' + self.origin
self.uuid = UUID or str(uuid.uuid4())
+
+ if type(sys.version_info) is tuple:
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
+ else:
+ self.python_version = 3
+ self.pc = PubnubCrypto3()
+
- if not isinstance(self.uuid, basestring):
+ if not isinstance(self.uuid, str):
raise AttributeError("pres_uuid must be a string")
def sign(self, channel, message):
@@ -177,6 +261,14 @@ class PubnubBase(object):
return message
+ def _return_wrapped_callback(self, callback=None):
+ def _new_format_callback(response):
+ if 'payload' in response:
+ if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
+ else:
+ if (callback != None):callback(response)
+ if (callback != None): return _new_format_callback
+
def publish( self, args ) :
"""
@@ -207,7 +299,7 @@ class PubnubBase(object):
channel = str(args['channel'])
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -226,7 +318,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]}, callback)
+ ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -266,7 +358,7 @@ class PubnubBase(object):
callback = args['callback']
subscribe_key = args.get('subscribe_key') or self.subscribe_key
- return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback})
+ return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})
def here_now( self, args ) :
@@ -291,7 +383,7 @@ class PubnubBase(object):
channel = str(args['channel'])
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -329,7 +421,7 @@ class PubnubBase(object):
"""
## Capture User Input
- limit = args.has_key('limit') and int(args['limit']) or 10
+ limit = 'limit' in args and int(args['limit']) or 10
channel = str(args['channel'])
## Fail if bad input.
@@ -338,7 +430,7 @@ class PubnubBase(object):
return False
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -377,18 +469,18 @@ class PubnubBase(object):
params = dict()
count = 100
- if args.has_key('count'):
+ if 'count' in args:
count = int(args['count'])
params['count'] = str(count)
- if args.has_key('reverse'):
+ if 'reverse' in args:
params['reverse'] = str(args['reverse']).lower()
- if args.has_key('start'):
+ if 'start' in args:
params['start'] = str(args['start'])
- if args.has_key('end'):
+ if 'end' in args:
params['end'] = str(args['end'])
## Fail if bad input.
@@ -397,7 +489,7 @@ class PubnubBase(object):
return False
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -428,7 +520,7 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and args.has_key('callback') :
+ if args and 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -454,8 +546,8 @@ class PubnubBase(object):
hex(ord(ch)).replace( '0x', '%' ).upper() or
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
- if (request.has_key("urlparams")):
- url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].iteritems()])
+ if ("urlparams" in request):
+ url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()])
return url
@@ -471,8 +563,6 @@ class PubnubCoreAsync(PubnubBase):
def start(self): pass
def stop(self): pass
- def timeout( self, delay, callback ):
- pass
def __init__(
self,
@@ -515,8 +605,23 @@ class PubnubCoreAsync(PubnubBase):
self.timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
-
- def subscribe( self, args ) :
+ self.SUB_RECEIVER = None
+ self._connect = None
+
+ def get_channel_list(self, channels):
+ channel = ''
+ first = True
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ return channel
+
+ def subscribe( self, args=None, sync=False ) :
"""
#**
#* Subscribe
@@ -548,97 +653,143 @@ class PubnubCoreAsync(PubnubBase):
})
"""
- ## Fail if missing channel
- if not 'channel' in args :
- return 'Missing Channel.'
- ## Fail if missing callback
- if not 'callback' in args :
- return 'Missing Callback.'
+ if sync is True and self.susbcribe_sync is not None:
+ self.susbcribe_sync(args)
+ return
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- connectcb = args['connect']
+ def _invoke(func,msg=None):
+ if func is not None:
+ if msg is not None:
+ func(msg)
+ else:
+ func()
+
+ def _invoke_connect():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'])
+
+ def _invoke_error(err=None):
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj.error,err)
+
+
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+
+ def _get_channel():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['subscribed'] is True:
+ return chobj
- if 'errorback' in args:
- errorback = args['errorback']
- else:
- errorback = lambda x: x
## New Channel?
- if not (channel in self.subscriptions) :
+ if not channel in self.subscriptions:
self.subscriptions[channel] = {
- 'first' : False,
- 'connected' : False,
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
}
- ## Ensure Single Connection
+ ## return if already connected to channel
if self.subscriptions[channel]['connected'] :
- return "Already Connected"
+ _invoke(error, "Already Connected")
+ return
+
- self.subscriptions[channel]['connected'] = 1
## SUBSCRIPTION RECURSION
- def _subscribe():
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ def _connect():
+ self._reset_offline()
+
def sub_callback(response):
- if not self.subscriptions[channel]['first'] :
- self.subscriptions[channel]['first'] = True
- connectcb()
+ print response
+ ## ERROR ?
+ if not response or error in response:
+ _invoke_error()
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ _invoke_connect()
+ self.timetoken = response[1]
- ## PROBLEM?
- if not response:
- def time_callback(_time):
- if not _time:
- self.timeout( 1, _subscribe )
- return errorback("Lost Network Connection")
- else:
- self.timeout( 1, _subscribe)
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
- ## ENSURE CONNECTED (Call Time Function)
- return self.time({ 'callback' : time_callback })
- self.timetoken = response[1]
- _subscribe()
+ _connect()
+
- pc = PubnubCrypto()
- out = []
- for message in response[0]:
- callback(self.decrypt(message))
+ channel_list = self.get_channel_list(self.subscriptions)
+ print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
- self._request( { "urlcomponents" : [
+ self.SUB_RECEIVER = self._request( { "urlcomponents" : [
'subscribe',
self.subscribe_key,
- channel,
+ channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback )
- except :
- self.timeout( 1, _subscribe)
+ ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ except Exception as e:
+ self.timeout( 1, _connect)
return
+ self._connect = _connect
+
+
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
- _subscribe()
+ _connect()
+
+ def _reset_offline(self):
+ if self.SUB_RECEIVER is not None:
+ self.SUB_RECEIVER()
+ self.SUB_RECEIVER = None
+
+ def CONNECT(self):
+ self._reset_offline()
+ self._connect()
+
+
def unsubscribe( self, args ):
+ #print(args['channel'])
channel = str(args['channel'])
if not (channel in self.subscriptions):
return False
## DISCONNECT
self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
self.subscriptions[channel]['timetoken'] = 0
self.subscriptions[channel]['first'] = False
+ self.CONNECT()
import tornado.httpclient
@@ -685,15 +836,24 @@ class Pubnub(PubnubCoreAsync):
self.headers['Accept-Encoding'] = self.accept_encoding
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
+ self.id = None
- def _request( self, request, callback ) :
+ def _request( self, request, callback, single=False ) :
url = self.getUrl(request)
- ## Send Request Expecting JSON Response
- #print self.headers
-
request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )
+ if single is True:
+ id = time.time()
+ self.id = id
def responseCallback(response):
+ if single is True:
+ if not id == self.id:
+ return None
+
+ body = response._get_body()
+ if body is None:
+ return
+
def handle_exc(*args):
return True
if response.error is not None:
@@ -701,9 +861,14 @@ class Pubnub(PubnubCoreAsync):
response.rethrow()
elif callback:
callback(eval(response._get_body()))
-
+
self.http.fetch(
- request,
- callback=responseCallback,
+ request=request,
+ callback=responseCallback
)
+ def abort():
+ pass
+
+ return abort
+
diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py
index 62d3a26..f98befb 100644
--- a/python-tornado/unassembled/Platform.py
+++ b/python-tornado/unassembled/Platform.py
@@ -42,15 +42,24 @@ class Pubnub(PubnubCoreAsync):
self.headers['Accept-Encoding'] = self.accept_encoding
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
+ self.id = None
- def _request( self, request, callback ) :
+ def _request( self, request, callback, single=False ) :
url = self.getUrl(request)
- ## Send Request Expecting JSON Response
- #print self.headers
-
request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )
+ if single is True:
+ id = time.time()
+ self.id = id
def responseCallback(response):
+ if single is True:
+ if not id == self.id:
+ return None
+
+ body = response._get_body()
+ if body is None:
+ return
+
def handle_exc(*args):
return True
if response.error is not None:
@@ -58,9 +67,14 @@ class Pubnub(PubnubCoreAsync):
response.rethrow()
elif callback:
callback(eval(response._get_body()))
-
+
self.http.fetch(
- request,
- callback=responseCallback,
+ request=request,
+ callback=responseCallback
)
+ def abort():
+ pass
+
+ return abort
+
diff --git a/python-twisted/Pubnub.py b/python-twisted/Pubnub.py
index 66534b5..3bc2d35 100644
--- a/python-twisted/Pubnub.py
+++ b/python-twisted/Pubnub.py
@@ -16,7 +16,7 @@ from base64 import encodestring, decodestring
import hashlib
import hmac
-class PubnubCrypto() :
+class PubnubCrypto2() :
"""
#**
#* PubnubCrypto
@@ -93,13 +93,89 @@ class PubnubCrypto() :
return self.depad((cipher.decrypt(decodestring(msg))))
+class PubnubCrypto3() :
+ """
+ #**
+ #* PubnubCrypto
+ #*
+ #**
+
+ ## Initiate Class
+ pc = PubnubCrypto
+
+ """
+
+ def pad( self, msg, block_size=16 ):
+ """
+ #**
+ #* pad
+ #*
+ #* pad the text to be encrypted
+ #* appends a padding character to the end of the String
+ #* until the string has block_size length
+ #* @return msg with padding.
+ #**
+ """
+ padding = block_size - (len(msg) % block_size)
+ return msg + (chr(padding)*padding).encode('utf-8')
+
+ def depad( self, msg ):
+ """
+ #**
+ #* depad
+ #*
+ #* depad the decryptet message"
+ #* @return msg without padding.
+ #**
+ """
+ return msg[0:-ord(msg[-1])]
+
+ def getSecret( self, key ):
+ """
+ #**
+ #* getSecret
+ #*
+ #* hases the key to MD5
+ #* @return key in MD5 format
+ #**
+ """
+ return hashlib.sha256(key.encode("utf-8")).hexdigest()
+
+ def encrypt( self, key, msg ):
+ """
+ #**
+ #* encrypt
+ #*
+ #* encrypts the message
+ #* @return message in encrypted format
+ #**
+ """
+ secret = self.getSecret(key)
+ Initial16bytes='0123456789012345'
+ cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes)
+ return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8')
+ def decrypt( self, key, msg ):
+ """
+ #**
+ #* decrypt
+ #*
+ #* decrypts the message
+ #* @return message in decryped format
+ #**
+ """
+ secret = self.getSecret(key)
+ Initial16bytes='0123456789012345'
+ cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes)
+ return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8')
+
+
try: import json
except ImportError: import simplejson as json
import time
import hashlib
-import urllib2
-import uuid
+import uuid
+import sys
class PubnubBase(object):
def __init__(
@@ -137,7 +213,7 @@ class PubnubBase(object):
self.secret_key = secret_key
self.cipher_key = cipher_key
self.ssl = ssl_on
- self.pc = PubnubCrypto()
+
if self.ssl :
self.origin = 'https://' + self.origin
@@ -145,8 +221,16 @@ class PubnubBase(object):
self.origin = 'http://' + self.origin
self.uuid = UUID or str(uuid.uuid4())
+
+ if type(sys.version_info) is tuple:
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
+ else:
+ self.python_version = 3
+ self.pc = PubnubCrypto3()
+
- if not isinstance(self.uuid, basestring):
+ if not isinstance(self.uuid, str):
raise AttributeError("pres_uuid must be a string")
def sign(self, channel, message):
@@ -177,6 +261,14 @@ class PubnubBase(object):
return message
+ def _return_wrapped_callback(self, callback=None):
+ def _new_format_callback(response):
+ if 'payload' in response:
+ if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
+ else:
+ if (callback != None):callback(response)
+ if (callback != None): return _new_format_callback
+
def publish( self, args ) :
"""
@@ -207,7 +299,7 @@ class PubnubBase(object):
channel = str(args['channel'])
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -226,7 +318,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]}, callback)
+ ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -266,7 +358,7 @@ class PubnubBase(object):
callback = args['callback']
subscribe_key = args.get('subscribe_key') or self.subscribe_key
- return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback})
+ return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})
def here_now( self, args ) :
@@ -291,7 +383,7 @@ class PubnubBase(object):
channel = str(args['channel'])
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -329,7 +421,7 @@ class PubnubBase(object):
"""
## Capture User Input
- limit = args.has_key('limit') and int(args['limit']) or 10
+ limit = 'limit' in args and int(args['limit']) or 10
channel = str(args['channel'])
## Fail if bad input.
@@ -338,7 +430,7 @@ class PubnubBase(object):
return False
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -377,18 +469,18 @@ class PubnubBase(object):
params = dict()
count = 100
- if args.has_key('count'):
+ if 'count' in args:
count = int(args['count'])
params['count'] = str(count)
- if args.has_key('reverse'):
+ if 'reverse' in args:
params['reverse'] = str(args['reverse']).lower()
- if args.has_key('start'):
+ if 'start' in args:
params['start'] = str(args['start'])
- if args.has_key('end'):
+ if 'end' in args:
params['end'] = str(args['end'])
## Fail if bad input.
@@ -397,7 +489,7 @@ class PubnubBase(object):
return False
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -428,7 +520,7 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and args.has_key('callback') :
+ if args and 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -454,8 +546,8 @@ class PubnubBase(object):
hex(ord(ch)).replace( '0x', '%' ).upper() or
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
- if (request.has_key("urlparams")):
- url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].iteritems()])
+ if ("urlparams" in request):
+ url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()])
return url
@@ -471,8 +563,6 @@ class PubnubCoreAsync(PubnubBase):
def start(self): pass
def stop(self): pass
- def timeout( self, delay, callback ):
- pass
def __init__(
self,
@@ -515,8 +605,23 @@ class PubnubCoreAsync(PubnubBase):
self.timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
-
- def subscribe( self, args ) :
+ self.SUB_RECEIVER = None
+ self._connect = None
+
+ def get_channel_list(self, channels):
+ channel = ''
+ first = True
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ return channel
+
+ def subscribe( self, args=None, sync=False ) :
"""
#**
#* Subscribe
@@ -548,97 +653,143 @@ class PubnubCoreAsync(PubnubBase):
})
"""
- ## Fail if missing channel
- if not 'channel' in args :
- return 'Missing Channel.'
- ## Fail if missing callback
- if not 'callback' in args :
- return 'Missing Callback.'
+ if sync is True and self.susbcribe_sync is not None:
+ self.susbcribe_sync(args)
+ return
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- connectcb = args['connect']
+ def _invoke(func,msg=None):
+ if func is not None:
+ if msg is not None:
+ func(msg)
+ else:
+ func()
+
+ def _invoke_connect():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'])
+
+ def _invoke_error(err=None):
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj.error,err)
+
+
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+
+ def _get_channel():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['subscribed'] is True:
+ return chobj
- if 'errorback' in args:
- errorback = args['errorback']
- else:
- errorback = lambda x: x
## New Channel?
- if not (channel in self.subscriptions) :
+ if not channel in self.subscriptions:
self.subscriptions[channel] = {
- 'first' : False,
- 'connected' : False,
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
}
- ## Ensure Single Connection
+ ## return if already connected to channel
if self.subscriptions[channel]['connected'] :
- return "Already Connected"
+ _invoke(error, "Already Connected")
+ return
+
- self.subscriptions[channel]['connected'] = 1
## SUBSCRIPTION RECURSION
- def _subscribe():
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ def _connect():
+ self._reset_offline()
+
def sub_callback(response):
- if not self.subscriptions[channel]['first'] :
- self.subscriptions[channel]['first'] = True
- connectcb()
+ print response
+ ## ERROR ?
+ if not response or error in response:
+ _invoke_error()
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ _invoke_connect()
+ self.timetoken = response[1]
- ## PROBLEM?
- if not response:
- def time_callback(_time):
- if not _time:
- self.timeout( 1, _subscribe )
- return errorback("Lost Network Connection")
- else:
- self.timeout( 1, _subscribe)
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
- ## ENSURE CONNECTED (Call Time Function)
- return self.time({ 'callback' : time_callback })
- self.timetoken = response[1]
- _subscribe()
+ _connect()
+
- pc = PubnubCrypto()
- out = []
- for message in response[0]:
- callback(self.decrypt(message))
+ channel_list = self.get_channel_list(self.subscriptions)
+ print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
- self._request( { "urlcomponents" : [
+ self.SUB_RECEIVER = self._request( { "urlcomponents" : [
'subscribe',
self.subscribe_key,
- channel,
+ channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback )
- except :
- self.timeout( 1, _subscribe)
+ ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ except Exception as e:
+ self.timeout( 1, _connect)
return
+ self._connect = _connect
+
+
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
- _subscribe()
+ _connect()
+
+ def _reset_offline(self):
+ if self.SUB_RECEIVER is not None:
+ self.SUB_RECEIVER()
+ self.SUB_RECEIVER = None
+
+ def CONNECT(self):
+ self._reset_offline()
+ self._connect()
+
+
def unsubscribe( self, args ):
+ #print(args['channel'])
channel = str(args['channel'])
if not (channel in self.subscriptions):
return False
## DISCONNECT
self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
self.subscriptions[channel]['timetoken'] = 0
self.subscriptions[channel]['first'] = False
+ self.CONNECT()
from twisted.web.client import getPage
@@ -650,10 +801,14 @@ from twisted.web.client import HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.internet.ssl import ClientContextFactory
from twisted.internet.task import LoopingCall
+import twisted
+from hashlib import sha256
+import time
pnconn_pool = HTTPConnectionPool(reactor, persistent=True)
-pnconn_pool.maxPersistentPerHost = 100
+pnconn_pool.maxPersistentPerHost = 100000
pnconn_pool.cachedConnectionTimeout = 310
+pnconn_pool.retryAutomatically = True
class Pubnub(PubnubCoreAsync):
@@ -684,7 +839,7 @@ class Pubnub(PubnubCoreAsync):
#self.headers['Accept-Encoding'] = [self.accept_encoding]
self.headers['V'] = [self.version]
- def _request( self, request, callback ) :
+ def _request( self, request, callback, single=False ) :
global pnconn_pool
## Build URL
@@ -696,24 +851,42 @@ class Pubnub(PubnubCoreAsync):
]) for bit in request])
'''
url = self.getUrl(request)
+
agent = ContentDecoderAgent(RedirectAgent(Agent(
reactor,
contextFactory = WebClientContextFactory(),
pool = self.ssl and None or pnconn_pool
)), [('gzip', GzipDecoder)])
+
request = agent.request( 'GET', url, Headers(self.headers), None )
+ if single is True:
+ id = time.time()
+ self.id = id
+
def received(response):
finished = Deferred()
response.deliverBody(PubNubResponse(finished))
return finished
def complete(data):
- callback(eval(data))
+ if single is True:
+ if not id == self.id:
+ return None
+ try:
+ callback(eval(data))
+ except Exception as e:
+ pass
+ #need error handling here
+
+ def abort():
+ pass
request.addCallback(received)
request.addBoth(complete)
+ return abort
+
class WebClientContextFactory(ClientContextFactory):
def getContext(self, hostname, port):
return ClientContextFactory.getContext(self)
diff --git a/python-twisted/unassembled/Platform.py b/python-twisted/unassembled/Platform.py
index 7318703..3b84b30 100644
--- a/python-twisted/unassembled/Platform.py
+++ b/python-twisted/unassembled/Platform.py
@@ -7,10 +7,14 @@ from twisted.web.client import HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.internet.ssl import ClientContextFactory
from twisted.internet.task import LoopingCall
+import twisted
+from hashlib import sha256
+import time
pnconn_pool = HTTPConnectionPool(reactor, persistent=True)
-pnconn_pool.maxPersistentPerHost = 100
+pnconn_pool.maxPersistentPerHost = 100000
pnconn_pool.cachedConnectionTimeout = 310
+pnconn_pool.retryAutomatically = True
class Pubnub(PubnubCoreAsync):
@@ -41,7 +45,7 @@ class Pubnub(PubnubCoreAsync):
#self.headers['Accept-Encoding'] = [self.accept_encoding]
self.headers['V'] = [self.version]
- def _request( self, request, callback ) :
+ def _request( self, request, callback, single=False ) :
global pnconn_pool
## Build URL
@@ -53,24 +57,42 @@ class Pubnub(PubnubCoreAsync):
]) for bit in request])
'''
url = self.getUrl(request)
+
agent = ContentDecoderAgent(RedirectAgent(Agent(
reactor,
contextFactory = WebClientContextFactory(),
pool = self.ssl and None or pnconn_pool
)), [('gzip', GzipDecoder)])
+
request = agent.request( 'GET', url, Headers(self.headers), None )
+ if single is True:
+ id = time.time()
+ self.id = id
+
def received(response):
finished = Deferred()
response.deliverBody(PubNubResponse(finished))
return finished
def complete(data):
- callback(eval(data))
+ if single is True:
+ if not id == self.id:
+ return None
+ try:
+ callback(eval(data))
+ except Exception as e:
+ pass
+ #need error handling here
+
+ def abort():
+ pass
request.addCallback(received)
request.addBoth(complete)
+ return abort
+
class WebClientContextFactory(ClientContextFactory):
def getContext(self, hostname, port):
return ClientContextFactory.getContext(self)
diff --git a/python/Makefile b/python/Makefile
index 5eb9e2f..b693cf8 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -12,7 +12,7 @@ build:
echo "\n" >> ./Pubnub.py
cat ../common/PubnubBase.py >> ./Pubnub.py
echo "\n" >> ./Pubnub.py
- cat ../common/PubnubCore.py >> ./Pubnub.py
+ cat ../common/PubnubCoreAsync.py >> ./Pubnub.py
echo "\n" >> ./Pubnub.py
cat ./unassembled/Platform.py >> ./Pubnub.py
find -name "Pubnub*py" | xargs sed -i "s/PubNub\ [0-9]\.[0-9]\.[0-9]/PubNub\ $(VERSION)/g"
diff --git a/python/Pubnub.py b/python/Pubnub.py
index 91f67ad..a449c2d 100644
--- a/python/Pubnub.py
+++ b/python/Pubnub.py
@@ -206,13 +206,13 @@ class PubnubBase(object):
pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False )
"""
- self.origin = origin
- self.limit = 1800
- self.publish_key = publish_key
- self.subscribe_key = subscribe_key
- self.secret_key = secret_key
- self.cipher_key = cipher_key
- self.ssl = ssl_on
+ self.origin = origin
+ self.limit = 1800
+ self.publish_key = publish_key
+ self.subscribe_key = subscribe_key
+ self.secret_key = secret_key
+ self.cipher_key = cipher_key
+ self.ssl = ssl_on
if self.ssl :
@@ -261,6 +261,14 @@ class PubnubBase(object):
return message
+ def _return_wrapped_callback(self, callback=None):
+ def _new_format_callback(response):
+ if 'payload' in response:
+ if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
+ else:
+ if (callback != None):callback(response)
+ if (callback != None): return _new_format_callback
+
def publish( self, args ) :
"""
@@ -310,7 +318,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]}, callback)
+ ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -350,7 +358,7 @@ class PubnubBase(object):
callback = args['callback']
subscribe_key = args.get('subscribe_key') or self.subscribe_key
- return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback})
+ return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})
def here_now( self, args ) :
@@ -543,7 +551,19 @@ class PubnubBase(object):
return url
-class PubnubCore(PubnubBase):
+try:
+ from hashlib import sha256
+ digestmod = sha256
+except ImportError:
+ import Crypto.Hash.SHA256 as digestmod
+ sha256 = digestmod.new
+import hmac
+
+class PubnubCoreAsync(PubnubBase):
+
+ def start(self): pass
+ def stop(self): pass
+
def __init__(
self,
publish_key,
@@ -562,17 +582,16 @@ class PubnubCore(PubnubBase):
#*
#* @param string publish_key required key to send messages.
#* @param string subscribe_key required key to receive messages.
- #* @param string secret_key optional key to sign messages.
+ #* @param string secret_key required key to sign messages.
#* @param boolean ssl required for 2048 bit encrypted messages.
#* @param string origin PUBNUB Server Origin.
- #* @param string pres_uuid optional identifier for presence (auto-generated if not supplied)
#**
## Initiat Class
pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False )
"""
- super(PubnubCore, self).__init__(
+ super(PubnubCoreAsync, self).__init__(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
@@ -584,20 +603,33 @@ class PubnubCore(PubnubBase):
self.subscriptions = {}
self.timetoken = 0
- self.version = '3.4'
+ self.version = '3.3.4'
self.accept_encoding = 'gzip'
+ self.SUB_RECEIVER = None
+ self._connect = None
+ def get_channel_list(self, channels):
+ channel = ''
+ first = True
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ return channel
-
- def subscribe( self, args ) :
+ def subscribe( self, args=None, sync=False ) :
"""
#**
#* Subscribe
#*
- #* This is BLOCKING.
+ #* This is NON-BLOCKING.
#* Listen for a message on a channel.
#*
- #* @param array args with channel and callback.
+ #* @param array args with channel and message.
#* @return false on fail, array on success.
#**
@@ -606,58 +638,158 @@ class PubnubCore(PubnubBase):
print(message)
return True
+ ## On Connect Callback
+ def connected() :
+ pubnub.publish({
+ 'channel' : 'hello_world',
+ 'message' : { 'some_var' : 'text' }
+ })
+
+ ## Subscribe
pubnub.subscribe({
'channel' : 'hello_world',
- 'callback' : receive
+ 'connect' : connected,
+ 'callback' : receive
})
"""
- ## Fail if missing channel
- if not 'channel' in args :
- raise Exception('Missing Channel.')
- return False
+ if sync is True and self.susbcribe_sync is not None:
+ self.susbcribe_sync(args)
+ return
+
+ def _invoke(func,msg=None):
+ if func is not None:
+ if msg is not None:
+ func(msg)
+ else:
+ func()
+
+ def _invoke_connect():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'])
+
+ def _invoke_error(err=None):
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj.error,err)
+
+
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+
+ def _get_channel():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['subscribed'] is True:
+ return chobj
+
+
+ ## New Channel?
+ if not channel in self.subscriptions:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
- ## Fail if missing callback
- if not 'callback' in args :
- raise Exception('Missing Callback.')
- return False
+ ## return if already connected to channel
+ if self.subscriptions[channel]['connected'] :
+ _invoke(error, "Already Connected")
+ return
+
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- subscribe_key = args.get('subscribe_key') or self.subscribe_key
+ ## SUBSCRIPTION RECURSION
+ def _connect():
+
+ self._reset_offline()
+
+ def sub_callback(response):
+ print response
+ ## ERROR ?
+ if not response or error in response:
+ _invoke_error()
+
+ _invoke_connect()
+
+
+ self.timetoken = response[1]
- ## Begin Subscribe
- while True :
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
- timetoken = 'timetoken' in args and args['timetoken'] or 0
- try :
- ## Wait for Message
- response = self._request({"urlcomponents" : [
+
+ _connect()
+
+
+
+ channel_list = self.get_channel_list(self.subscriptions)
+ print channel_list
+ ## CONNECT TO PUBNUB SUBSCRIBE SERVERS
+ try:
+ self.SUB_RECEIVER = self._request( { "urlcomponents" : [
'subscribe',
- subscribe_key,
- channel,
+ self.subscribe_key,
+ channel_list,
'0',
- str(timetoken)
- ],"urlparams" : {"uuid" : self.uuid }})
+ str(self.timetoken)
+ ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ except Exception as e:
+ self.timeout( 1, _connect)
+ return
+
+ self._connect = _connect
- messages = response[0]
- args['timetoken'] = response[1]
- ## If it was a timeout
- if not len(messages) :
- continue
+ ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
+ _connect()
- ## Run user Callback and Reconnect if user permits.
- for message in messages :
- if not callback(self.decrypt(message)) :
- return
+ def _reset_offline(self):
+ if self.SUB_RECEIVER is not None:
+ self.SUB_RECEIVER()
+ self.SUB_RECEIVER = None
- except Exception:
- time.sleep(1)
+ def CONNECT(self):
+ self._reset_offline()
+ self._connect()
+
+
+ def unsubscribe( self, args ):
+ #print(args['channel'])
+ channel = str(args['channel'])
+ if not (channel in self.subscriptions):
+ return False
- return True
+ ## DISCONNECT
+ self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
+ self.subscriptions[channel]['timetoken'] = 0
+ self.subscriptions[channel]['first'] = False
+ self.CONNECT()
try:
@@ -665,6 +797,34 @@ try:
except:
import urllib2
+import threading
+import json
+import time
+
+current_req_id = -1
+
+class HTTPClient:
+ def __init__(self, url, callback, id=None):
+ self.url = url
+ self.id = id
+ self.callback = callback
+ self.stop = False
+
+ def cancel(self):
+ self.stop = True
+ self.callback = None
+
+ def run(self):
+ global current_req_id
+ data = urllib2.urlopen(self.url, timeout=310).read()
+ if self.stop is True:
+ return
+ if self.id is not None and current_req_id != self.id:
+ return
+ if self.callback is not None:
+ self.callback(json.loads(data))
+
+
class Pubnub(PubnubCore):
def __init__(
self,
@@ -690,7 +850,33 @@ class Pubnub(PubnubCore):
else:
self._request = self._request3
- def _request2( self, request, callback = None ) :
+ def timeout(self, interval, func):
+ def cb():
+ time.sleep(interval)
+ func()
+ thread = threading.Thread(target=cb)
+ thread.start()
+
+ def _request2_async( self, request, callback, single=False ) :
+ global current_req_id
+ ## Build URL
+ url = self.getUrl(request)
+ if single is True:
+ id = time.time()
+ client = HTTPClient(url, callback, id)
+ current_req_id = id
+ else:
+ client = HTTPClient(url, callback)
+
+ thread = threading.Thread(target=client.run)
+ thread.start()
+ def abort():
+ client.cancel();
+ return abort
+
+
+ def _request2_sync( self, request) :
+
## Build URL
url = self.getUrl(request)
@@ -704,13 +890,18 @@ class Pubnub(PubnubCore):
except:
return None
- if (callback):
- callback(resp_json)
- else:
return resp_json
- def _request3( self, request, callback = None ) :
+ def _request2(self, request, callback=None, single=False):
+ if callback is None:
+ return self._request2_sync(request,single=single)
+ else:
+ self._request2_async(request, callback, single=single)
+
+
+
+ def _request3_sync( self, request) :
## Build URL
url = self.getUrl(request)
## Send Request Expecting JSONP Response
@@ -718,18 +909,15 @@ class Pubnub(PubnubCore):
response = urllib.request.urlopen(url,timeout=310)
resp_json = json.loads(response.read().decode("utf-8"))
except Exception as e:
- print(e)
return None
- if (callback):
- callback(resp_json)
- else:
- return resp_json
+ return resp_json
- '''
- def _request(self, request, callback = None):
- if self.python_version == 2:
- return self._request2(request,callback)
+ def _request3_async( self, request, callback, single=False ) :
+ pass
+
+ def _request3(self, request, callback=None, single=False):
+ if callback is None:
+ return self._request3_sync(request,single=single)
else:
- return self._request3(request, callback)
- '''
+ self._request3_async(request, callback, single=single)
diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py
index c60690f..f0f9327 100644
--- a/python/unassembled/Platform.py
+++ b/python/unassembled/Platform.py
@@ -3,6 +3,34 @@ try:
except:
import urllib2
+import threading
+import json
+import time
+
+current_req_id = -1
+
+class HTTPClient:
+ def __init__(self, url, callback, id=None):
+ self.url = url
+ self.id = id
+ self.callback = callback
+ self.stop = False
+
+ def cancel(self):
+ self.stop = True
+ self.callback = None
+
+ def run(self):
+ global current_req_id
+ data = urllib2.urlopen(self.url, timeout=310).read()
+ if self.stop is True:
+ return
+ if self.id is not None and current_req_id != self.id:
+ return
+ if self.callback is not None:
+ self.callback(json.loads(data))
+
+
class Pubnub(PubnubCore):
def __init__(
self,
@@ -28,7 +56,33 @@ class Pubnub(PubnubCore):
else:
self._request = self._request3
- def _request2( self, request, callback = None ) :
+ def timeout(self, interval, func):
+ def cb():
+ time.sleep(interval)
+ func()
+ thread = threading.Thread(target=cb)
+ thread.start()
+
+ def _request2_async( self, request, callback, single=False ) :
+ global current_req_id
+ ## Build URL
+ url = self.getUrl(request)
+ if single is True:
+ id = time.time()
+ client = HTTPClient(url, callback, id)
+ current_req_id = id
+ else:
+ client = HTTPClient(url, callback)
+
+ thread = threading.Thread(target=client.run)
+ thread.start()
+ def abort():
+ client.cancel();
+ return abort
+
+
+ def _request2_sync( self, request) :
+
## Build URL
url = self.getUrl(request)
@@ -42,13 +96,18 @@ class Pubnub(PubnubCore):
except:
return None
- if (callback):
- callback(resp_json)
- else:
return resp_json
- def _request3( self, request, callback = None ) :
+ def _request2(self, request, callback=None, single=False):
+ if callback is None:
+ return self._request2_sync(request,single=single)
+ else:
+ self._request2_async(request, callback, single=single)
+
+
+
+ def _request3_sync( self, request) :
## Build URL
url = self.getUrl(request)
## Send Request Expecting JSONP Response
@@ -56,10 +115,15 @@ class Pubnub(PubnubCore):
response = urllib.request.urlopen(url,timeout=310)
resp_json = json.loads(response.read().decode("utf-8"))
except Exception as e:
- print(e)
return None
- if (callback):
- callback(resp_json)
+ return resp_json
+
+ def _request3_async( self, request, callback, single=False ) :
+ pass
+
+ def _request3(self, request, callback=None, single=False):
+ if callback is None:
+ return self._request3_sync(request,single=single)
else:
- return resp_json
+ self._request3_async(request, callback, single=single)