aboutsummaryrefslogtreecommitdiffstats
path: root/python-tornado
diff options
context:
space:
mode:
authorDevendra2014-04-02 02:44:29 +0530
committerDevendra2014-04-02 02:44:29 +0530
commit765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 (patch)
treeec6e9e2d102e866ae1b54a43d805607f0c62b8c2 /python-tornado
parent9ac3ccf6283772b404a0c80945e3cdf3406ac5bf (diff)
downloadpubnub-python-765ee5db6fc39d77e55dcf4fe97fb96da2f46d30.tar.bz2
multiplexing support
Diffstat (limited to 'python-tornado')
-rw-r--r--python-tornado/Pubnub.py329
-rw-r--r--python-tornado/unassembled/Platform.py28
2 files changed, 268 insertions, 89 deletions
diff --git a/python-tornado/Pubnub.py b/python-tornado/Pubnub.py
index 89c0d97..ee66619 100644
--- a/python-tornado/Pubnub.py
+++ b/python-tornado/Pubnub.py
@@ -16,7 +16,7 @@ from base64 import encodestring, decodestring
import hashlib
import hmac
-class PubnubCrypto() :
+class PubnubCrypto2() :
"""
#**
#* PubnubCrypto
@@ -93,13 +93,89 @@ class PubnubCrypto() :
return self.depad((cipher.decrypt(decodestring(msg))))
+class PubnubCrypto3() :
+ """
+ #**
+ #* PubnubCrypto
+ #*
+ #**
+
+ ## Initiate Class
+ pc = PubnubCrypto
+
+ """
+
+ def pad( self, msg, block_size=16 ):
+ """
+ #**
+ #* pad
+ #*
+ #* pad the text to be encrypted
+ #* appends a padding character to the end of the String
+ #* until the string has block_size length
+ #* @return msg with padding.
+ #**
+ """
+ padding = block_size - (len(msg) % block_size)
+ return msg + (chr(padding)*padding).encode('utf-8')
+
+ def depad( self, msg ):
+ """
+ #**
+ #* depad
+ #*
+ #* depad the decryptet message"
+ #* @return msg without padding.
+ #**
+ """
+ return msg[0:-ord(msg[-1])]
+
+ def getSecret( self, key ):
+ """
+ #**
+ #* getSecret
+ #*
+ #* hases the key to MD5
+ #* @return key in MD5 format
+ #**
+ """
+ return hashlib.sha256(key.encode("utf-8")).hexdigest()
+
+ def encrypt( self, key, msg ):
+ """
+ #**
+ #* encrypt
+ #*
+ #* encrypts the message
+ #* @return message in encrypted format
+ #**
+ """
+ secret = self.getSecret(key)
+ Initial16bytes='0123456789012345'
+ cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes)
+ return encodestring(cipher.encrypt(self.pad(msg.encode('utf-8')))).decode('utf-8')
+ def decrypt( self, key, msg ):
+ """
+ #**
+ #* decrypt
+ #*
+ #* decrypts the message
+ #* @return message in decryped format
+ #**
+ """
+ secret = self.getSecret(key)
+ Initial16bytes='0123456789012345'
+ cipher = AES.new(secret[0:32],AES.MODE_CBC,Initial16bytes)
+ return (cipher.decrypt(decodestring(msg.encode('utf-8')))).decode('utf-8')
+
+
try: import json
except ImportError: import simplejson as json
import time
import hashlib
-import urllib2
-import uuid
+import uuid
+import sys
class PubnubBase(object):
def __init__(
@@ -137,7 +213,7 @@ class PubnubBase(object):
self.secret_key = secret_key
self.cipher_key = cipher_key
self.ssl = ssl_on
- self.pc = PubnubCrypto()
+
if self.ssl :
self.origin = 'https://' + self.origin
@@ -145,8 +221,16 @@ class PubnubBase(object):
self.origin = 'http://' + self.origin
self.uuid = UUID or str(uuid.uuid4())
+
+ if type(sys.version_info) is tuple:
+ self.python_version = 2
+ self.pc = PubnubCrypto2()
+ else:
+ self.python_version = 3
+ self.pc = PubnubCrypto3()
+
- if not isinstance(self.uuid, basestring):
+ if not isinstance(self.uuid, str):
raise AttributeError("pres_uuid must be a string")
def sign(self, channel, message):
@@ -177,6 +261,14 @@ class PubnubBase(object):
return message
+ def _return_wrapped_callback(self, callback=None):
+ def _new_format_callback(response):
+ if 'payload' in response:
+ if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
+ else:
+ if (callback != None):callback(response)
+ if (callback != None): return _new_format_callback
+
def publish( self, args ) :
"""
@@ -207,7 +299,7 @@ class PubnubBase(object):
channel = str(args['channel'])
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -226,7 +318,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]}, callback)
+ ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -266,7 +358,7 @@ class PubnubBase(object):
callback = args['callback']
subscribe_key = args.get('subscribe_key') or self.subscribe_key
- return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback})
+ return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})
def here_now( self, args ) :
@@ -291,7 +383,7 @@ class PubnubBase(object):
channel = str(args['channel'])
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -329,7 +421,7 @@ class PubnubBase(object):
"""
## Capture User Input
- limit = args.has_key('limit') and int(args['limit']) or 10
+ limit = 'limit' in args and int(args['limit']) or 10
channel = str(args['channel'])
## Fail if bad input.
@@ -338,7 +430,7 @@ class PubnubBase(object):
return False
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -377,18 +469,18 @@ class PubnubBase(object):
params = dict()
count = 100
- if args.has_key('count'):
+ if 'count' in args:
count = int(args['count'])
params['count'] = str(count)
- if args.has_key('reverse'):
+ if 'reverse' in args:
params['reverse'] = str(args['reverse']).lower()
- if args.has_key('start'):
+ if 'start' in args:
params['start'] = str(args['start'])
- if args.has_key('end'):
+ if 'end' in args:
params['end'] = str(args['end'])
## Fail if bad input.
@@ -397,7 +489,7 @@ class PubnubBase(object):
return False
## Capture Callback
- if args.has_key('callback') :
+ if 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -428,7 +520,7 @@ class PubnubBase(object):
"""
## Capture Callback
- if args and args.has_key('callback') :
+ if args and 'callback' in args :
callback = args['callback']
else :
callback = None
@@ -454,8 +546,8 @@ class PubnubBase(object):
hex(ord(ch)).replace( '0x', '%' ).upper() or
ch for ch in list(bit)
]) for bit in request["urlcomponents"]])
- if (request.has_key("urlparams")):
- url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].iteritems()])
+ if ("urlparams" in request):
+ url = url + '?' + "&".join([ x + "=" + y for x,y in request["urlparams"].items()])
return url
@@ -471,8 +563,6 @@ class PubnubCoreAsync(PubnubBase):
def start(self): pass
def stop(self): pass
- def timeout( self, delay, callback ):
- pass
def __init__(
self,
@@ -515,8 +605,23 @@ class PubnubCoreAsync(PubnubBase):
self.timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
-
- def subscribe( self, args ) :
+ self.SUB_RECEIVER = None
+ self._connect = None
+
+ def get_channel_list(self, channels):
+ channel = ''
+ first = True
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ return channel
+
+ def subscribe( self, args=None, sync=False ) :
"""
#**
#* Subscribe
@@ -548,97 +653,143 @@ class PubnubCoreAsync(PubnubBase):
})
"""
- ## Fail if missing channel
- if not 'channel' in args :
- return 'Missing Channel.'
- ## Fail if missing callback
- if not 'callback' in args :
- return 'Missing Callback.'
+ if sync is True and self.susbcribe_sync is not None:
+ self.susbcribe_sync(args)
+ return
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- connectcb = args['connect']
+ def _invoke(func,msg=None):
+ if func is not None:
+ if msg is not None:
+ func(msg)
+ else:
+ func()
+
+ def _invoke_connect():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'])
+
+ def _invoke_error(err=None):
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj.error,err)
+
+
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+
+ def _get_channel():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['subscribed'] is True:
+ return chobj
- if 'errorback' in args:
- errorback = args['errorback']
- else:
- errorback = lambda x: x
## New Channel?
- if not (channel in self.subscriptions) :
+ if not channel in self.subscriptions:
self.subscriptions[channel] = {
- 'first' : False,
- 'connected' : False,
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
}
- ## Ensure Single Connection
+ ## return if already connected to channel
if self.subscriptions[channel]['connected'] :
- return "Already Connected"
+ _invoke(error, "Already Connected")
+ return
+
- self.subscriptions[channel]['connected'] = 1
## SUBSCRIPTION RECURSION
- def _subscribe():
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ def _connect():
+ self._reset_offline()
+
def sub_callback(response):
- if not self.subscriptions[channel]['first'] :
- self.subscriptions[channel]['first'] = True
- connectcb()
+ print response
+ ## ERROR ?
+ if not response or error in response:
+ _invoke_error()
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ _invoke_connect()
+ self.timetoken = response[1]
- ## PROBLEM?
- if not response:
- def time_callback(_time):
- if not _time:
- self.timeout( 1, _subscribe )
- return errorback("Lost Network Connection")
- else:
- self.timeout( 1, _subscribe)
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
- ## ENSURE CONNECTED (Call Time Function)
- return self.time({ 'callback' : time_callback })
- self.timetoken = response[1]
- _subscribe()
+ _connect()
+
- pc = PubnubCrypto()
- out = []
- for message in response[0]:
- callback(self.decrypt(message))
+ channel_list = self.get_channel_list(self.subscriptions)
+ print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
- self._request( { "urlcomponents" : [
+ self.SUB_RECEIVER = self._request( { "urlcomponents" : [
'subscribe',
self.subscribe_key,
- channel,
+ channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback )
- except :
- self.timeout( 1, _subscribe)
+ ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ except Exception as e:
+ self.timeout( 1, _connect)
return
+ self._connect = _connect
+
+
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
- _subscribe()
+ _connect()
+
+ def _reset_offline(self):
+ if self.SUB_RECEIVER is not None:
+ self.SUB_RECEIVER()
+ self.SUB_RECEIVER = None
+
+ def CONNECT(self):
+ self._reset_offline()
+ self._connect()
+
+
def unsubscribe( self, args ):
+ #print(args['channel'])
channel = str(args['channel'])
if not (channel in self.subscriptions):
return False
## DISCONNECT
self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
self.subscriptions[channel]['timetoken'] = 0
self.subscriptions[channel]['first'] = False
+ self.CONNECT()
import tornado.httpclient
@@ -685,15 +836,24 @@ class Pubnub(PubnubCoreAsync):
self.headers['Accept-Encoding'] = self.accept_encoding
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
+ self.id = None
- def _request( self, request, callback ) :
+ def _request( self, request, callback, single=False ) :
url = self.getUrl(request)
- ## Send Request Expecting JSON Response
- #print self.headers
-
request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )
+ if single is True:
+ id = time.time()
+ self.id = id
def responseCallback(response):
+ if single is True:
+ if not id == self.id:
+ return None
+
+ body = response._get_body()
+ if body is None:
+ return
+
def handle_exc(*args):
return True
if response.error is not None:
@@ -701,9 +861,14 @@ class Pubnub(PubnubCoreAsync):
response.rethrow()
elif callback:
callback(eval(response._get_body()))
-
+
self.http.fetch(
- request,
- callback=responseCallback,
+ request=request,
+ callback=responseCallback
)
+ def abort():
+ pass
+
+ return abort
+
diff --git a/python-tornado/unassembled/Platform.py b/python-tornado/unassembled/Platform.py
index 62d3a26..f98befb 100644
--- a/python-tornado/unassembled/Platform.py
+++ b/python-tornado/unassembled/Platform.py
@@ -42,15 +42,24 @@ class Pubnub(PubnubCoreAsync):
self.headers['Accept-Encoding'] = self.accept_encoding
self.headers['V'] = self.version
self.http = tornado.httpclient.AsyncHTTPClient(max_clients=1000)
+ self.id = None
- def _request( self, request, callback ) :
+ def _request( self, request, callback, single=False ) :
url = self.getUrl(request)
- ## Send Request Expecting JSON Response
- #print self.headers
-
request = tornado.httpclient.HTTPRequest( url, 'GET', self.headers, connect_timeout=10, request_timeout=310 )
+ if single is True:
+ id = time.time()
+ self.id = id
def responseCallback(response):
+ if single is True:
+ if not id == self.id:
+ return None
+
+ body = response._get_body()
+ if body is None:
+ return
+
def handle_exc(*args):
return True
if response.error is not None:
@@ -58,9 +67,14 @@ class Pubnub(PubnubCoreAsync):
response.rethrow()
elif callback:
callback(eval(response._get_body()))
-
+
self.http.fetch(
- request,
- callback=responseCallback,
+ request=request,
+ callback=responseCallback
)
+ def abort():
+ pass
+
+ return abort
+