aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorDevendra2014-04-02 02:44:29 +0530
committerDevendra2014-04-02 02:44:29 +0530
commit765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 (patch)
treeec6e9e2d102e866ae1b54a43d805607f0c62b8c2 /python
parent9ac3ccf6283772b404a0c80945e3cdf3406ac5bf (diff)
downloadpubnub-python-765ee5db6fc39d77e55dcf4fe97fb96da2f46d30.tar.bz2
multiplexing support
Diffstat (limited to 'python')
-rw-r--r--python/Makefile2
-rw-r--r--python/Pubnub.py326
-rw-r--r--python/unassembled/Platform.py82
3 files changed, 331 insertions, 79 deletions
diff --git a/python/Makefile b/python/Makefile
index 5eb9e2f..b693cf8 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -12,7 +12,7 @@ build:
echo "\n" >> ./Pubnub.py
cat ../common/PubnubBase.py >> ./Pubnub.py
echo "\n" >> ./Pubnub.py
- cat ../common/PubnubCore.py >> ./Pubnub.py
+ cat ../common/PubnubCoreAsync.py >> ./Pubnub.py
echo "\n" >> ./Pubnub.py
cat ./unassembled/Platform.py >> ./Pubnub.py
find -name "Pubnub*py" | xargs sed -i "s/PubNub\ [0-9]\.[0-9]\.[0-9]/PubNub\ $(VERSION)/g"
diff --git a/python/Pubnub.py b/python/Pubnub.py
index 91f67ad..a449c2d 100644
--- a/python/Pubnub.py
+++ b/python/Pubnub.py
@@ -206,13 +206,13 @@ class PubnubBase(object):
pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False )
"""
- self.origin = origin
- self.limit = 1800
- self.publish_key = publish_key
- self.subscribe_key = subscribe_key
- self.secret_key = secret_key
- self.cipher_key = cipher_key
- self.ssl = ssl_on
+ self.origin = origin
+ self.limit = 1800
+ self.publish_key = publish_key
+ self.subscribe_key = subscribe_key
+ self.secret_key = secret_key
+ self.cipher_key = cipher_key
+ self.ssl = ssl_on
if self.ssl :
@@ -261,6 +261,14 @@ class PubnubBase(object):
return message
+ def _return_wrapped_callback(self, callback=None):
+ def _new_format_callback(response):
+ if 'payload' in response:
+ if (callback != None): callback({'message' : response['message'], 'payload' : response['payload']})
+ else:
+ if (callback != None):callback(response)
+ if (callback != None): return _new_format_callback
+
def publish( self, args ) :
"""
@@ -310,7 +318,7 @@ class PubnubBase(object):
channel,
'0',
message
- ]}, callback)
+ ]'urlparams' : {'auth' : self.auth_key}}, self._return_wrapped_callback(callback))
def presence( self, args ) :
"""
@@ -350,7 +358,7 @@ class PubnubBase(object):
callback = args['callback']
subscribe_key = args.get('subscribe_key') or self.subscribe_key
- return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': callback})
+ return self.subscribe({'channel': channel+'-pnpres', 'subscribe_key':subscribe_key, 'callback': self._return_wrapped_callback(callback)})
def here_now( self, args ) :
@@ -543,7 +551,19 @@ class PubnubBase(object):
return url
-class PubnubCore(PubnubBase):
+try:
+ from hashlib import sha256
+ digestmod = sha256
+except ImportError:
+ import Crypto.Hash.SHA256 as digestmod
+ sha256 = digestmod.new
+import hmac
+
+class PubnubCoreAsync(PubnubBase):
+
+ def start(self): pass
+ def stop(self): pass
+
def __init__(
self,
publish_key,
@@ -562,17 +582,16 @@ class PubnubCore(PubnubBase):
#*
#* @param string publish_key required key to send messages.
#* @param string subscribe_key required key to receive messages.
- #* @param string secret_key optional key to sign messages.
+ #* @param string secret_key required key to sign messages.
#* @param boolean ssl required for 2048 bit encrypted messages.
#* @param string origin PUBNUB Server Origin.
- #* @param string pres_uuid optional identifier for presence (auto-generated if not supplied)
#**
## Initiat Class
pubnub = Pubnub( 'PUBLISH-KEY', 'SUBSCRIBE-KEY', 'SECRET-KEY', False )
"""
- super(PubnubCore, self).__init__(
+ super(PubnubCoreAsync, self).__init__(
publish_key=publish_key,
subscribe_key=subscribe_key,
secret_key=secret_key,
@@ -584,20 +603,33 @@ class PubnubCore(PubnubBase):
self.subscriptions = {}
self.timetoken = 0
- self.version = '3.4'
+ self.version = '3.3.4'
self.accept_encoding = 'gzip'
+ self.SUB_RECEIVER = None
+ self._connect = None
+ def get_channel_list(self, channels):
+ channel = ''
+ first = True
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ return channel
-
- def subscribe( self, args ) :
+ def subscribe( self, args=None, sync=False ) :
"""
#**
#* Subscribe
#*
- #* This is BLOCKING.
+ #* This is NON-BLOCKING.
#* Listen for a message on a channel.
#*
- #* @param array args with channel and callback.
+ #* @param array args with channel and message.
#* @return false on fail, array on success.
#**
@@ -606,58 +638,158 @@ class PubnubCore(PubnubBase):
print(message)
return True
+ ## On Connect Callback
+ def connected() :
+ pubnub.publish({
+ 'channel' : 'hello_world',
+ 'message' : { 'some_var' : 'text' }
+ })
+
+ ## Subscribe
pubnub.subscribe({
'channel' : 'hello_world',
- 'callback' : receive
+ 'connect' : connected,
+ 'callback' : receive
})
"""
- ## Fail if missing channel
- if not 'channel' in args :
- raise Exception('Missing Channel.')
- return False
+ if sync is True and self.susbcribe_sync is not None:
+ self.susbcribe_sync(args)
+ return
+
+ def _invoke(func,msg=None):
+ if func is not None:
+ if msg is not None:
+ func(msg)
+ else:
+ func()
+
+ def _invoke_connect():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'])
+
+ def _invoke_error(err=None):
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj.error,err)
+
+
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+
+ def _get_channel():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['subscribed'] is True:
+ return chobj
+
+
+ ## New Channel?
+ if not channel in self.subscriptions:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
- ## Fail if missing callback
- if not 'callback' in args :
- raise Exception('Missing Callback.')
- return False
+ ## return if already connected to channel
+ if self.subscriptions[channel]['connected'] :
+ _invoke(error, "Already Connected")
+ return
+
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- subscribe_key = args.get('subscribe_key') or self.subscribe_key
+ ## SUBSCRIPTION RECURSION
+ def _connect():
+
+ self._reset_offline()
+
+ def sub_callback(response):
+ print response
+ ## ERROR ?
+ if not response or error in response:
+ _invoke_error()
+
+ _invoke_connect()
+
+
+ self.timetoken = response[1]
- ## Begin Subscribe
- while True :
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
- timetoken = 'timetoken' in args and args['timetoken'] or 0
- try :
- ## Wait for Message
- response = self._request({"urlcomponents" : [
+
+ _connect()
+
+
+
+ channel_list = self.get_channel_list(self.subscriptions)
+ print channel_list
+ ## CONNECT TO PUBNUB SUBSCRIBE SERVERS
+ try:
+ self.SUB_RECEIVER = self._request( { "urlcomponents" : [
'subscribe',
- subscribe_key,
- channel,
+ self.subscribe_key,
+ channel_list,
'0',
- str(timetoken)
- ],"urlparams" : {"uuid" : self.uuid }})
+ str(self.timetoken)
+ ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ except Exception as e:
+ self.timeout( 1, _connect)
+ return
+
+ self._connect = _connect
- messages = response[0]
- args['timetoken'] = response[1]
- ## If it was a timeout
- if not len(messages) :
- continue
+ ## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
+ _connect()
- ## Run user Callback and Reconnect if user permits.
- for message in messages :
- if not callback(self.decrypt(message)) :
- return
+ def _reset_offline(self):
+ if self.SUB_RECEIVER is not None:
+ self.SUB_RECEIVER()
+ self.SUB_RECEIVER = None
- except Exception:
- time.sleep(1)
+ def CONNECT(self):
+ self._reset_offline()
+ self._connect()
+
+
+ def unsubscribe( self, args ):
+ #print(args['channel'])
+ channel = str(args['channel'])
+ if not (channel in self.subscriptions):
+ return False
- return True
+ ## DISCONNECT
+ self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
+ self.subscriptions[channel]['timetoken'] = 0
+ self.subscriptions[channel]['first'] = False
+ self.CONNECT()
try:
@@ -665,6 +797,34 @@ try:
except:
import urllib2
+import threading
+import json
+import time
+
+current_req_id = -1
+
+class HTTPClient:
+ def __init__(self, url, callback, id=None):
+ self.url = url
+ self.id = id
+ self.callback = callback
+ self.stop = False
+
+ def cancel(self):
+ self.stop = True
+ self.callback = None
+
+ def run(self):
+ global current_req_id
+ data = urllib2.urlopen(self.url, timeout=310).read()
+ if self.stop is True:
+ return
+ if self.id is not None and current_req_id != self.id:
+ return
+ if self.callback is not None:
+ self.callback(json.loads(data))
+
+
class Pubnub(PubnubCore):
def __init__(
self,
@@ -690,7 +850,33 @@ class Pubnub(PubnubCore):
else:
self._request = self._request3
- def _request2( self, request, callback = None ) :
+ def timeout(self, interval, func):
+ def cb():
+ time.sleep(interval)
+ func()
+ thread = threading.Thread(target=cb)
+ thread.start()
+
+ def _request2_async( self, request, callback, single=False ) :
+ global current_req_id
+ ## Build URL
+ url = self.getUrl(request)
+ if single is True:
+ id = time.time()
+ client = HTTPClient(url, callback, id)
+ current_req_id = id
+ else:
+ client = HTTPClient(url, callback)
+
+ thread = threading.Thread(target=client.run)
+ thread.start()
+ def abort():
+ client.cancel();
+ return abort
+
+
+ def _request2_sync( self, request) :
+
## Build URL
url = self.getUrl(request)
@@ -704,13 +890,18 @@ class Pubnub(PubnubCore):
except:
return None
- if (callback):
- callback(resp_json)
- else:
return resp_json
- def _request3( self, request, callback = None ) :
+ def _request2(self, request, callback=None, single=False):
+ if callback is None:
+ return self._request2_sync(request,single=single)
+ else:
+ self._request2_async(request, callback, single=single)
+
+
+
+ def _request3_sync( self, request) :
## Build URL
url = self.getUrl(request)
## Send Request Expecting JSONP Response
@@ -718,18 +909,15 @@ class Pubnub(PubnubCore):
response = urllib.request.urlopen(url,timeout=310)
resp_json = json.loads(response.read().decode("utf-8"))
except Exception as e:
- print(e)
return None
- if (callback):
- callback(resp_json)
- else:
- return resp_json
+ return resp_json
- '''
- def _request(self, request, callback = None):
- if self.python_version == 2:
- return self._request2(request,callback)
+ def _request3_async( self, request, callback, single=False ) :
+ pass
+
+ def _request3(self, request, callback=None, single=False):
+ if callback is None:
+ return self._request3_sync(request,single=single)
else:
- return self._request3(request, callback)
- '''
+ self._request3_async(request, callback, single=single)
diff --git a/python/unassembled/Platform.py b/python/unassembled/Platform.py
index c60690f..f0f9327 100644
--- a/python/unassembled/Platform.py
+++ b/python/unassembled/Platform.py
@@ -3,6 +3,34 @@ try:
except:
import urllib2
+import threading
+import json
+import time
+
+current_req_id = -1
+
+class HTTPClient:
+ def __init__(self, url, callback, id=None):
+ self.url = url
+ self.id = id
+ self.callback = callback
+ self.stop = False
+
+ def cancel(self):
+ self.stop = True
+ self.callback = None
+
+ def run(self):
+ global current_req_id
+ data = urllib2.urlopen(self.url, timeout=310).read()
+ if self.stop is True:
+ return
+ if self.id is not None and current_req_id != self.id:
+ return
+ if self.callback is not None:
+ self.callback(json.loads(data))
+
+
class Pubnub(PubnubCore):
def __init__(
self,
@@ -28,7 +56,33 @@ class Pubnub(PubnubCore):
else:
self._request = self._request3
- def _request2( self, request, callback = None ) :
+ def timeout(self, interval, func):
+ def cb():
+ time.sleep(interval)
+ func()
+ thread = threading.Thread(target=cb)
+ thread.start()
+
+ def _request2_async( self, request, callback, single=False ) :
+ global current_req_id
+ ## Build URL
+ url = self.getUrl(request)
+ if single is True:
+ id = time.time()
+ client = HTTPClient(url, callback, id)
+ current_req_id = id
+ else:
+ client = HTTPClient(url, callback)
+
+ thread = threading.Thread(target=client.run)
+ thread.start()
+ def abort():
+ client.cancel();
+ return abort
+
+
+ def _request2_sync( self, request) :
+
## Build URL
url = self.getUrl(request)
@@ -42,13 +96,18 @@ class Pubnub(PubnubCore):
except:
return None
- if (callback):
- callback(resp_json)
- else:
return resp_json
- def _request3( self, request, callback = None ) :
+ def _request2(self, request, callback=None, single=False):
+ if callback is None:
+ return self._request2_sync(request,single=single)
+ else:
+ self._request2_async(request, callback, single=single)
+
+
+
+ def _request3_sync( self, request) :
## Build URL
url = self.getUrl(request)
## Send Request Expecting JSONP Response
@@ -56,10 +115,15 @@ class Pubnub(PubnubCore):
response = urllib.request.urlopen(url,timeout=310)
resp_json = json.loads(response.read().decode("utf-8"))
except Exception as e:
- print(e)
return None
- if (callback):
- callback(resp_json)
+ return resp_json
+
+ def _request3_async( self, request, callback, single=False ) :
+ pass
+
+ def _request3(self, request, callback=None, single=False):
+ if callback is None:
+ return self._request3_sync(request,single=single)
else:
- return resp_json
+ self._request3_async(request, callback, single=single)