diff options
| author | Madison Smith | 2014-07-18 14:01:21 -0700 |
|---|---|---|
| committer | Madison Smith | 2014-07-18 14:01:21 -0700 |
| commit | 359b831b3eed191159e79e943d77077bc112a7cf (patch) | |
| tree | 8f223a092ad5c0b53a5ff18924f02dc493c76760 /Pubnub.py | |
| parent | 1ecaa93e2ec929ca8ebc185dbdde46d649da7534 (diff) | |
| download | pubnub-python-359b831b3eed191159e79e943d77077bc112a7cf.tar.bz2 | |
init
Diffstat (limited to 'Pubnub.py')
| -rw-r--r-- | Pubnub.py | 285 |
1 files changed, 261 insertions, 24 deletions
@@ -1,4 +1,3 @@ - ## www.pubnub.com - PubNub Real-time push service in the cloud. # coding=utf8 @@ -227,6 +226,17 @@ class PubnubBase(object): self.ssl = ssl_on self.auth_key = auth_key + self.ds_obj = {} + self.ds_temp = [] + self.ds_timetoken = 0 + self.ds_transaction = {} + self.ds_action_list = [] + self.ds_delete = '' + self.ds_location = '' + self.ds_set = False + self.ready_callback = None + self.error_callback = None + if self.ssl: self.origin = 'https://' + self.origin else: @@ -299,7 +309,7 @@ class PubnubBase(object): return auth_key def grant(self, channel=None, auth_key=False, read=True, - write=True, ttl=5, callback=None, error=None): + write=True, ttl=5, callback=None, error=None, object_id=None): """Method for granting permissions. This function establishes subscribe and/or write permissions for @@ -377,10 +387,11 @@ class PubnubBase(object): 'r' : read and 1 or 0, 'w' : write and 1 or 0, 'ttl' : ttl, - 'pnsdk' : self.pnsdk + 'pnsdk' : self.pnsdk, + 'obj-id' : object_id }, callback=callback, error=error) - def revoke(self, channel=None, auth_key=None, ttl=1, callback=None, error=None): + def revoke(self, channel=None, auth_key=None, ttl=1, callback=None, error=None, object_id=None): """Method for revoking permissions. Args: @@ -441,10 +452,11 @@ class PubnubBase(object): 'r' : 0, 'w' : 0, 'ttl' : ttl, - 'pnsdk' : self.pnsdk + 'pnsdk' : self.pnsdk, + 'obj-id' : object_id }, callback=callback, error=error) - def audit(self, channel=None, auth_key=None, callback=None, error=None): + def audit(self, channel=None, auth_key=None, callback=None, error=None, object_id=None): """Method for fetching permissions from pubnub servers. This method provides a mechanism to reveal existing PubNub Access Manager attributes @@ -501,7 +513,8 @@ class PubnubBase(object): return self._pam_auth({ 'channel' : channel, 'auth' : auth_key, - 'pnsdk' : self.pnsdk + 'pnsdk' : self.pnsdk, + 'obj-id' : object_id }, 1, callback=callback, error=error) def encrypt(self, message): @@ -637,6 +650,196 @@ class PubnubBase(object): 'subscribe_key': self.subscribe_key, 'callback': self._return_wrapped_callback(callback)}) + def merge(self, object_id, data, path=''): + path = self._format_path(path)[1] + + def _patch_callback(message): + self.ds_action_list.append(message) + if 'message' in message and message['message'] != 'Path Updated': + self.error_callback(message['message']) + + url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path] + if self.auth_key is not None: + url_params = {'method': 'PATCH', 'auth': self.auth_key} + else: + url_params = {'method': 'PATCH'} + + self._request({'urlcomponents': url_components, 'urlparams': url_params}, body=data, callback=_patch_callback) + + def delete(self, object_id, path=''): + path, path_slashes = self._format_path(path) + self.ds_delete = object_id + path_slashes + + def _delete_callback(message): + self.ds_action_list.append(message) + if 'message' in message and message['message'] != 'Path Deleted': + self.error_callback(message['message']) + + url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path] + if self.auth_key is not None: + url_params = {'method': 'DELETE', 'auth': self.auth_key} + else: + url_params = {'method': 'DELETE'} + + self._request({'urlcomponents': url_components, 'urlparams': url_params}, callback=_delete_callback) + + def set(self, object_id, data, path=''): + path = self._format_path(path)[1] + self.ds_set = True + + def _put_callback(message): + self.ds_action_list.append(message) + if 'message' in message and message['message'] != 'Path Set': + self.error_callback(message['message']) + + url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path] + if self.auth_key is not None: + url_params = {'method': 'PUT', 'auth': self.auth_key} + else: + url_params = {'method': 'PUT'} + + self._request({'urlcomponents': url_components, 'urlparams': url_params}, body=data, callback=_put_callback) + + def get_synced_object(self, object_id, ready_callback, path='', error_callback=None): + path, path_slashes = self._format_path(path) + + def _error(message): + if error_callback is None: + ready_callback(message, self.ds_action_list) + else: + error_callback(message, self.ds_action_list) + self.ds_action_list = [] + + self.ready_callback = ready_callback + self.error_callback = _error + + def _connect(message): + timestamp = self.timetoken + _read(object_id, timestamp, path=path_slashes) + + def _callback(message, channel): + self.ds_action_list.append(message) + if self.timetoken > self.ds_timetoken: + self.ds_timetoken = self.timetoken + if channel.startswith('pn_ds_'): + if message['action'] == 'update': + if message['trans_id'] not in self.ds_transaction: + self.ds_transaction[message['trans_id']] = [{'value': message['value'], 'location': message['location']}] + else: + self.ds_transaction[message['trans_id']].append({'value': message['value'], 'location': message['location']}) + elif message['action'] == 'delete' and not self.ds_set: + self._sync_ds(self.ds_delete, 'DELETE') + elif channel.startswith('pn_dstr_'): + if message['trans_id'] in self.ds_transaction: + self._sync_ds(self.ds_transaction[message['trans_id']], 'SYNC') + del message['trans_id'] + else: + if 'Path Deleted' in str(self.ds_action_list) or '\'delete\'' in str(self.ds_action_list): + pass + else: + _error('Error. Received a transaction complete without getting the transaction') + + #The object is synced and ready to go + temp_action_list = str(self.ds_action_list).replace("u'", '') + if "status': complete'" in temp_action_list or "status': 200" in temp_action_list: + self.ready_callback(dict(self.ds_obj), self.ds_action_list) + self.ds_action_list = [] + + def _read(object_id, obj_at, path='', error=None): + + def _read_callback(message): + self.ds_action_list.append(message) + if 'messaage' in message and message['message'] != 'OK': + _error(message['message']) + + if 'next_page' in message and message['next_page'] is not None: + self.ds_temp.append(message['payload']) + _read(object_id, obj_at, path=message['start_at']) + else: + self.ds_temp.append(message['payload']) + self._sync_ds(self.ds_temp, 'GET') + self.ds_temp = [] + _callback('', '') + + + url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'obj-id', object_id, path] + if self.auth_key is not None: + url_params = {'method': 'GET', 'obj_at': obj_at, 'auth': self.auth_key} + else: + url_params = {'method': 'GET', 'obj_at': obj_at} + self._request({'urlcomponents': url_components, 'urlparams': url_params}, callback=_read_callback) + + self.ds_location = 'pn_ds_' + object_id + path + self.subscribe(channels=['pn_ds_' + object_id + path, 'pn_ds_' + object_id + path + '.*'], + connect=_connect, + callback=_callback, + error=_error) + + self.subscribe(channels='pn_dstr_' + object_id, + callback=_callback, + error=_error) + + def _format_path(self, path): + if not path.startswith('.'): + path = '.' + path + if path.endswith('.'): + path = path[:-1] + + return(path, path.replace('.', '/')[1:]) + + def _sync_ds(self, updates, operation): + if operation == 'GET': + final = {} + for update in updates: + if type(update) != dict: + final = dict(update) + else: + final = dict(final.items() + update.items()) + self.ds_obj = final + elif operation == 'SYNC': + for update in updates: + path = update['location'] + path = path[len(self.ds_location)+1:] + #path = update['location'].split('.')[1:] + path = path.split('.') + ref = self.ds_obj + previous = '' + previous_ref = {} + two_previous_ref = {} + for level in path: + if level in ref: + ref = ref[level] + two_previous_ref = previous_ref + previous_ref = ref + elif previous != '': + ref[previous] = {} + ref = ref[previous] + previous = level + else: + previous = level + + if type(ref) is not dict: + two_previous_ref[path[-1]] = update['value'] + else: + ref[path[-1]] = update['value'] + if self.ds_set: + self.ds_set = False + + elif operation == 'DELETE': + path = updates.split('.')[1:] + if len(path) == 0: + self.ds_obj = {} + self.ds_delete = '' + else: + ref = self.ds_obj + previous = '' + for level in path[:-1]: + ref = ref[level] + previous = level + + del ref[previous] + self.ds_delete = '' + def here_now(self, channel, callback=None, error=None): """Get here now data. @@ -1057,7 +1260,18 @@ class PubnubCoreAsync(PubnubBase): self.timetoken = \ self.last_timetoken if self.timetoken == 0 and \ self.last_timetoken != 0 else response[1] - if len(response) > 2: + + if len(response) > 3: + channel_list = response[3].split(',') + response_list = response[0] + for ch in enumerate(channel_list): + if ch[1] in self.subscriptions: + chobj = self.subscriptions[ch[1]] + _invoke(chobj['callback'], + self.decrypt(response_list[ch[0]]), + chobj['name']) + + elif len(response) > 2: channel_list = response[2].split(',') response_list = response[0] for ch in enumerate(channel_list): @@ -1066,6 +1280,7 @@ class PubnubCoreAsync(PubnubBase): _invoke(chobj['callback'], self.decrypt(response_list[ch[0]]), chobj['name']) + else: response_list = response[0] chobj = _get_channel() @@ -1232,8 +1447,9 @@ class PubnubCore(PubnubCoreAsync): class HTTPClient: def __init__(self, pubnub, url, urllib_func=None, - callback=None, error=None, id=None, timeout=5): + callback=None, error=None, id=None, timeout=5, body={}): self.url = url + self.body = body self.id = id self.callback = callback self.error = error @@ -1251,12 +1467,15 @@ class HTTPClient: def _invoke(func, data): if func is not None: - func(get_data_for_user(data)) + if type(data) is dict and 'service' in data and data['service'] == 'DataSync': + func(data) + else: + func(get_data_for_user(data)) if self._urllib_func is None: return - resp = self._urllib_func(self.url, timeout=self.timeout) + resp = self._urllib_func(self.url, timeout=self.timeout, body=self.body) data = resp[0] code = resp[1] @@ -1292,7 +1511,7 @@ class HTTPClient: _invoke(self.callback, data) -def _urllib_request_2(url, timeout=5): +def _urllib_request_2(url, method='GET', timeout=5): try: resp = urllib2.urlopen(url, timeout=timeout) except urllib2.HTTPError as http_error: @@ -1304,12 +1523,30 @@ def _urllib_request_2(url, timeout=5): return (resp.read(), resp.code) s = requests.Session() -s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1)) -s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1)) +s.mount('http://pubsub-beta.pubnub.com', HTTPAdapter(max_retries=1)) +s.mount('https://pubsub-beta.pubnub.com', HTTPAdapter(max_retries=1)) + +def _requests_request(url, timeout=5, body={}): + #This is a hack because the server doesn't support trailing slashes + if '/datasync/' in url: + if '?' in url: + url_0, url_1 = url.split('?') + if url_0[-1] == '/': + url = url_0[:-1] + '?' + url_1 + else: + url = url_0 + '?' + url_1 + else: + url = url[:-1] -def _requests_request(url, timeout=5): try: - resp = s.get(url, timeout=timeout) + if 'method=DELETE' in url: + resp = s.delete(url, timeout=timeout) + elif 'method=PATCH' in url: + resp = s.patch(url, timeout=timeout, data=json.dumps(body)) + elif 'method=PUT' in url: + resp = s.put(url, timeout=timeout, data=json.dumps(body)) + else: + resp = s.get(url, timeout=timeout) except requests.exceptions.HTTPError as http_error: resp = http_error except requests.exceptions.ConnectionError as error: @@ -1321,7 +1558,7 @@ def _requests_request(url, timeout=5): return (resp.text, resp.status_code) -def _urllib_request_3(url, timeout=5): +def _urllib_request_3(url, method='GET', timeout=5): try: resp = urllib.request.urlopen(url, timeout=timeout) except (urllib.request.HTTPError, urllib.request.URLError) as http_error: @@ -1343,7 +1580,7 @@ class Pubnub(PubnubCore): cipher_key=None, auth_key=None, ssl_on=False, - origin='pubsub.pubnub.com', + origin='pubsub-beta.pubnub.com', uuid=None, pooling=True, pres_uuid=None @@ -1380,20 +1617,20 @@ class Pubnub(PubnubCore): thread = threading.Thread(target=cb) thread.start() - def _request_async(self, request, callback=None, error=None, single=False, timeout=5): + def _request_async(self, request, callback=None, error=None, body={}, single=False, timeout=5): global _urllib_request ## Build URL url = self.getUrl(request) if single is True: id = time.time() - client = HTTPClient(self, url=url, urllib_func=_urllib_request, + client = HTTPClient(self, url=url, body=body, urllib_func=_urllib_request, callback=None, error=None, id=id, timeout=timeout) with self.latest_sub_callback_lock: self.latest_sub_callback['id'] = id self.latest_sub_callback['callback'] = callback self.latest_sub_callback['error'] = error else: - client = HTTPClient(self, url=url, urllib_func=_urllib_request, + client = HTTPClient(self, url=url, body=body, urllib_func=_urllib_request, callback=callback, error=error, timeout=timeout) thread = threading.Thread(target=client.run) @@ -1423,11 +1660,11 @@ class Pubnub(PubnubCore): return resp_json - def _request(self, request, callback=None, error=None, single=False, timeout=5): + def _request(self, request, callback=None, error=None, single=False, timeout=5, body={}): if callback is None: return get_data_for_user(self._request_sync(request, timeout=timeout)) else: - self._request_async(request, callback, error, single=single, timeout=timeout) + self._request_async(request, callback, error, body=body, single=single, timeout=timeout) # Pubnub Twisted @@ -1630,4 +1867,4 @@ class PubnubTornado(PubnubCoreAsync): def abort(): pass - return abort + return abort
\ No newline at end of file |
