diff options
| author | Aymeric Augustin | 2013-10-16 22:29:51 +0200 |
|---|---|---|
| committer | Aymeric Augustin | 2013-10-17 18:24:57 +0200 |
| commit | eacf116d61f0e3f47a7c581275a18b3aec8548b1 (patch) | |
| tree | 87674c6ea964f9404eb3d646c518a3ecd54cce4b /debug_toolbar/utils/sqlparse/lexer.py | |
| parent | c1f39b1a168d535e0b90d3c69273e391adf22637 (diff) | |
| download | django-debug-toolbar-eacf116d61f0e3f47a7c581275a18b3aec8548b1.tar.bz2 | |
Switch to an external version of sqlparse.
Diffstat (limited to 'debug_toolbar/utils/sqlparse/lexer.py')
| -rw-r--r-- | debug_toolbar/utils/sqlparse/lexer.py | 331 |
1 files changed, 0 insertions, 331 deletions
diff --git a/debug_toolbar/utils/sqlparse/lexer.py b/debug_toolbar/utils/sqlparse/lexer.py deleted file mode 100644 index ae3fc2e..0000000 --- a/debug_toolbar/utils/sqlparse/lexer.py +++ /dev/null @@ -1,331 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2008 Andi Albrecht, albrecht.andi@gmail.com -# -# This module is part of python-sqlparse and is released under -# the BSD License: http://www.opensource.org/licenses/bsd-license.php. - -"""SQL Lexer""" - -# This code is based on the SqlLexer in pygments. -# http://pygments.org/ -# It's separated from the rest of pygments to increase performance -# and to allow some customizations. - -import re - -from debug_toolbar.utils.sqlparse import tokens -from debug_toolbar.utils.sqlparse.keywords import KEYWORDS, KEYWORDS_COMMON - - -class include(str): - pass - - -class combined(tuple): - """Indicates a state combined from multiple states.""" - - def __new__(cls, *args): - return tuple.__new__(cls, args) - - def __init__(self, *args): - # tuple.__init__ doesn't do anything - pass - - -def is_keyword(value): - test = value.upper() - return KEYWORDS_COMMON.get(test, KEYWORDS.get(test, tokens.Name)), value - - -def apply_filters(stream, filters, lexer=None): - """ - Use this method to apply an iterable of filters to - a stream. If lexer is given it's forwarded to the - filter, otherwise the filter receives `None`. - """ - - def _apply(filter_, stream): - for token in filter_.filter(lexer, stream): - yield token - - for filter_ in filters: - stream = _apply(filter_, stream) - return stream - - -class LexerMeta(type): - """ - Metaclass for Lexer, creates the self._tokens attribute from - self.tokens on the first instantiation. - """ - - def _process_state(cls, unprocessed, processed, state): - assert type(state) is str, "wrong state name %r" % state - assert state[0] != '#', "invalid state name %r" % state - if state in processed: - return processed[state] - tokenlist = processed[state] = [] - rflags = cls.flags - for tdef in unprocessed[state]: - if isinstance(tdef, include): - # it's a state reference - assert tdef != state, "circular state reference %r" % state - tokenlist.extend(cls._process_state( - unprocessed, processed, str(tdef))) - continue - - assert type(tdef) is tuple, "wrong rule def %r" % tdef - - try: - rex = re.compile(tdef[0], rflags).match - except Exception, err: - raise ValueError(("uncompilable regex %r in state" - " %r of %r: %s" - % (tdef[0], state, cls, err))) - - assert type(tdef[1]) is tokens._TokenType or callable(tdef[1]), \ - ('token type must be simple type or callable, not %r' - % (tdef[1],)) - - if len(tdef) == 2: - new_state = None - else: - tdef2 = tdef[2] - if isinstance(tdef2, str): - # an existing state - if tdef2 == '#pop': - new_state = -1 - elif tdef2 in unprocessed: - new_state = (tdef2,) - elif tdef2 == '#push': - new_state = tdef2 - elif tdef2[:5] == '#pop:': - new_state = -int(tdef2[5:]) - else: - assert False, 'unknown new state %r' % tdef2 - elif isinstance(tdef2, combined): - # combine a new state from existing ones - new_state = '_tmp_%d' % cls._tmpname - cls._tmpname += 1 - itokens = [] - for istate in tdef2: - assert istate != state, \ - 'circular state ref %r' % istate - itokens.extend(cls._process_state(unprocessed, - processed, istate)) - processed[new_state] = itokens - new_state = (new_state,) - elif isinstance(tdef2, tuple): - # push more than one state - for state in tdef2: - assert (state in unprocessed or - state in ('#pop', '#push')), \ - 'unknown new state ' + state - new_state = tdef2 - else: - assert False, 'unknown new state def %r' % tdef2 - tokenlist.append((rex, tdef[1], new_state)) - return tokenlist - - def process_tokendef(cls): - cls._all_tokens = {} - cls._tmpname = 0 - processed = cls._all_tokens[cls.__name__] = {} - #tokendefs = tokendefs or cls.tokens[name] - for state in cls.tokens.keys(): - cls._process_state(cls.tokens, processed, state) - return processed - - def __call__(cls, *args, **kwds): - if not hasattr(cls, '_tokens'): - cls._all_tokens = {} - cls._tmpname = 0 - if hasattr(cls, 'token_variants') and cls.token_variants: - # don't process yet - pass - else: - cls._tokens = cls.process_tokendef() - - return type.__call__(cls, *args, **kwds) - - -class Lexer(object): - - __metaclass__ = LexerMeta - - encoding = 'utf-8' - stripall = False - stripnl = False - tabsize = 0 - flags = re.IGNORECASE - - tokens = { - 'root': [ - (r'--.*?(\r\n|\r|\n)', tokens.Comment.Single), - # $ matches *before* newline, therefore we have two patterns - # to match Comment.Single - (r'--.*?$', tokens.Comment.Single), - (r'(\r|\n|\r\n)', tokens.Newline), - (r'\s+', tokens.Whitespace), - (r'/\*', tokens.Comment.Multiline, 'multiline-comments'), - (r':=', tokens.Assignment), - (r'::', tokens.Punctuation), - (r'[*]', tokens.Wildcard), - (r'CASE\b', tokens.Keyword), # extended CASE(foo) - (r"`(``|[^`])*`", tokens.Name), - (r"´(´´|[^´])*´", tokens.Name), - (r'\$([a-zA-Z_][a-zA-Z0-9_]*)?\$', tokens.Name.Builtin), - (r'\?{1}', tokens.Name.Placeholder), - (r'[$:?%][a-zA-Z0-9_]+[^$:?%]?', tokens.Name.Placeholder), - (r'@[a-zA-Z_][a-zA-Z0-9_]+', tokens.Name), - (r'[a-zA-Z_][a-zA-Z0-9_]*(?=[.(])', tokens.Name), # see issue39 - (r'[<>=~!]+', tokens.Operator.Comparison), - (r'[+/@#%^&|`?^-]+', tokens.Operator), - (r'0x[0-9a-fA-F]+', tokens.Number.Hexadecimal), - (r'[0-9]*\.[0-9]+', tokens.Number.Float), - (r'[0-9]+', tokens.Number.Integer), - # TODO: Backslash escapes? - (r"(''|'.*?[^\\]')", tokens.String.Single), - # not a real string literal in ANSI SQL: - (r'(""|".*?[^\\]")', tokens.String.Symbol), - (r'(\[.*[^\]]\])', tokens.Name), - (r'(LEFT |RIGHT )?(INNER |OUTER )?JOIN\b', tokens.Keyword), - (r'END( IF| LOOP)?\b', tokens.Keyword), - (r'NOT NULL\b', tokens.Keyword), - (r'CREATE( OR REPLACE)?\b', tokens.Keyword.DDL), - (r'[a-zA-Z_][a-zA-Z0-9_]*', is_keyword), - (r'[;:()\[\],\.]', tokens.Punctuation), - ], - 'multiline-comments': [ - (r'/\*', tokens.Comment.Multiline, 'multiline-comments'), - (r'\*/', tokens.Comment.Multiline, '#pop'), - (r'[^/\*]+', tokens.Comment.Multiline), - (r'[/*]', tokens.Comment.Multiline) - ]} - - def __init__(self): - self.filters = [] - - def add_filter(self, filter_, **options): - from debug_toolbar.utils.sqlparse.filters import Filter - if not isinstance(filter_, Filter): - filter_ = filter_(**options) - self.filters.append(filter_) - - def get_tokens(self, text, unfiltered=False): - """ - Return an iterable of (tokentype, value) pairs generated from - `text`. If `unfiltered` is set to `True`, the filtering mechanism - is bypassed even if filters are defined. - - Also preprocess the text, i.e. expand tabs and strip it if - wanted and applies registered filters. - """ - if not isinstance(text, unicode): - if self.encoding == 'guess': - try: - text = text.decode('utf-8') - if text.startswith(u'\ufeff'): - text = text[len(u'\ufeff'):] - except UnicodeDecodeError: - text = text.decode('latin1') - elif self.encoding == 'chardet': - try: - import chardet - except ImportError: - raise ImportError('To enable chardet encoding guessing, ' - 'please install the chardet library ' - 'from http://chardet.feedparser.org/') - enc = chardet.detect(text) - text = text.decode(enc['encoding']) - else: - text = text.decode(self.encoding) - if self.stripall: - text = text.strip() - elif self.stripnl: - text = text.strip('\n') - if self.tabsize > 0: - text = text.expandtabs(self.tabsize) -# if not text.endswith('\n'): -# text += '\n' - - def streamer(): - for i, t, v in self.get_tokens_unprocessed(text): - yield t, v - stream = streamer() - if not unfiltered: - stream = apply_filters(stream, self.filters, self) - return stream - - def get_tokens_unprocessed(self, text, stack=('root',)): - """ - Split ``text`` into (tokentype, text) pairs. - - ``stack`` is the inital stack (default: ``['root']``) - """ - pos = 0 - tokendefs = self._tokens - statestack = list(stack) - statetokens = tokendefs[statestack[-1]] - known_names = {} - while 1: - for rexmatch, action, new_state in statetokens: - m = rexmatch(text, pos) - if m: - # print rex.pattern - value = m.group() - if value in known_names: - yield pos, known_names[value], value - elif type(action) is tokens._TokenType: - yield pos, action, value - elif hasattr(action, '__call__'): - ttype, value = action(value) - known_names[value] = ttype - yield pos, ttype, value - else: - for item in action(self, m): - yield item - pos = m.end() - if new_state is not None: - # state transition - if isinstance(new_state, tuple): - for state in new_state: - if state == '#pop': - statestack.pop() - elif state == '#push': - statestack.append(statestack[-1]) - else: - statestack.append(state) - elif isinstance(new_state, int): - # pop - del statestack[new_state:] - elif new_state == '#push': - statestack.append(statestack[-1]) - else: - assert False, "wrong state def: %r" % new_state - statetokens = tokendefs[statestack[-1]] - break - else: - try: - if text[pos] == '\n': - # at EOL, reset state to "root" - pos += 1 - statestack = ['root'] - statetokens = tokendefs['root'] - yield pos, tokens.Text, u'\n' - continue - yield pos, tokens.Error, text[pos] - pos += 1 - except IndexError: - break - - -def tokenize(sql): - """Tokenize sql. - - Tokenize *sql* using the :class:`Lexer` and return a 2-tuple stream - of ``(token type, value)`` items. - """ - lexer = Lexer() - return lexer.get_tokens(sql) |
