Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 59 additions & 45 deletions cmd2/cmd2.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@
import rich.box
from prompt_toolkit import print_formatted_text
from prompt_toolkit.application import get_app
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import Completer, DummyCompleter
from prompt_toolkit.formatted_text import ANSI, FormattedText
from prompt_toolkit.history import InMemoryHistory
from prompt_toolkit.input import DummyInput, create_input
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.output import DummyOutput, create_output
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.shortcuts import CompleteStyle, PromptSession, set_title
from rich.console import (
Group,
RenderableType,
Expand Down Expand Up @@ -158,16 +167,6 @@
with contextlib.suppress(ImportError):
from IPython import start_ipython

from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
from prompt_toolkit.completion import Completer, DummyCompleter
from prompt_toolkit.formatted_text import ANSI, FormattedText
from prompt_toolkit.history import InMemoryHistory
from prompt_toolkit.input import DummyInput, create_input
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.output import DummyOutput, create_output
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.shortcuts import CompleteStyle, PromptSession, set_title

try:
if sys.platform == "win32":
from prompt_toolkit.output.win32 import NoConsoleScreenBufferError # type: ignore[attr-defined]
Expand Down Expand Up @@ -413,9 +412,6 @@ def __init__(
else:
self.stdout = sys.stdout

# Key used for completion
self.completekey = completekey

# Attributes which should NOT be dynamically settable via the set command at runtime
self.default_to_shell = False # Attempt to run unrecognized commands as shell commands
self.allow_redirection = allow_redirection # Security setting to prevent redirection of stdout
Expand Down Expand Up @@ -468,17 +464,14 @@ def __init__(
self._persistent_history_length = persistent_history_length
self._initialize_history(persistent_history_file)

# Initialize prompt-toolkit PromptSession
self.history_adapter = Cmd2History(self)
self.completer = Cmd2Completer(self)
self.lexer = Cmd2Lexer(self)
# Create the main PromptSession
self.bottom_toolbar = bottom_toolbar
self.main_session = self._create_main_session(auto_suggest, completekey)

self.auto_suggest = None
if auto_suggest:
self.auto_suggest = AutoSuggestFromHistory()

self.session = self._init_session()
# The session currently holding focus (either the main REPL or a command's
# custom prompt). Completion and UI logic should reference this variable
# to ensure they modify the correct session state.
self.active_session = self.main_session

# Commands to exclude from the history command
self.exclude_from_history = ['_eof', 'history']
Expand Down Expand Up @@ -651,18 +644,18 @@ def __init__(
# the current command being executed
self.current_command: Statement | None = None

def _init_session(self) -> PromptSession[str]:
"""Initialize and return the core PromptSession for the application.
def _create_main_session(self, auto_suggest: bool, completekey: str) -> PromptSession[str]:
"""Create and return the main PromptSession for the application.

Builds an interactive session if stdin is a TTY. Otherwise, uses
dummy drivers to support non-interactive streams like pipes or files.
"""
key_bindings = None
if self.completekey != self.DEFAULT_COMPLETEKEY:
if completekey != self.DEFAULT_COMPLETEKEY:
# Configure prompt_toolkit `KeyBindings` with the custom key for completion
key_bindings = KeyBindings()

@key_bindings.add(self.completekey)
@key_bindings.add(completekey)
def _(event: Any) -> None: # pragma: no cover
"""Trigger completion."""
b = event.current_buffer
Expand All @@ -673,15 +666,15 @@ def _(event: Any) -> None: # pragma: no cover

# Base configuration
kwargs: dict[str, Any] = {
"auto_suggest": self.auto_suggest,
"auto_suggest": AutoSuggestFromHistory() if auto_suggest else None,
"bottom_toolbar": self.get_bottom_toolbar if self.bottom_toolbar else None,
"complete_style": CompleteStyle.MULTI_COLUMN,
"complete_in_thread": True,
"complete_while_typing": False,
"completer": self.completer,
"history": self.history_adapter,
"completer": Cmd2Completer(self),
"history": Cmd2History(self),
"key_bindings": key_bindings,
"lexer": self.lexer,
"lexer": Cmd2Lexer(self),
"rprompt": self.get_rprompt,
}

Expand Down Expand Up @@ -2448,9 +2441,9 @@ def complete(

# Swap between COLUMN and MULTI_COLUMN style based on the number of matches.
if len(completions) > self.max_column_completion_results:
self.session.complete_style = CompleteStyle.MULTI_COLUMN
self.active_session.complete_style = CompleteStyle.MULTI_COLUMN
else:
self.session.complete_style = CompleteStyle.COLUMN
self.active_session.complete_style = CompleteStyle.COLUMN

return completions # noqa: TRY300

Expand Down Expand Up @@ -3227,11 +3220,23 @@ def completedefault(self, *_ignored: Sequence[str]) -> Completions:
def _suggest_similar_command(self, command: str) -> str | None:
return suggest_similar(command, self.get_visible_commands())

@staticmethod
def _is_tty_session(session: PromptSession[str]) -> bool:
"""Determine if the session supports full terminal interactions.

Returns True if the session is attached to a real TTY or a virtual
terminal (like PipeInput in tests). Returns False if the session is
running in a headless environment (DummyInput).
"""
# Validate against the session's assigned input driver rather than sys.stdin.
# This respects the fallback logic in _create_main_session() and allows unit
# tests to inject PipeInput for programmatic interaction.
return not isinstance(session.input, DummyInput)

def _read_raw_input(
self,
prompt: Callable[[], ANSI | str] | ANSI | str,
session: PromptSession[str],
completer: Completer,
**prompt_kwargs: Any,
) -> str:
"""Execute the low-level input read from either a terminal or a redirected stream.
Expand All @@ -3242,17 +3247,23 @@ def _read_raw_input(

:param prompt: the prompt text or a callable that returns the prompt.
:param session: the PromptSession instance to use for reading.
:param completer: the completer to use for this specific input.
:param prompt_kwargs: additional arguments passed directly to session.prompt().
:return: the stripped input string.
:raises EOFError: if the input stream is closed or the user signals EOF (e.g., Ctrl+D)
"""
# Check if the session is configured for interactive terminal use.
if not isinstance(session.input, DummyInput):
if self._is_tty_session(session):
if not callable(prompt):
prompt = pt_filter_style(prompt)

with patch_stdout():
if not callable(prompt):
prompt = pt_filter_style(prompt)
return session.prompt(prompt, completer=completer, **prompt_kwargs)
try:
# Set this session as the active one for UI/completion logic.
self.active_session = session
return session.prompt(prompt, **prompt_kwargs)
finally:
# Revert back to the main session.
self.active_session = self.main_session

# We're not at a terminal, so we're likely reading from a file or a pipe.
prompt_obj = prompt() if callable(prompt) else prompt
Expand Down Expand Up @@ -3350,14 +3361,18 @@ def read_input(
)

temp_session: PromptSession[str] = PromptSession(
complete_style=self.session.complete_style,
complete_while_typing=self.session.complete_while_typing,
auto_suggest=self.main_session.auto_suggest,
complete_style=self.main_session.complete_style,
complete_in_thread=self.main_session.complete_in_thread,
complete_while_typing=self.main_session.complete_while_typing,
completer=completer_to_use,
history=InMemoryHistory(history) if history is not None else InMemoryHistory(),
input=self.session.input,
output=self.session.output,
key_bindings=self.main_session.key_bindings,
input=self.main_session.input,
output=self.main_session.output,
)

return self._read_raw_input(prompt, temp_session, completer_to_use)
return self._read_raw_input(prompt, temp_session)

def _process_alerts(self) -> None:
"""Background worker that processes queued alerts and dynamic prompt updates."""
Expand Down Expand Up @@ -3452,8 +3467,7 @@ def _pre_prompt() -> None:
try:
return self._read_raw_input(
prompt=prompt_to_use,
session=self.session,
completer=self.completer,
session=self.main_session,
pre_run=_pre_prompt,
)
finally:
Expand Down
6 changes: 2 additions & 4 deletions cmd2/pt_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ def get_completions(self, document: Document, _complete_event: object) -> Iterab

# Define delimiters for completion to match cmd2/readline behavior
delimiters = BASE_DELIMITERS
if hasattr(self.cmd_app, 'statement_parser'):
delimiters += "".join(self.cmd_app.statement_parser.terminators)
delimiters += "".join(self.cmd_app.statement_parser.terminators)

# Find last delimiter before cursor to determine the word being completed
begidx = 0
Expand Down Expand Up @@ -275,8 +274,7 @@ def get_line(lineno: int) -> list[tuple[str, str]]:

# Get redirection tokens and terminators to avoid highlighting them as values
exclude_tokens = set(constants.REDIRECTION_TOKENS)
if hasattr(self.cmd_app, 'statement_parser'):
exclude_tokens.update(self.cmd_app.statement_parser.terminators)
exclude_tokens.update(self.cmd_app.statement_parser.terminators)

for m in arg_pattern.finditer(rest):
space, flag, quoted, word = m.groups()
Expand Down
6 changes: 3 additions & 3 deletions examples/async_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ def __init__(self) -> None:
super().__init__()
self.intro = 'Welcome to the Async Commands example. Type "help" to see available commands.'

if self.session.key_bindings is None:
self.session.key_bindings = KeyBindings()
if self.main_session.key_bindings is None:
self.main_session.key_bindings = KeyBindings()

# Add a custom key binding for <CTRL>+T that calls a method so it has access to self
@self.session.key_bindings.add('c-t')
@self.main_session.key_bindings.add('c-t')
def _(_event: Any) -> None:
self.handle_control_t(_event)

Expand Down
Loading
Loading