1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
import pytest
import time
import threading
object_id = "python-unittest"
from Pubnub import Pubnub
p = Pubnub(
publish_key="demo",
subscribe_key="demo",
origin="pubsub-beta.pubnub.com"
)
@pytest.fixture
def async(request):
"""Use py.test fixture black magic to create Event objects for each test."""
return threading.Event()
class TestSimpleDataSync():
data = {}
def asyncTest(fn):
"""
Decorator that creates an asynchronous test case with setup and teardown
methods.
"""
def decorator(self, async):
self.setUp(async)
try:
fn(self, async)
finally:
self.tearDown(async)
return decorator
def setUp(self, async):
"""Initialize the DataSync object and its callback."""
def callback(message, action_list):
print "DataSync callback:", message, action_list
self.data = message
async.set()
p.get_synced_object(object_id, callback)
self.wait(async)
def tearDown(self, async):
"""
Python SDK will not exit because it spawns a subscribe thread that keeps
the process active.
Workaround; unsubscribe from all hidden DataSync channels and publish to
the last channel to terminate the subscribe thread.
"""
p.unsubscribe("pn_ds_" + object_id)
p.unsubscribe("pn_ds_" + object_id + ".*")
p.unsubscribe("pn_dstr_" + object_id)
p.publish(channel="pn_dstr_" + object_id, message={ 'trans_id' : -1 })
def wait(self, async):
"""
Wait for the threading.Event object to be set, then clear it for any
following async operations.
"""
async.wait()
async.clear()
@asyncTest
def test_set(self, async):
"""Set object data with PUT."""
now = time.time()
p.set(object_id, {
"time" : now
})
self.wait(async)
assert 'time' in self.data
assert self.data['time'] == now
@asyncTest
def test_merge(self, async):
"""Update object data with PATCH."""
now = time.time()
p.merge(object_id, {
"time" : now
})
self.wait(async)
assert 'time' in self.data
assert self.data['time'] == now
@asyncTest
def test_delete(self, async):
"""Delete object data with DELETE."""
p.delete(object_id)
self.wait(async)
assert self.data == {}
|