aboutsummaryrefslogtreecommitdiffstats
path: root/tests/test_simple_datasync.py
blob: 9b170e24e3986ed349a9a7f426120729c6a325d2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import pytest

import time
import threading

object_id = "python-unittest"

from Pubnub import Pubnub
p = Pubnub(
    publish_key="demo",
    subscribe_key="demo",
    origin="pubsub-beta.pubnub.com"
)

@pytest.fixture
def async(request):
    """Use py.test fixture black magic to create Event objects for each test."""
    return threading.Event()

class TestSimpleDataSync():
    data = {}

    def asyncTest(fn):
        """
        Decorator that creates an asynchronous test case with setup and teardown
        methods.
        """

        def decorator(self, async):
            self.setUp(async)
            try:
                fn(self, async)
            finally:
                self.tearDown(async)

        return decorator

    def setUp(self, async):
        """Initialize the DataSync object and its callback."""

        def callback(message, action_list):
            print "DataSync callback:", message, action_list
            self.data = message
            async.set()

        p.get_synced_object(object_id, callback)
        self.wait(async)

    def tearDown(self, async):
        """
        Python SDK will not exit because it spawns a subscribe thread that keeps
        the process active.

        Workaround; unsubscribe from all hidden DataSync channels and publish to
        the last channel to terminate the subscribe thread.
        """

        p.unsubscribe("pn_ds_" + object_id)
        p.unsubscribe("pn_ds_" + object_id + ".*")
        p.unsubscribe("pn_dstr_" + object_id)
        p.publish(channel="pn_dstr_" + object_id, message={ 'trans_id' : -1 })

    def wait(self, async):
        """
        Wait for the threading.Event object to be set, then clear it for any
        following async operations.
        """

        async.wait()
        async.clear()

    @asyncTest
    def test_set(self, async):
        """Set object data with PUT."""

        now = time.time()
        p.set(object_id, {
            "time" : now
        })
        self.wait(async)

        assert 'time' in self.data
        assert self.data['time'] == now

    @asyncTest
    def test_merge(self, async):
        """Update object data with PATCH."""

        now = time.time()
        p.merge(object_id, {
            "time" : now
        })
        self.wait(async)

        assert 'time' in self.data
        assert self.data['time'] == now

    @asyncTest
    def test_delete(self, async):
        """Delete object data with DELETE."""

        p.delete(object_id)
        self.wait(async)

        assert self.data == {}