diff options
| -rw-r--r-- | rest_framework/pagination.py | 30 | ||||
| -rw-r--r-- | tests/test_pagination.py | 212 | 
2 files changed, 107 insertions, 135 deletions
| diff --git a/rest_framework/pagination.py b/rest_framework/pagination.py index d5af2ac8..61835239 100644 --- a/rest_framework/pagination.py +++ b/rest_framework/pagination.py @@ -171,6 +171,8 @@ class PageNumberPagination(BasePagination):      template = 'rest_framework/pagination/numbers.html' +    invalid_page_message = _('Invalid page "{page_number}": {message}.') +      def _handle_backwards_compat(self, view):          """          Prior to version 3.1, pagination was handled in the view, and the @@ -203,7 +205,7 @@ class PageNumberPagination(BasePagination):          try:              self.page = paginator.page(page_number)          except InvalidPage as exc: -            msg = _('Invalid page "{page_number}": {message}.').format( +            msg = self.invalid_page_message.format(                  page_number=page_number, message=six.text_type(exc)              )              raise NotFound(msg) @@ -386,8 +388,8 @@ Cursor = namedtuple('Cursor', ['offset', 'reverse', 'position'])  def decode_cursor(encoded): -    tokens = urlparse.parse_qs(b64decode(encoded), keep_blank_values=True)      try: +        tokens = urlparse.parse_qs(b64decode(encoded), keep_blank_values=True)          offset = int(tokens['offset'][0])          reverse = bool(int(tokens['reverse'][0]))          position = tokens['position'][0] @@ -411,7 +413,8 @@ class CursorPagination(BasePagination):      # Support case where ordering is already negative      # Support tuple orderings      cursor_query_param = 'cursor' -    page_size = 5 +    page_size = api_settings.PAGINATE_BY +    invalid_cursor_message = _('Invalid cursor')      def paginate_queryset(self, queryset, request, view=None):          self.base_url = request.build_absolute_uri() @@ -424,8 +427,9 @@ class CursorPagination(BasePagination):              (offset, reverse, current_position) = (0, False, '')          else:              self.cursor = decode_cursor(encoded) +            if self.cursor is None: +                raise NotFound(self.invalid_cursor_message)              (offset, reverse, current_position) = self.cursor -            # TODO: Invalid cursors should 404          # Cursor pagination always enforces an ordering.          if reverse: @@ -458,7 +462,7 @@ class CursorPagination(BasePagination):          # If we have a reverse queryset, then the query ordering was in reverse          # so we need to reverse the items again before returning them to the user.          if reverse: -            self.page = reversed(self.page) +            self.page = list(reversed(self.page))          if reverse:              # Determine next and previous positions for reverse cursors. @@ -483,8 +487,14 @@ class CursorPagination(BasePagination):          if not self.has_next:              return None -        compare = self.next_position +        if self.cursor and self.cursor.reverse and self.cursor.offset != 0: +            # If we're reversing direction and we have an offset cursor +            # then we cannot use the first position we find as a marker. +            compare = self._get_position_from_instance(self.page[-1], self.ordering) +        else: +            compare = self.next_position          offset = 0 +          for item in reversed(self.page):              position = self._get_position_from_instance(item, self.ordering)              if position != compare: @@ -526,8 +536,14 @@ class CursorPagination(BasePagination):          if not self.has_previous:              return None -        compare = self.previous_position +        if self.cursor and not self.cursor.reverse and self.cursor.offset != 0: +            # If we're reversing direction and we have an offset cursor +            # then we cannot use the first position we find as a marker. +            compare = self._get_position_from_instance(self.page[0], self.ordering) +        else: +            compare = self.previous_position          offset = 0 +          for item in self.page:              position = self._get_position_from_instance(item, self.ordering)              if position != compare: diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 4907a080..e32dd028 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -451,171 +451,127 @@ class TestCursorPagination:              def order_by(self, ordering):                  if ordering.startswith('-'): -                    return MockQuerySet(reversed(self.items)) +                    return MockQuerySet(list(reversed(self.items)))                  return self              def __getitem__(self, sliced):                  return self.items[sliced] -        self.pagination = pagination.CursorPagination() -        self.queryset = MockQuerySet( -            [MockObject(idx) for idx in range(1, 16)] -        ) - -    def paginate_queryset(self, request): -        return list(self.pagination.paginate_queryset(self.queryset, request)) - -    # def get_paginated_content(self, queryset): -    #     response = self.pagination.get_paginated_response(queryset) -    #     return response.data - -    # def get_html_context(self): -    #     return self.pagination.get_html_context() - -    def test_following_cursor(self): -        request = Request(factory.get('/')) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [1, 2, 3, 4, 5] - -        next_url = self.pagination.get_next_link() -        assert next_url +        class ExamplePagination(pagination.CursorPagination): +            page_size = 5 -        request = Request(factory.get(next_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [6, 7, 8, 9, 10] +        self.pagination = ExamplePagination() +        self.queryset = MockQuerySet([ +            MockObject(idx) for idx in [ +                1, 1, 1, 1, 1, +                1, 2, 3, 4, 4, +                4, 4, 5, 6, 7, +                7, 7, 7, 7, 7, +                7, 7, 7, 8, 9, +                9, 9, 9, 9, 9 +            ] +        ]) -        next_url = self.pagination.get_next_link() -        assert next_url +    def get_pages(self, url): +        """ +        Given a URL return a tuple of: -        request = Request(factory.get(next_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [11, 12, 13, 14, 15] +        (previous page, current page, next page, previous url, next url) +        """ +        request = Request(factory.get(url)) +        queryset = self.pagination.paginate_queryset(self.queryset, request) +        current = [item.created for item in queryset]          next_url = self.pagination.get_next_link() -        assert next_url is None - -        # Now page back again -          previous_url = self.pagination.get_previous_link() -        assert previous_url -        request = Request(factory.get(previous_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [6, 7, 8, 9, 10] +        if next_url is not None: +            request = Request(factory.get(next_url)) +            queryset = self.pagination.paginate_queryset(self.queryset, request) +            next = [item.created for item in queryset] +        else: +            next = None -        previous_url = self.pagination.get_previous_link() -        assert previous_url +        if previous_url is not None: +            request = Request(factory.get(previous_url)) +            queryset = self.pagination.paginate_queryset(self.queryset, request) +            previous = [item.created for item in queryset] +        else: +            previous = None -        request = Request(factory.get(previous_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [1, 2, 3, 4, 5] +        return (previous, current, next, previous_url, next_url) -        previous_url = self.pagination.get_previous_link() -        assert previous_url is None - - -class TestCrazyCursorPagination: -    """ -    Unit tests for `pagination.CursorPagination`. -    """ - -    def setup(self): -        class MockObject(object): -            def __init__(self, idx): -                self.created = idx - -        class MockQuerySet(object): -            def __init__(self, items): -                self.items = items +    def test_invalid_cursor(self): +        request = Request(factory.get('/', {'cursor': '123'})) +        with pytest.raises(exceptions.NotFound): +            self.pagination.paginate_queryset(self.queryset, request) -            def filter(self, created__gt=None, created__lt=None): -                if created__gt is not None: -                    return MockQuerySet([ -                        item for item in self.items -                        if item.created > int(created__gt) -                    ]) +    def test_cursor_pagination(self): +        (previous, current, next, previous_url, next_url) = self.get_pages('/') -                assert created__lt is not None -                return MockQuerySet([ -                    item for item in self.items -                    if item.created < int(created__lt) -                ]) +        assert previous is None +        assert current == [1, 1, 1, 1, 1] +        assert next == [1, 2, 3, 4, 4] -            def order_by(self, ordering): -                if ordering.startswith('-'): -                    return MockQuerySet(reversed(self.items)) -                return self +        (previous, current, next, previous_url, next_url) = self.get_pages(next_url) -            def __getitem__(self, sliced): -                return self.items[sliced] +        assert previous == [1, 1, 1, 1, 1] +        assert current == [1, 2, 3, 4, 4] +        assert next == [4, 4, 5, 6, 7] -        self.pagination = pagination.CursorPagination() -        self.queryset = MockQuerySet([ -            MockObject(idx) for idx in [ -                1, 1, 1, 1, 1, -                1, 1, 1, 1, 1, -                1, 1, 2, 3, 4, -                5, 6, 7, 8, 9 -            ] -        ]) +        (previous, current, next, previous_url, next_url) = self.get_pages(next_url) -    def paginate_queryset(self, request): -        return list(self.pagination.paginate_queryset(self.queryset, request)) +        assert previous == [1, 2, 3, 4, 4] +        assert current == [4, 4, 5, 6, 7] +        assert next == [7, 7, 7, 7, 7] -    def test_following_cursor_identical_items(self): -        request = Request(factory.get('/')) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [1, 1, 1, 1, 1] +        (previous, current, next, previous_url, next_url) = self.get_pages(next_url) -        next_url = self.pagination.get_next_link() -        assert next_url +        assert previous == [4, 4, 4, 5, 6]  # Paging artifact +        assert current == [7, 7, 7, 7, 7] +        assert next == [7, 7, 7, 8, 9] -        request = Request(factory.get(next_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [1, 1, 1, 1, 1] +        (previous, current, next, previous_url, next_url) = self.get_pages(next_url) -        next_url = self.pagination.get_next_link() -        assert next_url +        assert previous == [7, 7, 7, 7, 7] +        assert current == [7, 7, 7, 8, 9] +        assert next == [9, 9, 9, 9, 9] -        request = Request(factory.get(next_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [1, 1, 2, 3, 4] +        (previous, current, next, previous_url, next_url) = self.get_pages(next_url) -        next_url = self.pagination.get_next_link() -        assert next_url +        assert previous == [7, 7, 7, 8, 9] +        assert current == [9, 9, 9, 9, 9] +        assert next is None -        request = Request(factory.get(next_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [5, 6, 7, 8, 9] +        (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) -        next_url = self.pagination.get_next_link() -        assert next_url is None +        assert previous == [7, 7, 7, 7, 7] +        assert current == [7, 7, 7, 8, 9] +        assert next == [9, 9, 9, 9, 9] -        # Now page back again +        (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) -        previous_url = self.pagination.get_previous_link() -        assert previous_url +        assert previous == [4, 4, 5, 6, 7] +        assert current == [7, 7, 7, 7, 7] +        assert next == [8, 9, 9, 9, 9]  # Paging artifact -        request = Request(factory.get(previous_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [1, 1, 2, 3, 4] +        (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) -        previous_url = self.pagination.get_previous_link() -        assert previous_url +        assert previous == [1, 2, 3, 4, 4] +        assert current == [4, 4, 5, 6, 7] +        assert next == [7, 7, 7, 7, 7] -        request = Request(factory.get(previous_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [1, 1, 1, 1, 1] +        (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) -        previous_url = self.pagination.get_previous_link() -        assert previous_url +        assert previous == [1, 1, 1, 1, 1] +        assert current == [1, 2, 3, 4, 4] +        assert next == [4, 4, 5, 6, 7] -        request = Request(factory.get(previous_url)) -        queryset = self.paginate_queryset(request) -        assert [item.created for item in queryset] == [1, 1, 1, 1, 1] +        (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) -        previous_url = self.pagination.get_previous_link() -        assert previous_url is None +        assert previous is None +        assert current == [1, 1, 1, 1, 1] +        assert next == [1, 2, 3, 4, 4]  def test_get_displayed_page_numbers(): | 
