aboutsummaryrefslogtreecommitdiffstats
path: root/python-twisted
diff options
context:
space:
mode:
authorDevendra2014-05-15 19:48:06 +0530
committerDevendra2014-05-15 19:48:06 +0530
commit32492b68b66a46bdbf6736e1ddd16a34dca12b9b (patch)
treefdd9ad83c8a7ec116383242b3424fa596583fe30 /python-twisted
parent3faa087aca856d14e4dca7b3ec41a691f8d468f2 (diff)
downloadpubnub-python-32492b68b66a46bdbf6736e1ddd16a34dca12b9b.tar.bz2
adding more unit tests
Diffstat (limited to 'python-twisted')
-rw-r--r--python-twisted/tests/test_grant_async.py359
-rw-r--r--python-twisted/tests/test_publish_async.py304
-rw-r--r--python-twisted/tests/unit-tests.py107
3 files changed, 663 insertions, 107 deletions
diff --git a/python-twisted/tests/test_grant_async.py b/python-twisted/tests/test_grant_async.py
new file mode 100644
index 0000000..5b33b11
--- /dev/null
+++ b/python-twisted/tests/test_grant_async.py
@@ -0,0 +1,359 @@
+
+
+from Pubnub import PubnubTwisted as Pubnub
+import time
+
+pubnub = Pubnub("demo","demo")
+pubnub_pam = Pubnub("pub-c-c077418d-f83c-4860-b213-2f6c77bde29a",
+ "sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe", "sec-c-OGU3Y2Q4ZWUtNDQwMC00NTI1LThjNWYtNWJmY2M4OGIwNjEy")
+
+
+
+# Grant permission read true, write true, on channel ( Async Mode )
+def test_1():
+
+ def _callback(resp, ch= None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 1}
+ }
+
+ def _error(response):
+ assert False
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=True, ttl=1, callback=_callback, error=_error)
+
+
+# Grant permission read false, write false, on channel ( Async Mode )
+def test_2():
+
+ def _callback(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 1}
+ }
+
+ def _error(response):
+ assert False
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=False, ttl=1, callback=_callback, error=_error)
+
+
+# Grant permission read True, write false, on channel ( Async Mode )
+def test_3():
+
+ def _callback(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 1, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 1}
+ }
+
+ def _error(response):
+ assert False
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=True, write=False, ttl=1, callback=_callback, error=_error)
+
+
+# Grant permission read False, write True, on channel ( Async Mode )
+def test_4():
+
+ def _callback(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 1}
+ }
+
+ def _error(response):
+ assert False
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=True, ttl=1, callback=_callback, error=_error)
+
+
+# Grant permission read False, write True, on channel ( Async Mode ), TTL 10
+def test_5():
+
+ def _callback(resp,ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {u'abcd': {u'r': 0, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': u'abcd', u'ttl': 10}
+ }
+
+
+ def _error(response):
+ assert False
+
+ pubnub_pam.grant(channel="abcd", auth_key="abcd", read=False, write=True, ttl=10, callback=_callback, error=_error)
+
+
+# Grant permission read False, write True, without channel ( Async Mode ), TTL 10
+def test_6():
+ def _callback(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'r': 0, u'w': 1,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 10}
+ }
+
+ def _error(response):
+ assert False
+
+ pubnub_pam.grant(auth_key="abcd", read=False, write=True, ttl=10, callback=_callback, error=_error)
+
+
+
+# Grant permission read False, write False, without channel ( Async Mode )
+def test_7():
+
+ def _callback(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'r': 0, u'w': 0,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 1}
+ }
+
+ def _error(response):
+ resp['response'] = response
+
+ pubnub_pam.grant(auth_key="abcd", read=False, write=False, callback=_callback, error=_error)
+
+
+# Complete flow , try publish on forbidden channel, grant permission to subkey and try again. ( Sync Mode)
+
+def test_8():
+ channel = "test_8-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+
+ def _cb1(resp, ch=None):
+ assert False
+ def _err1(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ def _cb2(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 10}
+ }
+ def _cb3(resp, ch=None):
+ assert resp[0] == 1
+ def _err3(resp):
+ assert False
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3)
+ def _err2(resp):
+ assert False
+
+
+ pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10, callback=_cb2, error=_err2)
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1)
+
+
+# Complete flow , try publish on forbidden channel, grant permission to authkey and try again.
+# then revoke and try again
+def test_9():
+ channel = "test_9-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+
+ def _cb1(resp, ch=None):
+ assert False
+ def _err1(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ def _cb2(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 10}
+ }
+ def _cb3(resp, ch=None):
+ assert resp[0] == 1
+ def _cb4(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'auths': {auth_key : {u'r': 0, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'user', u'channel': channel, u'ttl': 1}
+ }
+
+ def _cb5(resp, ch=None):
+ assert False
+ def _err5(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5)
+ def _err4(resp):
+ assert False
+ pubnub_pam.revoke(channel=channel, auth_key=auth_key, callback=_cb4, error=_err4)
+ def _err3(resp):
+ assert False
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3)
+ def _err2(resp):
+ assert False
+
+
+ pubnub_pam.grant(channel=channel, read=True, write=True, auth_key=auth_key, ttl=10, callback=_cb2, error=_err2)
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1)
+
+
+# Complete flow , try publish on forbidden channel, grant permission channel level for subkey and try again.
+# then revoke and try again
+def test_10():
+ channel = "test_10-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+
+ def _cb1(resp, ch=None):
+ assert False
+ def _err1(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ def _cb2(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'channels': {channel: {u'r': 1, u'w': 1}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'channel', u'ttl': 10}
+ }
+ def _cb3(resp, ch=None):
+ assert resp[0] == 1
+ def _cb4(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'channels': {channel : {u'r': 0, u'w': 0}},
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'channel', u'ttl': 1}
+ }
+
+ def _cb5(resp, ch=None):
+ assert False
+ def _err5(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5)
+ def _err4(resp):
+ assert False
+ pubnub_pam.revoke(channel=channel, callback=_cb4, error=_err4)
+ def _err3(resp):
+ assert False
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3)
+ def _err2(resp):
+ assert False
+
+
+ pubnub_pam.grant(channel=channel, read=True, write=True, ttl=10, callback=_cb2, error=_err2)
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1)
+
+
+
+
+
+
+# Complete flow , try publish on forbidden channel, grant permission subkey level for subkey and try again.
+# then revoke and try again
+def test_11():
+ channel = "test_11-" + str(time.time())
+ message = "Hello World"
+ auth_key = "auth-" + channel
+ pubnub_pam.set_auth_key(auth_key)
+
+ def _cb1(resp, ch=None):
+ assert False
+ def _err1(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+ def _cb2(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': { u'r': 1, u'w': 1,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 10}
+ }
+ def _cb3(resp, ch=None):
+ assert resp[0] == 1
+ def _cb4(resp, ch=None):
+ assert resp == {
+ 'message': u'Success',
+ 'payload': {u'r': 0, u'w': 0,
+ u'subscribe_key': u'sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe',
+ u'level': u'subkey', u'ttl': 1}
+ }
+
+ def _cb5(resp, ch=None):
+ assert False
+ def _err5(resp):
+ assert resp['message'] == 'Forbidden'
+ assert resp['payload'] == {u'channels': [channel]}
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb5, error=_err5)
+ def _err4(resp):
+ assert False
+ pubnub_pam.revoke(callback=_cb4, error=_err4)
+ def _err3(resp):
+ assert False
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb3, error=_err3)
+ def _err2(resp):
+ assert False
+
+
+ pubnub_pam.grant(read=True, write=True, ttl=10, callback=_cb2, error=_err2)
+
+ pubnub_pam.publish(channel=channel,message=message, callback=_cb1, error=_err1)
+
+
+x = 5
+def run_test(t):
+ global x
+ x += 5
+ i = (x / 5) - 1
+ def _print():
+ print('Running test ' + str(i))
+ pubnub.timeout(x, _print)
+ pubnub.timeout(x + 1,t)
+
+def stop():
+ pubnub.stop()
+
+run_test(test_1)
+run_test(test_2)
+run_test(test_3)
+run_test(test_4)
+run_test(test_5)
+run_test(test_6)
+run_test(test_7)
+run_test(test_8)
+run_test(test_9)
+run_test(test_10)
+run_test(test_11)
+run_test(stop)
+
+
+pubnub_pam.start()
+
+
diff --git a/python-twisted/tests/test_publish_async.py b/python-twisted/tests/test_publish_async.py
new file mode 100644
index 0000000..391297d
--- /dev/null
+++ b/python-twisted/tests/test_publish_async.py
@@ -0,0 +1,304 @@
+
+
+from Pubnub import PubnubTwisted as Pubnub
+import time
+
+pubnub = Pubnub("demo","demo")
+pubnub_enc = Pubnub("demo", "demo", cipher_key="enigma")
+pubnub_pam = Pubnub("pub-c-c077418d-f83c-4860-b213-2f6c77bde29a",
+ "sub-c-e8839098-f568-11e2-a11a-02ee2ddab7fe", "sec-c-OGU3Y2Q4ZWUtNDQwMC00NTI1LThjNWYtNWJmY2M4OGIwNjEy")
+
+
+
+# Publish and receive string
+def test_1():
+
+ channel = "test_1-" + str(time.time())
+ message = "I am a string"
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive array
+def test_2():
+
+ channel = "test_2-" + str(time.time())
+ message = [1,2]
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive json object
+def test_3():
+
+ channel = "test_2-" + str(time.time())
+ message = { "a" : "b" }
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive number
+def test_4():
+
+ channel = "test_2-" + str(time.time())
+ message = 100
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive number string
+def test_5():
+
+ channel = "test_5-" + str(time.time())
+ message = "100"
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+
+# Publish and receive string (Encryption enabled)
+def test_6():
+
+ channel = "test_6-" + str(time.time())
+ message = "I am a string"
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub_enc.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive array (Encryption enabled)
+def test_7():
+
+ channel = "test_7-" + str(time.time())
+ message = [1,2]
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub_enc.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive json object (Encryption enabled)
+def test_8():
+
+ channel = "test_8-" + str(time.time())
+ message = { "a" : "b" }
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub_enc.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive number (Encryption enabled)
+def test_9():
+
+ channel = "test_9-" + str(time.time())
+ message = 100
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub_enc.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive number string (Encryption enabled)
+def test_10():
+
+ channel = "test_10-" + str(time.time())
+ message = "100"
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub_enc.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive object string (Encryption enabled)
+def test_11():
+
+ channel = "test_11-" + str(time.time())
+ message = '{"a" : "b"}'
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub_enc.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+# Publish and receive array string (Encryption enabled)
+def test_12():
+
+ channel = "test_12-" + str(time.time())
+ message = '[1,2]'
+
+ def _cb(resp, ch=None):
+ assert resp == message
+ pubnub_enc.unsubscribe(channel)
+
+ def _connect(resp):
+ def _cb1(resp,ch=None):
+ assert resp[0] == 1
+ def _err1(resp):
+ assert False
+ pubnub_enc.publish(channel,message, callback=_cb1, error=_err1)
+
+ def _error(resp):
+ assert False
+
+ pubnub_enc.subscribe(channel, callback=_cb, connect=_connect, error=_error)
+
+x = 5
+def run_test(t):
+ global x
+ x += 5
+ i = (x / 5) - 1
+ def _print():
+ print('Running test ' + str(i))
+ pubnub.timeout(x, _print)
+ pubnub.timeout(x + 1,t)
+
+def stop():
+ pubnub.stop()
+
+run_test(test_1)
+run_test(test_2)
+run_test(test_3)
+run_test(test_4)
+run_test(test_5)
+run_test(test_6)
+run_test(test_7)
+run_test(test_8)
+run_test(test_9)
+run_test(test_10)
+run_test(test_11)
+run_test(stop)
+
+pubnub_enc.start()
diff --git a/python-twisted/tests/unit-tests.py b/python-twisted/tests/unit-tests.py
deleted file mode 100644
index 70c373b..0000000
--- a/python-twisted/tests/unit-tests.py
+++ /dev/null
@@ -1,107 +0,0 @@
-
-import sys
-import time
-
-
-from PubnubUnitTest import Suite
-from Pubnub import PubnubTwisted as Pubnub
-
-pubnub = Pubnub("demo", "demo")
-
-tests_count = 1 + 2 + 1
-test_suite = Suite(pubnub, tests_count)
-
-tests = []
-
-
-def test_publish():
- channel = "hello" + str(time.time())
- name = "Publish Test"
-
- def success(r):
- test_suite.test(r[0] == 1, name)
-
- def fail(e):
- test_suite.test(False, msg, e)
-
- pubnub.publish({
- 'channel': 'hello',
- 'message': 'hi',
- 'callback': success,
- 'error': fail
- })
-tests.append(test_publish)
-
-#"""
-
-
-def test_subscribe_publish():
- channel = "hello" + str(time.time())
- name = "Subscribe Publish Test"
- publish_msg = "This is Pubnub Python-Twisted"
-
- def connect():
- #print 'connect'
- def success(r):
- test_suite.test(r[0] == 1, name, "publish success")
-
- def fail(e):
- test_suite.test(False, name, "Publish Failed", e)
-
- pubnub.publish({
- 'channel': channel,
- 'message': publish_msg,
- 'callback': success,
- 'error': fail
- })
-
- def callback(r):
- test_suite.test(r == publish_msg, name, "message received")
-
- pubnub.subscribe({
- 'channel': channel,
- 'callback': callback,
- 'connect': connect
- })
-tests.append(test_subscribe_publish)
-#"""
-
-
-def test_here_now():
- channel = "hello12" # + str(time.time())
- name = "Here Now Test"
-
- def connect():
- print 'connect'
-
- def call_here_now():
- print 'call_here_now'
-
- def success(r):
- test_suite.test(r['occupancy'] == 1, name, "Here Now success")
-
- def fail(e):
- test_suite.test(False, name, "Here Now Failed", e)
-
- pubnub.here_now({
- 'channel': channel,
- 'callback': success,
- 'error': fail
- })
- pubnub.timeout(5, call_here_now)
-
- def callback(r):
- pass
- print 'Subscribe'
- pubnub.subscribe({
- 'channel': channel,
- 'callback': callback,
- 'connect': connect
- })
-tests.append(test_here_now)
-
-
-for t in tests:
- t()
-
-pubnub.start()