diff options
| author | Madison Smith | 2014-07-18 14:01:21 -0700 | 
|---|---|---|
| committer | Madison Smith | 2014-07-18 14:01:21 -0700 | 
| commit | 359b831b3eed191159e79e943d77077bc112a7cf (patch) | |
| tree | 8f223a092ad5c0b53a5ff18924f02dc493c76760 | |
| parent | 1ecaa93e2ec929ca8ebc185dbdde46d649da7534 (diff) | |
| download | pubnub-python-359b831b3eed191159e79e943d77077bc112a7cf.tar.bz2 | |
init
| -rw-r--r-- | Pubnub.py | 285 | 
1 files changed, 261 insertions, 24 deletions
| @@ -1,4 +1,3 @@ -  ## www.pubnub.com - PubNub Real-time push service in the cloud.  # coding=utf8 @@ -227,6 +226,17 @@ class PubnubBase(object):          self.ssl = ssl_on          self.auth_key = auth_key +        self.ds_obj = {} +        self.ds_temp = [] +        self.ds_timetoken = 0 +        self.ds_transaction = {} +        self.ds_action_list = [] +        self.ds_delete = '' +        self.ds_location = '' +        self.ds_set = False +        self.ready_callback = None +        self.error_callback = None +          if self.ssl:              self.origin = 'https://' + self.origin          else: @@ -299,7 +309,7 @@ class PubnubBase(object):          return auth_key      def grant(self, channel=None, auth_key=False, read=True, -              write=True, ttl=5, callback=None, error=None): +              write=True, ttl=5, callback=None, error=None, object_id=None):          """Method for granting permissions.          This function establishes subscribe and/or write permissions for @@ -377,10 +387,11 @@ class PubnubBase(object):              'r'         : read and 1 or 0,              'w'         : write and 1 or 0,              'ttl'       : ttl, -            'pnsdk'     : self.pnsdk +            'pnsdk'     : self.pnsdk, +            'obj-id'    : object_id          }, callback=callback, error=error) -    def revoke(self, channel=None, auth_key=None, ttl=1, callback=None, error=None): +    def revoke(self, channel=None, auth_key=None, ttl=1, callback=None, error=None, object_id=None):          """Method for revoking permissions.          Args: @@ -441,10 +452,11 @@ class PubnubBase(object):              'r'         : 0,              'w'         : 0,              'ttl'       : ttl, -            'pnsdk'     : self.pnsdk +            'pnsdk'     : self.pnsdk, +            'obj-id'    : object_id          }, callback=callback, error=error) -    def audit(self, channel=None, auth_key=None, callback=None, error=None): +    def audit(self, channel=None, auth_key=None, callback=None, error=None, object_id=None):          """Method for fetching permissions from pubnub servers.          This method provides a mechanism to reveal existing PubNub Access Manager attributes @@ -501,7 +513,8 @@ class PubnubBase(object):          return self._pam_auth({              'channel'   : channel,              'auth'      : auth_key, -            'pnsdk'     : self.pnsdk +            'pnsdk'     : self.pnsdk, +            'obj-id'    : object_id          }, 1, callback=callback, error=error)      def encrypt(self, message): @@ -637,6 +650,196 @@ class PubnubBase(object):              'subscribe_key': self.subscribe_key,              'callback': self._return_wrapped_callback(callback)}) +    def merge(self, object_id, data, path=''): +        path = self._format_path(path)[1] + +        def _patch_callback(message): +            self.ds_action_list.append(message) +            if 'message' in message and message['message'] != 'Path Updated': +                self.error_callback(message['message']) + +        url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path] +        if self.auth_key is not None: +            url_params = {'method': 'PATCH', 'auth': self.auth_key} +        else: +            url_params = {'method': 'PATCH'} + +        self._request({'urlcomponents': url_components, 'urlparams': url_params}, body=data, callback=_patch_callback) + +    def delete(self, object_id, path=''): +        path, path_slashes = self._format_path(path) +        self.ds_delete = object_id + path_slashes + +        def _delete_callback(message): +            self.ds_action_list.append(message) +            if 'message' in message and message['message'] != 'Path Deleted': +                self.error_callback(message['message']) + +        url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path] +        if self.auth_key is not None: +            url_params = {'method': 'DELETE', 'auth': self.auth_key} +        else: +            url_params = {'method': 'DELETE'} + +        self._request({'urlcomponents': url_components, 'urlparams': url_params}, callback=_delete_callback) + +    def set(self, object_id, data, path=''): +        path = self._format_path(path)[1] +        self.ds_set = True + +        def _put_callback(message): +            self.ds_action_list.append(message) +            if 'message' in message and message['message'] != 'Path Set': +                self.error_callback(message['message']) + +        url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path] +        if self.auth_key is not None: +            url_params = {'method': 'PUT', 'auth': self.auth_key} +        else: +            url_params = {'method': 'PUT'} + +        self._request({'urlcomponents': url_components, 'urlparams': url_params}, body=data, callback=_put_callback) + +    def get_synced_object(self, object_id, ready_callback, path='', error_callback=None): +        path, path_slashes = self._format_path(path) + +        def _error(message): +            if error_callback is None: +                ready_callback(message, self.ds_action_list) +            else: +                error_callback(message, self.ds_action_list) +            self.ds_action_list = [] + +        self.ready_callback = ready_callback +        self.error_callback = _error + +        def _connect(message): +            timestamp = self.timetoken +            _read(object_id, timestamp, path=path_slashes) + +        def _callback(message, channel): +            self.ds_action_list.append(message) +            if self.timetoken > self.ds_timetoken: +                self.ds_timetoken = self.timetoken +                if channel.startswith('pn_ds_'): +                    if message['action'] == 'update': +                        if message['trans_id'] not in self.ds_transaction: +                            self.ds_transaction[message['trans_id']] = [{'value': message['value'], 'location': message['location']}] +                        else: +                            self.ds_transaction[message['trans_id']].append({'value': message['value'], 'location': message['location']}) +                    elif message['action'] == 'delete' and not self.ds_set: +                        self._sync_ds(self.ds_delete, 'DELETE') +                elif channel.startswith('pn_dstr_'): +                    if message['trans_id'] in self.ds_transaction: +                        self._sync_ds(self.ds_transaction[message['trans_id']], 'SYNC') +                        del message['trans_id'] +                    else: +                        if 'Path Deleted' in str(self.ds_action_list) or '\'delete\'' in str(self.ds_action_list): +                            pass +                        else: +                            _error('Error. Received a transaction complete without getting the transaction') + +                #The object is synced and ready to go +                temp_action_list = str(self.ds_action_list).replace("u'", '') +                if "status': complete'" in temp_action_list or "status': 200" in temp_action_list: +                    self.ready_callback(dict(self.ds_obj), self.ds_action_list) +                    self.ds_action_list = [] + +        def _read(object_id, obj_at, path='', error=None): + +            def _read_callback(message): +                self.ds_action_list.append(message) +                if 'messaage' in message and message['message'] != 'OK': +                    _error(message['message']) + +                if 'next_page' in message and message['next_page'] is not None: +                    self.ds_temp.append(message['payload']) +                    _read(object_id, obj_at, path=message['start_at']) +                else: +                    self.ds_temp.append(message['payload']) +                    self._sync_ds(self.ds_temp, 'GET') +                    self.ds_temp = [] +                    _callback('', '') + + +            url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'obj-id', object_id, path] +            if self.auth_key is not None: +                url_params = {'method': 'GET', 'obj_at': obj_at, 'auth': self.auth_key} +            else: +                url_params = {'method': 'GET', 'obj_at': obj_at} +            self._request({'urlcomponents': url_components, 'urlparams': url_params}, callback=_read_callback) + +        self.ds_location = 'pn_ds_' + object_id + path +        self.subscribe(channels=['pn_ds_' + object_id + path, 'pn_ds_' + object_id + path + '.*'], +            connect=_connect, +            callback=_callback, +            error=_error) + +        self.subscribe(channels='pn_dstr_' + object_id, +            callback=_callback, +            error=_error) + +    def _format_path(self, path): +        if not path.startswith('.'): +            path = '.' + path +        if path.endswith('.'): +            path = path[:-1] + +        return(path, path.replace('.', '/')[1:]) + +    def _sync_ds(self, updates, operation): +        if operation == 'GET':  +            final = {}    +            for update in updates: +                if type(update) != dict: +                    final = dict(update) +                else: +                    final = dict(final.items() + update.items()) +            self.ds_obj = final +        elif operation == 'SYNC': +            for update in updates: +                path = update['location'] +                path = path[len(self.ds_location)+1:] +                #path = update['location'].split('.')[1:] +                path = path.split('.') +                ref = self.ds_obj +                previous = '' +                previous_ref = {} +                two_previous_ref = {} +                for level in path: +                    if level in ref: +                        ref = ref[level] +                        two_previous_ref = previous_ref +                        previous_ref = ref +                    elif previous != '': +                        ref[previous] = {} +                        ref = ref[previous] +                        previous = level +                    else: +                        previous = level + +                if type(ref) is not dict: +                    two_previous_ref[path[-1]] = update['value'] +                else: +                    ref[path[-1]] = update['value'] +            if self.ds_set: +                self.ds_set = False + +        elif operation == 'DELETE': +            path = updates.split('.')[1:] +            if len(path) == 0: +                self.ds_obj = {} +                self.ds_delete = '' +            else: +                ref = self.ds_obj +                previous = '' +                for level in path[:-1]: +                    ref = ref[level] +                    previous = level + +                del ref[previous] +                self.ds_delete = '' +      def here_now(self, channel, callback=None, error=None):          """Get here now data. @@ -1057,7 +1260,18 @@ class PubnubCoreAsync(PubnubBase):                      self.timetoken = \                          self.last_timetoken if self.timetoken == 0 and \                          self.last_timetoken != 0 else response[1] -                    if len(response) > 2: + +                    if len(response) > 3: +                        channel_list = response[3].split(',') +                        response_list = response[0] +                        for ch in enumerate(channel_list): +                            if ch[1] in self.subscriptions: +                                chobj = self.subscriptions[ch[1]] +                                _invoke(chobj['callback'], +                                        self.decrypt(response_list[ch[0]]), +                                        chobj['name']) + +                    elif len(response) > 2:                          channel_list = response[2].split(',')                          response_list = response[0]                          for ch in enumerate(channel_list): @@ -1066,6 +1280,7 @@ class PubnubCoreAsync(PubnubBase):                                  _invoke(chobj['callback'],                                          self.decrypt(response_list[ch[0]]),                                          chobj['name']) +                      else:                          response_list = response[0]                          chobj = _get_channel() @@ -1232,8 +1447,9 @@ class PubnubCore(PubnubCoreAsync):  class HTTPClient:      def __init__(self, pubnub, url, urllib_func=None, -                 callback=None, error=None, id=None, timeout=5): +                 callback=None, error=None, id=None, timeout=5, body={}):          self.url = url +        self.body = body          self.id = id          self.callback = callback          self.error = error @@ -1251,12 +1467,15 @@ class HTTPClient:          def _invoke(func, data):              if func is not None: -                func(get_data_for_user(data)) +                if type(data) is dict and 'service' in data and data['service'] == 'DataSync': +                    func(data) +                else: +                    func(get_data_for_user(data))          if self._urllib_func is None:              return -        resp = self._urllib_func(self.url, timeout=self.timeout) +        resp = self._urllib_func(self.url, timeout=self.timeout, body=self.body)          data = resp[0]          code = resp[1] @@ -1292,7 +1511,7 @@ class HTTPClient:                  _invoke(self.callback, data) -def _urllib_request_2(url, timeout=5): +def _urllib_request_2(url, method='GET', timeout=5):      try:          resp = urllib2.urlopen(url, timeout=timeout)      except urllib2.HTTPError as http_error: @@ -1304,12 +1523,30 @@ def _urllib_request_2(url, timeout=5):      return (resp.read(), resp.code)  s = requests.Session() -s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1)) -s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1)) +s.mount('http://pubsub-beta.pubnub.com', HTTPAdapter(max_retries=1)) +s.mount('https://pubsub-beta.pubnub.com', HTTPAdapter(max_retries=1)) + +def _requests_request(url, timeout=5, body={}): +    #This is a hack because the server doesn't support trailing slashes +    if '/datasync/' in url: +        if '?' in url: +            url_0, url_1 = url.split('?') +            if url_0[-1] == '/': +                url = url_0[:-1] + '?' + url_1 +            else: +                url = url_0 + '?' + url_1 +        else: +            url = url[:-1] -def _requests_request(url, timeout=5):      try: -        resp = s.get(url, timeout=timeout) +        if 'method=DELETE' in url: +            resp = s.delete(url, timeout=timeout) +        elif 'method=PATCH' in url: +            resp = s.patch(url, timeout=timeout, data=json.dumps(body)) +        elif 'method=PUT' in url: +            resp = s.put(url, timeout=timeout, data=json.dumps(body)) +        else: +            resp = s.get(url, timeout=timeout)      except requests.exceptions.HTTPError as http_error:          resp = http_error      except requests.exceptions.ConnectionError as error: @@ -1321,7 +1558,7 @@ def _requests_request(url, timeout=5):      return (resp.text, resp.status_code) -def _urllib_request_3(url, timeout=5): +def _urllib_request_3(url, method='GET', timeout=5):      try:          resp = urllib.request.urlopen(url, timeout=timeout)      except (urllib.request.HTTPError, urllib.request.URLError) as http_error: @@ -1343,7 +1580,7 @@ class Pubnub(PubnubCore):          cipher_key=None,          auth_key=None,          ssl_on=False, -        origin='pubsub.pubnub.com', +        origin='pubsub-beta.pubnub.com',          uuid=None,          pooling=True,          pres_uuid=None @@ -1380,20 +1617,20 @@ class Pubnub(PubnubCore):          thread = threading.Thread(target=cb)          thread.start() -    def _request_async(self, request, callback=None, error=None, single=False, timeout=5): +    def _request_async(self, request, callback=None, error=None, body={}, single=False, timeout=5):          global _urllib_request          ## Build URL          url = self.getUrl(request)          if single is True:              id = time.time() -            client = HTTPClient(self, url=url, urllib_func=_urllib_request, +            client = HTTPClient(self, url=url, body=body, urllib_func=_urllib_request,                                  callback=None, error=None, id=id, timeout=timeout)              with self.latest_sub_callback_lock:                  self.latest_sub_callback['id'] = id                  self.latest_sub_callback['callback'] = callback                  self.latest_sub_callback['error'] = error          else: -            client = HTTPClient(self, url=url, urllib_func=_urllib_request, +            client = HTTPClient(self, url=url, body=body, urllib_func=_urllib_request,                                  callback=callback, error=error, timeout=timeout)          thread = threading.Thread(target=client.run) @@ -1423,11 +1660,11 @@ class Pubnub(PubnubCore):          return resp_json -    def _request(self, request, callback=None, error=None, single=False, timeout=5): +    def _request(self, request, callback=None, error=None, single=False, timeout=5, body={}):          if callback is None:              return get_data_for_user(self._request_sync(request, timeout=timeout))          else: -            self._request_async(request, callback, error, single=single, timeout=timeout) +            self._request_async(request, callback, error, body=body, single=single, timeout=timeout)  # Pubnub Twisted @@ -1630,4 +1867,4 @@ class PubnubTornado(PubnubCoreAsync):          def abort():              pass -        return abort +        return abort
\ No newline at end of file | 
