diff options
Diffstat (limited to 'common/PubnubCoreAsync.py')
| -rw-r--r-- | common/PubnubCoreAsync.py | 125 | 
1 files changed, 65 insertions, 60 deletions
| diff --git a/common/PubnubCoreAsync.py b/common/PubnubCoreAsync.py index 4251d47..f7b57cc 100644 --- a/common/PubnubCoreAsync.py +++ b/common/PubnubCoreAsync.py @@ -5,9 +5,14 @@ except ImportError:      import Crypto.Hash.SHA256 as digestmod      sha256 = digestmod.new  import hmac -import threading -from threading import current_thread -import threading + +class EmptyLock(): +    def __enter__(self): +        pass +    def __exit__(self,a,b,c): +        pass + +empty_lock = EmptyLock()  class PubnubCoreAsync(PubnubBase): @@ -23,7 +28,9 @@ class PubnubCoreAsync(PubnubBase):          auth_key = None,          ssl_on = False,          origin = 'pubsub.pubnub.com', -        uuid = None +        uuid = None, +        _tt_lock=empty_lock, +        _channel_list_lock=empty_lock      ) :          """          #** @@ -53,29 +60,20 @@ class PubnubCoreAsync(PubnubBase):              UUID=uuid          )         -        self.subscriptions = {} -        self.timetoken     = 0 -        self.last_timetoken = 0 -        self.version       = '3.3.4' -        self.accept_encoding = 'gzip' -        self.SUB_RECEIVER  = None -        self._connect    = None -        self._tt_lock    = threading.RLock() +        self.subscriptions              = {} +        self.timetoken                  = 0 +        self.last_timetoken             = 0 +        self.version                    = '3.3.4' +        self.accept_encoding            = 'gzip' +        self.SUB_RECEIVER               = None +        self._connect                   = None +        self._tt_lock                   = _tt_lock +        self._channel_list_lock         = _channel_list_lock      def get_channel_list(self, channels):          channel = ''          first = True -        if self._channel_list_lock: -            with self._channel_list_lock: -                for ch in channels: -                    if not channels[ch]['subscribed']: -                        continue -                    if not first: -                        channel += ',' -                    else: -                        first = False -                    channel += ch -        else: +        with self._channel_list_lock:              for ch in channels:                  if not channels[ch]['subscribed']:                      continue @@ -84,9 +82,15 @@ class PubnubCoreAsync(PubnubBase):                  else:                      first = False                  channel += ch -          return channel + +    def each(l, func): +        if func is None: +            return +        for i in l: +            func(i) +      def subscribe( self, args=None, sync=False ) :          """          #** @@ -122,12 +126,12 @@ class PubnubCoreAsync(PubnubBase):          if args is None:              _invoke(error, "Arguments Missing")              return -        channel         = args['channel'] if 'channel' in args else None -        callback        = args['callback'] if 'callback' in args else None -        connect         = args['connect'] if 'connect' in args else None -        disconnect      = args['disconnect'] if 'disconnect' in args else None -        reconnect       = args['reconnect'] if 'reconnect' in args else None -        error           = args['error'] if 'error' in args else None +        channel         = args['channel']       if 'channel'    in args else None +        callback        = args['callback']      if 'callback'   in args else None +        connect         = args['connect']       if 'connect'    in args else None +        disconnect      = args['disconnect']    if 'disconnect' in args else None +        reconnect       = args['reconnect']     if 'reconnect'  in args else None +        error           = args['error']         if 'error'      in args else None          with self._tt_lock:              self.last_timetoken = self.timetoken if self.timetoken != 0 else self.last_timetoken @@ -160,10 +164,15 @@ class PubnubCoreAsync(PubnubBase):                              chobj['connected'] = True                              _invoke(chobj['connect'],chobj['name']) -        def _invoke_error(err=None): -            for ch in self.subscriptions: -                chobj = self.subscriptions[ch] -                _invoke(chobj.error,err) +        def _invoke_error(channel_list=None, err=None): +            if channel_list is None: +                for ch in self.subscriptions: +                    chobj = self.subscriptions[ch] +                    _invoke(chobj['error'],err) +            else: +                for ch in channel_list: +                    chobj = self.subscriptions[ch] +                    _invoke(chobj['error'],err)          '''          if callback is None: @@ -184,19 +193,7 @@ class PubnubCoreAsync(PubnubBase):          ## New Channel?          if not channel in self.subscriptions: -            if self._channel_list_lock: -                with self._channel_list_lock: -                    self.subscriptions[channel] = { -                        'name'          : channel, -                        'first'         : False, -                        'connected'     : False, -                        'subscribed'    : True, -                        'callback'      : callback, -                        'connect'       : connect, -                        'disconnect'    : disconnect, -                        'reconnect'     : reconnect -                    } -            else: +            with self._channel_list_lock:                  self.subscriptions[channel] = {                      'name'          : channel,                      'first'         : False, @@ -205,9 +202,11 @@ class PubnubCoreAsync(PubnubBase):                      'callback'      : callback,                      'connect'       : connect,                      'disconnect'    : disconnect, -                    'reconnect'     : reconnect +                    'reconnect'     : reconnect, +                    'error'         : error                  } +          ## return if already connected to channel          if channel in self.subscriptions and 'connected' in self.subscriptions[channel] and self.subscriptions[channel]['connected'] is True:              _invoke(error, "Already Connected") @@ -222,8 +221,11 @@ class PubnubCoreAsync(PubnubBase):              def sub_callback(response):                  ## ERROR ? -                if not response or error in response: -                    _invoke_error() +                #print response +                if not response or ('message' in response and response['message'] == 'Forbidden'): +                    _invoke_error(response['payload']['channels'], response['message']) +                    _connect() +                    return                  _invoke_connect() @@ -250,7 +252,6 @@ class PubnubCoreAsync(PubnubBase):                      _connect() -              channel_list = self.get_channel_list(self.subscriptions)              ## CONNECT TO PUBNUB SUBSCRIBE SERVERS              try: @@ -260,9 +261,9 @@ class PubnubCoreAsync(PubnubBase):                      channel_list,                      '0',                      str(self.timetoken) -                ], "urlparams" : {"uuid":self.uuid} }, sub_callback, single=True ) +                ], "urlparams" : {"uuid":self.uuid, "auth" : self.auth_key} }, sub_callback, sub_callback, single=True )              except Exception as e: -                print e +                print(e)                  self.timeout( 1, _connect)                  return @@ -283,14 +284,18 @@ class PubnubCoreAsync(PubnubBase):      def unsubscribe( self, args ): -        #print(args['channel']) -        channel = str(args['channel']) -        if not (channel in self.subscriptions): + +        if 'channel' in self.subscriptions is False:              return False +        channel = str(args['channel']) + +          ## DISCONNECT -        self.subscriptions[channel]['connected'] = 0 -        self.subscriptions[channel]['subscribed'] = False -        self.subscriptions[channel]['timetoken'] = 0 -        self.subscriptions[channel]['first']     = False +        with self._channel_list_lock: +            if channel in self.subscriptions: +                self.subscriptions[channel]['connected']    = 0 +                self.subscriptions[channel]['subscribed']   = False +                self.subscriptions[channel]['timetoken']    = 0 +                self.subscriptions[channel]['first']        = False          self.CONNECT() | 
