aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTom Christie2015-01-17 00:10:43 +0000
committerTom Christie2015-01-17 00:10:43 +0000
commit4919492582547d227a22852ad2339fa73739cc94 (patch)
tree8c7b8de16849b288be2f97657396a4c74481b585
parentdc18040ba47325afb38ae62042a6103bfd794c4b (diff)
downloaddjango-rest-framework-4919492582547d227a22852ad2339fa73739cc94.tar.bz2
First pass at cursor pagination
-rw-r--r--rest_framework/pagination.py51
-rw-r--r--tests/test_pagination.py88
2 files changed, 139 insertions, 0 deletions
diff --git a/rest_framework/pagination.py b/rest_framework/pagination.py
index 1b7524c6..89d6f9f4 100644
--- a/rest_framework/pagination.py
+++ b/rest_framework/pagination.py
@@ -3,10 +3,12 @@ Pagination serializers determine the structure of the output that should
be used for paginated responses.
"""
from __future__ import unicode_literals
+from base64 import b64encode, b64decode
from collections import namedtuple
from django.core.paginator import InvalidPage, Paginator as DjangoPaginator
from django.template import Context, loader
from django.utils import six
+from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _
from rest_framework.compat import OrderedDict
from rest_framework.exceptions import NotFound
@@ -377,3 +379,52 @@ class LimitOffsetPagination(BasePagination):
template = loader.get_template(self.template)
context = Context(self.get_html_context())
return template.render(context)
+
+
+class CursorPagination(BasePagination):
+ # reverse
+ # limit
+ # multiple orderings
+ cursor_query_param = 'cursor'
+ page_size = 5
+
+ def paginate_queryset(self, queryset, request, view=None):
+ self.base_url = request.build_absolute_uri()
+ self.ordering = self.get_ordering()
+ encoded = request.query_params.get(self.cursor_query_param)
+
+ if encoded is None:
+ cursor = None
+ else:
+ cursor = self.decode_cursor(encoded, self.ordering)
+
+ if cursor is not None:
+ kwargs = {self.ordering + '__gt': cursor}
+ queryset = queryset.filter(**kwargs)
+
+ results = list(queryset[:self.page_size + 1])
+ self.page = results[:self.page_size]
+ self.has_next = len(results) > len(self.page)
+ return self.page
+
+ def get_next_link(self):
+ if not self.has_next:
+ return None
+ last_item = self.page[-1]
+ cursor = self.get_cursor_from_instance(last_item, self.ordering)
+ encoded = self.encode_cursor(cursor, self.ordering)
+ return replace_query_param(self.base_url, self.cursor_query_param, encoded)
+
+ def get_ordering(self):
+ return 'created'
+
+ def get_cursor_from_instance(self, instance, ordering):
+ return getattr(instance, ordering)
+
+ def decode_cursor(self, encoded, ordering):
+ items = urlparse.parse_qs(b64decode(encoded))
+ return items.get(ordering)[0]
+
+ def encode_cursor(self, cursor, ordering):
+ items = [(ordering, cursor)]
+ return b64encode(urlparse.urlencode(items, doseq=True))
diff --git a/tests/test_pagination.py b/tests/test_pagination.py
index 7cc92347..7f18b446 100644
--- a/tests/test_pagination.py
+++ b/tests/test_pagination.py
@@ -422,6 +422,94 @@ class TestLimitOffset:
assert queryset == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+class TestCursorPagination:
+ """
+ Unit tests for `pagination.CursorPagination`.
+ """
+
+ def setup(self):
+ class MockObject(object):
+ def __init__(self, idx):
+ self.created = idx
+
+ class MockQuerySet(object):
+ def __init__(self, items):
+ self.items = items
+
+ def filter(self, created__gt):
+ return [
+ item for item in self.items
+ if item.created > int(created__gt)
+ ]
+
+ def __getitem__(self, sliced):
+ return self.items[sliced]
+
+ self.pagination = pagination.CursorPagination()
+ self.queryset = MockQuerySet(
+ [MockObject(idx) for idx in range(1, 21)]
+ )
+
+ def paginate_queryset(self, request):
+ return list(self.pagination.paginate_queryset(self.queryset, request))
+
+ # def get_paginated_content(self, queryset):
+ # response = self.pagination.get_paginated_response(queryset)
+ # return response.data
+
+ # def get_html_context(self):
+ # return self.pagination.get_html_context()
+
+ def test_following_cursor(self):
+ request = Request(factory.get('/'))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [1, 2, 3, 4, 5]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url
+
+ request = Request(factory.get(next_url))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [6, 7, 8, 9, 10]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url
+
+ request = Request(factory.get(next_url))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [11, 12, 13, 14, 15]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url
+
+ request = Request(factory.get(next_url))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [16, 17, 18, 19, 20]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url is None
+
+ # assert content == {
+ # 'results': [1, 2, 3, 4, 5],
+ # 'previous': None,
+ # 'next': 'http://testserver/?limit=5&offset=5',
+ # 'count': 100
+ # }
+ # assert context == {
+ # 'previous_url': None,
+ # 'next_url': 'http://testserver/?limit=5&offset=5',
+ # 'page_links': [
+ # PageLink('http://testserver/?limit=5', 1, True, False),
+ # PageLink('http://testserver/?limit=5&offset=5', 2, False, False),
+ # PageLink('http://testserver/?limit=5&offset=10', 3, False, False),
+ # PAGE_BREAK,
+ # PageLink('http://testserver/?limit=5&offset=95', 20, False, False),
+ # ]
+ # }
+ # assert self.pagination.display_page_controls
+ # assert isinstance(self.pagination.to_html(), type(''))
+
+
def test_get_displayed_page_numbers():
"""
Test our contextual page display function.