aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rest_framework/pagination.py105
-rw-r--r--tests/test_pagination.py146
2 files changed, 251 insertions, 0 deletions
diff --git a/rest_framework/pagination.py b/rest_framework/pagination.py
index 55c173df..5482788a 100644
--- a/rest_framework/pagination.py
+++ b/rest_framework/pagination.py
@@ -1,12 +1,15 @@
+# coding: utf-8
"""
Pagination serializers determine the structure of the output that should
be used for paginated responses.
"""
from __future__ import unicode_literals
+from base64 import b64encode, b64decode
from collections import namedtuple
from django.core.paginator import InvalidPage, Paginator as DjangoPaginator
from django.template import Context, loader
from django.utils import six
+from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _
from rest_framework.compat import OrderedDict
from rest_framework.exceptions import NotFound
@@ -377,3 +380,105 @@ class LimitOffsetPagination(BasePagination):
template = loader.get_template(self.template)
context = Context(self.get_html_context())
return template.render(context)
+
+
+Cursor = namedtuple('Cursor', ['offset', 'reverse', 'position'])
+
+
+def decode_cursor(encoded):
+ tokens = urlparse.parse_qs(b64decode(encoded), keep_blank_values=True)
+ try:
+ offset = int(tokens['offset'][0])
+ reverse = bool(int(tokens['reverse'][0]))
+ position = tokens['position'][0]
+ except (TypeError, ValueError):
+ return None
+
+ return Cursor(offset=offset, reverse=reverse, position=position)
+
+
+def encode_cursor(cursor):
+ tokens = {
+ 'offset': str(cursor.offset),
+ 'reverse': '1' if cursor.reverse else '0',
+ 'position': cursor.position
+ }
+ return b64encode(urlparse.urlencode(tokens, doseq=True))
+
+
+class CursorPagination(BasePagination):
+ # TODO: reverse cursors
+ cursor_query_param = 'cursor'
+ page_size = 5
+
+ def paginate_queryset(self, queryset, request, view=None):
+ self.base_url = request.build_absolute_uri()
+ self.ordering = self.get_ordering()
+ encoded = request.query_params.get(self.cursor_query_param)
+
+ if encoded is None:
+ self.cursor = None
+ else:
+ self.cursor = decode_cursor(encoded)
+ # TODO: Invalid cursors should 404
+
+ if self.cursor is not None and self.cursor.position != '':
+ kwargs = {self.ordering + '__gt': self.cursor.position}
+ queryset = queryset.filter(**kwargs)
+
+ # The offset is used in order to deal with cases where we have
+ # items with an identical position. This allows the cursors
+ # to gracefully deal with non-unique fields as the ordering.
+ offset = 0 if (self.cursor is None) else self.cursor.offset
+
+ # We fetch an extra item in order to determine if there is a next page.
+ results = list(queryset[offset:offset + self.page_size + 1])
+ self.page = results[:self.page_size]
+ self.has_next = len(results) > len(self.page)
+ self.next_item = results[-1] if self.has_next else None
+ return self.page
+
+ def get_next_link(self):
+ if not self.has_next:
+ return None
+
+ compare = self.get_position_from_instance(self.next_item, self.ordering)
+ offset = 0
+ for item in reversed(self.page):
+ position = self.get_position_from_instance(item, self.ordering)
+ if position != compare:
+ # The item in this position and the item following it
+ # have different positions. We can use this position as
+ # our marker.
+ break
+
+ # The item in this postion has the same position as the item
+ # following it, we can't use it as a marker position, so increment
+ # the offset and keep seeking to the previous item.
+ compare = position
+ offset += 1
+
+ else:
+ if self.cursor is None:
+ # There were no unique positions in the page, and we were
+ # on the first page, ie. there was no existing cursor.
+ # Our cursor will have an offset equal to the page size,
+ # but no position to filter against yet.
+ offset = self.page_size
+ position = ''
+ else:
+ # There were no unique positions in the page.
+ # Use the position from the existing cursor and increment
+ # it's offset by the page size.
+ offset = self.cursor.offset + self.page_size
+ position = self.cursor.position
+
+ cursor = Cursor(offset=offset, reverse=False, position=position)
+ encoded = encode_cursor(cursor)
+ return replace_query_param(self.base_url, self.cursor_query_param, encoded)
+
+ def get_ordering(self):
+ return 'created'
+
+ def get_position_from_instance(self, instance, ordering):
+ return str(getattr(instance, ordering))
diff --git a/tests/test_pagination.py b/tests/test_pagination.py
index 7cc92347..f04079a7 100644
--- a/tests/test_pagination.py
+++ b/tests/test_pagination.py
@@ -422,6 +422,152 @@ class TestLimitOffset:
assert queryset == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+class TestCursorPagination:
+ """
+ Unit tests for `pagination.CursorPagination`.
+ """
+
+ def setup(self):
+ class MockObject(object):
+ def __init__(self, idx):
+ self.created = idx
+
+ class MockQuerySet(object):
+ def __init__(self, items):
+ self.items = items
+
+ def filter(self, created__gt):
+ return [
+ item for item in self.items
+ if item.created > int(created__gt)
+ ]
+
+ def __getitem__(self, sliced):
+ return self.items[sliced]
+
+ self.pagination = pagination.CursorPagination()
+ self.queryset = MockQuerySet(
+ [MockObject(idx) for idx in range(1, 16)]
+ )
+
+ def paginate_queryset(self, request):
+ return list(self.pagination.paginate_queryset(self.queryset, request))
+
+ # def get_paginated_content(self, queryset):
+ # response = self.pagination.get_paginated_response(queryset)
+ # return response.data
+
+ # def get_html_context(self):
+ # return self.pagination.get_html_context()
+
+ def test_following_cursor(self):
+ request = Request(factory.get('/'))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [1, 2, 3, 4, 5]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url
+
+ request = Request(factory.get(next_url))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [6, 7, 8, 9, 10]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url
+
+ request = Request(factory.get(next_url))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [11, 12, 13, 14, 15]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url is None
+
+
+class TestCrazyCursorPagination:
+ """
+ Unit tests for `pagination.CursorPagination`.
+ """
+
+ def setup(self):
+ class MockObject(object):
+ def __init__(self, idx):
+ self.created = idx
+
+ class MockQuerySet(object):
+ def __init__(self, items):
+ self.items = items
+
+ def filter(self, created__gt):
+ return [
+ item for item in self.items
+ if item.created > int(created__gt)
+ ]
+
+ def __getitem__(self, sliced):
+ return self.items[sliced]
+
+ self.pagination = pagination.CursorPagination()
+ self.queryset = MockQuerySet([
+ MockObject(idx) for idx in [
+ 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1,
+ 1, 1, 2, 3, 4,
+ 5, 6, 7, 8, 9
+ ]
+ ])
+
+ def paginate_queryset(self, request):
+ return list(self.pagination.paginate_queryset(self.queryset, request))
+
+ def test_following_cursor_identical_items(self):
+ request = Request(factory.get('/'))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [1, 1, 1, 1, 1]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url
+
+ request = Request(factory.get(next_url))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [1, 1, 1, 1, 1]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url
+
+ request = Request(factory.get(next_url))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [1, 1, 2, 3, 4]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url
+
+ request = Request(factory.get(next_url))
+ queryset = self.paginate_queryset(request)
+ assert [item.created for item in queryset] == [5, 6, 7, 8, 9]
+
+ next_url = self.pagination.get_next_link()
+ assert next_url is None
+ # assert content == {
+ # 'results': [1, 2, 3, 4, 5],
+ # 'previous': None,
+ # 'next': 'http://testserver/?limit=5&offset=5',
+ # 'count': 100
+ # }
+ # assert context == {
+ # 'previous_url': None,
+ # 'next_url': 'http://testserver/?limit=5&offset=5',
+ # 'page_links': [
+ # PageLink('http://testserver/?limit=5', 1, True, False),
+ # PageLink('http://testserver/?limit=5&offset=5', 2, False, False),
+ # PageLink('http://testserver/?limit=5&offset=10', 3, False, False),
+ # PAGE_BREAK,
+ # PageLink('http://testserver/?limit=5&offset=95', 20, False, False),
+ # ]
+ # }
+ # assert self.pagination.display_page_controls
+ # assert isinstance(self.pagination.to_html(), type(''))
+
+
def test_get_displayed_page_numbers():
"""
Test our contextual page display function.