aboutsummaryrefslogtreecommitdiffstats
path: root/common/PubnubCoreAsync.py
diff options
context:
space:
mode:
authorDevendra2014-04-11 14:49:43 +0530
committerDevendra2014-04-11 14:49:43 +0530
commit99096b8c11b9a541f6350639e8735495cf90091c (patch)
tree446e63037f76cb98d7e3cc0f93316a8bce96e19e /common/PubnubCoreAsync.py
parent765ee5db6fc39d77e55dcf4fe97fb96da2f46d30 (diff)
downloadpubnub-python-99096b8c11b9a541f6350639e8735495cf90091c.tar.bz2
v1 MX and async code for python, twisted, tornado
Diffstat (limited to 'common/PubnubCoreAsync.py')
-rw-r--r--common/PubnubCoreAsync.py149
1 files changed, 103 insertions, 46 deletions
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py
index 0038243..4251d47 100644
--- a/common/PubnubCoreAsync.py
+++ b/common/PubnubCoreAsync.py
@@ -5,6 +5,9 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
+import threading
+from threading import current_thread
+import threading
class PubnubCoreAsync(PubnubBase):
@@ -17,6 +20,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key,
secret_key = False,
cipher_key = False,
+ auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
uuid = None
@@ -43,6 +47,7 @@ class PubnubCoreAsync(PubnubBase):
subscribe_key=subscribe_key,
secret_key=secret_key,
cipher_key=cipher_key,
+ auth_key=auth_key,
ssl_on=ssl_on,
origin=origin,
UUID=uuid
@@ -50,22 +55,36 @@ class PubnubCoreAsync(PubnubBase):
self.subscriptions = {}
self.timetoken = 0
+ self.last_timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
self.SUB_RECEIVER = None
self._connect = None
+ self._tt_lock = threading.RLock()
def get_channel_list(self, channels):
channel = ''
first = True
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ else:
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+
return channel
def subscribe( self, args=None, sync=False ) :
@@ -100,6 +119,26 @@ class PubnubCoreAsync(PubnubBase):
})
"""
+ if args is None:
+ _invoke(error, "Arguments Missing")
+ return
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
+
+ with self._tt_lock:
+ self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
+ self.timetoken = 0
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
if sync is True and self.susbcribe_sync is not None:
self.susbcribe_sync(args)
@@ -113,18 +152,20 @@ class PubnubCoreAsync(PubnubBase):
func()
def _invoke_connect():
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- if chobj['connected'] is False:
- chobj['connected'] = True
- _invoke(chobj['connect'])
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'],chobj['name'])
def _invoke_error(err=None):
for ch in self.subscriptions:
chobj = self.subscriptions[ch]
_invoke(chobj.error,err)
-
+ '''
if callback is None:
_invoke(error, "Callback Missing")
return
@@ -132,6 +173,7 @@ class PubnubCoreAsync(PubnubBase):
if channel is None:
_invoke(error, "Channel Missing")
return
+ '''
def _get_channel():
for ch in self.subscriptions:
@@ -142,22 +184,36 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
+ if self._channel_list_lock:
+ with self._channel_list_lock:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
+ else:
+ self.subscriptions[channel] = {
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
+ }
## return if already connected to channel
- if self.subscriptions[channel]['connected'] :
+ if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
return
+
## SUBSCRIPTION RECURSION
def _connect():
@@ -165,37 +221,37 @@ class PubnubCoreAsync(PubnubBase):
self._reset_offline()
def sub_callback(response):
- print response
## ERROR ?
if not response or error in response:
_invoke_error()
_invoke_connect()
-
- self.timetoken = response[1]
-
- if len(response) > 2:
- channel_list = response[2].split(',')
- response_list = response[0]
- for ch in enumerate(channel_list):
- if ch[1] in self.subscriptions:
- chobj = self.subscriptions[ch[1]]
- _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
- else:
- response_list = response[0]
- chobj = _get_channel()
- for r in response_list:
- if chobj:
- _invoke(chobj['callback'], self.decrypt(r))
-
-
- _connect()
+ with self._tt_lock:
+ #print 'A tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ #print 'B tt : ', self.timetoken , ' last tt : ' , self.last_timetoken
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
+
+ #with self._tt_lock:
+ # self.timetoken = self.last_timetoken if self.timetoken == 0 and self.last_timetoken != 0 else response[1]
+ _connect()
channel_list = self.get_channel_list(self.subscriptions)
- print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
self.SUB_RECEIVER = self._request( { "urlcomponents" : [
@@ -206,6 +262,7 @@ class PubnubCoreAsync(PubnubBase):
str(self.timetoken)
], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
except Exception as e:
+ print e
self.timeout( 1, _connect)
return