aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Pubnub.py285
1 files changed, 261 insertions, 24 deletions
diff --git a/Pubnub.py b/Pubnub.py
index ef857d2..9286c9a 100644
--- a/Pubnub.py
+++ b/Pubnub.py
@@ -1,4 +1,3 @@
-
## www.pubnub.com - PubNub Real-time push service in the cloud.
# coding=utf8
@@ -227,6 +226,17 @@ class PubnubBase(object):
self.ssl = ssl_on
self.auth_key = auth_key
+ self.ds_obj = {}
+ self.ds_temp = []
+ self.ds_timetoken = 0
+ self.ds_transaction = {}
+ self.ds_action_list = []
+ self.ds_delete = ''
+ self.ds_location = ''
+ self.ds_set = False
+ self.ready_callback = None
+ self.error_callback = None
+
if self.ssl:
self.origin = 'https://' + self.origin
else:
@@ -299,7 +309,7 @@ class PubnubBase(object):
return auth_key
def grant(self, channel=None, auth_key=False, read=True,
- write=True, ttl=5, callback=None, error=None):
+ write=True, ttl=5, callback=None, error=None, object_id=None):
"""Method for granting permissions.
This function establishes subscribe and/or write permissions for
@@ -377,10 +387,11 @@ class PubnubBase(object):
'r' : read and 1 or 0,
'w' : write and 1 or 0,
'ttl' : ttl,
- 'pnsdk' : self.pnsdk
+ 'pnsdk' : self.pnsdk,
+ 'obj-id' : object_id
}, callback=callback, error=error)
- def revoke(self, channel=None, auth_key=None, ttl=1, callback=None, error=None):
+ def revoke(self, channel=None, auth_key=None, ttl=1, callback=None, error=None, object_id=None):
"""Method for revoking permissions.
Args:
@@ -441,10 +452,11 @@ class PubnubBase(object):
'r' : 0,
'w' : 0,
'ttl' : ttl,
- 'pnsdk' : self.pnsdk
+ 'pnsdk' : self.pnsdk,
+ 'obj-id' : object_id
}, callback=callback, error=error)
- def audit(self, channel=None, auth_key=None, callback=None, error=None):
+ def audit(self, channel=None, auth_key=None, callback=None, error=None, object_id=None):
"""Method for fetching permissions from pubnub servers.
This method provides a mechanism to reveal existing PubNub Access Manager attributes
@@ -501,7 +513,8 @@ class PubnubBase(object):
return self._pam_auth({
'channel' : channel,
'auth' : auth_key,
- 'pnsdk' : self.pnsdk
+ 'pnsdk' : self.pnsdk,
+ 'obj-id' : object_id
}, 1, callback=callback, error=error)
def encrypt(self, message):
@@ -637,6 +650,196 @@ class PubnubBase(object):
'subscribe_key': self.subscribe_key,
'callback': self._return_wrapped_callback(callback)})
+ def merge(self, object_id, data, path=''):
+ path = self._format_path(path)[1]
+
+ def _patch_callback(message):
+ self.ds_action_list.append(message)
+ if 'message' in message and message['message'] != 'Path Updated':
+ self.error_callback(message['message'])
+
+ url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path]
+ if self.auth_key is not None:
+ url_params = {'method': 'PATCH', 'auth': self.auth_key}
+ else:
+ url_params = {'method': 'PATCH'}
+
+ self._request({'urlcomponents': url_components, 'urlparams': url_params}, body=data, callback=_patch_callback)
+
+ def delete(self, object_id, path=''):
+ path, path_slashes = self._format_path(path)
+ self.ds_delete = object_id + path_slashes
+
+ def _delete_callback(message):
+ self.ds_action_list.append(message)
+ if 'message' in message and message['message'] != 'Path Deleted':
+ self.error_callback(message['message'])
+
+ url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path]
+ if self.auth_key is not None:
+ url_params = {'method': 'DELETE', 'auth': self.auth_key}
+ else:
+ url_params = {'method': 'DELETE'}
+
+ self._request({'urlcomponents': url_components, 'urlparams': url_params}, callback=_delete_callback)
+
+ def set(self, object_id, data, path=''):
+ path = self._format_path(path)[1]
+ self.ds_set = True
+
+ def _put_callback(message):
+ self.ds_action_list.append(message)
+ if 'message' in message and message['message'] != 'Path Set':
+ self.error_callback(message['message'])
+
+ url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'pub-key', self.publish_key, 'obj-id', object_id, path]
+ if self.auth_key is not None:
+ url_params = {'method': 'PUT', 'auth': self.auth_key}
+ else:
+ url_params = {'method': 'PUT'}
+
+ self._request({'urlcomponents': url_components, 'urlparams': url_params}, body=data, callback=_put_callback)
+
+ def get_synced_object(self, object_id, ready_callback, path='', error_callback=None):
+ path, path_slashes = self._format_path(path)
+
+ def _error(message):
+ if error_callback is None:
+ ready_callback(message, self.ds_action_list)
+ else:
+ error_callback(message, self.ds_action_list)
+ self.ds_action_list = []
+
+ self.ready_callback = ready_callback
+ self.error_callback = _error
+
+ def _connect(message):
+ timestamp = self.timetoken
+ _read(object_id, timestamp, path=path_slashes)
+
+ def _callback(message, channel):
+ self.ds_action_list.append(message)
+ if self.timetoken > self.ds_timetoken:
+ self.ds_timetoken = self.timetoken
+ if channel.startswith('pn_ds_'):
+ if message['action'] == 'update':
+ if message['trans_id'] not in self.ds_transaction:
+ self.ds_transaction[message['trans_id']] = [{'value': message['value'], 'location': message['location']}]
+ else:
+ self.ds_transaction[message['trans_id']].append({'value': message['value'], 'location': message['location']})
+ elif message['action'] == 'delete' and not self.ds_set:
+ self._sync_ds(self.ds_delete, 'DELETE')
+ elif channel.startswith('pn_dstr_'):
+ if message['trans_id'] in self.ds_transaction:
+ self._sync_ds(self.ds_transaction[message['trans_id']], 'SYNC')
+ del message['trans_id']
+ else:
+ if 'Path Deleted' in str(self.ds_action_list) or '\'delete\'' in str(self.ds_action_list):
+ pass
+ else:
+ _error('Error. Received a transaction complete without getting the transaction')
+
+ #The object is synced and ready to go
+ temp_action_list = str(self.ds_action_list).replace("u'", '')
+ if "status': complete'" in temp_action_list or "status': 200" in temp_action_list:
+ self.ready_callback(dict(self.ds_obj), self.ds_action_list)
+ self.ds_action_list = []
+
+ def _read(object_id, obj_at, path='', error=None):
+
+ def _read_callback(message):
+ self.ds_action_list.append(message)
+ if 'messaage' in message and message['message'] != 'OK':
+ _error(message['message'])
+
+ if 'next_page' in message and message['next_page'] is not None:
+ self.ds_temp.append(message['payload'])
+ _read(object_id, obj_at, path=message['start_at'])
+ else:
+ self.ds_temp.append(message['payload'])
+ self._sync_ds(self.ds_temp, 'GET')
+ self.ds_temp = []
+ _callback('', '')
+
+
+ url_components = ['v1', 'datasync', 'sub-key', self.subscribe_key, 'obj-id', object_id, path]
+ if self.auth_key is not None:
+ url_params = {'method': 'GET', 'obj_at': obj_at, 'auth': self.auth_key}
+ else:
+ url_params = {'method': 'GET', 'obj_at': obj_at}
+ self._request({'urlcomponents': url_components, 'urlparams': url_params}, callback=_read_callback)
+
+ self.ds_location = 'pn_ds_' + object_id + path
+ self.subscribe(channels=['pn_ds_' + object_id + path, 'pn_ds_' + object_id + path + '.*'],
+ connect=_connect,
+ callback=_callback,
+ error=_error)
+
+ self.subscribe(channels='pn_dstr_' + object_id,
+ callback=_callback,
+ error=_error)
+
+ def _format_path(self, path):
+ if not path.startswith('.'):
+ path = '.' + path
+ if path.endswith('.'):
+ path = path[:-1]
+
+ return(path, path.replace('.', '/')[1:])
+
+ def _sync_ds(self, updates, operation):
+ if operation == 'GET':
+ final = {}
+ for update in updates:
+ if type(update) != dict:
+ final = dict(update)
+ else:
+ final = dict(final.items() + update.items())
+ self.ds_obj = final
+ elif operation == 'SYNC':
+ for update in updates:
+ path = update['location']
+ path = path[len(self.ds_location)+1:]
+ #path = update['location'].split('.')[1:]
+ path = path.split('.')
+ ref = self.ds_obj
+ previous = ''
+ previous_ref = {}
+ two_previous_ref = {}
+ for level in path:
+ if level in ref:
+ ref = ref[level]
+ two_previous_ref = previous_ref
+ previous_ref = ref
+ elif previous != '':
+ ref[previous] = {}
+ ref = ref[previous]
+ previous = level
+ else:
+ previous = level
+
+ if type(ref) is not dict:
+ two_previous_ref[path[-1]] = update['value']
+ else:
+ ref[path[-1]] = update['value']
+ if self.ds_set:
+ self.ds_set = False
+
+ elif operation == 'DELETE':
+ path = updates.split('.')[1:]
+ if len(path) == 0:
+ self.ds_obj = {}
+ self.ds_delete = ''
+ else:
+ ref = self.ds_obj
+ previous = ''
+ for level in path[:-1]:
+ ref = ref[level]
+ previous = level
+
+ del ref[previous]
+ self.ds_delete = ''
+
def here_now(self, channel, callback=None, error=None):
"""Get here now data.
@@ -1057,7 +1260,18 @@ class PubnubCoreAsync(PubnubBase):
self.timetoken = \
self.last_timetoken if self.timetoken == 0 and \
self.last_timetoken != 0 else response[1]
- if len(response) > 2:
+
+ if len(response) > 3:
+ channel_list = response[3].split(',')
+ response_list = response[0]
+ for ch in enumerate(channel_list):
+ if ch[1] in self.subscriptions:
+ chobj = self.subscriptions[ch[1]]
+ _invoke(chobj['callback'],
+ self.decrypt(response_list[ch[0]]),
+ chobj['name'])
+
+ elif len(response) > 2:
channel_list = response[2].split(',')
response_list = response[0]
for ch in enumerate(channel_list):
@@ -1066,6 +1280,7 @@ class PubnubCoreAsync(PubnubBase):
_invoke(chobj['callback'],
self.decrypt(response_list[ch[0]]),
chobj['name'])
+
else:
response_list = response[0]
chobj = _get_channel()
@@ -1232,8 +1447,9 @@ class PubnubCore(PubnubCoreAsync):
class HTTPClient:
def __init__(self, pubnub, url, urllib_func=None,
- callback=None, error=None, id=None, timeout=5):
+ callback=None, error=None, id=None, timeout=5, body={}):
self.url = url
+ self.body = body
self.id = id
self.callback = callback
self.error = error
@@ -1251,12 +1467,15 @@ class HTTPClient:
def _invoke(func, data):
if func is not None:
- func(get_data_for_user(data))
+ if type(data) is dict and 'service' in data and data['service'] == 'DataSync':
+ func(data)
+ else:
+ func(get_data_for_user(data))
if self._urllib_func is None:
return
- resp = self._urllib_func(self.url, timeout=self.timeout)
+ resp = self._urllib_func(self.url, timeout=self.timeout, body=self.body)
data = resp[0]
code = resp[1]
@@ -1292,7 +1511,7 @@ class HTTPClient:
_invoke(self.callback, data)
-def _urllib_request_2(url, timeout=5):
+def _urllib_request_2(url, method='GET', timeout=5):
try:
resp = urllib2.urlopen(url, timeout=timeout)
except urllib2.HTTPError as http_error:
@@ -1304,12 +1523,30 @@ def _urllib_request_2(url, timeout=5):
return (resp.read(), resp.code)
s = requests.Session()
-s.mount('http://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
-s.mount('https://pubsub.pubnub.com', HTTPAdapter(max_retries=1))
+s.mount('http://pubsub-beta.pubnub.com', HTTPAdapter(max_retries=1))
+s.mount('https://pubsub-beta.pubnub.com', HTTPAdapter(max_retries=1))
+
+def _requests_request(url, timeout=5, body={}):
+ #This is a hack because the server doesn't support trailing slashes
+ if '/datasync/' in url:
+ if '?' in url:
+ url_0, url_1 = url.split('?')
+ if url_0[-1] == '/':
+ url = url_0[:-1] + '?' + url_1
+ else:
+ url = url_0 + '?' + url_1
+ else:
+ url = url[:-1]
-def _requests_request(url, timeout=5):
try:
- resp = s.get(url, timeout=timeout)
+ if 'method=DELETE' in url:
+ resp = s.delete(url, timeout=timeout)
+ elif 'method=PATCH' in url:
+ resp = s.patch(url, timeout=timeout, data=json.dumps(body))
+ elif 'method=PUT' in url:
+ resp = s.put(url, timeout=timeout, data=json.dumps(body))
+ else:
+ resp = s.get(url, timeout=timeout)
except requests.exceptions.HTTPError as http_error:
resp = http_error
except requests.exceptions.ConnectionError as error:
@@ -1321,7 +1558,7 @@ def _requests_request(url, timeout=5):
return (resp.text, resp.status_code)
-def _urllib_request_3(url, timeout=5):
+def _urllib_request_3(url, method='GET', timeout=5):
try:
resp = urllib.request.urlopen(url, timeout=timeout)
except (urllib.request.HTTPError, urllib.request.URLError) as http_error:
@@ -1343,7 +1580,7 @@ class Pubnub(PubnubCore):
cipher_key=None,
auth_key=None,
ssl_on=False,
- origin='pubsub.pubnub.com',
+ origin='pubsub-beta.pubnub.com',
uuid=None,
pooling=True,
pres_uuid=None
@@ -1380,20 +1617,20 @@ class Pubnub(PubnubCore):
thread = threading.Thread(target=cb)
thread.start()
- def _request_async(self, request, callback=None, error=None, single=False, timeout=5):
+ def _request_async(self, request, callback=None, error=None, body={}, single=False, timeout=5):
global _urllib_request
## Build URL
url = self.getUrl(request)
if single is True:
id = time.time()
- client = HTTPClient(self, url=url, urllib_func=_urllib_request,
+ client = HTTPClient(self, url=url, body=body, urllib_func=_urllib_request,
callback=None, error=None, id=id, timeout=timeout)
with self.latest_sub_callback_lock:
self.latest_sub_callback['id'] = id
self.latest_sub_callback['callback'] = callback
self.latest_sub_callback['error'] = error
else:
- client = HTTPClient(self, url=url, urllib_func=_urllib_request,
+ client = HTTPClient(self, url=url, body=body, urllib_func=_urllib_request,
callback=callback, error=error, timeout=timeout)
thread = threading.Thread(target=client.run)
@@ -1423,11 +1660,11 @@ class Pubnub(PubnubCore):
return resp_json
- def _request(self, request, callback=None, error=None, single=False, timeout=5):
+ def _request(self, request, callback=None, error=None, single=False, timeout=5, body={}):
if callback is None:
return get_data_for_user(self._request_sync(request, timeout=timeout))
else:
- self._request_async(request, callback, error, single=single, timeout=timeout)
+ self._request_async(request, callback, error, body=body, single=single, timeout=timeout)
# Pubnub Twisted
@@ -1630,4 +1867,4 @@ class PubnubTornado(PubnubCoreAsync):
def abort():
pass
- return abort
+ return abort \ No newline at end of file