diff options
| -rw-r--r-- | rest_framework/pagination.py | 105 | ||||
| -rw-r--r-- | tests/test_pagination.py | 146 |
2 files changed, 251 insertions, 0 deletions
diff --git a/rest_framework/pagination.py b/rest_framework/pagination.py index 55c173df..5482788a 100644 --- a/rest_framework/pagination.py +++ b/rest_framework/pagination.py @@ -1,12 +1,15 @@ +# coding: utf-8 """ Pagination serializers determine the structure of the output that should be used for paginated responses. """ from __future__ import unicode_literals +from base64 import b64encode, b64decode from collections import namedtuple from django.core.paginator import InvalidPage, Paginator as DjangoPaginator from django.template import Context, loader from django.utils import six +from django.utils.six.moves.urllib import parse as urlparse from django.utils.translation import ugettext as _ from rest_framework.compat import OrderedDict from rest_framework.exceptions import NotFound @@ -377,3 +380,105 @@ class LimitOffsetPagination(BasePagination): template = loader.get_template(self.template) context = Context(self.get_html_context()) return template.render(context) + + +Cursor = namedtuple('Cursor', ['offset', 'reverse', 'position']) + + +def decode_cursor(encoded): + tokens = urlparse.parse_qs(b64decode(encoded), keep_blank_values=True) + try: + offset = int(tokens['offset'][0]) + reverse = bool(int(tokens['reverse'][0])) + position = tokens['position'][0] + except (TypeError, ValueError): + return None + + return Cursor(offset=offset, reverse=reverse, position=position) + + +def encode_cursor(cursor): + tokens = { + 'offset': str(cursor.offset), + 'reverse': '1' if cursor.reverse else '0', + 'position': cursor.position + } + return b64encode(urlparse.urlencode(tokens, doseq=True)) + + +class CursorPagination(BasePagination): + # TODO: reverse cursors + cursor_query_param = 'cursor' + page_size = 5 + + def paginate_queryset(self, queryset, request, view=None): + self.base_url = request.build_absolute_uri() + self.ordering = self.get_ordering() + encoded = request.query_params.get(self.cursor_query_param) + + if encoded is None: + self.cursor = None + else: + self.cursor = decode_cursor(encoded) + # TODO: Invalid cursors should 404 + + if self.cursor is not None and self.cursor.position != '': + kwargs = {self.ordering + '__gt': self.cursor.position} + queryset = queryset.filter(**kwargs) + + # The offset is used in order to deal with cases where we have + # items with an identical position. This allows the cursors + # to gracefully deal with non-unique fields as the ordering. + offset = 0 if (self.cursor is None) else self.cursor.offset + + # We fetch an extra item in order to determine if there is a next page. + results = list(queryset[offset:offset + self.page_size + 1]) + self.page = results[:self.page_size] + self.has_next = len(results) > len(self.page) + self.next_item = results[-1] if self.has_next else None + return self.page + + def get_next_link(self): + if not self.has_next: + return None + + compare = self.get_position_from_instance(self.next_item, self.ordering) + offset = 0 + for item in reversed(self.page): + position = self.get_position_from_instance(item, self.ordering) + if position != compare: + # The item in this position and the item following it + # have different positions. We can use this position as + # our marker. + break + + # The item in this postion has the same position as the item + # following it, we can't use it as a marker position, so increment + # the offset and keep seeking to the previous item. + compare = position + offset += 1 + + else: + if self.cursor is None: + # There were no unique positions in the page, and we were + # on the first page, ie. there was no existing cursor. + # Our cursor will have an offset equal to the page size, + # but no position to filter against yet. + offset = self.page_size + position = '' + else: + # There were no unique positions in the page. + # Use the position from the existing cursor and increment + # it's offset by the page size. + offset = self.cursor.offset + self.page_size + position = self.cursor.position + + cursor = Cursor(offset=offset, reverse=False, position=position) + encoded = encode_cursor(cursor) + return replace_query_param(self.base_url, self.cursor_query_param, encoded) + + def get_ordering(self): + return 'created' + + def get_position_from_instance(self, instance, ordering): + return str(getattr(instance, ordering)) diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 7cc92347..f04079a7 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -422,6 +422,152 @@ class TestLimitOffset: assert queryset == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] +class TestCursorPagination: + """ + Unit tests for `pagination.CursorPagination`. + """ + + def setup(self): + class MockObject(object): + def __init__(self, idx): + self.created = idx + + class MockQuerySet(object): + def __init__(self, items): + self.items = items + + def filter(self, created__gt): + return [ + item for item in self.items + if item.created > int(created__gt) + ] + + def __getitem__(self, sliced): + return self.items[sliced] + + self.pagination = pagination.CursorPagination() + self.queryset = MockQuerySet( + [MockObject(idx) for idx in range(1, 16)] + ) + + def paginate_queryset(self, request): + return list(self.pagination.paginate_queryset(self.queryset, request)) + + # def get_paginated_content(self, queryset): + # response = self.pagination.get_paginated_response(queryset) + # return response.data + + # def get_html_context(self): + # return self.pagination.get_html_context() + + def test_following_cursor(self): + request = Request(factory.get('/')) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [1, 2, 3, 4, 5] + + next_url = self.pagination.get_next_link() + assert next_url + + request = Request(factory.get(next_url)) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [6, 7, 8, 9, 10] + + next_url = self.pagination.get_next_link() + assert next_url + + request = Request(factory.get(next_url)) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [11, 12, 13, 14, 15] + + next_url = self.pagination.get_next_link() + assert next_url is None + + +class TestCrazyCursorPagination: + """ + Unit tests for `pagination.CursorPagination`. + """ + + def setup(self): + class MockObject(object): + def __init__(self, idx): + self.created = idx + + class MockQuerySet(object): + def __init__(self, items): + self.items = items + + def filter(self, created__gt): + return [ + item for item in self.items + if item.created > int(created__gt) + ] + + def __getitem__(self, sliced): + return self.items[sliced] + + self.pagination = pagination.CursorPagination() + self.queryset = MockQuerySet([ + MockObject(idx) for idx in [ + 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, + 1, 1, 2, 3, 4, + 5, 6, 7, 8, 9 + ] + ]) + + def paginate_queryset(self, request): + return list(self.pagination.paginate_queryset(self.queryset, request)) + + def test_following_cursor_identical_items(self): + request = Request(factory.get('/')) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [1, 1, 1, 1, 1] + + next_url = self.pagination.get_next_link() + assert next_url + + request = Request(factory.get(next_url)) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [1, 1, 1, 1, 1] + + next_url = self.pagination.get_next_link() + assert next_url + + request = Request(factory.get(next_url)) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [1, 1, 2, 3, 4] + + next_url = self.pagination.get_next_link() + assert next_url + + request = Request(factory.get(next_url)) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [5, 6, 7, 8, 9] + + next_url = self.pagination.get_next_link() + assert next_url is None + # assert content == { + # 'results': [1, 2, 3, 4, 5], + # 'previous': None, + # 'next': 'http://testserver/?limit=5&offset=5', + # 'count': 100 + # } + # assert context == { + # 'previous_url': None, + # 'next_url': 'http://testserver/?limit=5&offset=5', + # 'page_links': [ + # PageLink('http://testserver/?limit=5', 1, True, False), + # PageLink('http://testserver/?limit=5&offset=5', 2, False, False), + # PageLink('http://testserver/?limit=5&offset=10', 3, False, False), + # PAGE_BREAK, + # PageLink('http://testserver/?limit=5&offset=95', 20, False, False), + # ] + # } + # assert self.pagination.display_page_controls + # assert isinstance(self.pagination.to_html(), type('')) + + def test_get_displayed_page_numbers(): """ Test our contextual page display function. |
