aboutsummaryrefslogtreecommitdiffstats
path: root/common/PubnubCoreAsync.py
diff options
context:
space:
mode:
Diffstat (limited to 'common/PubnubCoreAsync.py')
-rw-r--r--common/PubnubCoreAsync.py171
1 files changed, 115 insertions, 56 deletions
diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py
index a7fbb7d..0038243 100644
--- a/common/PubnubCoreAsync.py
+++ b/common/PubnubCoreAsync.py
@@ -10,8 +10,6 @@ class PubnubCoreAsync(PubnubBase):
def start(self): pass
def stop(self): pass
- def timeout( self, delay, callback ):
- pass
def __init__(
self,
@@ -54,8 +52,23 @@ class PubnubCoreAsync(PubnubBase):
self.timetoken = 0
self.version = '3.3.4'
self.accept_encoding = 'gzip'
-
- def subscribe( self, args ) :
+ self.SUB_RECEIVER = None
+ self._connect = None
+
+ def get_channel_list(self, channels):
+ channel = ''
+ first = True
+ for ch in channels:
+ if not channels[ch]['subscribed']:
+ continue
+ if not first:
+ channel += ','
+ else:
+ first = False
+ channel += ch
+ return channel
+
+ def subscribe( self, args=None, sync=False ) :
"""
#**
#* Subscribe
@@ -87,94 +100,140 @@ class PubnubCoreAsync(PubnubBase):
})
"""
- ## Fail if missing channel
- if not 'channel' in args :
- return 'Missing Channel.'
- ## Fail if missing callback
- if not 'callback' in args :
- return 'Missing Callback.'
+ if sync is True and self.susbcribe_sync is not None:
+ self.susbcribe_sync(args)
+ return
+
+ def _invoke(func,msg=None):
+ if func is not None:
+ if msg is not None:
+ func(msg)
+ else:
+ func()
+
+ def _invoke_connect():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['connected'] is False:
+ chobj['connected'] = True
+ _invoke(chobj['connect'])
+
+ def _invoke_error(err=None):
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ _invoke(chobj.error,err)
+
- ## Capture User Input
- channel = str(args['channel'])
- callback = args['callback']
- connectcb = args['connect']
+ if callback is None:
+ _invoke(error, "Callback Missing")
+ return
+
+ if channel is None:
+ _invoke(error, "Channel Missing")
+ return
+
+ def _get_channel():
+ for ch in self.subscriptions:
+ chobj = self.subscriptions[ch]
+ if chobj['subscribed'] is True:
+ return chobj
- if 'errorback' in args:
- errorback = args['errorback']
- else:
- errorback = lambda x: x
## New Channel?
- if not (channel in self.subscriptions) :
+ if not channel in self.subscriptions:
self.subscriptions[channel] = {
- 'first' : False,
- 'connected' : False,
+ 'name' : channel,
+ 'first' : False,
+ 'connected' : False,
+ 'subscribed' : True,
+ 'callback' : callback,
+ 'connect' : connect,
+ 'disconnect' : disconnect,
+ 'reconnect' : reconnect
}
- ## Ensure Single Connection
+ ## return if already connected to channel
if self.subscriptions[channel]['connected'] :
- return "Already Connected"
+ _invoke(error, "Already Connected")
+ return
+
- self.subscriptions[channel]['connected'] = 1
## SUBSCRIPTION RECURSION
- def _subscribe():
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ def _connect():
+ self._reset_offline()
+
def sub_callback(response):
- if not self.subscriptions[channel]['first'] :
- self.subscriptions[channel]['first'] = True
- connectcb()
+ print response
+ ## ERROR ?
+ if not response or error in response:
+ _invoke_error()
- ## STOP CONNECTION?
- if not self.subscriptions[channel]['connected']:
- return
+ _invoke_connect()
+ self.timetoken = response[1]
- ## PROBLEM?
- if not response:
- def time_callback(_time):
- if not _time:
- self.timeout( 1, _subscribe )
- return errorback("Lost Network Connection")
- else:
- self.timeout( 1, _subscribe)
+ if len(response) > 2:
+ channel_list = response[2].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],self.decrypt(response_list[ch[0]]))
+ else:
+ response_list = response[0]
+ chobj = _get_channel()
+ for r in response_list:
+ if chobj:
+ _invoke(chobj['callback'], self.decrypt(r))
- ## ENSURE CONNECTED (Call Time Function)
- return self.time({ 'callback' : time_callback })
- self.timetoken = response[1]
- _subscribe()
+ _connect()
+
- pc = PubnubCrypto()
- out = []
- for message in response[0]:
- callback(self.decrypt(message))
+ channel_list = self.get_channel_list(self.subscriptions)
+ print channel_list
## CONNECT TO PUBNUB SUBSCRIBE SERVERS
try:
- self._request( { "urlcomponents" : [
+ self.SUB_RECEIVER = self._request( { "urlcomponents" : [
'subscribe',
self.subscribe_key,
- channel,
+ channel_list,
'0',
str(self.timetoken)
- ], "urlparams" : {"uuid":self.uuid} }, sub_callback )
- except :
- self.timeout( 1, _subscribe)
+ ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True )
+ except Exception as e:
+ self.timeout( 1, _connect)
return
+ self._connect = _connect
+
+
## BEGIN SUBSCRIPTION (LISTEN FOR MESSAGES)
- _subscribe()
+ _connect()
+
+ def _reset_offline(self):
+ if self.SUB_RECEIVER is not None:
+ self.SUB_RECEIVER()
+ self.SUB_RECEIVER = None
+
+ def CONNECT(self):
+ self._reset_offline()
+ self._connect()
+
+
def unsubscribe( self, args ):
+ #print(args['channel'])
channel = str(args['channel'])
if not (channel in self.subscriptions):
return False
## DISCONNECT
self.subscriptions[channel]['connected'] = 0
+ self.subscriptions[channel]['subscribed'] = False
self.subscriptions[channel]['timetoken'] = 0
self.subscriptions[channel]['first'] = False
+ self.CONNECT()