gh-133390: Support SQL keyword completion for sqlite3 CLI (GH-133393) (GH-135292)
Co-authored-by: Tan Long <tanloong@foxmail.com>
This commit is contained in:
parent
e6c3039cb3
commit
e7a3c20b92
@ -134,6 +134,13 @@ shelve
|
|||||||
(Contributed by Andrea Oliveri in :gh:`134004`.)
|
(Contributed by Andrea Oliveri in :gh:`134004`.)
|
||||||
|
|
||||||
|
|
||||||
|
sqlite3
|
||||||
|
-------
|
||||||
|
|
||||||
|
* Support SQL keyword completion in the :mod:`sqlite3` command-line interface.
|
||||||
|
(Contributed by Long Tan in :gh:`133393`.)
|
||||||
|
|
||||||
|
|
||||||
ssl
|
ssl
|
||||||
---
|
---
|
||||||
|
|
||||||
|
@ -12,6 +12,8 @@ from code import InteractiveConsole
|
|||||||
from textwrap import dedent
|
from textwrap import dedent
|
||||||
from _colorize import get_theme, theme_no_color
|
from _colorize import get_theme, theme_no_color
|
||||||
|
|
||||||
|
from ._completer import completer
|
||||||
|
|
||||||
|
|
||||||
def execute(c, sql, suppress_errors=True, theme=theme_no_color):
|
def execute(c, sql, suppress_errors=True, theme=theme_no_color):
|
||||||
"""Helper that wraps execution of SQL code.
|
"""Helper that wraps execution of SQL code.
|
||||||
@ -136,12 +138,9 @@ def main(*args):
|
|||||||
execute(con, args.sql, suppress_errors=False, theme=theme)
|
execute(con, args.sql, suppress_errors=False, theme=theme)
|
||||||
else:
|
else:
|
||||||
# No SQL provided; start the REPL.
|
# No SQL provided; start the REPL.
|
||||||
console = SqliteInteractiveConsole(con, use_color=True)
|
with completer():
|
||||||
try:
|
console = SqliteInteractiveConsole(con, use_color=True)
|
||||||
import readline # noqa: F401
|
console.interact(banner, exitmsg="")
|
||||||
except ImportError:
|
|
||||||
pass
|
|
||||||
console.interact(banner, exitmsg="")
|
|
||||||
finally:
|
finally:
|
||||||
con.close()
|
con.close()
|
||||||
|
|
||||||
|
42
Lib/sqlite3/_completer.py
Normal file
42
Lib/sqlite3/_completer.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
try:
|
||||||
|
from _sqlite3 import SQLITE_KEYWORDS
|
||||||
|
except ImportError:
|
||||||
|
SQLITE_KEYWORDS = ()
|
||||||
|
|
||||||
|
_completion_matches = []
|
||||||
|
|
||||||
|
|
||||||
|
def _complete(text, state):
|
||||||
|
global _completion_matches
|
||||||
|
|
||||||
|
if state == 0:
|
||||||
|
text_upper = text.upper()
|
||||||
|
_completion_matches = [c for c in SQLITE_KEYWORDS if c.startswith(text_upper)]
|
||||||
|
try:
|
||||||
|
return _completion_matches[state] + " "
|
||||||
|
except IndexError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def completer():
|
||||||
|
try:
|
||||||
|
import readline
|
||||||
|
except ImportError:
|
||||||
|
yield
|
||||||
|
return
|
||||||
|
|
||||||
|
old_completer = readline.get_completer()
|
||||||
|
try:
|
||||||
|
readline.set_completer(_complete)
|
||||||
|
if readline.backend == "editline":
|
||||||
|
# libedit uses "^I" instead of "tab"
|
||||||
|
command_string = "bind ^I rl_complete"
|
||||||
|
else:
|
||||||
|
command_string = "tab: complete"
|
||||||
|
readline.parse_and_bind(command_string)
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
readline.set_completer(old_completer)
|
@ -1,14 +1,22 @@
|
|||||||
"""sqlite3 CLI tests."""
|
"""sqlite3 CLI tests."""
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
import textwrap
|
||||||
import unittest
|
import unittest
|
||||||
|
import unittest.mock
|
||||||
|
import os
|
||||||
|
|
||||||
from sqlite3.__main__ import main as cli
|
from sqlite3.__main__ import main as cli
|
||||||
|
from test.support.import_helper import import_module
|
||||||
from test.support.os_helper import TESTFN, unlink
|
from test.support.os_helper import TESTFN, unlink
|
||||||
|
from test.support.pty_helper import run_pty
|
||||||
from test.support import (
|
from test.support import (
|
||||||
captured_stdout,
|
captured_stdout,
|
||||||
captured_stderr,
|
captured_stderr,
|
||||||
captured_stdin,
|
captured_stdin,
|
||||||
force_not_colorized_test_class,
|
force_not_colorized_test_class,
|
||||||
|
requires_subprocess,
|
||||||
|
verbose,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -200,5 +208,108 @@ class InteractiveSession(unittest.TestCase):
|
|||||||
self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: '
|
self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: '
|
||||||
'\x1b[35mnear "sel": syntax error\x1b[0m', err)
|
'\x1b[35mnear "sel": syntax error\x1b[0m', err)
|
||||||
|
|
||||||
|
|
||||||
|
@requires_subprocess()
|
||||||
|
@force_not_colorized_test_class
|
||||||
|
class Completion(unittest.TestCase):
|
||||||
|
PS1 = "sqlite> "
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
_sqlite3 = import_module("_sqlite3")
|
||||||
|
if not hasattr(_sqlite3, "SQLITE_KEYWORDS"):
|
||||||
|
raise unittest.SkipTest("unable to determine SQLite keywords")
|
||||||
|
|
||||||
|
readline = import_module("readline")
|
||||||
|
if readline.backend == "editline":
|
||||||
|
raise unittest.SkipTest("libedit readline is not supported")
|
||||||
|
|
||||||
|
def write_input(self, input_, env=None):
|
||||||
|
script = textwrap.dedent("""
|
||||||
|
import readline
|
||||||
|
from sqlite3.__main__ import main
|
||||||
|
|
||||||
|
readline.parse_and_bind("set colored-completion-prefix off")
|
||||||
|
main()
|
||||||
|
""")
|
||||||
|
return run_pty(script, input_, env)
|
||||||
|
|
||||||
|
def test_complete_sql_keywords(self):
|
||||||
|
# List candidates starting with 'S', there should be multiple matches.
|
||||||
|
input_ = b"S\t\tEL\t 1;\n.quit\n"
|
||||||
|
output = self.write_input(input_)
|
||||||
|
self.assertIn(b"SELECT", output)
|
||||||
|
self.assertIn(b"SET", output)
|
||||||
|
self.assertIn(b"SAVEPOINT", output)
|
||||||
|
self.assertIn(b"(1,)", output)
|
||||||
|
|
||||||
|
# Keywords are completed in upper case for even lower case user input.
|
||||||
|
input_ = b"sel\t\t 1;\n.quit\n"
|
||||||
|
output = self.write_input(input_)
|
||||||
|
self.assertIn(b"SELECT", output)
|
||||||
|
self.assertIn(b"(1,)", output)
|
||||||
|
|
||||||
|
@unittest.skipIf(sys.platform.startswith("freebsd"),
|
||||||
|
"Two actual tabs are inserted when there are no matching"
|
||||||
|
" completions in the pseudo-terminal opened by run_pty()"
|
||||||
|
" on FreeBSD")
|
||||||
|
def test_complete_no_match(self):
|
||||||
|
input_ = b"xyzzy\t\t\b\b\b\b\b\b\b.quit\n"
|
||||||
|
# Set NO_COLOR to disable coloring for self.PS1.
|
||||||
|
output = self.write_input(input_, env={**os.environ, "NO_COLOR": "1"})
|
||||||
|
lines = output.decode().splitlines()
|
||||||
|
indices = (
|
||||||
|
i for i, line in enumerate(lines, 1)
|
||||||
|
if line.startswith(f"{self.PS1}xyzzy")
|
||||||
|
)
|
||||||
|
line_num = next(indices, -1)
|
||||||
|
self.assertNotEqual(line_num, -1)
|
||||||
|
# Completions occupy lines, assert no extra lines when there is nothing
|
||||||
|
# to complete.
|
||||||
|
self.assertEqual(line_num, len(lines))
|
||||||
|
|
||||||
|
def test_complete_no_input(self):
|
||||||
|
from _sqlite3 import SQLITE_KEYWORDS
|
||||||
|
|
||||||
|
script = textwrap.dedent("""
|
||||||
|
import readline
|
||||||
|
from sqlite3.__main__ import main
|
||||||
|
|
||||||
|
# Configure readline to ...:
|
||||||
|
# - hide control sequences surrounding each candidate
|
||||||
|
# - hide "Display all xxx possibilities? (y or n)"
|
||||||
|
# - hide "--More--"
|
||||||
|
# - show candidates one per line
|
||||||
|
readline.parse_and_bind("set colored-completion-prefix off")
|
||||||
|
readline.parse_and_bind("set colored-stats off")
|
||||||
|
readline.parse_and_bind("set completion-query-items 0")
|
||||||
|
readline.parse_and_bind("set page-completions off")
|
||||||
|
readline.parse_and_bind("set completion-display-width 0")
|
||||||
|
readline.parse_and_bind("set show-all-if-ambiguous off")
|
||||||
|
readline.parse_and_bind("set show-all-if-unmodified off")
|
||||||
|
|
||||||
|
main()
|
||||||
|
""")
|
||||||
|
input_ = b"\t\t.quit\n"
|
||||||
|
output = run_pty(script, input_, env={**os.environ, "NO_COLOR": "1"})
|
||||||
|
try:
|
||||||
|
lines = output.decode().splitlines()
|
||||||
|
indices = [
|
||||||
|
i for i, line in enumerate(lines)
|
||||||
|
if line.startswith(self.PS1)
|
||||||
|
]
|
||||||
|
self.assertEqual(len(indices), 2)
|
||||||
|
start, end = indices
|
||||||
|
candidates = [l.strip() for l in lines[start+1:end]]
|
||||||
|
self.assertEqual(candidates, sorted(SQLITE_KEYWORDS))
|
||||||
|
except:
|
||||||
|
if verbose:
|
||||||
|
print(' PTY output: '.center(30, '-'))
|
||||||
|
print(output.decode(errors='replace'))
|
||||||
|
print(' end PTY output '.center(30, '-'))
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -1869,6 +1869,7 @@ Neil Tallim
|
|||||||
Geoff Talvola
|
Geoff Talvola
|
||||||
Anish Tambe
|
Anish Tambe
|
||||||
Musashi Tamura
|
Musashi Tamura
|
||||||
|
Long Tan
|
||||||
William Tanksley
|
William Tanksley
|
||||||
Christian Tanzer
|
Christian Tanzer
|
||||||
Steven Taschuk
|
Steven Taschuk
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
Support keyword completion in the :mod:`sqlite3` command-line interface.
|
@ -32,6 +32,7 @@
|
|||||||
#include "microprotocols.h"
|
#include "microprotocols.h"
|
||||||
#include "row.h"
|
#include "row.h"
|
||||||
#include "blob.h"
|
#include "blob.h"
|
||||||
|
#include "util.h"
|
||||||
|
|
||||||
#if SQLITE_VERSION_NUMBER < 3015002
|
#if SQLITE_VERSION_NUMBER < 3015002
|
||||||
#error "SQLite 3.15.2 or higher required"
|
#error "SQLite 3.15.2 or higher required"
|
||||||
@ -404,6 +405,40 @@ pysqlite_error_name(int rc)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
add_keyword_tuple(PyObject *module)
|
||||||
|
{
|
||||||
|
#if SQLITE_VERSION_NUMBER >= 3024000
|
||||||
|
int count = sqlite3_keyword_count();
|
||||||
|
PyObject *keywords = PyTuple_New(count);
|
||||||
|
if (keywords == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
const char *keyword;
|
||||||
|
int size;
|
||||||
|
int result = sqlite3_keyword_name(i, &keyword, &size);
|
||||||
|
if (result != SQLITE_OK) {
|
||||||
|
pysqlite_state *state = pysqlite_get_state(module);
|
||||||
|
set_error_from_code(state, result);
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
PyObject *kwd = PyUnicode_FromStringAndSize(keyword, size);
|
||||||
|
if (!kwd) {
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
PyTuple_SET_ITEM(keywords, i, kwd);
|
||||||
|
}
|
||||||
|
return PyModule_Add(module, "SQLITE_KEYWORDS", keywords);
|
||||||
|
|
||||||
|
error:
|
||||||
|
Py_DECREF(keywords);
|
||||||
|
return -1;
|
||||||
|
#else
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
add_integer_constants(PyObject *module) {
|
add_integer_constants(PyObject *module) {
|
||||||
#define ADD_INT(ival) \
|
#define ADD_INT(ival) \
|
||||||
@ -702,6 +737,10 @@ module_exec(PyObject *module)
|
|||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (add_keyword_tuple(module) < 0) {
|
||||||
|
goto error;
|
||||||
|
}
|
||||||
|
|
||||||
if (PyModule_AddStringConstant(module, "sqlite_version", sqlite3_libversion())) {
|
if (PyModule_AddStringConstant(module, "sqlite_version", sqlite3_libversion())) {
|
||||||
goto error;
|
goto error;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user