diff options
| author | Tom Christie | 2015-01-17 00:10:43 +0000 | 
|---|---|---|
| committer | Tom Christie | 2015-01-17 00:10:43 +0000 | 
| commit | 4919492582547d227a22852ad2339fa73739cc94 (patch) | |
| tree | 8c7b8de16849b288be2f97657396a4c74481b585 | |
| parent | dc18040ba47325afb38ae62042a6103bfd794c4b (diff) | |
| download | django-rest-framework-4919492582547d227a22852ad2339fa73739cc94.tar.bz2 | |
First pass at cursor pagination
| -rw-r--r-- | rest_framework/pagination.py | 51 | ||||
| -rw-r--r-- | tests/test_pagination.py | 88 | 
2 files changed, 139 insertions, 0 deletions
| diff --git a/rest_framework/pagination.py b/rest_framework/pagination.py index 1b7524c6..89d6f9f4 100644 --- a/rest_framework/pagination.py +++ b/rest_framework/pagination.py @@ -3,10 +3,12 @@ Pagination serializers determine the structure of the output that should  be used for paginated responses.  """  from __future__ import unicode_literals +from base64 import b64encode, b64decode  from collections import namedtuple  from django.core.paginator import InvalidPage, Paginator as DjangoPaginator  from django.template import Context, loader  from django.utils import six +from django.utils.six.moves.urllib import parse as urlparse  from django.utils.translation import ugettext as _  from rest_framework.compat import OrderedDict  from rest_framework.exceptions import NotFound @@ -377,3 +379,52 @@ class LimitOffsetPagination(BasePagination):          template = loader.get_template(self.template)          context = Context(self.get_html_context())          return template.render(context) + + +class CursorPagination(BasePagination): +    # reverse +    # limit +    # multiple orderings +    cursor_query_param = 'cursor' +    page_size = 5 + +    def paginate_queryset(self, queryset, request, view=None): +        self.base_url = request.build_absolute_uri() +        self.ordering = self.get_ordering() +        encoded = request.query_params.get(self.cursor_query_param) + +        if encoded is None: +            cursor = None +        else: +            cursor = self.decode_cursor(encoded, self.ordering) + +        if cursor is not None: +            kwargs = {self.ordering + '__gt': cursor} +            queryset = queryset.filter(**kwargs) + +        results = list(queryset[:self.page_size + 1]) +        self.page = results[:self.page_size] +        self.has_next = len(results) > len(self.page) +        return self.page + +    def get_next_link(self): +        if not self.has_next: +            return None +        last_item = self.page[-1] +        cursor = self.get_cursor_from_instance(last_item, self.ordering) +        encoded = self.encode_cursor(cursor, self.ordering) +        return replace_query_param(self.base_url, self.cursor_query_param, encoded) + +    def get_ordering(self): +        return 'created' + +    def get_cursor_from_instance(self, instance, ordering): +        return getattr(instance, ordering) + +    def decode_cursor(self, encoded, ordering): +        items = urlparse.parse_qs(b64decode(encoded)) +        return items.get(ordering)[0] + +    def encode_cursor(self, cursor, ordering): +        items = [(ordering, cursor)] +        return b64encode(urlparse.urlencode(items, doseq=True)) diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 7cc92347..7f18b446 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -422,6 +422,94 @@ class TestLimitOffset:          assert queryset == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] +class TestCursorPagination: +    """ +    Unit tests for `pagination.CursorPagination`. +    """ + +    def setup(self): +        class MockObject(object): +            def __init__(self, idx): +                self.created = idx + +        class MockQuerySet(object): +            def __init__(self, items): +                self.items = items + +            def filter(self, created__gt): +                return [ +                    item for item in self.items +                    if item.created > int(created__gt) +                ] + +            def __getitem__(self, sliced): +                return self.items[sliced] + +        self.pagination = pagination.CursorPagination() +        self.queryset = MockQuerySet( +            [MockObject(idx) for idx in range(1, 21)] +        ) + +    def paginate_queryset(self, request): +        return list(self.pagination.paginate_queryset(self.queryset, request)) + +    # def get_paginated_content(self, queryset): +    #     response = self.pagination.get_paginated_response(queryset) +    #     return response.data + +    # def get_html_context(self): +    #     return self.pagination.get_html_context() + +    def test_following_cursor(self): +        request = Request(factory.get('/')) +        queryset = self.paginate_queryset(request) +        assert [item.created for item in queryset] == [1, 2, 3, 4, 5] + +        next_url = self.pagination.get_next_link() +        assert next_url + +        request = Request(factory.get(next_url)) +        queryset = self.paginate_queryset(request) +        assert [item.created for item in queryset] == [6, 7, 8, 9, 10] + +        next_url = self.pagination.get_next_link() +        assert next_url + +        request = Request(factory.get(next_url)) +        queryset = self.paginate_queryset(request) +        assert [item.created for item in queryset] == [11, 12, 13, 14, 15] + +        next_url = self.pagination.get_next_link() +        assert next_url + +        request = Request(factory.get(next_url)) +        queryset = self.paginate_queryset(request) +        assert [item.created for item in queryset] == [16, 17, 18, 19, 20] + +        next_url = self.pagination.get_next_link() +        assert next_url is None + +        # assert content == { +        #     'results': [1, 2, 3, 4, 5], +        #     'previous': None, +        #     'next': 'http://testserver/?limit=5&offset=5', +        #     'count': 100 +        # } +        # assert context == { +        #     'previous_url': None, +        #     'next_url': 'http://testserver/?limit=5&offset=5', +        #     'page_links': [ +        #         PageLink('http://testserver/?limit=5', 1, True, False), +        #         PageLink('http://testserver/?limit=5&offset=5', 2, False, False), +        #         PageLink('http://testserver/?limit=5&offset=10', 3, False, False), +        #         PAGE_BREAK, +        #         PageLink('http://testserver/?limit=5&offset=95', 20, False, False), +        #     ] +        # } +        # assert self.pagination.display_page_controls +        # assert isinstance(self.pagination.to_html(), type('')) + +  def test_get_displayed_page_numbers():      """      Test our contextual page display function. | 
