aboutsummaryrefslogtreecommitdiffstats
path: root/common/PubnubCoreAsync.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/PubnubCoreAsync.py')
-rw-r--r--common/PubnubCoreAsync.py125
1 files changed, 65 insertions, 60 deletions
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py
index 4251d47..f7b57cc 100644
--- a/common/PubnubCoreAsync.py
+++ b/common/PubnubCoreAsync.py
@@ -5,9 +5,14 @@ except ImportError:
import Crypto.Hash.SHA256 as digestmod
sha256 = digestmod.new
import hmac
-import threading
-from threading import current_thread
-import threading
+
+class EmptyLock():
+ def __enter__(self):
+ pass
+ def __exit__(self,a,b,c):
+ pass
+
+empty_lock = EmptyLock()
class PubnubCoreAsync(PubnubBase):
@@ -23,7 +28,9 @@ class PubnubCoreAsync(PubnubBase):
auth_key = None,
ssl_on = False,
origin = 'pubsub.pubnub.com',
- uuid = None
+ uuid = None,
+ _tt_lock=empty_lock,
+ _channel_list_lock=empty_lock
) :
"""
#**
@@ -53,29 +60,20 @@ class PubnubCoreAsync(PubnubBase):
UUID=uuid
)
- self.subscriptions = {}
- self.timetoken = 0
- self.last_timetoken = 0
- self.version = '3.3.4'
- self.accept_encoding = 'gzip'
- self.SUB_RECEIVER = None
- self._connect = None
- self._tt_lock = threading.RLock()
+ self.subscriptions = {}
+ self.timetoken = 0
+ self.last_timetoken = 0
+ self.version = '3.3.4'
+ self.accept_encoding = 'gzip'
+ self.SUB_RECEIVER = None
+ self._connect = None
+ self._tt_lock = _tt_lock
+ self._channel_list_lock = _channel_list_lock
def get_channel_list(self, channels):
channel = ''
first = True
- if self._channel_list_lock:
- with self._channel_list_lock:
- for ch in channels:
- if not channels[ch]['subscribed']:
- continue
- if not first:
- channel += ','
- else:
- first = False
- channel += ch
- else:
+ with self._channel_list_lock:
for ch in channels:
if not channels[ch]['subscribed']:
continue
@@ -84,9 +82,15 @@ class PubnubCoreAsync(PubnubBase):
else:
first = False
channel += ch
-
return channel
+
+ def each(l, func):
+ if func is None:
+ return
+ for i in l:
+ func(i)
+
def subscribe( self, args=None, sync=False ) :
"""
#**
@@ -122,12 +126,12 @@ class PubnubCoreAsync(PubnubBase):
if args is None:
_invoke(error, "Arguments Missing")
return
- channel = args['channel'] if 'channel' in args else None
- callback = args['callback'] if 'callback' in args else None
- connect = args['connect'] if 'connect' in args else None
- disconnect = args['disconnect'] if 'disconnect' in args else None
- reconnect = args['reconnect'] if 'reconnect' in args else None
- error = args['error'] if 'error' in args else None
+ channel = args['channel'] if 'channel' in args else None
+ callback = args['callback'] if 'callback' in args else None
+ connect = args['connect'] if 'connect' in args else None
+ disconnect = args['disconnect'] if 'disconnect' in args else None
+ reconnect = args['reconnect'] if 'reconnect' in args else None
+ error = args['error'] if 'error' in args else None
with self._tt_lock:
self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken
@@ -160,10 +164,15 @@ class PubnubCoreAsync(PubnubBase):
chobj['connected'] = True
_invoke(chobj['connect'],chobj['name'])
- def _invoke_error(err=None):
- for ch in self.subscriptions:
- chobj = self.subscriptions[ch]
- _invoke(chobj.error,err)
+ def _invoke_error(channel_list=None, err=None):
+ if channel_list is None:
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
+ else:
+ for ch in channel_list:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj['error'],err)
'''
if callback is None:
@@ -184,19 +193,7 @@ class PubnubCoreAsync(PubnubBase):
## New Channel?
if not channel in self.subscriptions:
- if self._channel_list_lock:
- with self._channel_list_lock:
- self.subscriptions[channel] = {
- 'name' : channel,
- 'first' : False,
- 'connected' : False,
- 'subscribed' : True,
- 'callback' : callback,
- 'connect' : connect,
- 'disconnect' : disconnect,
- 'reconnect' : reconnect
- }
- else:
+ with self._channel_list_lock:
self.subscriptions[channel] = {
'name' : channel,
'first' : False,
@@ -205,9 +202,11 @@ class PubnubCoreAsync(PubnubBase):
'callback' : callback,
'connect' : connect,
'disconnect' : disconnect,
- 'reconnect' : reconnect
+ 'reconnect' : reconnect,
+ 'error' : error
}
+
## return if already connected to channel
if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:
_invoke(error, "Already Connected")
@@ -222,8 +221,11 @@ class PubnubCoreAsync(PubnubBase):
def sub_callback(response):
## ERROR ?
- if not response or error in response:
- _invoke_error()
+ #print response
+ if not response or ('message' in response and response['message'] == 'Forbidden'):
+ _invoke_error(response['payload']['channels'], response['message'])
+ _connect()
+ return
_invoke_connect()
@@ -250,7 +252,6 @@ class PubnubCoreAsync(PubnubBase):
_connect()
-
channel_list = self.get_channel_list(self.subscriptions)
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
@@ -260,9 +261,9 @@ class PubnubCoreAsync(PubnubBase):
channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )
except Exception as e:
- print e
+ print(e)
self.timeout( 1, _connect)
return
@@ -283,14 +284,18 @@ class PubnubCoreAsync(PubnubBase):
def unsubscribe( self, args ):
- #print(args['channel'])
- channel = str(args['channel'])
- if not (channel in self.subscriptions):
+
+ if 'channel' in self.subscriptions is False:
return False
+ channel = str(args['channel'])
+
+
## DISCONNECT
- self.subscriptions[channel]['connected'] = 0
- self.subscriptions[channel]['subscribed'] = False
- self.subscriptions[channel]['timetoken'] = 0
- self.subscriptions[channel]['first'] = False
+ with self._channel_list_lock:
+ if channel in self.subscriptions:
+ self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
+ self.subscriptions[channel]['timetoken'] = 0
+ self.subscriptions[channel]['first'] = False
self.CONNECT()