1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
import sys
import time
sys.path.append('../../common')
sys.path.append('../common')
sys.path.append('..')
sys.path.append('.')
from PubnubUnitTest import Suite
from Pubnub import Pubnub
pubnub = Pubnub("demo","demo")
tests_count = 1 + 2 + 1
test_suite = Suite(pubnub,tests_count)
tests = []
def test_publish():
channel = "hello" + str(time.time())
name = "Publish Test"
def success(r):
test_suite.test(r[0] == 1, name)
def fail(e):
test_suite.test(False, msg , e)
pubnub.publish({
'channel' : 'hello',
'message' : 'hi',
'callback' : success,
'error' : fail
})
tests.append(test_publish)
#"""
def test_subscribe_publish():
channel = "hello" + str(time.time())
name = "Subscribe Publish Test"
publish_msg = "This is Pubnub Python-Twisted"
def connect():
#print 'connect'
def success(r):
test_suite.test(r[0] == 1, name, "publish success")
def fail(e):
test_suite.test(False, name , "Publish Failed", e)
pubnub.publish({
'channel' : channel,
'message' : publish_msg,
'callback' : success,
'error' : fail
})
def callback(r):
test_suite.test(r == publish_msg, name, "message received")
pubnub.subscribe({
'channel' : channel,
'callback' : callback,
'connect' : connect
})
tests.append(test_subscribe_publish)
#"""
def test_here_now():
channel = "hello12" #+ str(time.time())
name = "Here Now Test"
def connect():
print 'connect'
def call_here_now():
print 'call_here_now'
def success(r):
test_suite.test(r['occupancy'] == 1, name, "Here Now success")
def fail(e):
test_suite.test(False, name , "Here Now Failed", e)
pubnub.here_now({
'channel' : channel,
'callback' : success,
'error' : fail
})
pubnub.timeout(5, call_here_now)
def callback(r):
pass
print 'Subscribe'
pubnub.subscribe({
'channel' : channel,
'callback' : callback,
'connect' : connect
})
tests.append(test_here_now)
for t in tests: t()
pubnub.start()
|