gh-131507: Add support for syntax highlighting in PyREPL (GH-133247)

Co-authored-by: Victorien <65306057+Viicos@users.noreply.github.com>
Co-authored-by: Hugo van Kemenade <1324225+hugovk@users.noreply.github.com>
This commit is contained in:
Łukasz Langa 2025-05-02 20:22:31 +02:00 committed by GitHub
parent bfcbb28223
commit fac41f56d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 654 additions and 99 deletions

View File

@ -560,6 +560,23 @@ For further information on how to build Python, see
(Contributed by Ken Jin in :gh:`128563`, with ideas on how to implement this
in CPython by Mark Shannon, Garrett Gu, Haoran Xu, and Josh Haberman.)
Syntax highlighting in PyREPL
-----------------------------
The default :term:`interactive` shell now highlights Python syntax as you
type. The feature is enabled by default unless the
:envvar:`PYTHON_BASIC_REPL` environment is set or any color-disabling
environment variables are used. See :ref:`using-on-controlling-color` for
details.
The default color theme for syntax highlighting strives for good contrast
and uses exclusively the 4-bit VGA standard ANSI color codes for maximum
compatibility. The theme can be customized using an experimental API
``_colorize.set_theme()``. This can be called interactively, as well as
in the :envvar:`PYTHONSTARTUP` script.
(Contributed by Łukasz Langa in :gh:`131507`.)
Other language changes
======================

View File

@ -7,7 +7,22 @@ COLORIZE = True
# types
if False:
from typing import IO
from typing import IO, Literal
type ColorTag = Literal[
"PROMPT",
"KEYWORD",
"BUILTIN",
"COMMENT",
"STRING",
"NUMBER",
"OP",
"DEFINITION",
"SOFT_KEYWORD",
"RESET",
]
theme: dict[ColorTag, str]
class ANSIColors:
@ -23,6 +38,7 @@ class ANSIColors:
WHITE = "\x1b[37m" # more like LIGHT GRAY
YELLOW = "\x1b[33m"
BOLD = "\x1b[1m"
BOLD_BLACK = "\x1b[1;30m" # DARK GRAY
BOLD_BLUE = "\x1b[1;34m"
BOLD_CYAN = "\x1b[1;36m"
@ -120,3 +136,28 @@ def can_colorize(*, file: IO[str] | IO[bytes] | None = None) -> bool:
return os.isatty(file.fileno())
except io.UnsupportedOperation:
return hasattr(file, "isatty") and file.isatty()
def set_theme(t: dict[ColorTag, str] | None = None) -> None:
global theme
if t:
theme = t
return
colors = get_colors()
theme = {
"PROMPT": colors.BOLD_MAGENTA,
"KEYWORD": colors.BOLD_BLUE,
"BUILTIN": colors.CYAN,
"COMMENT": colors.RED,
"STRING": colors.GREEN,
"NUMBER": colors.YELLOW,
"OP": colors.RESET,
"DEFINITION": colors.BOLD,
"SOFT_KEYWORD": colors.BOLD_BLUE,
"RESET": colors.RESET,
}
set_theme()

View File

@ -2,6 +2,7 @@ from __future__ import annotations
import pkgutil
import sys
import token
import tokenize
from io import StringIO
from contextlib import contextmanager
@ -180,8 +181,8 @@ class ImportParser:
when parsing multiple statements.
"""
_ignored_tokens = {
tokenize.INDENT, tokenize.DEDENT, tokenize.COMMENT,
tokenize.NL, tokenize.NEWLINE, tokenize.ENDMARKER
token.INDENT, token.DEDENT, token.COMMENT,
token.NL, token.NEWLINE, token.ENDMARKER
}
_keywords = {'import', 'from', 'as'}
@ -350,11 +351,11 @@ class TokenQueue:
def peek_name(self) -> bool:
if not (tok := self.peek()):
return False
return tok.type == tokenize.NAME
return tok.type == token.NAME
def pop_name(self) -> str:
tok = self.pop()
if tok.type != tokenize.NAME:
if tok.type != token.NAME:
raise ParseError('pop_name')
return tok.string

View File

@ -21,6 +21,7 @@
from __future__ import annotations
import os
import time
# Categories of actions:
# killing
@ -31,6 +32,7 @@ import os
# finishing
# [completion]
from .trace import trace
# types
if False:
@ -471,19 +473,24 @@ class show_history(Command):
class paste_mode(Command):
def do(self) -> None:
self.reader.paste_mode = not self.reader.paste_mode
self.reader.dirty = True
class enable_bracketed_paste(Command):
class perform_bracketed_paste(Command):
def do(self) -> None:
self.reader.paste_mode = True
self.reader.in_bracketed_paste = True
class disable_bracketed_paste(Command):
def do(self) -> None:
self.reader.paste_mode = False
self.reader.in_bracketed_paste = False
self.reader.dirty = True
done = "\x1b[201~"
data = ""
start = time.time()
while done not in data:
self.reader.console.wait(100)
ev = self.reader.console.getpending()
data += ev.data
trace(
"bracketed pasting of {l} chars done in {s:.2f}s",
l=len(data),
s=time.time() - start,
)
self.reader.insert(data.replace(done, ""))
self.reader.last_refresh_cache.invalidated = True

View File

@ -23,7 +23,3 @@ check_untyped_defs = False
# Various internal modules that typeshed deliberately doesn't have stubs for:
[mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*]
ignore_missing_imports = True
# Other untyped parts of the stdlib
[mypy-idlelib.*]
ignore_missing_imports = True

View File

@ -22,14 +22,13 @@
from __future__ import annotations
import sys
import _colorize
from contextlib import contextmanager
from dataclasses import dataclass, field, fields
from _colorize import can_colorize, ANSIColors
from . import commands, console, input
from .utils import wlen, unbracket, disp_str
from .utils import wlen, unbracket, disp_str, gen_colors
from .trace import trace
@ -38,8 +37,7 @@ Command = commands.Command
from .types import Callback, SimpleContextManager, KeySpec, CommandName
# syntax classes:
# syntax classes
SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3)
@ -105,8 +103,7 @@ default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
(r"\M-9", "digit-arg"),
(r"\M-\n", "accept"),
("\\\\", "self-insert"),
(r"\x1b[200~", "enable_bracketed_paste"),
(r"\x1b[201~", "disable_bracketed_paste"),
(r"\x1b[200~", "perform-bracketed-paste"),
(r"\x03", "ctrl-c"),
]
+ [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"]
@ -144,16 +141,17 @@ class Reader:
Instance variables of note include:
* buffer:
A *list* (*not* a string at the moment :-) containing all the
characters that have been entered.
A per-character list containing all the characters that have been
entered. Does not include color information.
* console:
Hopefully encapsulates the OS dependent stuff.
* pos:
A 0-based index into 'buffer' for where the insertion point
is.
* screeninfo:
Ahem. This list contains some info needed to move the
insertion point around reasonably efficiently.
A list of screen position tuples. Each list element is a tuple
representing information on visible line length for a given line.
Allows for efficient skipping of color escape sequences.
* cxy, lxy:
the position of the insertion point in screen ...
* syntax_table:
@ -203,7 +201,6 @@ class Reader:
dirty: bool = False
finished: bool = False
paste_mode: bool = False
in_bracketed_paste: bool = False
commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
last_command: type[Command] | None = None
syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
@ -221,7 +218,6 @@ class Reader:
## cached metadata to speed up screen refreshes
@dataclass
class RefreshCache:
in_bracketed_paste: bool = False
screen: list[str] = field(default_factory=list)
screeninfo: list[tuple[int, list[int]]] = field(init=False)
line_end_offsets: list[int] = field(default_factory=list)
@ -235,7 +231,6 @@ class Reader:
screen: list[str],
screeninfo: list[tuple[int, list[int]]],
) -> None:
self.in_bracketed_paste = reader.in_bracketed_paste
self.screen = screen.copy()
self.screeninfo = screeninfo.copy()
self.pos = reader.pos
@ -248,8 +243,7 @@ class Reader:
return False
dimensions = reader.console.width, reader.console.height
dimensions_changed = dimensions != self.dimensions
paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste
return not (dimensions_changed or paste_changed)
return not dimensions_changed
def get_cached_location(self, reader: Reader) -> tuple[int, int]:
if self.invalidated:
@ -279,7 +273,7 @@ class Reader:
self.screeninfo = [(0, [])]
self.cxy = self.pos2xy()
self.lxy = (self.pos, 0)
self.can_colorize = can_colorize()
self.can_colorize = _colorize.can_colorize()
self.last_refresh_cache.screeninfo = self.screeninfo
self.last_refresh_cache.pos = self.pos
@ -316,6 +310,12 @@ class Reader:
pos -= offset
prompt_from_cache = (offset and self.buffer[offset - 1] != "\n")
if self.can_colorize:
colors = list(gen_colors(self.get_unicode()))
else:
colors = None
trace("colors = {colors}", colors=colors)
lines = "".join(self.buffer[offset:]).split("\n")
cursor_found = False
lines_beyond_cursor = 0
@ -343,9 +343,8 @@ class Reader:
screeninfo.append((0, []))
pos -= line_len + 1
prompt, prompt_len = self.process_prompt(prompt)
chars, char_widths = disp_str(line)
chars, char_widths = disp_str(line, colors, offset)
wrapcount = (sum(char_widths) + prompt_len) // self.console.width
trace("wrapcount = {wrapcount}", wrapcount=wrapcount)
if wrapcount == 0 or not char_widths:
offset += line_len + 1 # Takes all of the line plus the newline
last_refresh_line_end_offsets.append(offset)
@ -479,7 +478,7 @@ class Reader:
'lineno'."""
if self.arg is not None and cursor_on_line:
prompt = f"(arg: {self.arg}) "
elif self.paste_mode and not self.in_bracketed_paste:
elif self.paste_mode:
prompt = "(paste) "
elif "\n" in self.buffer:
if lineno == 0:
@ -492,7 +491,11 @@ class Reader:
prompt = self.ps1
if self.can_colorize:
prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}"
prompt = (
f"{_colorize.theme["PROMPT"]}"
f"{prompt}"
f"{_colorize.theme["RESET"]}"
)
return prompt
def push_input_trans(self, itrans: input.KeymapTranslator) -> None:
@ -567,6 +570,7 @@ class Reader:
def update_cursor(self) -> None:
"""Move the cursor to reflect changes in self.pos"""
self.cxy = self.pos2xy()
trace("update_cursor({pos}) = {cxy}", pos=self.pos, cxy=self.cxy)
self.console.move_cursor(*self.cxy)
def after_command(self, cmd: Command) -> None:
@ -633,9 +637,6 @@ class Reader:
def refresh(self) -> None:
"""Recalculate and refresh the screen."""
if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n":
return
# this call sets up self.cxy, so call it first.
self.screen = self.calc_screen()
self.console.refresh(self.screen, self.cxy)

View File

@ -276,10 +276,6 @@ class maybe_accept(commands.Command):
r = self.reader # type: ignore[assignment]
r.dirty = True # this is needed to hide the completion menu, if visible
if self.reader.in_bracketed_paste:
r.insert("\n")
return
# if there are already several lines and the cursor
# is not on the last one, always insert a new \n.
text = r.get_unicode()

View File

@ -157,7 +157,6 @@ def run_multiline_interactive_console(
r.pos = len(r.get_unicode())
r.dirty = True
r.refresh()
r.in_bracketed_paste = False
console.write("\nKeyboardInterrupt\n")
console.resetbuffer()
except MemoryError:

View File

@ -150,8 +150,6 @@ class UnixConsole(Console):
self.pollob = poll()
self.pollob.register(self.input_fd, select.POLLIN)
self.input_buffer = b""
self.input_buffer_pos = 0
curses.setupterm(term or None, self.output_fd)
self.term = term
@ -199,22 +197,8 @@ class UnixConsole(Console):
self.event_queue = EventQueue(self.input_fd, self.encoding)
self.cursor_visible = 1
def more_in_buffer(self) -> bool:
return bool(
self.input_buffer
and self.input_buffer_pos < len(self.input_buffer)
)
def __read(self, n: int) -> bytes:
if not self.more_in_buffer():
self.input_buffer = os.read(self.input_fd, 10000)
ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n]
self.input_buffer_pos += len(ret)
if self.input_buffer_pos >= len(self.input_buffer):
self.input_buffer = b""
self.input_buffer_pos = 0
return ret
return os.read(self.input_fd, n)
def change_encoding(self, encoding: str) -> None:
@ -422,7 +406,6 @@ class UnixConsole(Console):
"""
return (
not self.event_queue.empty()
or self.more_in_buffer()
or bool(self.pollob.poll(timeout))
)
@ -525,6 +508,7 @@ class UnixConsole(Console):
e.raw += e.raw
amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0]
trace("getpending({a})", a=amount)
raw = self.__read(amount)
data = str(raw, self.encoding, "replace")
e.data += data

View File

@ -1,6 +1,17 @@
import re
import unicodedata
from __future__ import annotations
import builtins
import functools
import keyword
import re
import token as T
import tokenize
import unicodedata
import _colorize
from collections import deque
from io import StringIO
from tokenize import TokenInfo as TI
from typing import Iterable, Iterator, Match, NamedTuple, Self
from .types import CharBuffer, CharWidths
from .trace import trace
@ -8,6 +19,32 @@ from .trace import trace
ANSI_ESCAPE_SEQUENCE = re.compile(r"\x1b\[[ -@]*[A-~]")
ZERO_WIDTH_BRACKET = re.compile(r"\x01.*?\x02")
ZERO_WIDTH_TRANS = str.maketrans({"\x01": "", "\x02": ""})
IDENTIFIERS_AFTER = {"def", "class"}
BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')}
class Span(NamedTuple):
"""Span indexing that's inclusive on both ends."""
start: int
end: int
@classmethod
def from_re(cls, m: Match[str], group: int | str) -> Self:
re_span = m.span(group)
return cls(re_span[0], re_span[1] - 1)
@classmethod
def from_token(cls, token: TI, line_len: list[int]) -> Self:
return cls(
line_len[token.start[0] - 1] + token.start[1],
line_len[token.end[0] - 1] + token.end[1] - 1,
)
class ColorSpan(NamedTuple):
span: Span
tag: _colorize.ColorTag
@functools.cache
@ -41,17 +78,207 @@ def unbracket(s: str, including_content: bool = False) -> str:
return s.translate(ZERO_WIDTH_TRANS)
def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
r"""Decompose the input buffer into a printable variant.
def gen_colors(buffer: str) -> Iterator[ColorSpan]:
"""Returns a list of index spans to color using the given color tag.
The input `buffer` should be a valid start of a Python code block, i.e.
it cannot be a block starting in the middle of a multiline string.
"""
sio = StringIO(buffer)
line_lengths = [0] + [len(line) for line in sio.readlines()]
# make line_lengths cumulative
for i in range(1, len(line_lengths)):
line_lengths[i] += line_lengths[i-1]
sio.seek(0)
gen = tokenize.generate_tokens(sio.readline)
last_emitted: ColorSpan | None = None
try:
for color in gen_colors_from_token_stream(gen, line_lengths):
yield color
last_emitted = color
except tokenize.TokenError as te:
yield from recover_unterminated_string(
te, line_lengths, last_emitted, buffer
)
def recover_unterminated_string(
exc: tokenize.TokenError,
line_lengths: list[int],
last_emitted: ColorSpan | None,
buffer: str,
) -> Iterator[ColorSpan]:
msg, loc = exc.args
if loc is None:
return
line_no, column = loc
if msg.startswith(
(
"unterminated string literal",
"unterminated f-string literal",
"unterminated t-string literal",
"EOF in multi-line string",
"unterminated triple-quoted f-string literal",
"unterminated triple-quoted t-string literal",
)
):
start = line_lengths[line_no - 1] + column - 1
end = line_lengths[-1] - 1
# in case FSTRING_START was already emitted
if last_emitted and start <= last_emitted.span.start:
trace("before last emitted = {s}", s=start)
start = last_emitted.span.end + 1
span = Span(start, end)
trace("yielding span {a} -> {b}", a=span.start, b=span.end)
yield ColorSpan(span, "STRING")
else:
trace(
"unhandled token error({buffer}) = {te}",
buffer=repr(buffer),
te=str(exc),
)
def gen_colors_from_token_stream(
token_generator: Iterator[TI],
line_lengths: list[int],
) -> Iterator[ColorSpan]:
token_window = prev_next_window(token_generator)
is_def_name = False
bracket_level = 0
for prev_token, token, next_token in token_window:
assert token is not None
if token.start == token.end:
continue
match token.type:
case (
T.STRING
| T.FSTRING_START | T.FSTRING_MIDDLE | T.FSTRING_END
| T.TSTRING_START | T.TSTRING_MIDDLE | T.TSTRING_END
):
span = Span.from_token(token, line_lengths)
yield ColorSpan(span, "STRING")
case T.COMMENT:
span = Span.from_token(token, line_lengths)
yield ColorSpan(span, "COMMENT")
case T.NUMBER:
span = Span.from_token(token, line_lengths)
yield ColorSpan(span, "NUMBER")
case T.OP:
if token.string in "([{":
bracket_level += 1
elif token.string in ")]}":
bracket_level -= 1
span = Span.from_token(token, line_lengths)
yield ColorSpan(span, "OP")
case T.NAME:
if is_def_name:
is_def_name = False
span = Span.from_token(token, line_lengths)
yield ColorSpan(span, "DEFINITION")
elif keyword.iskeyword(token.string):
span = Span.from_token(token, line_lengths)
yield ColorSpan(span, "KEYWORD")
if token.string in IDENTIFIERS_AFTER:
is_def_name = True
elif (
keyword.issoftkeyword(token.string)
and bracket_level == 0
and is_soft_keyword_used(prev_token, token, next_token)
):
span = Span.from_token(token, line_lengths)
yield ColorSpan(span, "SOFT_KEYWORD")
elif token.string in BUILTINS:
span = Span.from_token(token, line_lengths)
yield ColorSpan(span, "BUILTIN")
keyword_first_sets_match = {"False", "None", "True", "await", "lambda", "not"}
keyword_first_sets_case = {"False", "None", "True"}
def is_soft_keyword_used(*tokens: TI | None) -> bool:
"""Returns True if the current token is a keyword in this context.
For the `*tokens` to match anything, they have to be a three-tuple of
(previous, current, next).
"""
trace("is_soft_keyword_used{t}", t=tokens)
match tokens:
case (
None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
TI(string="match"),
TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START)
| TI(T.OP, string="(" | "*" | "[" | "{" | "~" | "...")
):
return True
case (
None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
TI(string="match"),
TI(T.NAME, string=s)
):
if keyword.iskeyword(s):
return s in keyword_first_sets_match
return True
case (
None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
TI(string="case"),
TI(T.NUMBER | T.STRING | T.FSTRING_START | T.TSTRING_START)
| TI(T.OP, string="(" | "*" | "-" | "[" | "{")
):
return True
case (
None | TI(T.NEWLINE) | TI(T.INDENT) | TI(string=":"),
TI(string="case"),
TI(T.NAME, string=s)
):
if keyword.iskeyword(s):
return s in keyword_first_sets_case
return True
case (TI(string="case"), TI(string="_"), TI(string=":")):
return True
case _:
return False
def disp_str(
buffer: str, colors: list[ColorSpan] | None = None, start_index: int = 0
) -> tuple[CharBuffer, CharWidths]:
r"""Decompose the input buffer into a printable variant with applied colors.
Returns a tuple of two lists:
- the first list is the input buffer, character by character;
- the first list is the input buffer, character by character, with color
escape codes added (while those codes contain multiple ASCII characters,
each code is considered atomic *and is attached for the corresponding
visible character*);
- the second list is the visible width of each character in the input
buffer.
Note on colors:
- The `colors` list, if provided, is partially consumed within. We're using
a list and not a generator since we need to hold onto the current
unfinished span between calls to disp_str in case of multiline strings.
- The `colors` list is computed from the start of the input block. `buffer`
is only a subset of that input block, a single line within. This is why
we need `start_index` to inform us which position is the start of `buffer`
actually within user input. This allows us to match color spans correctly.
Examples:
>>> utils.disp_str("a = 9")
(['a', ' ', '=', ' ', '9'], [1, 1, 1, 1, 1])
>>> line = "while 1:"
>>> colors = list(utils.gen_colors(line))
>>> utils.disp_str(line, colors=colors)
(['\x1b[1;34mw', 'h', 'i', 'l', 'e\x1b[0m', ' ', '1', ':'], [1, 1, 1, 1, 1, 1, 1, 1])
"""
chars: CharBuffer = []
char_widths: CharWidths = []
@ -59,7 +286,20 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
if not buffer:
return chars, char_widths
for c in buffer:
while colors and colors[0].span.end < start_index:
# move past irrelevant spans
colors.pop(0)
pre_color = ""
post_color = ""
if colors and colors[0].span.start < start_index:
# looks like we're continuing a previous color (e.g. a multiline str)
pre_color = _colorize.theme[colors[0].tag]
for i, c in enumerate(buffer, start_index):
if colors and colors[0].span.start == i: # new color starts now
pre_color = _colorize.theme[colors[0].tag]
if c == "\x1a": # CTRL-Z on Windows
chars.append(c)
char_widths.append(2)
@ -73,5 +313,43 @@ def disp_str(buffer: str) -> tuple[CharBuffer, CharWidths]:
else:
chars.append(c)
char_widths.append(str_width(c))
trace("disp_str({buffer}) = {s}, {b}", buffer=repr(buffer), s=chars, b=char_widths)
if colors and colors[0].span.end == i: # current color ends now
post_color = _colorize.theme["RESET"]
colors.pop(0)
chars[-1] = pre_color + chars[-1] + post_color
pre_color = ""
post_color = ""
if colors and colors[0].span.start < i and colors[0].span.end > i:
# even though the current color should be continued, reset it for now.
# the next call to `disp_str()` will revive it.
chars[-1] += _colorize.theme["RESET"]
return chars, char_widths
def prev_next_window[T](
iterable: Iterable[T]
) -> Iterator[tuple[T | None, ...]]:
"""Generates three-tuples of (previous, current, next) items.
On the first iteration previous is None. On the last iteration next
is None. In case of exception next is None and the exception is re-raised
on a subsequent next() call.
Inspired by `sliding_window` from `itertools` recipes.
"""
iterator = iter(iterable)
window = deque((None, next(iterator)), maxlen=3)
try:
for x in iterator:
window.append(x)
yield tuple(window)
except Exception:
raise
finally:
window.append(None)
yield tuple(window)

View File

@ -426,6 +426,20 @@ class WindowsConsole(Console):
return rec
def _read_input_bulk(
self, block: bool, n: int
) -> tuple[ctypes.Array[INPUT_RECORD], int]:
rec = (n * INPUT_RECORD)()
read = DWORD()
if not block and not self.wait(timeout=0):
return rec, 0
if not ReadConsoleInput(InHandle, rec, n, read):
raise WinError(GetLastError())
return rec, read.value
def get_event(self, block: bool = True) -> Event | None:
"""Return an Event instance. Returns None if |block| is false
and there is no event pending, otherwise waits for the
@ -521,7 +535,23 @@ class WindowsConsole(Console):
def getpending(self) -> Event:
"""Return the characters that have been typed but not yet
processed."""
return Event("key", "", b"")
e = Event("key", "", b"")
while not self.event_queue.empty():
e2 = self.event_queue.get()
if e2:
e.data += e2.data
recs, rec_count = self._read_input_bulk(False, 1024)
for i in range(rec_count):
rec = recs[i]
if rec and rec.EventType == KEY_EVENT:
key_event = rec.Event.KeyEvent
ch = key_event.uChar.UnicodeChar
if ch == "\r":
ch += "\n"
e.data += ch
return e
def wait(self, timeout: float | None) -> bool:
"""Wait for an event."""

View File

@ -45,6 +45,7 @@ class ReplTestCase(TestCase):
cmdline_args: list[str] | None = None,
cwd: str | None = None,
skip: bool = False,
timeout: float = SHORT_TIMEOUT,
) -> tuple[str, int]:
temp_dir = None
if cwd is None:
@ -52,7 +53,12 @@ class ReplTestCase(TestCase):
cwd = temp_dir.name
try:
return self._run_repl(
repl_input, env=env, cmdline_args=cmdline_args, cwd=cwd, skip=skip,
repl_input,
env=env,
cmdline_args=cmdline_args,
cwd=cwd,
skip=skip,
timeout=timeout,
)
finally:
if temp_dir is not None:
@ -66,6 +72,7 @@ class ReplTestCase(TestCase):
cmdline_args: list[str] | None,
cwd: str,
skip: bool,
timeout: float,
) -> tuple[str, int]:
assert pty
master_fd, slave_fd = pty.openpty()
@ -103,7 +110,7 @@ class ReplTestCase(TestCase):
os.write(master_fd, repl_input.encode("utf-8"))
output = []
while select.select([master_fd], [], [], SHORT_TIMEOUT)[0]:
while select.select([master_fd], [], [], timeout)[0]:
try:
data = os.read(master_fd, 1024).decode("utf-8")
if not data:
@ -114,12 +121,12 @@ class ReplTestCase(TestCase):
else:
os.close(master_fd)
process.kill()
process.wait(timeout=SHORT_TIMEOUT)
process.wait(timeout=timeout)
self.fail(f"Timeout while waiting for output, got: {''.join(output)}")
os.close(master_fd)
try:
exit_code = process.wait(timeout=SHORT_TIMEOUT)
exit_code = process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
exit_code = process.wait()
@ -1561,25 +1568,29 @@ class TestMain(ReplTestCase):
def test_history_survive_crash(self):
env = os.environ.copy()
commands = "1\nexit()\n"
output, exit_code = self.run_repl(commands, env=env, skip=True)
with tempfile.NamedTemporaryFile() as hfile:
env["PYTHON_HISTORY"] = hfile.name
commands = "spam\nimport time\ntime.sleep(1000)\npreved\n"
commands = "1\n2\n3\nexit()\n"
output, exit_code = self.run_repl(commands, env=env, skip=True)
commands = "spam\nimport time\ntime.sleep(1000)\nquit\n"
try:
self.run_repl(commands, env=env)
self.run_repl(commands, env=env, timeout=3)
except AssertionError:
pass
history = pathlib.Path(hfile.name).read_text()
self.assertIn("2", history)
self.assertIn("exit()", history)
self.assertIn("spam", history)
self.assertIn("time", history)
self.assertIn("import time", history)
self.assertNotIn("sleep", history)
self.assertNotIn("preved", history)
self.assertNotIn("quit", history)
def test_keyboard_interrupt_after_isearch(self):
output, exit_code = self.run_repl(["\x12", "\x03", "exit"])
output, exit_code = self.run_repl("\x12\x03exit\n")
self.assertEqual(exit_code, 0)
def test_prompt_after_help(self):

View File

@ -1,14 +1,21 @@
import itertools
import functools
import rlcompleter
from textwrap import dedent
from unittest import TestCase
from unittest.mock import MagicMock
from .support import handle_all_events, handle_events_narrow_console
from .support import ScreenEqualMixin, code_to_events
from .support import prepare_reader, prepare_console
from .support import prepare_console, reader_force_colors
from .support import reader_no_colors as prepare_reader
from _pyrepl.console import Event
from _pyrepl.reader import Reader
from _colorize import theme
overrides = {"RESET": "z", "SOFT_KEYWORD": "K"}
colors = {overrides.get(k, k[0].lower()): v for k, v in theme.items()}
class TestReader(ScreenEqualMixin, TestCase):
@ -123,8 +130,9 @@ class TestReader(ScreenEqualMixin, TestCase):
def test_control_characters(self):
code = 'flag = "🏳️‍🌈"'
events = code_to_events(code)
reader, _ = handle_all_events(events)
reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, 'flag = "🏳️\\u200d🌈"', clean=True)
self.assert_screen_equal(reader, 'flag {o}={z} {s}"🏳️\\u200d🌈"{z}'.format(**colors))
def test_setpos_from_xy_multiple_lines(self):
# fmt: off
@ -355,3 +363,140 @@ class TestReader(ScreenEqualMixin, TestCase):
reader, _ = handle_all_events(events)
reader.setpos_from_xy(8, 0)
self.assertEqual(reader.pos, 7)
def test_syntax_highlighting_basic(self):
code = dedent(
"""\
import re, sys
def funct(case: str = sys.platform) -> None:
match = re.search(
"(me)",
'''
Come on
Come on now
You know that it's time to emerge
''',
)
match case:
case "emscripten": print("on the web")
case "ios" | "android": print("on the phone")
case _: print('arms around', match.group(1))
"""
)
expected = dedent(
"""\
{k}import{z} re{o},{z} sys
{a}{k}def{z} {d}funct{z}{o}({z}case{o}:{z} {b}str{z} {o}={z} sys{o}.{z}platform{o}){z} {o}->{z} {k}None{z}{o}:{z}
match {o}={z} re{o}.{z}search{o}({z}
{s}"(me)"{z}{o},{z}
{s}'''{z}
{s} Come on{z}
{s} Come on now{z}
{s} You know that it's time to emerge{z}
{s} '''{z}{o},{z}
{o}){z}
{K}match{z} case{o}:{z}
{K}case{z} {s}"emscripten"{z}{o}:{z} {b}print{z}{o}({z}{s}"on the web"{z}{o}){z}
{K}case{z} {s}"ios"{z} {o}|{z} {s}"android"{z}{o}:{z} {b}print{z}{o}({z}{s}"on the phone"{z}{o}){z}
{K}case{z} {K}_{z}{o}:{z} {b}print{z}{o}({z}{s}'arms around'{z}{o},{z} match{o}.{z}group{o}({z}{n}1{z}{o}){z}{o}){z}
"""
)
expected_sync = expected.format(a="", **colors)
events = code_to_events(code)
reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, code, clean=True)
self.assert_screen_equal(reader, expected_sync)
self.assertEqual(reader.pos, 2**7 + 2**8)
self.assertEqual(reader.cxy, (0, 14))
async_msg = "{k}async{z} ".format(**colors)
expected_async = expected.format(a=async_msg, **colors)
more_events = itertools.chain(
code_to_events(code),
[Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))] * 13,
code_to_events("async "),
)
reader, _ = handle_all_events(more_events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, expected_async)
self.assertEqual(reader.pos, 21)
self.assertEqual(reader.cxy, (6, 1))
def test_syntax_highlighting_incomplete_string_first_line(self):
code = dedent(
"""\
def unfinished_function(arg: str = "still typing
"""
)
expected = dedent(
"""\
{k}def{z} {d}unfinished_function{z}{o}({z}arg{o}:{z} {b}str{z} {o}={z} {s}"still typing{z}
"""
).format(**colors)
events = code_to_events(code)
reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, code, clean=True)
self.assert_screen_equal(reader, expected)
def test_syntax_highlighting_incomplete_string_another_line(self):
code = dedent(
"""\
def unfinished_function(
arg: str = "still typing
"""
)
expected = dedent(
"""\
{k}def{z} {d}unfinished_function{z}{o}({z}
arg{o}:{z} {b}str{z} {o}={z} {s}"still typing{z}
"""
).format(**colors)
events = code_to_events(code)
reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, code, clean=True)
self.assert_screen_equal(reader, expected)
def test_syntax_highlighting_incomplete_multiline_string(self):
code = dedent(
"""\
def unfinished_function():
'''Still writing
the docstring
"""
)
expected = dedent(
"""\
{k}def{z} {d}unfinished_function{z}{o}({z}{o}){z}{o}:{z}
{s}'''Still writing{z}
{s} the docstring{z}
"""
).format(**colors)
events = code_to_events(code)
reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, code, clean=True)
self.assert_screen_equal(reader, expected)
def test_syntax_highlighting_incomplete_fstring(self):
code = dedent(
"""\
def unfinished_function():
var = f"Single-quote but {
1
+
1
} multi-line!
"""
)
expected = dedent(
"""\
{k}def{z} {d}unfinished_function{z}{o}({z}{o}){z}{o}:{z}
var {o}={z} {s}f"{z}{s}Single-quote but {z}{o}{OB}{z}
{n}1{z}
{o}+{z}
{n}1{z}
{o}{CB}{z}{s} multi-line!{z}
"""
).format(OB="{", CB="}", **colors)
events = code_to_events(code)
reader, _ = handle_all_events(events, prepare_reader=reader_force_colors)
self.assert_screen_equal(reader, code, clean=True)
self.assert_screen_equal(reader, expected)

View File

@ -33,10 +33,12 @@ def unix_console(events, **kwargs):
handle_events_unix_console = partial(
handle_all_events,
prepare_console=partial(unix_console),
prepare_reader=reader_no_colors,
prepare_console=unix_console,
)
handle_events_narrow_unix_console = partial(
handle_all_events,
prepare_reader=reader_no_colors,
prepare_console=partial(unix_console, width=5),
)
handle_events_short_unix_console = partial(

View File

@ -1,6 +1,6 @@
from unittest import TestCase
from _pyrepl.utils import str_width, wlen
from _pyrepl.utils import str_width, wlen, prev_next_window
class TestUtils(TestCase):
@ -25,3 +25,38 @@ class TestUtils(TestCase):
self.assertEqual(wlen('hello'), 5)
self.assertEqual(wlen('hello' + '\x1a'), 7)
def test_prev_next_window(self):
def gen_normal():
yield 1
yield 2
yield 3
yield 4
pnw = prev_next_window(gen_normal())
self.assertEqual(next(pnw), (None, 1, 2))
self.assertEqual(next(pnw), (1, 2, 3))
self.assertEqual(next(pnw), (2, 3, 4))
self.assertEqual(next(pnw), (3, 4, None))
with self.assertRaises(StopIteration):
next(pnw)
def gen_short():
yield 1
pnw = prev_next_window(gen_short())
self.assertEqual(next(pnw), (None, 1, None))
with self.assertRaises(StopIteration):
next(pnw)
def gen_raise():
yield from gen_normal()
1/0
pnw = prev_next_window(gen_raise())
self.assertEqual(next(pnw), (None, 1, 2))
self.assertEqual(next(pnw), (1, 2, 3))
self.assertEqual(next(pnw), (2, 3, 4))
self.assertEqual(next(pnw), (3, 4, None))
with self.assertRaises(ZeroDivisionError):
next(pnw)

View File

@ -12,6 +12,7 @@ from unittest import TestCase
from unittest.mock import MagicMock, call
from .support import handle_all_events, code_to_events
from .support import reader_no_colors as default_prepare_reader
try:
from _pyrepl.console import Event, Console
@ -47,14 +48,22 @@ class WindowsConsoleTests(TestCase):
setattr(console, key, val)
return console
def handle_events(self, events: Iterable[Event], **kwargs):
return handle_all_events(events, partial(self.console, **kwargs))
def handle_events(
self,
events: Iterable[Event],
prepare_console=None,
prepare_reader=None,
**kwargs,
):
prepare_console = prepare_console or partial(self.console, **kwargs)
prepare_reader = prepare_reader or default_prepare_reader
return handle_all_events(events, prepare_console, prepare_reader)
def handle_events_narrow(self, events):
return self.handle_events(events, width=5)
def handle_events_short(self, events):
return self.handle_events(events, height=1)
def handle_events_short(self, events, **kwargs):
return self.handle_events(events, height=1, **kwargs)
def handle_events_height_3(self, events):
return self.handle_events(events, height=3)

6
Lib/token.py generated
View File

@ -134,11 +134,11 @@ EXACT_TOKEN_TYPES = {
'~': TILDE,
}
def ISTERMINAL(x):
def ISTERMINAL(x: int) -> bool:
return x < NT_OFFSET
def ISNONTERMINAL(x):
def ISNONTERMINAL(x: int) -> bool:
return x >= NT_OFFSET
def ISEOF(x):
def ISEOF(x: int) -> bool:
return x == ENDMARKER

View File

@ -0,0 +1 @@
PyREPL now supports syntax highlighing. Contributed by Łukasz Langa.

1
Misc/mypy/token.py Symbolic link
View File

@ -0,0 +1 @@
../../Lib/token.py

View File

@ -2,4 +2,5 @@
_colorize.py
_pyrepl
token.py
tomllib

View File

@ -278,13 +278,13 @@ EXACT_TOKEN_TYPES = {
%s
}
def ISTERMINAL(x):
def ISTERMINAL(x: int) -> bool:
return x < NT_OFFSET
def ISNONTERMINAL(x):
def ISNONTERMINAL(x: int) -> bool:
return x >= NT_OFFSET
def ISEOF(x):
def ISEOF(x: int) -> bool:
return x == ENDMARKER
'''