1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
import sys
import time
from PubnubUnitTest import Suite
from Pubnub import PubnubTwisted as Pubnub
pubnub = Pubnub("demo", "demo")
tests_count = 1 + 2 + 1
test_suite = Suite(pubnub, tests_count)
tests = []
def test_publish():
channel = "hello" + str(time.time())
name = "Publish Test"
def success(r):
test_suite.test(r[0] == 1, name)
def fail(e):
test_suite.test(False, msg, e)
pubnub.publish({
'channel': 'hello',
'message': 'hi',
'callback': success,
'error': fail
})
tests.append(test_publish)
#"""
def test_subscribe_publish():
channel = "hello" + str(time.time())
name = "Subscribe Publish Test"
publish_msg = "This is Pubnub Python-Twisted"
def connect():
#print 'connect'
def success(r):
test_suite.test(r[0] == 1, name, "publish success")
def fail(e):
test_suite.test(False, name, "Publish Failed", e)
pubnub.publish({
'channel': channel,
'message': publish_msg,
'callback': success,
'error': fail
})
def callback(r):
test_suite.test(r == publish_msg, name, "message received")
pubnub.subscribe({
'channel': channel,
'callback': callback,
'connect': connect
})
tests.append(test_subscribe_publish)
#"""
def test_here_now():
channel = "hello12" # + str(time.time())
name = "Here Now Test"
def connect():
print 'connect'
def call_here_now():
print 'call_here_now'
def success(r):
test_suite.test(r['occupancy'] == 1, name, "Here Now success")
def fail(e):
test_suite.test(False, name, "Here Now Failed", e)
pubnub.here_now({
'channel': channel,
'callback': success,
'error': fail
})
pubnub.timeout(5, call_here_now)
def callback(r):
pass
print 'Subscribe'
pubnub.subscribe({
'channel': channel,
'callback': callback,
'connect': connect
})
tests.append(test_here_now)
for t in tests:
t()
pubnub.start()
|