aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/unit-test-async.py143
-rw-r--r--python-tornado/tests/unit-test.py226
-rw-r--r--python-twisted/tests/unit-test.py116
3 files changed, 143 insertions, 342 deletions
diff --git a/common/unit-test-async.py b/common/unit-test-async.py
new file mode 100644
index 0000000..258bf6a
--- /dev/null
+++ b/common/unit-test-async.py
@@ -0,0 +1,143 @@
+## www.pubnub.com - PubNub Real-time push service in the cloud.
+# coding=utf8
+
+## PubNub Real-time Push APIs and Notifications Framework
+## Copyright (c) 2010 Stephen Blum
+## http://www.pubnub.com/
+
+## -----------------------------------
+## PubNub 3.1 Real-time Push Cloud API
+## -----------------------------------
+
+import sys
+import time
+sys.path.append('../')
+sys.path.append('./')
+sys.path.append('../common/')
+from Pubnub import Pubnub
+
+publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
+subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
+secret_key = len(sys.argv) > 3 and sys.argv[3] or None
+cipher_key = len(sys.argv) > 4 and sys.argv[4] or None
+ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
+
+## -----------------------------------------------------------------------
+## Initiat Class
+## -----------------------------------------------------------------------
+pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
+crazy = 'python-async-test-channel'
+expect = 0
+done = 0
+failures = 0
+passes = 0
+
+def stop():
+ global done
+ global count
+ pubnub.stop()
+ print "============================"
+ print 'Total\t:\t' , failures + passes
+ print 'PASS\t:\t' , passes
+ print 'FAIL\t:\t', failures
+ print "============================"
+
+## ---------------------------------------------------------------------------
+## Unit Test Function
+## ---------------------------------------------------------------------------
+def test( trial, name ) :
+ global failures
+ global passes
+ global done
+ done += 1
+ #print trial
+ if trial == False:
+ print 'FAIL : ', name
+ failures += 1
+ else:
+ print 'PASS : ', name
+ passes += 1
+ if done == expect:
+ stop()
+
+def test_publish():
+ def publish_cb(messages):
+ test(messages[0] == 1, "Publish Test")
+
+ pubnub.publish({
+ 'channel' : crazy,
+ 'message' : {'one': 'Hello World! --> ɂ顶@#$%^&*()!', 'two': 'hello2'},
+ 'callback' : publish_cb
+ })
+
+
+def test_history():
+ def history_cb(messages):
+ test(len(messages) <= 1, "History Test")
+ pubnub.history({
+ 'channel' : crazy,
+ 'limit' : 1,
+ 'callback' : history_cb
+ })
+
+
+
+def test_subscribe():
+ message = "Testing Subscribe"
+ def subscribe_connect_cb():
+ def publish_cb(response):
+ test(response[0] == 1, 'Publish Test in subscribe Connect Callback')
+ pubnub.publish({
+ 'channel' : crazy,
+ 'message' : message,
+ 'callback' : publish_cb
+ })
+ def subscribe_cb(response):
+ print response
+ test(response == message , 'Subscribe Receive Test in subscribe Callback')
+ pubnub.subscribe({
+ 'channel' : crazy,
+ 'connect' : subscribe_connect_cb,
+ 'callback': subscribe_cb
+ })
+
+
+def test_here_now():
+ channel = crazy + '-here-now'
+ message = "Testing Subscribe"
+ def subscribe_connect_cb():
+ def here_now_cb(response):
+ test(response["occupancy"] > 0, 'Here Now Test')
+ def publish_cb(response):
+ test(response[0] == 1, 'Here Now Test: Publish Test in subscribe Connect Callback')
+ pubnub.publish({
+ 'channel' : channel,
+ 'message' : message,
+ 'callback' : publish_cb
+ })
+ time.sleep(5)
+ pubnub.here_now({
+ 'channel' : channel,
+ 'callback' : here_now_cb
+ })
+
+
+ def subscribe_cb(response):
+ test(response == message , 'Here Now Test: Subscribe Receive Test in subscribe Callback')
+ pubnub.subscribe({
+ 'channel' : channel,
+ 'connect' : subscribe_connect_cb,
+ 'callback': subscribe_cb
+ })
+
+expect = 7
+test_publish()
+test_history()
+test_subscribe()
+test_here_now()
+
+
+
+pubnub.start()
+if failures > 0:
+ raise Exception('Fail', failures)
diff --git a/python-tornado/tests/unit-test.py b/python-tornado/tests/unit-test.py
deleted file mode 100644
index c5940af..0000000
--- a/python-tornado/tests/unit-test.py
+++ /dev/null
@@ -1,226 +0,0 @@
-## www.pubnub.com - PubNub Real-time push service in the cloud.
-# coding=utf8
-
-## PubNub Real-time Push APIs and Notifications Framework
-## Copyright (c) 2010 Stephen Blum
-## http://www.pubnub.com/
-
-## TODO Tests
-##
-## - wait 20 minutes, send a message, receive and success.
-## -
-## -
-##
-##
-
-## -----------------------------------
-## PubNub 3.1 Real-time Push Cloud API
-## -----------------------------------
-
-import sys
-sys.path.append('../')
-sys.path.append('./')
-sys.path.append('../common/')
-from Pubnub import Pubnub
-
-publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
-subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
-secret_key = len(sys.argv) > 3 and sys.argv[3] or None
-cipher_key = len(sys.argv) > 4 and sys.argv[4] or None
-ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
-
-## -----------------------------------------------------------------------
-## Command Line Options Supplied PubNub
-## -----------------------------------------------------------------------
-pubnub_user_supplied_options = Pubnub(
- publish_key, ## OPTIONAL (supply None to disable)
- subscribe_key, ## REQUIRED
- secret_key, ## OPTIONAL (supply None to disable)
- cipher_key, ## OPTIONAL (supply None to disable)
- ssl_on ## OPTIONAL (supply None to disable)
-)
-
-## -----------------------------------------------------------------------
-## High Security PubNub
-## -----------------------------------------------------------------------
-pubnub_high_security = Pubnub(
- ## Publish Key
- 'pub-c-a30c030e-9f9c-408d-be89-d70b336ca7a0',
-
- ## Subscribe Key
- 'sub-c-387c90f3-c018-11e1-98c9-a5220e0555fd',
-
- ## Secret Key
- 'sec-c-MTliNDE0NTAtYjY4Ni00MDRkLTllYTItNDhiZGE0N2JlYzBl',
-
- ## Cipher Key
- 'YWxzamRmbVjFaa05HVnGFqZHM3NXRBS73jxmhVMkjiwVVXV1d5UrXR1JLSkZFRr'+
- 'WVd4emFtUm1iR0TFpUZvbiBoYXMgYmVlbxWkhNaF3uUi8kM0YkJTEVlZYVFjBYi'+
- 'jFkWFIxSkxTa1pGUjd874hjklaTFpUwRVuIFNob3VsZCB5UwRkxUR1J6YVhlQWa'+
- 'V1ZkNGVH32mDkdho3pqtRnRVbTFpUjBaeGUgYXNrZWQtZFoKjda40ZWlyYWl1eX'+
- 'U4RkNtdmNub2l1dHE2TTA1jd84jkdJTbFJXYkZwWlZtRnKkWVrSRhhWbFpZVmFz'+
- 'c2RkZmTFpUpGa1dGSXhTa3hUYTFwR1Vpkm9yIGluZm9ybWFNfdsWQdSiiYXNWVX'+
- 'RSblJWYlRGcFVqQmFlRmRyYUU0MFpXbHlZV2wxZVhVNFJrTnR51YjJsMWRIRTJU'+
- 'W91ciBpbmZvcm1hdGliBzdWJtaXR0ZWQb3UZSBhIHJlc3BvbnNlLCB3ZWxsIHJl'+
- 'VEExWdHVybiB0am0aW9uIb24gYXMgd2UgcG9zc2libHkgY2FuLuhcFe24ldWVns'+
- 'dSaTFpU3hVUjFKNllWaFdhRmxZUWpCaQo34gcmVxdWlGFzIHNveqQl83snBfVl3',
-
- ## 2048bit SSL ON - ENABLED TRUE
- True
-)
-
-## -----------------------------------------------------------------------
-## Channel | Message Test Data (UTF-8)
-## -----------------------------------------------------------------------
-crazy = ' ~`â¦â§!@#$%^&*(顶顅Ȓ)+=[]\\{}|;\':",./<>?abcd'
-many_channels = [ str(x) + '-many_channel_test' for x in range(10) ]
-runthroughs = 0
-planned_tests = 2
-delivery_retries = 0
-max_retries = 10
-
-## -----------------------------------------------------------------------
-## Unit Test Function
-## -----------------------------------------------------------------------
-def test( trial, name ) :
- if trial : print( 'PASS: ' + name )
- else : print( '- FAIL - ' + name )
-
-def test_pubnub(pubnub):
- global runthroughs, planned_tests, delivery_retries, max_retries
-
- ## -----------------------------------------------------------------------
- ## Many Channels
- ## -----------------------------------------------------------------------
- def phase2():
- status = {
- 'sent' : 0,
- 'received' : 0,
- 'connections' : 0
- }
-
- def received( message, chan ):
- global runthroughs
-
- test( status['received'] <= status['sent'], 'many sends' )
- status['received'] += 1
- pubnub.unsubscribe({ 'channel' : chan })
- if status['received'] == len(many_channels):
- runthroughs += 1
- if runthroughs == planned_tests: pubnub.stop()
-
- def publish_complete( info, chan ):
- global delivery_retries, max_retries
- status['sent'] += 1
- test( info, 'publish complete' )
- test( info and len(info) > 2, 'publish response' )
- if not info[0]:
- delivery_retries += 1
- if max_retries > delivery_retries: sendit(chan)
-
- def sendit(chan):
- tchan = chan
- pubnub.publish({
- 'channel' : chan,
- 'message' : "Hello World",
- 'callback' : (lambda msg:publish_complete( msg, tchan ))
- })
-
- def connected(chan):
- status['connections'] += 1
- sendit(chan)
-
- def delivered(info):
- if info and info[0]: status['sent'] += 1
-
- def subscribe(chan):
- pubnub.subscribe({
- 'channel' : chan,
- 'connect' : (lambda:connected(chan+'')),
- 'callback' : (lambda msg:received( msg, chan ))
- })
-
- ## Subscribe All Channels
- for chan in many_channels: subscribe(chan)
-
- ## -----------------------------------------------------------------------
- ## Time Example
- ## -----------------------------------------------------------------------
- def time_complete(timetoken):
- test( timetoken, 'timetoken fetch' )
- test( isinstance( timetoken, int ), 'timetoken int type' )
-
- pubnub.time({ 'callback' : time_complete })
-
- ## -----------------------------------------------------------------------
- ## Publish Example
- ## -----------------------------------------------------------------------
- def publish_complete(info):
- test( info, 'publish complete' )
- test( info and len(info) > 2, 'publish response' )
-
- pubnub.history( {
- 'channel' : crazy,
- 'limit' : 10,
- 'callback' : history_complete
- })
-
- ## -----------------------------------------------------------------------
- ## History Example
- ## -----------------------------------------------------------------------
- def history_complete(messages):
- test( messages and len(messages) > 0, 'history' )
- test( messages, 'history' )
-
-
- pubnub.publish({
- 'channel' : crazy,
- 'message' : "Hello World",
- 'callback' : publish_complete
- })
-
- ## -----------------------------------------------------------------------
- ## Subscribe Example
- ## -----------------------------------------------------------------------
- def message_received(message):
- test( message, 'message received' )
- pubnub.unsubscribe({ 'channel' : crazy })
-
- def done() :
- pubnub.unsubscribe({ 'channel' : crazy })
- pubnub.publish({
- 'channel' : crazy,
- 'message' : "Hello World",
- 'callback' : (lambda x:x)
- })
-
- def dumpster(message) :
- test( 0, 'never see this' )
-
- pubnub.subscribe({
- 'channel' : crazy,
- 'connect' : done,
- 'callback' : dumpster
- })
-
- def connected() :
- pubnub.publish({
- 'channel' : crazy,
- 'message' : { 'Info' : 'Connected!' }
- })
-
- pubnub.subscribe({
- 'channel' : crazy,
- 'connect' : connected,
- 'callback' : message_received
- })
-
- phase2()
-
-## -----------------------------------------------------------------------
-## Run Tests
-## -----------------------------------------------------------------------
-test_pubnub(pubnub_user_supplied_options)
-test_pubnub(pubnub_high_security)
-pubnub_high_security.start()
-
diff --git a/python-twisted/tests/unit-test.py b/python-twisted/tests/unit-test.py
deleted file mode 100644
index ed7fcd5..0000000
--- a/python-twisted/tests/unit-test.py
+++ /dev/null
@@ -1,116 +0,0 @@
-## www.pubnub.com - PubNub Real-time push service in the cloud.
-# coding=utf8
-
-## PubNub Real-time Push APIs and Notifications Framework
-## Copyright (c) 2010 Stephen Blum
-## http://www.pubnub.com/
-
-## -----------------------------------
-## PubNub 3.1 Real-time Push Cloud API
-## -----------------------------------
-
-import sys
-sys.path.append('../')
-sys.path.append('./')
-sys.path.append('../common/')
-from Pubnub import Pubnub
-
-publish_key = len(sys.argv) > 1 and sys.argv[1] or 'demo'
-subscribe_key = len(sys.argv) > 2 and sys.argv[2] or 'demo'
-secret_key = len(sys.argv) > 3 and sys.argv[3] or 'demo'
-cipher_key = len(sys.argv) > 4 and sys.argv[4] or '' ##(Cipher key is Optional)
-ssl_on = len(sys.argv) > 5 and bool(sys.argv[5]) or False
-
-## -----------------------------------------------------------------------
-## Initiat Class
-## -----------------------------------------------------------------------
-pubnub = Pubnub( publish_key, subscribe_key, secret_key, cipher_key, ssl_on )
-crazy = 'python-twisted-test-channel'
-
-## ---------------------------------------------------------------------------
-## Unit Test Function
-## ---------------------------------------------------------------------------
-def test( trial, name ) :
- if trial :
- print( 'PASS: ' + name )
- else :
- print( 'FAIL: ' + name )
-
-## -----------------------------------------------------------------------
-## Time Example
-## -----------------------------------------------------------------------
-def time_complete(timestamp):
- print(timestamp)
-
-pubnub.time({ 'callback' : time_complete })
-
-## -----------------------------------------------------------------------
-## History Example
-## -----------------------------------------------------------------------
-def history_complete(messages):
- print(messages)
-
-pubnub.history( {
- 'channel' : crazy,
- 'limit' : 10,
- 'callback' : history_complete
-})
-
-## -----------------------------------------------------------------------
-## Publish Example
-## -----------------------------------------------------------------------
-def publish_complete(info):
- print(info)
-
-pubnub.publish({
- 'channel' : crazy,
- 'message' : {'one': 'Hello World! --> ɂ顶@#$%^&*()!', 'two': 'hello2'},
- 'callback' : publish_complete
-})
-
-## -----------------------------------------------------------------------
-## Subscribe Example
-## -----------------------------------------------------------------------
-def message_received(message):
- print(message)
- print('Disconnecting...')
- pubnub.unsubscribe({ 'channel' : crazy })
-
- def done() :
- print('final connection, done :)')
- pubnub.unsubscribe({ 'channel' : crazy })
- pubnub.stop()
-
- def dumpster(message) :
- print('never see this')
- print(message)
-
- print('reconnecting...')
- pubnub.subscribe({
- 'channel' : crazy,
- 'connect' : done,
- 'callback' : dumpster
- })
-
-
-def connected() :
- ch = crazy
- def dump_message( msg):
- print ch, msg
- pubnub.unsubscribe({'channel':ch})
- pubnub.stop()
- pubnub.publish({
- 'channel' : ch,
- 'message' : { 'Info' : 'Connected!' },
- 'callback' : dump_message
- })
-
-pubnub.subscribe({
- 'channel' : crazy,
- 'connect' : connected,
- 'callback' : message_received
-})
-## -----------------------------------------------------------------------
-## IO Event Loop
-## -----------------------------------------------------------------------
-pubnub.start()