import copy
import os
import re
from .core import Argument
from .core import MultiCommand
from .core import Option
from .parser import split_arg_string
from .types import Choice
from .utils import echo
from collections import abc
except ImportError:
import collections as abc
# Note, only BASH version 4.4 and later have the nosort option.
%(complete_func)s() {
local IFS=$'\n'
%(autocomplete_var)s=complete $1 ) )
return 0
%(complete_func)setup() {
# Only BASH version 4.4 and later have the nosort option.
if [ ${BASH_VERSION_ARR[0]} -gt 4 ] || ([ ${BASH_VERSION_ARR[0]} -eq 4 ] \
&& [ ${BASH_VERSION_ARR[1]} -ge 4 ]); then
complete $COMPLETION_OPTIONS -F %(complete_func)s %(script_names)s
#compdef %(script_names)s
%(complete_func)s() {
local -a completions
local -a completions_with_descriptions
local -a response
(( ! $+commands[%(script_names)s] )) && return 1
response=("${(@f)$( env COMP_WORDS=\"${words[*]}\" \\
%(autocomplete_var)s=\"complete_zsh\" \\
%(script_names)s )}")
for key descr in ${(kv)response}; do
if [[ "$descr" == "_" ]]; then
if [ -n "$completions_with_descriptions" ]; then
_describe -V unsorted completions_with_descriptions -U
if [ -n "$completions" ]; then
compadd -U -V unsorted -a completions
compdef %(complete_func)s %(script_names)s
"complete --no-files --command %(script_names)s --arguments"
' "(env %(autocomplete_var)s=complete_fish'
" COMP_WORDS=(commandline -cp) COMP_CWORD=(commandline -t)"
' %(script_names)s)"'
_completion_scripts = {
_invalid_ident_char_re = re.compile(r"[^a-zA-Z0-9_]")
def get_completion_script(prog_name, complete_var, shell):
cf_name = _invalid_ident_char_re.sub("", prog_name.replace("-", "_"))
script = _completion_scripts.get(shell, COMPLETION_SCRIPT_BASH)
return (
% {
"complete_func": "_{}_completion".format(cf_name),
"script_names": prog_name,
"autocomplete_var": complete_var,
).strip() + ";"
def resolve_ctx(cli, prog_name, args):
"""Parse into a hierarchy of contexts. Contexts are connected
through the parent variable.
:param cli: command definition
:param prog_name: the program that is running
:param args: full list of args
:return: the final context/command parsed
ctx = cli.make_context(prog_name, args, resilient_parsing=True)
args = ctx.protected_args + ctx.args
while args:
if isinstance(ctx.command, MultiCommand):
if not ctx.command.chain:
cmd_name, cmd, args = ctx.command.resolve_command(ctx, args)
if cmd is None:
return ctx
ctx = cmd.make_context(
cmd_name, args, parent=ctx, resilient_parsing=True
args = ctx.protected_args + ctx.args
# Walk chained subcommand contexts saving the last one.
while args:
cmd_name, cmd, args = ctx.command.resolve_command(ctx, args)
if cmd is None:
return ctx
sub_ctx = cmd.make_context(
args = sub_ctx.args
ctx = sub_ctx
args = sub_ctx.protected_args + sub_ctx.args
return ctx
def start_of_option(param_str):
:param param_str: param_str to check
:return: whether or not this is the start of an option declaration
(i.e. starts "-" or "--")
return param_str and param_str[:1] == "-"
def is_incomplete_option(all_args, cmd_param):
:param all_args: the full original list of args supplied
:param cmd_param: the current command paramter
:return: whether or not the last option declaration (i.e. starts
"-" or "--") is incomplete and corresponds to this cmd_param. In
other words whether this cmd_param option can still accept
if not isinstance(cmd_param, Option):
return False
if cmd_param.is_flag:
return False
last_option = None
for index, arg_str in enumerate(
reversed([arg for arg in all_args if arg != WORDBREAK])
if index + 1 > cmd_param.nargs:
if start_of_option(arg_str):
last_option = arg_str
return True if last_option and last_option in cmd_param.opts else False
def is_incomplete_argument(current_params, cmd_param):
:param current_params: the current params and values for this
argument as already entered
:param cmd_param: the current command parameter
:return: whether or not the last argument is incomplete and
corresponds to this cmd_param. In other words whether or not the
this cmd_param argument can still accept values
if not isinstance(cmd_param, Argument):
return False
current_param_values = current_params[]
if current_param_values is None:
return True
if cmd_param.nargs == -1:
return True
if (
isinstance(current_param_values, abc.Iterable)
and cmd_param.nargs > 1
and len(current_param_values) < cmd_param.nargs
return True
return False
def get_user_autocompletions(ctx, args, incomplete, cmd_param):
:param ctx: context associated with the parsed command
:param args: full list of args
:param incomplete: the incomplete text to autocomplete
:param cmd_param: command definition
:return: all the possible user-specified completions for the param
results = []
if isinstance(cmd_param.type, Choice):
# Choices don't support descriptions.
results = [
(c, None) for c in cmd_param.type.choices if str(c).startswith(incomplete)
elif cmd_param.autocompletion is not None:
dynamic_completions = cmd_param.autocompletion(
ctx=ctx, args=args, incomplete=incomplete
results = [
c if isinstance(c, tuple) else (c, None) for c in dynamic_completions
return results
def get_visible_commands_starting_with(ctx, starts_with):
:param ctx: context associated with the parsed command
:starts_with: string that visible commands must start with.
:return: all visible (not hidden) commands that start with starts_with.
for c in ctx.command.list_commands(ctx):
if c.startswith(starts_with):
command = ctx.command.get_command(ctx, c)
if not command.hidden:
yield command
def add_subcommand_completions(ctx, incomplete, completions_out):
# Add subcommand completions.
if isinstance(ctx.command, MultiCommand):
(, c.get_short_help_str())
for c in get_visible_commands_starting_with(ctx, incomplete)
# Walk up the context list and add any other completion
# possibilities from chained commands
while ctx.parent is not None:
ctx = ctx.parent
if isinstance(ctx.command, MultiCommand) and ctx.command.chain:
remaining_commands = [
for c in get_visible_commands_starting_with(ctx, incomplete)
if not in ctx.protected_args
[(, c.get_short_help_str()) for c in remaining_commands]
def get_choices(cli, prog_name, args, incomplete):
:param cli: command definition
:param prog_name: the program that is running
:param args: full list of args
:param incomplete: the incomplete text to autocomplete
:return: all the possible completions for the incomplete
all_args = copy.deepcopy(args)
ctx = resolve_ctx(cli, prog_name, args)
if ctx is None:
return []
has_double_dash = "--" in all_args
# In newer versions of bash long opts with '='s are partitioned, but
# it's easier to parse without the '='
if start_of_option(incomplete) and WORDBREAK in incomplete:
partition_incomplete = incomplete.partition(WORDBREAK)
incomplete = partition_incomplete[2]
elif incomplete == WORDBREAK:
incomplete = ""
completions = []
if not has_double_dash and start_of_option(incomplete):
# completions for partial options
for param in ctx.command.params:
if isinstance(param, Option) and not param.hidden:
param_opts = [
for param_opt in param.opts + param.secondary_opts
if param_opt not in all_args or param.multiple
[(o, for o in param_opts if o.startswith(incomplete)]
return completions
# completion for option values from user supplied values
for param in ctx.command.params:
if is_incomplete_option(all_args, param):
return get_user_autocompletions(ctx, all_args, incomplete, param)
# completion for argument values from user supplied values
for param in ctx.command.params:
if is_incomplete_argument(ctx.params, param):
return get_user_autocompletions(ctx, all_args, incomplete, param)
add_subcommand_completions(ctx, incomplete, completions)
# Sort before returning so that proper ordering can be enforced in custom types.
return sorted(completions)
def do_complete(cli, prog_name, include_descriptions):
cwords = split_arg_string(os.environ["COMP_WORDS"])
cword = int(os.environ["COMP_CWORD"])
args = cwords[1:cword]
incomplete = cwords[cword]
except IndexError:
incomplete = ""
for item in get_choices(cli, prog_name, args, incomplete):
if include_descriptions:
# ZSH has trouble dealing with empty array parameters when
# returned from commands, use '_' to indicate no description
# is present.
echo(item[1] if item[1] else "_")
return True
def do_complete_fish(cli, prog_name):
cwords = split_arg_string(os.environ["COMP_WORDS"])
incomplete = os.environ["COMP_CWORD"]
args = cwords[1:]
for item in get_choices(cli, prog_name, args, incomplete):
if item[1]:
echo("{arg}\t{desc}".format(arg=item[0], desc=item[1]))
return True
def bashcomplete(cli, prog_name, complete_var, complete_instr):
if "_" in complete_instr:
command, shell = complete_instr.split("_", 1)
command = complete_instr
shell = "bash"
if command == "source":
echo(get_completion_script(prog_name, complete_var, shell))
return True
elif command == "complete":
if shell == "fish":
return do_complete_fish(cli, prog_name)
elif shell in {"bash", "zsh"}:
return do_complete(cli, prog_name, shell == "zsh")
return False
import codecs
import io
import os
import re
import sys
from weakref import WeakKeyDictionary
PY2 = sys.version_info[0] == 2
CYGWIN = sys.platform.startswith("cygwin")
MSYS2 = sys.platform.startswith("win") and ("GCC" in sys.version)
# Determine local App Engine environment, per Google's own suggestion
APP_ENGINE = "APPENGINE_RUNTIME" in os.environ and "Development/" in os.environ.get(
WIN = sys.platform.startswith("win") and not APP_ENGINE and not MSYS2
_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
def get_filesystem_encoding():
return sys.getfilesystemencoding() or sys.getdefaultencoding()
def _make_text_stream(
stream, encoding, errors, force_readable=False, force_writable=False
if encoding is None:
encoding = get_best_encoding(stream)
if errors is None:
errors = "replace"
return _NonClosingTextIOWrapper(
def is_ascii_encoding(encoding):
"""Checks if a given encoding is ascii."""
return codecs.lookup(encoding).name == "ascii"
except LookupError:
return False
def get_best_encoding(stream):
"""Returns the default stream encoding if not found."""
rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
if is_ascii_encoding(rv):
return "utf-8"
return rv
class _NonClosingTextIOWrapper(io.TextIOWrapper):
def __init__(
self._stream = stream = _FixupStream(stream, force_readable, force_writable)
io.TextIOWrapper.__init__(self, stream, encoding, errors, **extra)
# The io module is a place where the Python 3 text behavior
# was forced upon Python 2, so we need to unbreak
# it to look like Python 2.
if PY2:
def write(self, x):
if isinstance(x, str) or is_bytes(x):
except Exception:
return self.buffer.write(str(x))
return io.TextIOWrapper.write(self, x)
def writelines(self, lines):
for line in lines:
def __del__(self):
except Exception:
def isatty(self):
return self._stream.isatty()
class _FixupStream(object):
"""The new io interface needs more from streams than streams
traditionally implement. As such, this fix-up code is necessary in
some circumstances.
The forcing of readable and writable flags are there because some tools
put badly patched objects on sys (one such offender are certain version
of jupyter notebook).
def __init__(self, stream, force_readable=False, force_writable=False):
self._stream = stream
self._force_readable = force_readable
self._force_writable = force_writable
def __getattr__(self, name):
return getattr(self._stream, name)
def read1(self, size):
f = getattr(self._stream, "read1", None)
if f is not None:
return f(size)
# We only dispatch to readline instead of read in Python 2 as we
# do not want cause problems with the different implementation
# of line buffering.
if PY2:
return self._stream.readline(size)
def readable(self):
if self._force_readable:
return True
x = getattr(self._stream, "readable", None)
if x is not None:
return x()
except Exception:
return False
return True
def writable(self):
if self._force_writable:
return True
x = getattr(self._stream, "writable", None)
if x is not None:
return x()
except Exception:
except Exception:
return False
return True
def seekable(self):
x = getattr(self._stream, "seekable", None)
if x is not None:
return x()
except Exception:
return False
return True
if PY2:
text_type = unicode
raw_input = raw_input
string_types = (str, unicode)
int_types = (int, long)
iteritems = lambda x: x.iteritems()
range_type = xrange
from pipes import quote as shlex_quote
def is_bytes(x):
return isinstance(x, (buffer, bytearray))
_identifier_re = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_]*$")
# For Windows, we need to force stdout/stdin/stderr to binary if it's
# fetched for that. This obviously is not the most correct way to do
# it as it changes global state. Unfortunately, there does not seem to
# be a clear better way to do it as just reopening the file in binary
# mode does not change anything.
# An option would be to do what Python 3 does and to open the file as
# binary only, patch it back to the system, and then use a wrapper
# stream that converts newlines. It's not quite clear what's the
# correct option here.
# This code also lives in _winconsole for the fallback to the console
# emulation stream.
# There are also Windows environments where the `msvcrt` module is not
# available (which is why we use try-catch instead of the WIN variable
# here), such as the Google App Engine development server on Windows. In
# those cases there is just nothing we can do.
def set_binary_mode(f):
return f
import msvcrt
except ImportError:
def set_binary_mode(f):
fileno = f.fileno()
except Exception:
msvcrt.setmode(fileno, os.O_BINARY)
return f
import fcntl
except ImportError:
def set_binary_mode(f):
fileno = f.fileno()
except Exception:
flags = fcntl.fcntl(fileno, fcntl.F_GETFL)
fcntl.fcntl(fileno, fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
return f
def isidentifier(x):
return is not None
def get_binary_stdin():
return set_binary_mode(sys.stdin)
def get_binary_stdout():
return set_binary_mode(sys.stdout)
def get_binary_stderr():
return set_binary_mode(sys.stderr)
def get_text_stdin(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stdin, encoding, errors)
if rv is not None:
return rv
return _make_text_stream(sys.stdin, encoding, errors, force_readable=True)
def get_text_stdout(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stdout, encoding, errors)
if rv is not None:
return rv
return _make_text_stream(sys.stdout, encoding, errors, force_writable=True)
def get_text_stderr(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stderr, encoding, errors)
if rv is not None:
return rv
return _make_text_stream(sys.stderr, encoding, errors, force_writable=True)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), "replace")
return value
import io
text_type = str
raw_input = input
string_types = (str,)
int_types = (int,)
range_type = range
isidentifier = lambda x: x.isidentifier()
iteritems = lambda x: iter(x.items())
from shlex import quote as shlex_quote
def is_bytes(x):
return isinstance(x, (bytes, memoryview, bytearray))
def _is_binary_reader(stream, default=False):
return isinstance(, bytes)
except Exception:
return default
# This happens in some cases where the stream was already
# closed. In this case, we assume the default.
def _is_binary_writer(stream, default=False):
except Exception:
return False
except Exception:
return default
return True
def _find_binary_reader(stream):
# We need to figure out if the given stream is already binary.
# This can happen because the official docs recommend detaching
# the streams to get binary streams. Some code might do this, so
# we need to deal with this case explicitly.
if _is_binary_reader(stream, False):
return stream
buf = getattr(stream, "buffer", None)
# Same situation here; this time we assume that the buffer is
# actually binary in case it's closed.
if buf is not None and _is_binary_reader(buf, True):
return buf
def _find_binary_writer(stream):
# We need to figure out if the given stream is already binary.
# This can happen because the official docs recommend detatching
# the streams to get binary streams. Some code might do this, so
# we need to deal with this case explicitly.
if _is_binary_writer(stream, False):
return stream
buf = getattr(stream, "buffer", None)
# Same situation here; this time we assume that the buffer is
# actually binary in case it's closed.
if buf is not None and _is_binary_writer(buf, True):
return buf
def _stream_is_misconfigured(stream):
"""A stream is misconfigured if its encoding is ASCII."""
# If the stream does not have an encoding set, we assume it's set
# to ASCII. This appears to happen in certain unittest
# environments. It's not quite clear what the correct behavior is
# but this at least will force Click to recover somehow.
return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
def _is_compat_stream_attr(stream, attr, value):
"""A stream attribute is compatible if it is equal to the
desired value or the desired value is unset and the attribute
has a value.
stream_value = getattr(stream, attr, None)
return stream_value == value or (value is None and stream_value is not None)
def _is_compatible_text_stream(stream, encoding, errors):
"""Check if a stream's encoding and errors attributes are
compatible with the desired values.
return _is_compat_stream_attr(
stream, "encoding", encoding
) and _is_compat_stream_attr(stream, "errors", errors)
def _force_correct_text_stream(
if is_binary(text_stream, False):
binary_reader = text_stream
# If the stream looks compatible, and won't default to a
# misconfigured ascii encoding, return it as-is.
if _is_compatible_text_stream(text_stream, encoding, errors) and not (
encoding is None and _stream_is_misconfigured(text_stream)
return text_stream
# Otherwise, get the underlying binary reader.
binary_reader = find_binary(text_stream)
# If that's not possible, silently use the original reader
# and get mojibake instead of exceptions.
if binary_reader is None:
return text_stream
# Default errors to replace instead of strict in order to get
# something that works.
if errors is None:
errors = "replace"
# Wrap the binary stream in a text stream with the correct
# encoding parameters.
return _make_text_stream(
def _force_correct_text_reader(text_reader, encoding, errors, force_readable=False):
return _force_correct_text_stream(
def _force_correct_text_writer(text_writer, encoding, errors, force_writable=False):
return _force_correct_text_stream(
def get_binary_stdin():
reader = _find_binary_reader(sys.stdin)
if reader is None:
raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
return reader
def get_binary_stdout():
writer = _find_binary_writer(sys.stdout)
if writer is None:
raise RuntimeError(
"Was not able to determine binary stream for sys.stdout."
return writer
def get_binary_stderr():
writer = _find_binary_writer(sys.stderr)
if writer is None:
raise RuntimeError(
"Was not able to determine binary stream for sys.stderr."
return writer
def get_text_stdin(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stdin, encoding, errors)
if rv is not None:
return rv
return _force_correct_text_reader(
sys.stdin, encoding, errors, force_readable=True
def get_text_stdout(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stdout, encoding, errors)
if rv is not None:
return rv
return _force_correct_text_writer(
sys.stdout, encoding, errors, force_writable=True
def get_text_stderr(encoding=None, errors=None):
rv = _get_windows_console_stream(sys.stderr, encoding, errors)
if rv is not None:
return rv
return _force_correct_text_writer(
sys.stderr, encoding, errors, force_writable=True
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), "replace")
value = value.encode("utf-8", "surrogateescape").decode("utf-8", "replace")
return value
def get_streerror(e, default=None):
if hasattr(e, "strerror"):
msg = e.strerror
if default is not None:
msg = default
msg = str(e)
if isinstance(msg, bytes):
msg = msg.decode("utf-8", "replace")
return msg
def _wrap_io_open(file, mode, encoding, errors):
"""On Python 2, :func:`` returns a text file wrapper that
requires passing ``unicode`` to ``write``. Need to open the file in
binary mode then wrap it in a subclass that can write ``str`` and
Also handles not passing ``encoding`` and ``errors`` in binary mode.
binary = "b" in mode
if binary:
kwargs = {}
kwargs = {"encoding": encoding, "errors": errors}
if not PY2 or binary:
return, mode, **kwargs)
f =, "{}b".format(mode.replace("t", "")))
return _make_text_stream(f, **kwargs)
def open_stream(filename, mode="r", encoding=None, errors="strict", atomic=False):
binary = "b" in mode
# Standard streams first. These are simple because they don't need
# special handling for the atomic flag. It's entirely ignored.
if filename == "-":
if any(m in mode for m in ["w", "a", "x"]):
if binary:
return get_binary_stdout(), False
return get_text_stdout(encoding=encoding, errors=errors), False
if binary:
return get_binary_stdin(), False
return get_text_stdin(encoding=encoding, errors=errors), False
# Non-atomic writes directly go out through the regular open functions.
if not atomic:
return _wrap_io_open(filename, mode, encoding, errors), True
# Some usability stuff for atomic writes
if "a" in mode:
raise ValueError(
"Appending to an existing file is not supported, because that"
" would involve an expensive `copy`-operation to a temporary"
" file. Open the file in normal `w`-mode and copy explicitly"
" if that's what you're after."
if "x" in mode:
raise ValueError("Use the `overwrite`-parameter instead.")
if "w" not in mode:
raise ValueError("Atomic writes only make sense with `w`-mode.")
# Atomic writes are more complicated. They work by opening a file
# as a proxy in the same folder and then using the fdopen
# functionality to wrap it in a Python file. Then we wrap it in an
# atomic file that moves the file over on close.
import errno
import random
perm = os.stat(filename).st_mode
except OSError:
perm = None
flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
if binary:
flags |= getattr(os, "O_BINARY", 0)
while True:
tmp_filename = os.path.join(
".__atomic-write{:08x}".format(random.randrange(1 << 32)),
fd =, flags, 0o666 if perm is None else perm)
except OSError as e:
if e.errno == errno.EEXIST or ( == "nt"
and e.errno == errno.EACCES
and os.path.isdir(e.filename)
and os.access(e.filename, os.W_OK)
if perm is not None:
os.chmod(tmp_filename, perm) # in case perm includes bits in umask
f = _wrap_io_open(fd, mode, encoding, errors)
return _AtomicFile(f, tmp_filename, os.path.realpath(filename)), True
# Used in a destructor call, needs extra protection from interpreter cleanup.
if hasattr(os, "replace"):
_replace = os.replace
_can_replace = True
_replace = os.rename
_can_replace = not WIN
class _AtomicFile(object):
def __init__(self, f, tmp_filename, real_filename):
self._f = f
self._tmp_filename = tmp_filename
self._real_filename = real_filename
self.closed = False
def name(self):
return self._real_filename
def close(self, delete=False):
if self.closed:
if not _can_replace:
except OSError:
_replace(self._tmp_filename, self._real_filename)
self.closed = True
def __getattr__(self, name):
return getattr(self._f, name)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, tb):
self.close(delete=exc_type is not None)
def __repr__(self):
return repr(self._f)
auto_wrap_for_ansi = None
colorama = None
get_winterm_size = None
def strip_ansi(value):
return _ansi_re.sub("", value)
def _is_jupyter_kernel_output(stream):
if WIN:
# TODO: Couldn't test on Windows, should't try to support until
# someone tests the details wrt colorama.
while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
stream = stream._stream
return stream.__class__.__module__.startswith("ipykernel.")
def should_strip_ansi(stream=None, color=None):
if color is None:
if stream is None:
stream = sys.stdin
return not isatty(stream) and not _is_jupyter_kernel_output(stream)
return not color
# If we're on Windows, we provide transparent integration through
# colorama. This will make ANSI colors through the echo function
# work automatically.
if WIN:
# Windows has a smaller terminal
from ._winconsole import _get_windows_console_stream, _wrap_std_stream
def _get_argv_encoding():
import locale
return locale.getpreferredencoding()
if PY2:
def raw_input(prompt=""):
if prompt:
stdout = _default_text_stdout()
stdin = _default_text_stdin()
return stdin.readline().rstrip("\r\n")
import colorama
except ImportError:
_ansi_stream_wrappers = WeakKeyDictionary()
def auto_wrap_for_ansi(stream, color=None):
"""This function wraps a stream so that calls through colorama
are issued to the win32 console API to recolor on demand. It
also ensures to reset the colors if a write call is interrupted
to not destroy the console afterwards.
cached = _ansi_stream_wrappers.get(stream)
except Exception:
cached = None
if cached is not None:
return cached
strip = should_strip_ansi(stream, color)
ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
rv =
_write = rv.write
def _safe_write(s):
return _write(s)
rv.write = _safe_write
_ansi_stream_wrappers[stream] = rv
except Exception:
return rv
def get_winterm_size():
win = colorama.win32.GetConsoleScreenBufferInfo(
return win.Right - win.Left, win.Bottom - win.Top
def _get_argv_encoding():
return getattr(sys.stdin, "encoding", None) or get_filesystem_encoding()
_get_windows_console_stream = lambda *x: None
_wrap_std_stream = lambda *x: None
def term_len(x):
return len(strip_ansi(x))
def isatty(stream):
return stream.isatty()
except Exception:
return False
def _make_cached_stream_func(src_func, wrapper_func):
cache = WeakKeyDictionary()
def func():
stream = src_func()
rv = cache.get(stream)
except Exception:
rv = None
if rv is not None:
return rv
rv = wrapper_func()
stream = src_func() # In case wrapper_func() modified the stream
cache[stream] = rv
except Exception:
return rv
return func
_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
binary_streams = {
"stdin": get_binary_stdin,
"stdout": get_binary_stdout,
"stderr": get_binary_stderr,
text_streams = {
"stdin": get_text_stdin,
"stdout": get_text_stdout,
"stderr": get_text_stderr,
This module contains implementations for the termui module. To keep the
import time of Click down, some infrequently used functionality is
placed in this module and only imported as needed.
import contextlib
import math
import os
import sys
import time
from ._compat import _default_text_stdout
from ._compat import CYGWIN
from ._compat import get_best_encoding
from ._compat import int_types
from ._compat import isatty
from ._compat import open_stream
from ._compat import range_type
from ._compat import shlex_quote
from ._compat import strip_ansi
from ._compat import term_len
from ._compat import WIN
from .exceptions import ClickException
from .utils import echo
if == "nt":
AFTER_BAR = "\n"
BEFORE_BAR = "\r\033[?25l"
AFTER_BAR = "\033[?25h\n"
def _length_hint(obj):
"""Returns the length hint of an object."""
return len(obj)
except (AttributeError, TypeError):
get_hint = type(obj).__length_hint__
except AttributeError:
return None
hint = get_hint(obj)
except TypeError:
return None
if hint is NotImplemented or not isinstance(hint, int_types) or hint < 0:
return None
return hint
class ProgressBar(object):
def __init__(
empty_char=" ",
info_sep=" ",
self.fill_char = fill_char
self.empty_char = empty_char
self.bar_template = bar_template
self.info_sep = info_sep
self.show_eta = show_eta
self.show_percent = show_percent
self.show_pos = show_pos
self.item_show_func = item_show_func
self.label = label or ""
if file is None:
file = _default_text_stdout()
self.file = file
self.color = color
self.width = width
self.autowidth = width == 0
if length is None:
length = _length_hint(iterable)
if iterable is None:
if length is None:
raise TypeError("iterable or length is required")
iterable = range_type(length)
self.iter = iter(iterable)
self.length = length
self.length_known = length is not None
self.pos = 0
self.avg = []
self.start = self.last_eta = time.time()
self.eta_known = False
self.finished = False
self.max_width = None
self.entered = False
self.current_item = None
self.is_hidden = not isatty(self.file)
self._last_line = None
self.short_limit = 0.5
def __enter__(self):
self.entered = True
return self
def __exit__(self, exc_type, exc_value, tb):
def __iter__(self):
if not self.entered:
raise RuntimeError("You need to use progress bars in a with block.")
return self.generator()
def __next__(self):
# Iteration is defined in terms of a generator function,
# returned by iter(self); use that to define next(). This works
# because `self.iter` is an iterable consumed by that generator,
# so it is re-entry safe. Calling `next(self.generator())`
# twice works and does "what you want".
return next(iter(self))
next = __next__
def is_fast(self):
return time.time() - self.start <= self.short_limit
def render_finish(self):
if self.is_hidden or self.is_fast():
def pct(self):
if self.finished:
return 1.0
return min(self.pos / (float(self.length) or 1), 1.0)
def time_per_iteration(self):
if not self.avg:
return 0.0
return sum(self.avg) / float(len(self.avg))
def eta(self):
if self.length_known and not self.finished:
return self.time_per_iteration * (self.length - self.pos)
return 0.0
def format_eta(self):
if self.eta_known:
t = int(self.eta)
seconds = t % 60
t //= 60
minutes = t % 60
t //= 60
hours = t % 24
t //= 24
if t > 0:
return "{}d {:02}:{:02}:{:02}".format(t, hours, minutes, seconds)
return "{:02}:{:02}:{:02}".format(hours, minutes, seconds)
return ""
def format_pos(self):
pos = str(self.pos)
if self.length_known:
pos += "/{}".format(self.length)
return pos
def format_pct(self):
return "{: 4}%".format(int(self.pct * 100))[1:]
def format_bar(self):
if self.length_known:
bar_length = int(self.pct * self.width)
bar = self.fill_char * bar_length
bar += self.empty_char * (self.width - bar_length)
elif self.finished:
bar = self.fill_char * self.width
bar = list(self.empty_char * (self.width or 1))
if self.time_per_iteration != 0:
(math.cos(self.pos * self.time_per_iteration) / 2.0 + 0.5)
* self.width
] = self.fill_char
bar = "".join(bar)
return bar
def format_progress_line(self):
show_percent = self.show_percent
info_bits = []
if self.length_known and show_percent is None:
show_percent = not self.show_pos
if self.show_pos:
if show_percent:
if self.show_eta and self.eta_known and not self.finished:
if self.item_show_func is not None:
item_info = self.item_show_func(self.current_item)
if item_info is not None:
return (
% {
"label": self.label,
"bar": self.format_bar(),
"info": self.info_sep.join(info_bits),
def render_progress(self):
from .termui import get_terminal_size
if self.is_hidden:
buf = []
# Update width in case the terminal has been resized
if self.autowidth:
old_width = self.width
self.width = 0
clutter_length = term_len(self.format_progress_line())
new_width = max(0, get_terminal_size()[0] - clutter_length)
if new_width < old_width:
buf.append(" " * self.max_width)
self.max_width = new_width
self.width = new_width
clear_width = self.width
if self.max_width is not None:
clear_width = self.max_width
line = self.format_progress_line()
line_len = term_len(line)
if self.max_width is None or self.max_width < line_len:
self.max_width = line_len
buf.append(" " * (clear_width - line_len))
line = "".join(buf)
# Render the line only if it changed.
if line != self._last_line and not self.is_fast():
self._last_line = line
echo(line, file=self.file, color=self.color, nl=False)
def make_step(self, n_steps):
self.pos += n_steps
if self.length_known and self.pos >= self.length:
self.finished = True
if (time.time() - self.last_eta) < 1.0:
self.last_eta = time.time()
# self.avg is a rolling list of length <= 7 of steps where steps are
# defined as time elapsed divided by the total progress through
# self.length.
if self.pos:
step = (time.time() - self.start) / self.pos
step = time.time() - self.start
self.avg = self.avg[-6:] + [step]
self.eta_known = self.length_known
def update(self, n_steps):
def finish(self):
self.eta_known = 0
self.current_item = None
self.finished = True
def generator(self):
"""Return a generator which yields the items added to the bar
during construction, and updates the progress bar *after* the
yielded block returns.
# WARNING: the iterator interface for `ProgressBar` relies on
# this and only works because this is a simple generator which
# doesn't create or manage additional state. If this function
# changes, the impact should be evaluated both against
# `iter(bar)` and `next(bar)`. `next()` in particular may call
# `self.generator()` repeatedly, and this must remain safe in
# order for that interface to work.
if not self.entered:
raise RuntimeError("You need to use progress bars in a with block.")
if self.is_hidden:
for rv in self.iter:
yield rv
for rv in self.iter:
self.current_item = rv
yield rv
def pager(generator, color=None):
"""Decide what method to use for paging through text."""
stdout = _default_text_stdout()
if not isatty(sys.stdin) or not isatty(stdout):
return _nullpager(stdout, generator, color)
pager_cmd = (os.environ.get("PAGER", None) or "").strip()
if pager_cmd:
if WIN:
return _tempfilepager(generator, pager_cmd, color)
return _pipepager(generator, pager_cmd, color)
if os.environ.get("TERM") in ("dumb", "emacs"):
return _nullpager(stdout, generator, color)
if WIN or sys.platform.startswith("os2"):
return _tempfilepager(generator, "more <", color)
if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0:
return _pipepager(generator, "less", color)
import tempfile
fd, filename = tempfile.mkstemp()
if (
hasattr(os, "system")
and os.system("more {}".format(shlex_quote(filename))) == 0
return _pipepager(generator, "more", color)
return _nullpager(stdout, generator, color)
def _pipepager(generator, cmd, color):
"""Page through text by feeding it to another program. Invoking a
pager through this might support colors.
import subprocess
env = dict(os.environ)
# If we're piping to less we might support colors under the
# condition that
cmd_detail = cmd.rsplit("/", 1)[-1].split()
if color is None and cmd_detail[0] == "less":
less_flags = "{}{}".format(os.environ.get("LESS", ""), " ".join(cmd_detail[1:]))
if not less_flags:
env["LESS"] = "-R"
color = True
elif "r" in less_flags or "R" in less_flags:
color = True
c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env)
encoding = get_best_encoding(c.stdin)
for text in generator:
if not color:
text = strip_ansi(text)
c.stdin.write(text.encode(encoding, "replace"))
except (IOError, KeyboardInterrupt):
# Less doesn't respect ^C, but catches it for its own UI purposes (aborting
# search or other commands inside less).
# That means when the user hits ^C, the parent process (click) terminates,
# but less is still alive, paging the output and messing up the terminal.
# If the user wants to make the pager exit on ^C, they should set
# `LESS='-K'`. It's not our decision to make.
while True:
except KeyboardInterrupt:
def _tempfilepager(generator, cmd, color):
"""Page through text by invoking a program on a temporary file."""
import tempfile
filename = tempfile.mktemp()
# TODO: This never terminates if the passed generator never terminates.
text = "".join(generator)
if not color:
text = strip_ansi(text)
encoding = get_best_encoding(sys.stdout)
with open_stream(filename, "wb")[0] as f:
os.system("{} {}".format(shlex_quote(cmd), shlex_quote(filename)))
def _nullpager(stream, generator, color):
"""Simply print unformatted text. This is the ultimate fallback."""
for text in generator:
if not color:
text = strip_ansi(text)
class Editor(object):
def __init__(self, editor=None, env=None, require_save=True, extension=".txt"):
self.editor = editor
self.env = env
self.require_save = require_save
self.extension = extension
def get_editor(self):
if self.editor is not None:
return self.editor
for key in "VISUAL", "EDITOR":
rv = os.environ.get(key)
if rv:
return rv
if WIN:
return "notepad"
for editor in "sensible-editor", "vim", "nano":
if os.system("which {} >/dev/null 2>&1".format(editor)) == 0:
return editor
return "vi"
def edit_file(self, filename):
import subprocess
editor = self.get_editor()
if self.env:
environ = os.environ.copy()
environ = None
c = subprocess.Popen(
"{} {}".format(shlex_quote(editor), shlex_quote(filename)),
exit_code = c.wait()
if exit_code != 0:
raise ClickException("{}: Editing failed!".format(editor))
except OSError as e:
raise ClickException("{}: Editing failed: {}".format(editor, e))
def edit(self, text):
import tempfile
text = text or ""
if text and not text.endswith("\n"):
text += "\n"
fd, name = tempfile.mkstemp(prefix="editor-", suffix=self.extension)
if WIN:
encoding = "utf-8-sig"
text = text.replace("\n", "\r\n")
encoding = "utf-8"
text = text.encode(encoding)
f = os.fdopen(fd, "wb")
timestamp = os.path.getmtime(name)
if self.require_save and os.path.getmtime(name) == timestamp:
return None
f = open(name, "rb")
rv =
return rv.decode("utf-8-sig").replace("\r\n", "\n")
def open_url(url, wait=False, locate=False):
import subprocess
def _unquote_file(url):
import urllib
except ImportError:
import urllib
if url.startswith("file://"):
url = urllib.unquote(url[7:])
return url
if sys.platform == "darwin":
args = ["open"]
if wait:
if locate:
null = open("/dev/null", "w")
return subprocess.Popen(args, stderr=null).wait()
elif WIN:
if locate:
url = _unquote_file(url)
args = "explorer /select,{}".format(shlex_quote(url))
args = 'start {} "" {}'.format("/WAIT" if wait else "", shlex_quote(url))
return os.system(args)
elif CYGWIN:
if locate:
url = _unquote_file(url)
args = "cygstart {}".format(shlex_quote(os.path.dirname(url)))
args = "cygstart {} {}".format("-w" if wait else "", shlex_quote(url))
return os.system(args)
if locate:
url = os.path.dirname(_unquote_file(url)) or "."
url = _unquote_file(url)
c = subprocess.Popen(["xdg-open", url])
if wait:
return c.wait()
return 0
except OSError:
if url.startswith(("http://", "https://")) and not locate and not wait:
import webbrowser
return 0
return 1
def _translate_ch_to_exc(ch):
if ch == u"\x03":
raise KeyboardInterrupt()
if ch == u"\x04" and not WIN: # Unix-like, Ctrl+D
raise EOFError()
if ch == u"\x1a" and WIN: # Windows, Ctrl+Z
raise EOFError()
if WIN:
import msvcrt
def raw_terminal():
def getchar(echo):
# The function `getch` will return a bytes object corresponding to
# the pressed character. Since Windows 10 build 1803, it will also
# return \x00 when called a second time after pressing a regular key.
# `getwch` does not share this probably-bugged behavior. Moreover, it
# returns a Unicode object by default, which is what we want.
# Either of these functions will return \x00 or \xe0 to indicate
# a special key, and you need to call the same function again to get
# the "rest" of the code. The fun part is that \u00e0 is
# "latin small letter a with grave", so if you type that on a French
# keyboard, you _also_ get a \xe0.
# E.g., consider the Up arrow. This returns \xe0 and then \x48. The
# resulting Unicode string reads as "a with grave" + "capital H".
# This is indistinguishable from when the user actually types
# "a with grave" and then "capital H".
# When \xe0 is returned, we assume it's part of a special-key sequence
# and call `getwch` again, but that means that when the user types
# the \u00e0 character, `getchar` doesn't return until a second
# character is typed.
# The alternative is returning immediately, but that would mess up
# cross-platform handling of arrow keys and others that start with
# \xe0. Another option is using `getch`, but then we can't reliably
# read non-ASCII characters, because return values of `getch` are
# limited to the current 8-bit codepage.
# Anyway, Click doesn't claim to do this Right(tm), and using `getwch`
# is doing the right thing in more situations than with `getch`.
if echo:
func = msvcrt.getwche
func = msvcrt.getwch
rv = func()
if rv in (u"\x00", u"\xe0"):
# \x00 and \xe0 are control characters that indicate special key,
# see above.
rv += func()
return rv
import tty
import termios
def raw_terminal():
if not isatty(sys.stdin):
f = open("/dev/tty")
fd = f.fileno()
fd = sys.stdin.fileno()
f = None
old_settings = termios.tcgetattr(fd)
yield fd
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
if f is not None:
except termios.error:
def getchar(echo):
with raw_terminal() as fd:
ch =, 32)
ch = ch.decode(get_best_encoding(sys.stdin), "replace")
if echo and isatty(sys.stdout):
return ch
import textwrap
from contextlib import contextmanager
class TextWrapper(textwrap.TextWrapper):
def _handle_long_word(self, reversed_chunks, cur_line, cur_len, width):
space_left = max(width - cur_len, 1)
if self.break_long_words:
last = reversed_chunks[-1]
cut = last[:space_left]
res = last[space_left:]
reversed_chunks[-1] = res
elif not cur_line:
def extra_indent(self, indent):
old_initial_indent = self.initial_indent
old_subsequent_indent = self.subsequent_indent
self.initial_indent += indent
self.subsequent_indent += indent
self.initial_indent = old_initial_indent
self.subsequent_indent = old_subsequent_indent
def indent_only(self, text):
rv = []
for idx, line in enumerate(text.splitlines()):
indent = self.initial_indent
if idx > 0:
indent = self.subsequent_indent
rv.append(indent + line)
return "\n".join(rv)
import codecs
import os
import sys
from ._compat import PY2
def _find_unicode_literals_frame():
import __future__
if not hasattr(sys, "_getframe"): # not all Python implementations have it
return 0
frm = sys._getframe(1)
idx = 1
while frm is not None:
if frm.f_globals.get("__name__", "").startswith("click."):
frm = frm.f_back
idx += 1
elif frm.f_code.co_flags & __future__.unicode_literals.compiler_flag:
return idx
return 0
def _check_for_unicode_literals():
if not __debug__:
from . import disable_unicode_literals_warning
if not PY2 or disable_unicode_literals_warning:
bad_frame = _find_unicode_literals_frame()
if bad_frame <= 0:
from warnings import warn
"Click detected the use of the unicode_literals __future__"
" import. This is heavily discouraged because it can"
" introduce subtle bugs in your code. You should instead"
' use explicit u"" literals for your unicode strings. For'
" more information see"
def _verify_python3_env():
"""Ensures that the environment is good for unicode on Python 3."""
if PY2:
import locale
fs_enc = codecs.lookup(locale.getpreferredencoding()).name
except Exception:
fs_enc = "ascii"
if fs_enc != "ascii":
extra = ""
if == "posix":
import subprocess
rv = subprocess.Popen(
["locale", "-a"], stdout=subprocess.PIPE, stderr=subprocess.PIPE
except OSError:
rv = b""
good_locales = set()
has_c_utf8 = False
# Make sure we're operating on text here.
if isinstance(rv, bytes):
rv = rv.decode("ascii", "replace")
for line in rv.splitlines():
locale = line.strip()
if locale.lower().endswith((".utf-8", ".utf8")):
if locale.lower() in ("c.utf8", "c.utf-8"):
has_c_utf8 = True
extra += "\n\n"
if not good_locales:
extra += (
"Additional information: on this system no suitable"
" UTF-8 locales were discovered. This most likely"
" requires resolving by reconfiguring the locale"
" system."
elif has_c_utf8:
extra += (
"This system supports the C.UTF-8 locale which is"
" recommended. You might be able to resolve your issue"
" by exporting the following environment variables:\n\n"
" export LC_ALL=C.UTF-8\n"
" export LANG=C.UTF-8"
extra += (
"This system lists a couple of UTF-8 supporting locales"
" that you can pick from. The following suitable"
" locales were discovered: {}".format(", ".join(sorted(good_locales)))
bad_locale = None
for locale in os.environ.get("LC_ALL"), os.environ.get("LANG"):
if locale and locale.lower().endswith((".utf-8", ".utf8")):
bad_locale = locale
if locale is not None:
if bad_locale is not None:
extra += (
"\n\nClick discovered that you exported a UTF-8 locale"
" but the locale system could not pick up from it"
" because it does not exist. The exported locale is"
" '{}' but it is not supported".format(bad_locale)
raise RuntimeError(
"Click will abort further execution because Python 3 was"
" configured to use ASCII as encoding for the environment."
" Consult for"
" mitigation steps.{}".format(extra)
# This module is based on the excellent work by Adam Bartoš who
# provided a lot of what went into the implementation here in
# the discussion to issue1602 in the Python bug tracker.
# There are some general differences in regards to how this works
# compared to the original patches as we do not need to patch
# the entire interpreter but just work in our little world of
# echo and prmopt.
import ctypes
import io
import os
import sys
import time
import zlib
from ctypes import byref
from ctypes import c_char
from ctypes import c_char_p
from ctypes import c_int
from ctypes import c_ssize_t
from ctypes import c_ulong
from ctypes import c_void_p
from ctypes import POINTER
from ctypes import py_object
from ctypes import windll
from ctypes import WinError
from ctypes import WINFUNCTYPE
from ctypes.wintypes import DWORD
from ctypes.wintypes import HANDLE
from ctypes.wintypes import LPCWSTR
from ctypes.wintypes import LPWSTR
import msvcrt
from ._compat import _NonClosingTextIOWrapper
from ._compat import PY2
from ._compat import text_type
from ctypes import pythonapi
PyObject_GetBuffer = pythonapi.PyObject_GetBuffer
PyBuffer_Release = pythonapi.PyBuffer_Release
except ImportError:
pythonapi = None
c_ssize_p = POINTER(c_ssize_t)
kernel32 = windll.kernel32
GetStdHandle = kernel32.GetStdHandle
ReadConsoleW = kernel32.ReadConsoleW
WriteConsoleW = kernel32.WriteConsoleW
GetConsoleMode = kernel32.GetConsoleMode
GetLastError = kernel32.GetLastError
GetCommandLineW = WINFUNCTYPE(LPWSTR)(("GetCommandLineW", windll.kernel32))
("CommandLineToArgvW", windll.shell32)
LocalFree = WINFUNCTYPE(ctypes.c_void_p, ctypes.c_void_p)(
("LocalFree", windll.kernel32)
STDIN_HANDLE = GetStdHandle(-10)
STDOUT_HANDLE = GetStdHandle(-11)
STDERR_HANDLE = GetStdHandle(-12)
EOF = b"\x1a"
class Py_buffer(ctypes.Structure):
_fields_ = [
("buf", c_void_p),
("obj", py_object),
("len", c_ssize_t),
("itemsize", c_ssize_t),
("readonly", c_int),
("ndim", c_int),
("format", c_char_p),
("shape", c_ssize_p),
("strides", c_ssize_p),
("suboffsets", c_ssize_p),
("internal", c_void_p),
if PY2:
_fields_.insert(-1, ("smalltable", c_ssize_t * 2))
# On PyPy we cannot get buffers so our ability to operate here is
# serverly limited.
if pythonapi is None:
get_buffer = None
def get_buffer(obj, writable=False):
buf = Py_buffer()
flags = PyBUF_WRITABLE if writable else PyBUF_SIMPLE
PyObject_GetBuffer(py_object(obj), byref(buf), flags)
buffer_type = c_char * buf.len
return buffer_type.from_address(buf.buf)
class _WindowsConsoleRawIOBase(io.RawIOBase):
def __init__(self, handle):
self.handle = handle
def isatty(self):
return True
class _WindowsConsoleReader(_WindowsConsoleRawIOBase):
def readable(self):
return True
def readinto(self, b):
bytes_to_be_read = len(b)
if not bytes_to_be_read:
return 0
elif bytes_to_be_read % 2:
raise ValueError(
"cannot read odd number of bytes from UTF-16-LE encoded console"
buffer = get_buffer(b, writable=True)
code_units_to_be_read = bytes_to_be_read // 2
code_units_read = c_ulong()
rv = ReadConsoleW(
# wait for KeyboardInterrupt
if not rv:
raise OSError("Windows error: {}".format(GetLastError()))
if buffer[0] == EOF:
return 0
return 2 * code_units_read.value
class _WindowsConsoleWriter(_WindowsConsoleRawIOBase):
def writable(self):
return True
def _get_error_message(errno):
if errno == ERROR_SUCCESS:
return "Windows error {}".format(errno)
def write(self, b):
bytes_to_be_written = len(b)
buf = get_buffer(b)
code_units_to_be_written = min(bytes_to_be_written, MAX_BYTES_WRITTEN) // 2
code_units_written = c_ulong()
bytes_written = 2 * code_units_written.value
if bytes_written == 0 and bytes_to_be_written > 0:
raise OSError(self._get_error_message(GetLastError()))
return bytes_written
class ConsoleStream(object):
def __init__(self, text_stream, byte_stream):
self._text_stream = text_stream
self.buffer = byte_stream
def name(self):
def write(self, x):
if isinstance(x, text_type):
return self._text_stream.write(x)
except Exception:
return self.buffer.write(x)
def writelines(self, lines):
for line in lines:
def __getattr__(self, name):
return getattr(self._text_stream, name)
def isatty(self):
return self.buffer.isatty()
def __repr__(self):
return "<ConsoleStream name={!r} encoding={!r}>".format(, self.encoding
class WindowsChunkedWriter(object):
Wraps a stream (such as stdout), acting as a transparent proxy for all
attribute access apart from method 'write()' which we wrap to write in
limited chunks due to a Windows limitation on binary console streams.
def __init__(self, wrapped):
# double-underscore everything to prevent clashes with names of
# attributes on the wrapped stream object.
self.__wrapped = wrapped
def __getattr__(self, name):
return getattr(self.__wrapped, name)
def write(self, text):
total_to_write = len(text)
written = 0
while written < total_to_write:
to_write = min(total_to_write - written, MAX_BYTES_WRITTEN)
self.__wrapped.write(text[written : written + to_write])
written += to_write
_wrapped_std_streams = set()
def _wrap_std_stream(name):
# Python 2 & Windows 7 and below
if (
and sys.getwindowsversion()[:2] <= (6, 1)
and name not in _wrapped_std_streams
setattr(sys, name, WindowsChunkedWriter(getattr(sys, name)))
def _get_text_stdin(buffer_stream):
text_stream = _NonClosingTextIOWrapper(
return ConsoleStream(text_stream, buffer_stream)
def _get_text_stdout(buffer_stream):
text_stream = _NonClosingTextIOWrapper(
return ConsoleStream(text_stream, buffer_stream)
def _get_text_stderr(buffer_stream):
text_stream = _NonClosingTextIOWrapper(
return ConsoleStream(text_stream, buffer_stream)
if PY2:
def _hash_py_argv():
return zlib.crc32("\x00".join(sys.argv[1:]))
_initial_argv_hash = _hash_py_argv()
def _get_windows_argv():
argc = c_int(0)
argv_unicode = CommandLineToArgvW(GetCommandLineW(), byref(argc))
if not argv_unicode:
raise WinError()
argv = [argv_unicode[i] for i in range(0, argc.value)]
del argv_unicode
if not hasattr(sys, "frozen"):
argv = argv[1:]
while len(argv) > 0:
arg = argv[0]
if not arg.startswith("-") or arg == "-":
argv = argv[1:]
if arg.startswith(("-c", "-m")):
return argv[1:]
_stream_factories = {
0: _get_text_stdin,
1: _get_text_stdout,
2: _get_text_stderr,
def _is_console(f):
if not hasattr(f, "fileno"):
return False
fileno = f.fileno()
except OSError:
return False
handle = msvcrt.get_osfhandle(fileno)
return bool(GetConsoleMode(handle, byref(DWORD())))
def _get_windows_console_stream(f, encoding, errors):
if (
get_buffer is not None
and encoding in ("utf-16-le", None)
and errors in ("strict", None)
and _is_console(f)
func = _stream_factories.get(f.fileno())
if func is not None:
if not PY2:
f = getattr(f, "buffer", None)
if f is None:
return None
# If we are on Python 2 we need to set the stream that we
# deal with to binary mode as otherwise the exercise if a
# bit moot. The same problems apply as for
# get_binary_stdin and friends from _compat.
msvcrt.setmode(f.fileno(), os.O_BINARY)
return func(f)
import errno
import inspect
import os
import sys
from contextlib import contextmanager
from functools import update_wrapper
from itertools import repeat
from ._compat import isidentifier
from ._compat import iteritems
from ._compat import PY2
from ._compat import string_types
from ._unicodefun import _check_for_unicode_literals
from ._unicodefun import _verify_python3_env
from .exceptions import Abort
from .exceptions import BadParameter
from .exceptions import ClickException
from .exceptions import Exit
from .exceptions import MissingParameter
from .exceptions import UsageError
from .formatting import HelpFormatter
from .formatting import join_options
from .globals import pop_context
from .globals import push_context
from .parser import OptionParser
from .parser import split_opt
from .termui import confirm
from .termui import prompt
from .termui import style
from .types import BOOL
from .types import convert_type
from .types import IntRange
from .utils import echo
from .utils import get_os_args
from .utils import make_default_short_help
from .utils import make_str
from .utils import PacifyFlushWrapper
_missing = object()
DEPRECATED_INVOKE_NOTICE = "DeprecationWarning: The command %(name)s is deprecated."
def _maybe_show_deprecated_notice(cmd):
if cmd.deprecated:
echo(style(DEPRECATED_INVOKE_NOTICE % {"name":}, fg="red"), err=True)
def fast_exit(code):
"""Exit without garbage collection, this speeds up exit by about 10ms for
things like bash completion.
def _bashcomplete(cmd, prog_name, complete_var=None):
"""Internal handler for the bash completion support."""
if complete_var is None:
complete_var = "_{}_COMPLETE".format(prog_name.replace("-", "_").upper())
complete_instr = os.environ.get(complete_var)
if not complete_instr:
from ._bashcomplete import bashcomplete
if bashcomplete(cmd, prog_name, complete_var, complete_instr):
def _check_multicommand(base_command, cmd_name, cmd, register=False):
if not base_command.chain or not isinstance(cmd, MultiCommand):
if register:
hint = (
"It is not possible to add multi commands as children to"
" another multi command that is in chain mode."
hint = (
"Found a multi command as subcommand to a multi command"
" that is in chain mode. This is not supported."
raise RuntimeError(
"{}. Command '{}' is set to chain and '{}' was added as"
" subcommand but it in itself is a multi command. ('{}' is a {}"
" within a chained {} named '{}').".format(
def batch(iterable, batch_size):
return list(zip(*repeat(iter(iterable), batch_size)))
def invoke_param_callback(callback, ctx, param, value):
code = getattr(callback, "__code__", None)
args = getattr(code, "co_argcount", 3)
if args < 3:
from warnings import warn
"Parameter callbacks take 3 args, (ctx, param, value). The"
" 2-arg style is deprecated and will be removed in 8.0.".format(callback),
return callback(ctx, value)
return callback(ctx, param, value)
def augment_usage_errors(ctx, param=None):
"""Context manager that attaches extra information to exceptions."""
except BadParameter as e:
if e.ctx is None:
e.ctx = ctx
if param is not None and e.param is None:
e.param = param
except UsageError as e:
if e.ctx is None:
e.ctx = ctx
def iter_params_for_processing(invocation_order, declaration_order):
"""Given a sequence of parameters in the order as should be considered
for processing and an iterable of parameters that exist, this returns
a list in the correct order as they should be processed.
def sort_key(item):
idx = invocation_order.index(item)
except ValueError:
idx = float("inf")
return (not item.is_eager, idx)
return sorted(declaration_order, key=sort_key)
class Context(object):
"""The context is a special internal object that holds state relevant
for the script execution at every single level. It's normally invisible
to commands unless they opt-in to getting access to it.
The context is useful as it can pass internal objects around and can
control special execution features such as reading data from
environment variables.
A context can be used as context manager in which case it will call
:meth:`close` on teardown.
.. versionadded:: 2.0
Added the `resilient_parsing`, `help_option_names`,
`token_normalize_func` parameters.
.. versionadded:: 3.0
Added the `allow_extra_args` and `allow_interspersed_args`
.. versionadded:: 4.0
Added the `color`, `ignore_unknown_options`, and
`max_content_width` parameters.
.. versionadded:: 7.1
Added the `show_default` parameter.
:param command: the command class for this context.
:param parent: the parent context.
:param info_name: the info name for this invocation. Generally this
is the most descriptive name for the script or
command. For the toplevel script it is usually
the name of the script, for commands below it it's
the name of the script.
:param obj: an arbitrary object of user data.
:param auto_envvar_prefix: the prefix to use for automatic environment
variables. If this is `None` then reading
from environment variables is disabled. This
does not affect manually set environment
variables which are always read.
:param default_map: a dictionary (like object) with default values
for parameters.
:param terminal_width: the width of the terminal. The default is
inherit from parent context. If no context
defines the terminal width then auto
detection will be applied.
:param max_content_width: the maximum width for content rendered by
Click (this currently only affects help
pages). This defaults to 80 characters if
not overridden. In other words: even if the
terminal is larger than that, Click will not
format things wider than 80 characters by
default. In addition to that, formatters might
add some safety mapping on the right.
:param resilient_parsing: if this flag is enabled then Click will
parse without any interactivity or callback
invocation. Default values will also be
ignored. This is useful for implementing
things such as completion support.
:param allow_extra_args: if this is set to `True` then extra arguments
at the end will not raise an error and will be
kept on the context. The default is to inherit
from the command.
:param allow_interspersed_args: if this is set to `False` then options
and arguments cannot be mixed. The
default is to inherit from the command.
:param ignore_unknown_options: instructs click to ignore options it does
not know and keeps them for later
:param help_option_names: optionally a list of strings that define how
the default help parameter is named. The
default is ``['--help']``.
:param token_normalize_func: an optional function that is used to
normalize tokens (options, choices,
etc.). This for instance can be used to
implement case insensitive behavior.
:param color: controls if the terminal supports ANSI colors or not. The
default is autodetection. This is only needed if ANSI
codes are used in texts that Click prints which is by
default not the case. This for instance would affect
help output.
:param show_default: if True, shows defaults for all options.
Even if an option is later created with show_default=False,
this command-level setting overrides it.
def __init__(
#: the parent context or `None` if none exists.
self.parent = parent
#: the :class:`Command` for this context.
self.command = command
#: the descriptive information name
self.info_name = info_name
#: the parsed parameters except if the value is hidden in which
#: case it's not remembered.
self.params = {}
#: the leftover arguments.
self.args = []
#: protected arguments. These are arguments that are prepended
#: to `args` when certain parsing scenarios are encountered but
#: must be never propagated to another arguments. This is used
#: to implement nested parsing.
self.protected_args = []
if obj is None and parent is not None:
obj = parent.obj
#: the user object stored.
self.obj = obj
self._meta = getattr(parent, "meta", {})
#: A dictionary (-like object) with defaults for parameters.
if (
default_map is None
and parent is not None
and parent.default_map is not None
default_map = parent.default_map.get(info_name)
self.default_map = default_map
#: This flag indicates if a subcommand is going to be executed. A
#: group callback can use this information to figure out if it's
#: being executed directly or because the execution flow passes
#: onwards to a subcommand. By default it's None, but it can be
#: the name of the subcommand to execute.
#: If chaining is enabled this will be set to ``'*'`` in case
#: any commands are executed. It is however not possible to
#: figure out which ones. If you require this knowledge you
#: should use a :func:`resultcallback`.
self.invoked_subcommand = None
if terminal_width is None and parent is not None:
terminal_width = parent.terminal_width
#: The width of the terminal (None is autodetection).
self.terminal_width = terminal_width
if max_content_width is None and parent is not None:
max_content_width = parent.max_content_width
#: The maximum width of formatted content (None implies a sensible
#: default which is 80 for most things).
self.max_content_width = max_content_width
if allow_extra_args is None:
allow_extra_args = command.allow_extra_args
#: Indicates if the context allows extra args or if it should
#: fail on parsing.
#: .. versionadded:: 3.0
self.allow_extra_args = allow_extra_args
if allow_interspersed_args is None:
allow_interspersed_args = command.allow_interspersed_args
#: Indicates if the context allows mixing of arguments and
#: options or not.
#: .. versionadded:: 3.0
self.allow_interspersed_args = allow_interspersed_args
if ignore_unknown_options is None:
ignore_unknown_options = command.ignore_unknown_options
#: Instructs click to ignore options that a command does not
#: understand and will store it on the context for later
#: processing. This is primarily useful for situations where you
#: want to call into external programs. Generally this pattern is
#: strongly discouraged because it's not possibly to losslessly
#: forward all arguments.
#: .. versionadded:: 4.0
self.ignore_unknown_options = ignore_unknown_options
if help_option_names is None:
if parent is not None:
help_option_names = parent.help_option_names
help_option_names = ["--help"]
#: The names for the help options.
self.help_option_names = help_option_names
if token_normalize_func is None and parent is not None:
token_normalize_func = parent.token_normalize_func
#: An optional normalization function for tokens. This is
#: options, choices, commands etc.
self.token_normalize_func = token_normalize_func
#: Indicates if resilient parsing is enabled. In that case Click
#: will do its best to not cause any failures and default values
#: will be ignored. Useful for completion.
self.resilient_parsing = resilient_parsing
# If there is no envvar prefix yet, but the parent has one and
# the command on this level has a name, we can expand the envvar
# prefix automatically.
if auto_envvar_prefix is None:
if (
parent is not None
and parent.auto_envvar_prefix is not None
and self.info_name is not None
auto_envvar_prefix = "{}_{}".format(
parent.auto_envvar_prefix, self.info_name.upper()
auto_envvar_prefix = auto_envvar_prefix.upper()
if auto_envvar_prefix is not None:
auto_envvar_prefix = auto_envvar_prefix.replace("-", "_")
self.auto_envvar_prefix = auto_envvar_prefix
if color is None and parent is not None:
color = parent.color
#: Controls if styling output is wanted or not.
self.color = color
self.show_default = show_default
self._close_callbacks = []
self._depth = 0
def __enter__(self):
self._depth += 1
return self
def __exit__(self, exc_type, exc_value, tb):
self._depth -= 1
if self._depth == 0:
def scope(self, cleanup=True):
"""This helper method can be used with the context object to promote
it to the current thread local (see :func:`get_current_context`).
The default behavior of this is to invoke the cleanup functions which
can be disabled by setting `cleanup` to `False`. The cleanup
functions are typically used for things such as closing file handles.
If the cleanup is intended the context object can also be directly
used as a context manager.
Example usage::
with ctx.scope():
assert get_current_context() is ctx
This is equivalent::
with ctx:
assert get_current_context() is ctx
.. versionadded:: 5.0
:param cleanup: controls if the cleanup functions should be run or
not. The default is to run these functions. In
some situations the context only wants to be
temporarily pushed in which case this can be disabled.
Nested pushes automatically defer the cleanup.
if not cleanup:
self._depth += 1
with self as rv:
yield rv
if not cleanup:
self._depth -= 1
def meta(self):
"""This is a dictionary which is shared with all the contexts
that are nested. It exists so that click utilities can store some
state here if they need to. It is however the responsibility of
that code to manage this dictionary well.
The keys are supposed to be unique dotted strings. For instance
module paths are a good choice for it. What is stored in there is
irrelevant for the operation of click. However what is important is
that code that places data here adheres to the general semantics of
the system.
Example usage::
LANG_KEY = f'{__name__}.lang'
def set_language(value):
ctx = get_current_context()
ctx.meta[LANG_KEY] = value
def get_language():
return get_current_context().meta.get(LANG_KEY, 'en_US')
.. versionadded:: 5.0
return self._meta
def make_formatter(self):
"""Creates the formatter for the help and usage output."""
return HelpFormatter(
width=self.terminal_width, max_width=self.max_content_width
def call_on_close(self, f):
"""This decorator remembers a function as callback that should be
executed when the context tears down. This is most useful to bind
resource handling to the script execution. For instance, file objects
opened by the :class:`File` type will register their close callbacks
:param f: the function to execute on teardown.
return f
def close(self):
"""Invokes all close callbacks."""
for cb in self._close_callbacks:
self._close_callbacks = []
def command_path(self):
"""The computed command path. This is used for the ``usage``
information on the help page. It's automatically created by
combining the info names of the chain of contexts to the root.
rv = ""
if self.info_name is not None:
rv = self.info_name
if self.parent is not None:
rv = "{} {}".format(self.parent.command_path, rv)
return rv.lstrip()
def find_root(self):
"""Finds the outermost context."""
node = self
while node.parent is not None:
node = node.parent
return node
def find_object(self, object_type):
"""Finds the closest object of a given type."""
node = self
while node is not None:
if isinstance(node.obj, object_type):
return node.obj
node = node.parent
def ensure_object(self, object_type):
"""Like :meth:`find_object` but sets the innermost object to a
new instance of `object_type` if it does not exist.
rv = self.find_object(object_type)
if rv is None:
self.obj = rv = object_type()
return rv
def lookup_default(self, name):
"""Looks up the default for a parameter name. This by default
looks into the :attr:`default_map` if available.
if self.default_map is not None:
rv = self.default_map.get(name)
if callable(rv):
rv = rv()
return rv
def fail(self, message):
"""Aborts the execution of the program with a specific error
:param message: the error message to fail with.
raise UsageError(message, self)
def abort(self):
"""Aborts the script."""
raise Abort()
def exit(self, code=0):
"""Exits the application with a given exit code."""
raise Exit(code)
def get_usage(self):
"""Helper method to get formatted usage string for the current
context and command.
return self.command.get_usage(self)
def get_help(self):
"""Helper method to get formatted help page for the current
context and command.
return self.command.get_help(self)
def invoke(*args, **kwargs): # noqa: B902
"""Invokes a command callback in exactly the way it expects. There
are two ways to invoke this method:
1. the first argument can be a callback and all other arguments and
keyword arguments are forwarded directly to the function.
2. the first argument is a click command object. In that case all
arguments are forwarded as well but proper click parameters
(options and click arguments) must be keyword arguments and Click
will fill in defaults.
Note that before Click 3.2 keyword arguments were not properly filled
in against the intention of this code and no context was created. For
more information about this change and why it was done in a bugfix
release see :ref:`upgrade-to-3.2`.
self, callback = args[:2]
ctx = self
# It's also possible to invoke another command which might or
# might not have a callback. In that case we also fill
# in defaults and make a new context for this command.
if isinstance(callback, Command):
other_cmd = callback
callback = other_cmd.callback
ctx = Context(other_cmd,, parent=self)
if callback is None:
raise TypeError(
"The given command does not have a callback that can be invoked."
for param in other_cmd.params:
if not in kwargs and param.expose_value:
kwargs[] = param.get_default(ctx)
args = args[2:]
with augment_usage_errors(self):
with ctx:
return callback(*args, **kwargs)
def forward(*args, **kwargs): # noqa: B902
"""Similar to :meth:`invoke` but fills in default keyword
arguments from the current context if the other command expects
it. This cannot invoke callbacks directly, only other commands.
self, cmd = args[:2]
# It's also possible to invoke another command which might or
# might not have a callback.
if not isinstance(cmd, Command):
raise TypeError("Callback is not a command.")
for param in self.params:
if param not in kwargs:
kwargs[param] = self.params[param]
return self.invoke(cmd, **kwargs)
class BaseCommand(object):
"""The base command implements the minimal API contract of commands.
Most code will never use this as it does not implement a lot of useful
functionality but it can act as the direct subclass of alternative
parsing methods that do not depend on the Click parser.
For instance, this can be used to bridge Click and other systems like
argparse or docopt.
Because base commands do not implement a lot of the API that other
parts of Click take for granted, they are not supported for all
operations. For instance, they cannot be used with the decorators
usually and they have no built-in callback system.
.. versionchanged:: 2.0
Added the `context_settings` parameter.
:param name: the name of the command to use unless a group overrides it.
:param context_settings: an optional dictionary with defaults that are
passed to the context object.
#: the default for the :attr:`Context.allow_extra_args` flag.
allow_extra_args = False
#: the default for the :attr:`Context.allow_interspersed_args` flag.
allow_interspersed_args = True
#: the default for the :attr:`Context.ignore_unknown_options` flag.
ignore_unknown_options = False
def __init__(self, name, context_settings=None):
#: the name the command thinks it has. Upon registering a command
#: on a :class:`Group` the group will default the command name
#: with this information. You should instead use the
#: :class:`Context`\'s :attr:`~Context.info_name` attribute. = name
if context_settings is None:
context_settings = {}
#: an optional dictionary with defaults passed to the context.
self.context_settings = context_settings
def __repr__(self):
return "<{} {}>".format(self.__class__.__name__,
def get_usage(self, ctx):
raise NotImplementedError("Base commands cannot get usage")
def get_help(self, ctx):
raise NotImplementedError("Base commands cannot get help")
def make_context(self, info_name, args, parent=None, **extra):
"""This function when given an info name and arguments will kick
off the parsing and create a new :class:`Context`. It does not
invoke the actual command callback though.
:param info_name: the info name for this invokation. Generally this
is the most descriptive name for the script or
command. For the toplevel script it's usually
the name of the script, for commands below it it's
the name of the script.
:param args: the arguments to parse as list of strings.
:param parent: the parent context if available.
:param extra: extra keyword arguments forwarded to the context
for key, value in iteritems(self.context_settings):
if key not in extra:
extra[key] = value
ctx = Context(self, info_name=info_name, parent=parent, **extra)
with ctx.scope(cleanup=False):
self.parse_args(ctx, args)
return ctx
def parse_args(self, ctx, args):
"""Given a context and a list of arguments this creates the parser
and parses the arguments, then modifies the context as necessary.
This is automatically invoked by :meth:`make_context`.
raise NotImplementedError("Base commands do not know how to parse arguments.")
def invoke(self, ctx):
"""Given a context, this invokes the command. The default
implementation is raising a not implemented error.
raise NotImplementedError("Base commands are not invokable by default")
def main(
"""This is the way to invoke a script with all the bells and
whistles as a command line application. This will always terminate
the application after a call. If this is not wanted, ``SystemExit``
needs to be caught.
This method is also available by directly calling the instance of
a :class:`Command`.
.. versionadded:: 3.0
Added the `standalone_mode` flag to control the standalone mode.
:param args: the arguments that should be used for parsing. If not
provided, ``sys.argv[1:]`` is used.
:param prog_name: the program name that should be used. By default
the program name is constructed by taking the file
name from ``sys.argv[0]``.
:param complete_var: the environment variable that controls the
bash completion support. The default is
``"_<prog_name>_COMPLETE"`` with prog_name in
:param standalone_mode: the default behavior is to invoke the script
in standalone mode. Click will then
handle exceptions and convert them into
error messages and the function will never
return but shut down the interpreter. If
this is set to `False` they will be
propagated to the caller and the return
value of this function is the return value
of :meth:`invoke`.
:param extra: extra keyword arguments are forwarded to the context
constructor. See :class:`Context` for more information.
# If we are in Python 3, we will verify that the environment is
# sane at this point or reject further execution to avoid a
# broken script.
if not PY2:
if args is None:
args = get_os_args()
args = list(args)
if prog_name is None:
prog_name = make_str(
os.path.basename(sys.argv[0] if sys.argv else __file__)
# Hook for the Bash completion. This only activates if the Bash
# completion is actually enabled, otherwise this is quite a fast
# noop.
_bashcomplete(self, prog_name, complete_var)
with self.make_context(prog_name, args, **extra) as ctx:
rv = self.invoke(ctx)
if not standalone_mode:
return rv
# it's not safe to `ctx.exit(rv)` here!
# note that `rv` may actually contain data like "1" which
# has obvious effects
# more subtle case: `rv=[None, None]` can come out of
# chained commands which all returned `None` -- so it's not
# even always obvious that `rv` indicates success/failure
# by its truthiness/falsiness
except (EOFError, KeyboardInterrupt):
raise Abort()
except ClickException as e:
if not standalone_mode:
except IOError as e:
if e.errno == errno.EPIPE:
sys.stdout = PacifyFlushWrapper(sys.stdout)
sys.stderr = PacifyFlushWrapper(sys.stderr)
except Exit as e:
if standalone_mode:
# in non-standalone mode, return the exit code
# note that this is only reached if `self.invoke` above raises
# an Exit explicitly -- thus bypassing the check there which
# would return its result
# the results of non-standalone execution may therefore be
# somewhat ambiguous: if there are codepaths which lead to
# `ctx.exit(1)` and to `return 1`, the caller won't be able to
# tell the difference between the two
return e.exit_code
except Abort:
if not standalone_mode:
echo("Aborted!", file=sys.stderr)
def __call__(self, *args, **kwargs):
"""Alias for :meth:`main`."""
return self.main(*args, **kwargs)
class Command(BaseCommand):
"""Commands are the basic building block of command line interfaces in
Click. A basic command handles command line parsing and might dispatch
more parsing to commands nested below it.
.. versionchanged:: 2.0
Added the `context_settings` parameter.
.. versionchanged:: 7.1
Added the `no_args_is_help` parameter.
:param name: the name of the command to use unless a group overrides it.
:param context_settings: an optional dictionary with defaults that are
passed to the context object.
:param callback: the callback to invoke. This is optional.
:param params: the parameters to register with this command. This can
be either :class:`Option` or :class:`Argument` objects.
:param help: the help string to use for this command.
:param epilog: like the help string but it's printed at the end of the
help page after everything else.
:param short_help: the short help to use for this command. This is
shown on the command listing of the parent command.
:param add_help_option: by default each command registers a ``--help``
option. This can be disabled by this parameter.
:param no_args_is_help: this controls what happens if no arguments are
provided. This option is disabled by default.
If enabled this will add ``--help`` as argument
if no arguments are passed
:param hidden: hide this command from help outputs.
:param deprecated: issues a message indicating that
the command is deprecated.
def __init__(
BaseCommand.__init__(self, name, context_settings)
#: the callback to execute when the command fires. This might be
#: `None` in which case nothing happens.
self.callback = callback
#: the list of parameters for this command in the order they
#: should show up in the help page and execute. Eager parameters
#: will automatically be handled before non eager ones.
self.params = params or []
# if a form feed (page break) is found in the help text, truncate help
# text to the content preceding the first form feed
if help and "\f" in help:
help = help.split("\f", 1)[0] = help
self.epilog = epilog
self.options_metavar = options_metavar
self.short_help = short_help
self.add_help_option = add_help_option
self.no_args_is_help = no_args_is_help
self.hidden = hidden
self.deprecated = deprecated
def get_usage(self, ctx):
"""Formats the usage line into a string and returns it.
Calls :meth:`format_usage` internally.
formatter = ctx.make_formatter()
self.format_usage(ctx, formatter)
return formatter.getvalue().rstrip("\n")
def get_params(self, ctx):
rv = self.params
help_option = self.get_help_option(ctx)
if help_option is not None:
rv = rv + [help_option]
return rv
def format_usage(self, ctx, formatter):
"""Writes the usage line into the formatter.
This is a low-level method called by :meth:`get_usage`.
pieces = self.collect_usage_pieces(ctx)
formatter.write_usage(ctx.command_path, " ".join(pieces))
def collect_usage_pieces(self, ctx):
"""Returns all the pieces that go into the usage line and returns
it as a list of strings.
rv = [self.options_metavar]
for param in self.get_params(ctx):
return rv
def get_help_option_names(self, ctx):
"""Returns the names for the help option."""
all_names = set(ctx.help_option_names)
for param in self.params:
return all_names
def get_help_option(self, ctx):
"""Returns the help option object."""
help_options = self.get_help_option_names(ctx)
if not help_options or not self.add_help_option:
def show_help(ctx, param, value):
if value and not ctx.resilient_parsing:
echo(ctx.get_help(), color=ctx.color)
return Option(
help="Show this message and exit.",
def make_parser(self, ctx):
"""Creates the underlying option parser for this command."""
parser = OptionParser(ctx)
for param in self.get_params(ctx):
param.add_to_parser(parser, ctx)
return parser
def get_help(self, ctx):
"""Formats the help into a string and returns it.
Calls :meth:`format_help` internally.
formatter = ctx.make_formatter()
self.format_help(ctx, formatter)
return formatter.getvalue().rstrip("\n")
def get_short_help_str(self, limit=45):
"""Gets short help for the command or makes it by shortening the
long help string.
return (
and make_default_short_help(, limit)
or ""
def format_help(self, ctx, formatter):
"""Writes the help into the formatter if it exists.
This is a low-level method called by :meth:`get_help`.
This calls the following methods:
- :meth:`format_usage`
- :meth:`format_help_text`
- :meth:`format_options`
- :meth:`format_epilog`
self.format_usage(ctx, formatter)
self.format_help_text(ctx, formatter)
self.format_options(ctx, formatter)
self.format_epilog(ctx, formatter)
def format_help_text(self, ctx, formatter):
"""Writes the help text to the formatter if it exists."""
with formatter.indentation():
help_text =
if self.deprecated:
elif self.deprecated:
with formatter.indentation():
def format_options(self, ctx, formatter):
"""Writes all the options into the formatter if they exist."""
opts = []
for param in self.get_params(ctx):
rv = param.get_help_record(ctx)
if rv is not None:
if opts:
with formatter.section("Options"):
def format_epilog(self, ctx, formatter):
"""Writes the epilog into the formatter if it exists."""
if self.epilog:
with formatter.indentation():
def parse_args(self, ctx, args):
if not args and self.no_args_is_help and not ctx.resilient_parsing:
echo(ctx.get_help(), color=ctx.color)
parser = self.make_parser(ctx)
opts, args, param_order = parser.parse_args(args=args)
for param in iter_params_for_processing(param_order, self.get_params(ctx)):
value, args = param.handle_parse_result(ctx, opts, args)
if args and not ctx.allow_extra_args and not ctx.resilient_parsing:
"Got unexpected extra argument{} ({})".format(
"s" if len(args) != 1 else "", " ".join(map(make_str, args))
ctx.args = args
return args
def invoke(self, ctx):
"""Given a context, this invokes the attached callback (if it exists)
in the right way.
if self.callback is not None:
return ctx.invoke(self.callback, **ctx.params)
class MultiCommand(Command):
"""A multi command is the basic implementation of a command that
dispatches to subcommands. The most common version is the
:param invoke_without_command: this controls how the multi command itself
is invoked. By default it's only invoked
if a subcommand is provided.
:param no_args_is_help: this controls what happens if no arguments are
provided. This option is enabled by default if
`invoke_without_command` is disabled or disabled
if it's enabled. If enabled this will add
``--help`` as argument if no arguments are
:param subcommand_metavar: the string that is used in the documentation
to indicate the subcommand place.
:param chain: if this is set to `True` chaining of multiple subcommands
is enabled. This restricts the form of commands in that
they cannot have optional arguments but it allows
multiple commands to be chained together.
:param result_callback: the result callback to attach to this multi
allow_extra_args = True
allow_interspersed_args = False
def __init__(
Command.__init__(self, name, **attrs)
if no_args_is_help is None:
no_args_is_help = not invoke_without_command
self.no_args_is_help = no_args_is_help
self.invoke_without_command = invoke_without_command
if subcommand_metavar is None:
if chain:
subcommand_metavar = SUBCOMMANDS_METAVAR
subcommand_metavar = SUBCOMMAND_METAVAR
self.subcommand_metavar = subcommand_metavar
self.chain = chain
#: The result callback that is stored. This can be set or
#: overridden with the :func:`resultcallback` decorator.
self.result_callback = result_callback
if self.chain:
for param in self.params:
if isinstance(param, Argument) and not param.required:
raise RuntimeError(
"Multi commands in chain mode cannot have"
" optional arguments."
def collect_usage_pieces(self, ctx):
rv = Command.collect_usage_pieces(self, ctx)
return rv
def format_options(self, ctx, formatter):
Command.format_options(self, ctx, formatter)
self.format_commands(ctx, formatter)
def resultcallback(self, replace=False):
"""Adds a result callback to the chain command. By default if a
result callback is already registered this will chain them but
this can be disabled with the `replace` parameter. The result
callback is invoked with the return value of the subcommand
(or the list of return values from all subcommands if chaining
is enabled) as well as the parameters as they would be passed
to the main callback.
@click.option('-i', '--input', default=23)
def cli(input):
return 42
def process_result(result, input):
return result + input
.. versionadded:: 3.0
:param replace: if set to `True` an already existing result
callback will be removed.
def decorator(f):
old_callback = self.result_callback
if old_callback is None or replace:
self.result_callback = f
return f
def function(__value, *args, **kwargs):
return f(old_callback(__value, *args, **kwargs), *args, **kwargs)
self.result_callback = rv = update_wrapper(function, f)
return rv
return decorator
def format_commands(self, ctx, formatter):
"""Extra format methods for multi methods that adds all the commands
after the options.
commands = []
for subcommand in self.list_commands(ctx):
cmd = self.get_command(ctx, subcommand)
# What is this, the tool lied about a command. Ignore it
if cmd is None:
if cmd.hidden:
commands.append((subcommand, cmd))
# allow for 3 times the default spacing
if len(commands):
limit = formatter.width - 6 - max(len(cmd[0]) for cmd in commands)
rows = []
for subcommand, cmd in commands:
help = cmd.get_short_help_str(limit)
rows.append((subcommand, help))
if rows:
with formatter.section("Commands"):
def parse_args(self, ctx, args):
if not args and self.no_args_is_help and not ctx.resilient_parsing:
echo(ctx.get_help(), color=ctx.color)
rest = Command.parse_args(self, ctx, args)
if self.chain:
ctx.protected_args = rest
ctx.args = []
elif rest:
ctx.protected_args, ctx.args = rest[:1], rest[1:]
return ctx.args
def invoke(self, ctx):
def _process_result(value):
if self.result_callback is not None:
value = ctx.invoke(self.result_callback, value, **ctx.params)
return value
if not ctx.protected_args:
# If we are invoked without command the chain flag controls
# how this happens. If we are not in chain mode, the return
# value here is the return value of the command.
# If however we are in chain mode, the return value is the
# return value of the result processor invoked with an empty
# list (which means that no subcommand actually was executed).
if self.invoke_without_command:
if not self.chain:
return Command.invoke(self, ctx)
with ctx:
Command.invoke(self, ctx)
return _process_result([])"Missing command.")
# Fetch args back out
args = ctx.protected_args + ctx.args
ctx.args = []
ctx.protected_args = []
# If we're not in chain mode, we only allow the invocation of a
# single command but we also inform the current context about the
# name of the command to invoke.
if not self.chain:
# Make sure the context is entered so we do not clean up
# resources until the result processor has worked.
with ctx:
cmd_name, cmd, args = self.resolve_command(ctx, args)
ctx.invoked_subcommand = cmd_name
Command.invoke(self, ctx)
sub_ctx = cmd.make_context(cmd_name, args, parent=ctx)
with sub_ctx:
return _process_result(sub_ctx.command.invoke(sub_ctx))
# In chain mode we create the contexts step by step, but after the
# base command has been invoked. Because at that point we do not
# know the subcommands yet, the invoked subcommand attribute is
# set to ``*`` to inform the command that subcommands are executed
# but nothing else.
with ctx:
ctx.invoked_subcommand = "*" if args else None
Command.invoke(self, ctx)
# Otherwise we make every single context and invoke them in a
# chain. In that case the return value to the result processor
# is the list of all invoked subcommand's results.
contexts = []
while args:
cmd_name, cmd, args = self.resolve_command(ctx, args)
sub_ctx = cmd.make_context(
args, sub_ctx.args = sub_ctx.args, []
rv = []
for sub_ctx in contexts:
with sub_ctx:
return _process_result(rv)
def resolve_command(self, ctx, args):
cmd_name = make_str(args[0])
original_cmd_name = cmd_name
# Get the command
cmd = self.get_command(ctx, cmd_name)
# If we can't find the command but there is a normalization
# function available, we try with that one.
if cmd is None and ctx.token_normalize_func is not None:
cmd_name = ctx.token_normalize_func(cmd_name)
cmd = self.get_command(ctx, cmd_name)
# If we don't find the command we want to show an error message
# to the user that it was not provided. However, there is
# something else we should do: if the first argument looks like
# an option we want to kick off parsing again for arguments to
# resolve things like --help which now should go to the main
# place.
if cmd is None and not ctx.resilient_parsing:
if split_opt(cmd_name)[0]:
self.parse_args(ctx, ctx.args)"No such command '{}'.".format(original_cmd_name))
return cmd_name, cmd, args[1:]
def get_command(self, ctx, cmd_name):
"""Given a context and a command name, this returns a
:class:`Command` object if it exists or returns `None`.
raise NotImplementedError()
def list_commands(self, ctx):
"""Returns a list of subcommand names in the order they should
return []
class Group(MultiCommand):
"""A group allows a command to have subcommands attached. This is the
most common way to implement nesting in Click.
:param commands: a dictionary of commands.
def __init__(self, name=None, commands=None, **attrs):
MultiCommand.__init__(self, name, **attrs)
#: the registered subcommands by their exported names.
self.commands = commands or {}
def add_command(self, cmd, name=None):
"""Registers another :class:`Command` with this group. If the name
is not provided, the name of the command is used.
name = name or
if name is None:
raise TypeError("Command has no name.")
_check_multicommand(self, name, cmd, register=True)
self.commands[name] = cmd
def command(self, *args, **kwargs):
"""A shortcut decorator for declaring and attaching a command to
the group. This takes the same arguments as :func:`command` but
immediately registers the created command with this instance by
calling into :meth:`add_command`.
from .decorators import command
def decorator(f):
cmd = command(*args, **kwargs)(f)
return cmd
return decorator
def group(self, *args, **kwargs):
"""A shortcut decorator for declaring and attaching a group to
the group. This takes the same arguments as :func:`group` but
immediately registers the created command with this instance by
calling into :meth:`add_command`.
from .decorators import group
def decorator(f):
cmd = group(*args, **kwargs)(f)
return cmd
return decorator
def get_command(self, ctx, cmd_name):
return self.commands.get(cmd_name)
def list_commands(self, ctx):
return sorted(self.commands)
class CommandCollection(MultiCommand):
"""A command collection is a multi command that merges multiple multi
commands together into one. This is a straightforward implementation
that accepts a list of different multi commands as sources and
provides all the commands for each of them.
def __init__(self, name=None, sources=None, **attrs):
MultiCommand.__init__(self, name, **attrs)
#: The list of registered multi commands.
self.sources = sources or []
def add_source(self, multi_cmd):
"""Adds a new multi command to the chain dispatcher."""
def get_command(self, ctx, cmd_name):
for source in self.sources:
rv = source.get_command(ctx, cmd_name)
if rv is not None:
if self.chain:
_check_multicommand(self, cmd_name, rv)
return rv
def list_commands(self, ctx):
rv = set()
for source in self.sources:
return sorted(rv)
class Parameter(object):
r"""A parameter to a command comes in two versions: they are either
:class:`Option`\s or :class:`Argument`\s. Other subclasses are currently
not supported by design as some of the internals for parsing are
intentionally not finalized.
Some settings are supported by both options and arguments.
:param param_decls: the parameter declarations for this option or
argument. This is a list of flags or argument
:param type: the type that should be used. Either a :class:`ParamType`
or a Python type. The later is converted into the former
automatically if supported.
:param required: controls if this is optional or not.
:param default: the default value if omitted. This can also be a callable,
in which case it's invoked when the default is needed
without any arguments.
:param callback: a callback that should be executed after the parameter
was matched. This is called as ``fn(ctx, param,
value)`` and needs to return the value.
:param nargs: the number of arguments to match. If not ``1`` the return
value is a tuple instead of single value. The default for
nargs is ``1`` (except if the type is a tuple, then it's
the arity of the tuple).
:param metavar: how the value is represented in the help page.
:param expose_value: if this is `True` then the value is passed onwards
to the command callback and stored on the context,
otherwise it's skipped.
:param is_eager: eager values are processed before non eager ones. This
should not be set for arguments or it will inverse the
order of processing.
:param envvar: a string or list of strings that are environment variables
that should be checked.
.. versionchanged:: 7.1
Empty environment variables are ignored rather than taking the
empty string value. This makes it possible for scripts to clear
variables if they can't unset them.
.. versionchanged:: 2.0
Changed signature for parameter callback to also be passed the
parameter. The old callback format will still work, but it will
raise a warning to give you a chance to migrate the code easier.
param_type_name = "parameter"
def __init__(
):, self.opts, self.secondary_opts = self._parse_decls(
param_decls or (), expose_value
self.type = convert_type(type, default)
# Default nargs to what the type tells us if we have that
# information available.
if nargs is None:
if self.type.is_composite:
nargs = self.type.arity
nargs = 1
self.required = required
self.callback = callback
self.nargs = nargs
self.multiple = False
self.expose_value = expose_value
self.default = default
self.is_eager = is_eager
self.metavar = metavar
self.envvar = envvar
self.autocompletion = autocompletion
def __repr__(self):
return "<{} {}>".format(self.__class__.__name__,
def human_readable_name(self):
"""Returns the human readable name of this parameter. This is the
same as the name for options, but the metavar for arguments.
def make_metavar(self):
if self.metavar is not None:
return self.metavar
metavar = self.type.get_metavar(self)
if metavar is None:
metavar =
if self.nargs != 1:
metavar += "..."
return metavar
def get_default(self, ctx):
"""Given a context variable this calculates the default value."""
# Otherwise go with the regular default.
if callable(self.default):
rv = self.default()
rv = self.default
return self.type_cast_value(ctx, rv)
def add_to_parser(self, parser, ctx):
def consume_value(self, ctx, opts):
value = opts.get(
if value is None:
value = self.value_from_envvar(ctx)
if value is None:
value = ctx.lookup_default(
return value
def type_cast_value(self, ctx, value):
"""Given a value this runs it properly through the type system.
This automatically handles things like `nargs` and `multiple` as
well as composite types.
if self.type.is_composite:
if self.nargs <= 1:
raise TypeError(
"Attempted to invoke composite type but nargs has"
" been set to {}. This is not supported; nargs"
" needs to be set to a fixed value > 1.".format(self.nargs)
if self.multiple:
return tuple(self.type(x or (), self, ctx) for x in value or ())
return self.type(value or (), self, ctx)
def _convert(value, level):
if level == 0:
return self.type(value, self, ctx)
return tuple(_convert(x, level - 1) for x in value or ())
return _convert(value, (self.nargs != 1) + bool(self.multiple))
def process_value(self, ctx, value):
"""Given a value and context this runs the logic to convert the
value as necessary.
# If the value we were given is None we do nothing. This way
# code that calls this can easily figure out if something was
# not provided. Otherwise it would be converted into an empty
# tuple for multiple invocations which is inconvenient.
if value is not None:
return self.type_cast_value(ctx, value)
def value_is_missing(self, value):
if value is None:
return True
if (self.nargs != 1 or self.multiple) and value == ():
return True
return False
def full_process_value(self, ctx, value):
value = self.process_value(ctx, value)
if value is None and not ctx.resilient_parsing:
value = self.get_default(ctx)
if self.required and self.value_is_missing(value):
raise MissingParameter(ctx=ctx, param=self)
return value
def resolve_envvar_value(self, ctx):
if self.envvar is None:
if isinstance(self.envvar, (tuple, list)):
for envvar in self.envvar:
rv = os.environ.get(envvar)
if rv is not None:
return rv
rv = os.environ.get(self.envvar)
if rv != "":
return rv
def value_from_envvar(self, ctx):
rv = self.resolve_envvar_value(ctx)
if rv is not None and self.nargs != 1:
rv = self.type.split_envvar_value(rv)
return rv
def handle_parse_result(self, ctx, opts, args):
with augment_usage_errors(ctx, param=self):
value = self.consume_value(ctx, opts)
value = self.full_process_value(ctx, value)
except Exception:
if not ctx.resilient_parsing:
value = None
if self.callback is not None:
value = invoke_param_callback(self.callback, ctx, self, value)
except Exception:
if not ctx.resilient_parsing:
if self.expose_value:
ctx.params[] = value
return value, args
def get_help_record(self, ctx):
def get_usage_pieces(self, ctx):
return []
def get_error_hint(self, ctx):
"""Get a stringified version of the param for use in error messages to
indicate which param caused the error.
hint_list = self.opts or [self.human_readable_name]
return " / ".join(repr(x) for x in hint_list)
class Option(Parameter):
"""Options are usually optional values on the command line and
have some extra features that arguments don't have.
All other parameters are passed onwards to the parameter constructor.
:param show_default: controls if the default value should be shown on the
help page. Normally, defaults are not shown. If this
value is a string, it shows the string instead of the
value. This is particularly useful for dynamic options.
:param show_envvar: controls if an environment variable should be shown on
the help page. Normally, environment variables
are not shown.
:param prompt: if set to `True` or a non empty string then the user will be
prompted for input. If set to `True` the prompt will be the
option name capitalized.
:param confirmation_prompt: if set then the value will need to be confirmed
if it was prompted for.
:param hide_input: if this is `True` then the input on the prompt will be
hidden from the user. This is useful for password
:param is_flag: forces this option to act as a flag. The default is
auto detection.
:param flag_value: which value should be used for this flag if it's
enabled. This is set to a boolean automatically if
the option string contains a slash to mark two options.
:param multiple: if this is set to `True` then the argument is accepted
multiple times and recorded. This is similar to ``nargs``
in how it works but supports arbitrary number of
:param count: this flag makes an option increment an integer.
:param allow_from_autoenv: if this is enabled then the value of this
parameter will be pulled from an environment
variable in case a prefix is defined on the
:param help: the help string.
:param hidden: hide this option from help outputs.
param_type_name = "option"
def __init__(
default_is_missing = attrs.get("default", _missing) is _missing
Parameter.__init__(self, param_decls, type=type, **attrs)
if prompt is True:
prompt_text ="_", " ").capitalize()
elif prompt is False:
prompt_text = None
prompt_text = prompt
self.prompt = prompt_text
self.confirmation_prompt = confirmation_prompt
self.hide_input = hide_input
self.hidden = hidden
# Flags
if is_flag is None:
if flag_value is not None:
is_flag = True
is_flag = bool(self.secondary_opts)
if is_flag and default_is_missing:
self.default = False
if flag_value is None:
flag_value = not self.default
self.is_flag = is_flag
self.flag_value = flag_value
if self.is_flag and isinstance(self.flag_value, bool) and type in [None, bool]:
self.type = BOOL
self.is_bool_flag = True
self.is_bool_flag = False
# Counting
self.count = count
if count:
if type is None:
self.type = IntRange(min=0)
if default_is_missing:
self.default = 0
self.multiple = multiple
self.allow_from_autoenv = allow_from_autoenv = help
self.show_default = show_default
self.show_choices = show_choices
self.show_envvar = show_envvar
# Sanity check for stuff we don't support
if __debug__:
if self.nargs < 0:
raise TypeError("Options cannot have nargs < 0")
if self.prompt and self.is_flag and not self.is_bool_flag:
raise TypeError("Cannot prompt for flags that are not bools.")
if not self.is_bool_flag and self.secondary_opts:
raise TypeError("Got secondary option for non boolean flag.")
if self.is_bool_flag and self.hide_input and self.prompt is not None:
raise TypeError("Hidden input does not work with boolean flag prompts.")
if self.count:
if self.multiple:
raise TypeError(
"Options cannot be multiple and count at the same time."
elif self.is_flag:
raise TypeError(
"Options cannot be count and flags at the same time."
def _parse_decls(self, decls, expose_value):
opts = []
secondary_opts = []
name = None
possible_names = []
for decl in decls:
if isidentifier(decl):
if name is not None:
raise TypeError("Name defined twice")
name = decl
split_char = ";" if decl[:1] == "/" else "/"
if split_char in decl:
first, second = decl.split(split_char, 1)
first = first.rstrip()
if first:
second = second.lstrip()
if second:
if name is None and possible_names:
possible_names.sort(key=lambda x: -len(x[0])) # group long options first
name = possible_names[0][1].replace("-", "_").lower()
if not isidentifier(name):
name = None
if name is None:
if not expose_value:
return None, opts, secondary_opts
raise TypeError("Could not determine name for option")
if not opts and not secondary_opts:
raise TypeError(
"No options defined but a name was passed ({}). Did you"
" mean to declare an argument instead of an option?".format(name)
return name, opts, secondary_opts
def add_to_parser(self, parser, ctx):
kwargs = {
"nargs": self.nargs,
"obj": self,
if self.multiple:
action = "append"
elif self.count:
action = "count"
action = "store"
if self.is_flag:
kwargs.pop("nargs", None)
action_const = "{}_const".format(action)
if self.is_bool_flag and self.secondary_opts:
parser.add_option(self.opts, action=action_const, const=True, **kwargs)
self.secondary_opts, action=action_const, const=False, **kwargs
self.opts, action=action_const, const=self.flag_value, **kwargs
kwargs["action"] = action
parser.add_option(self.opts, **kwargs)
def get_help_record(self, ctx):
if self.hidden:
any_prefix_is_slash = []
def _write_opts(opts):
rv, any_slashes = join_options(opts)
if any_slashes:
any_prefix_is_slash[:] = [True]
if not self.is_flag and not self.count:
rv += " {}".format(self.make_metavar())
return rv
rv = [_write_opts(self.opts)]
if self.secondary_opts:
help = or ""
extra = []
if self.show_envvar:
envvar = self.envvar
if envvar is None:
if self.allow_from_autoenv and ctx.auto_envvar_prefix is not None:
envvar = "{}_{}".format(ctx.auto_envvar_prefix,
if envvar is not None:
"env var: {}".format(
", ".join(str(d) for d in envvar)
if isinstance(envvar, (list, tuple))
else envvar
if self.default is not None and (self.show_default or ctx.show_default):
if isinstance(self.show_default, string_types):
default_string = "({})".format(self.show_default)
elif isinstance(self.default, (list, tuple)):
default_string = ", ".join(str(d) for d in self.default)
elif inspect.isfunction(self.default):
default_string = "(dynamic)"
default_string = self.default
extra.append("default: {}".format(default_string))
if self.required:
if extra:
help = "{}[{}]".format(
"{} ".format(help) if help else "", "; ".join(extra)
return ("; " if any_prefix_is_slash else " / ").join(rv), help
def get_default(self, ctx):
# If we're a non boolean flag our default is more complex because
# we need to look at all flags in the same group to figure out
# if we're the the default one in which case we return the flag
# value as default.
if self.is_flag and not self.is_bool_flag:
for param in ctx.command.params:
if == and param.default:
return param.flag_value
return None
return Parameter.get_default(self, ctx)
def prompt_for_value(self, ctx):
"""This is an alternative flow that can be activated in the full
value processing if a value does not exist. It will prompt the
user until a valid value exists and then returns the processed
value as result.
# Calculate the default before prompting anything to be stable.
default = self.get_default(ctx)
# If this is a prompt for a flag we need to handle this
# differently.
if self.is_bool_flag:
return confirm(self.prompt, default)
return prompt(
value_proc=lambda x: self.process_value(ctx, x),
def resolve_envvar_value(self, ctx):
rv = Parameter.resolve_envvar_value(self, ctx)
if rv is not None:
return rv
if self.allow_from_autoenv and ctx.auto_envvar_prefix is not None:
envvar = "{}_{}".format(ctx.auto_envvar_prefix,
return os.environ.get(envvar)
def value_from_envvar(self, ctx):
rv = self.resolve_envvar_value(ctx)
if rv is None:
return None
value_depth = (self.nargs != 1) + bool(self.multiple)
if value_depth > 0 and rv is not None:
rv = self.type.split_envvar_value(rv)
if self.multiple and self.nargs != 1:
rv = batch(rv, self.nargs)
return rv
def full_process_value(self, ctx, value):
if value is None and self.prompt is not None and not ctx.resilient_parsing:
return self.prompt_for_value(ctx)
return Parameter.full_process_value(self, ctx, value)
class Argument(Parameter):
"""Arguments are positional parameters to a command. They generally
provide fewer features than options but can have infinite ``nargs``
and are required by default.
All parameters are passed onwards to the parameter constructor.
param_type_name = "argument"
def __init__(self, param_decls, required=None, **attrs):
if required is None:
if attrs.get("default") is not None:
required = False
required = attrs.get("nargs", 1) > 0
Parameter.__init__(self, param_decls, required=required, **attrs)
if self.default is not None and self.nargs < 0:
raise TypeError(
"nargs=-1 in combination with a default value is not supported."
def human_readable_name(self):
if self.metavar is not None:
return self.metavar
def make_metavar(self):
if self.metavar is not None:
return self.metavar
var = self.type.get_metavar(self)
if not var:
var =
if not self.required:
var = "[{}]".format(var)
if self.nargs != 1:
var += "..."
return var
def _parse_decls(self, decls, expose_value):
if not decls:
if not expose_value:
return None, [], []
raise TypeError("Could not determine name for argument")
if len(decls) == 1:
name = arg = decls[0]
name = name.replace("-", "_").lower()
raise TypeError(
"Arguments take exactly one parameter declaration, got"
" {}".format(len(decls))
return name, [arg], []
def get_usage_pieces(self, ctx):
return [self.make_metavar()]
def get_error_hint(self, ctx):
return repr(self.make_metavar())
def add_to_parser(self, parser, ctx):
parser.add_argument(, nargs=self.nargs, obj=self)
import inspect
import sys
from functools import update_wrapper
from ._compat import iteritems
from ._unicodefun import _check_for_unicode_literals
from .core import Argument
from .core import Command
from .core import Group
from .core import Option
from .globals import get_current_context
from .utils import echo
def pass_context(f):
"""Marks a callback as wanting to receive the current context
object as first argument.
def new_func(*args, **kwargs):
return f(get_current_context(), *args, **kwargs)
return update_wrapper(new_func, f)
def pass_obj(f):
"""Similar to :func:`pass_context`, but only pass the object on the
context onwards (:attr:`Context.obj`). This is useful if that object
represents the state of a nested system.
def new_func(*args, **kwargs):
return f(get_current_context().obj, *args, **kwargs)
return update_wrapper(new_func, f)
def make_pass_decorator(object_type, ensure=False):
"""Given an object type this creates a decorator that will work
similar to :func:`pass_obj` but instead of passing the object of the
current context, it will find the innermost context of type
This generates a decorator that works roughly like this::
from functools import update_wrapper
def decorator(f):
def new_func(ctx, *args, **kwargs):
obj = ctx.find_object(object_type)
return ctx.invoke(f, obj, *args, **kwargs)
return update_wrapper(new_func, f)
return decorator
:param object_type: the type of the object to pass.
:param ensure: if set to `True`, a new object will be created and
remembered on the context if it's not there yet.
def decorator(f):
def new_func(*args, **kwargs):
ctx = get_current_context()
if ensure:
obj = ctx.ensure_object(object_type)
obj = ctx.find_object(object_type)
if obj is None:
raise RuntimeError(
"Managed to invoke callback without a context"
" object of type '{}' existing".format(object_type.__name__)
return ctx.invoke(f, obj, *args, **kwargs)
return update_wrapper(new_func, f)
return decorator
def _make_command(f, name, attrs, cls):
if isinstance(f, Command):
raise TypeError("Attempted to convert a callback into a command twice.")
params = f.__click_params__
del f.__click_params__
except AttributeError:
params = []
help = attrs.get("help")
if help is None:
help = inspect.getdoc(f)
if isinstance(help, bytes):
help = help.decode("utf-8")
help = inspect.cleandoc(help)
attrs["help"] = help
return cls(
name=name or f.__name__.lower().replace("_", "-"),
def command(name=None, cls=None, **attrs):
r"""Creates a new :class:`Command` and uses the decorated function as
callback. This will also automatically attach all decorated
:func:`option`\s and :func:`argument`\s as parameters to the command.
The name of the command defaults to the name of the function with
underscores replaced by dashes. If you want to change that, you can
pass the intended name as the first argument.
All keyword arguments are forwarded to the underlying command class.
Once decorated the function turns into a :class:`Command` instance
that can be invoked as a command line utility or be attached to a
command :class:`Group`.
:param name: the name of the command. This defaults to the function
name with underscores replaced by dashes.
:param cls: the command class to instantiate. This defaults to
if cls is None:
cls = Command
def decorator(f):
cmd = _make_command(f, name, attrs, cls)
cmd.__doc__ = f.__doc__
return cmd
return decorator
def group(name=None, **attrs):
"""Creates a new :class:`Group` with a function as callback. This
works otherwise the same as :func:`command` just that the `cls`
parameter is set to :class:`Group`.
attrs.setdefault("cls", Group)
return command(name, **attrs)
def _param_memo(f, param):
if isinstance(f, Command):
if not hasattr(f, "__click_params__"):
f.__click_params__ = []
def argument(*param_decls, **attrs):
"""Attaches an argument to the command. All positional arguments are
passed as parameter declarations to :class:`Argument`; all keyword
arguments are forwarded unchanged (except ``cls``).
This is equivalent to creating an :class:`Argument` instance manually
and attaching it to the :attr:`Command.params` list.
:param cls: the argument class to instantiate. This defaults to
def decorator(f):
ArgumentClass = attrs.pop("cls", Argument)
_param_memo(f, ArgumentClass(param_decls, **attrs))
return f
return decorator
def option(*param_decls, **attrs):
"""Attaches an option to the command. All positional arguments are
passed as parameter declarations to :class:`Option`; all keyword
arguments are forwarded unchanged (except ``cls``).
This is equivalent to creating an :class:`Option` instance manually
and attaching it to the :attr:`Command.params` list.
:param cls: the option class to instantiate. This defaults to
def decorator(f):
# Issue 926, copy attrs, so pre-defined options can re-use the same cls=
option_attrs = attrs.copy()
if "help" in option_attrs:
option_attrs["help"] = inspect.cleandoc(option_attrs["help"])
OptionClass = option_attrs.pop("cls", Option)
_param_memo(f, OptionClass(param_decls, **option_attrs))
return f
return decorator
def confirmation_option(*param_decls, **attrs):
"""Shortcut for confirmation prompts that can be ignored by passing
``--yes`` as parameter.
This is equivalent to decorating a function with :func:`option` with
the following parameters::
def callback(ctx, param, value):
if not value:
@click.option('--yes', is_flag=True, callback=callback,
expose_value=False, prompt='Do you want to continue?')
def dropdb():
def decorator(f):
def callback(ctx, param, value):
if not value:
attrs.setdefault("is_flag", True)
attrs.setdefault("callback", callback)
attrs.setdefault("expose_value", False)
attrs.setdefault("prompt", "Do you want to continue?")
attrs.setdefault("help", "Confirm the action without prompting.")
return option(*(param_decls or ("--yes",)), **attrs)(f)
return decorator
def password_option(*param_decls, **attrs):
"""Shortcut for password prompts.
This is equivalent to decorating a function with :func:`option` with
the following parameters::
@click.option('--password', prompt=True, confirmation_prompt=True,
def changeadmin(password):
def decorator(f):
attrs.setdefault("prompt", True)
attrs.setdefault("confirmation_prompt", True)
attrs.setdefault("hide_input", True)
return option(*(param_decls or ("--password",)), **attrs)(f)
return decorator
def version_option(version=None, *param_decls, **attrs):
"""Adds a ``--version`` option which immediately ends the program
printing out the version number. This is implemented as an eager
option that prints the version and exits the program in the callback.
:param version: the version number to show. If not provided Click
attempts an auto discovery via setuptools.
:param prog_name: the name of the program (defaults to autodetection)
:param message: custom message to show instead of the default
(``'%(prog)s, version %(version)s'``)
:param others: everything else is forwarded to :func:`option`.
if version is None:
if hasattr(sys, "_getframe"):
module = sys._getframe(1).f_globals.get("__name__")
module = ""
def decorator(f):
prog_name = attrs.pop("prog_name", None)
message = attrs.pop("message", "%(prog)s, version %(version)s")
def callback(ctx, param, value):
if not value or ctx.resilient_parsing:
prog = prog_name
if prog is None:
prog = ctx.find_root().info_name
ver = version
if ver is None:
import pkg_resources
except ImportError:
for dist in pkg_resources.working_set:
scripts = dist.get_entry_map().get("console_scripts") or {}
for _, entry_point in iteritems(scripts):
if entry_point.module_name == module:
ver = dist.version
if ver is None:
raise RuntimeError("Could not determine version")
echo(message % {"prog": prog, "version": ver}, color=ctx.color)
attrs.setdefault("is_flag", True)
attrs.setdefault("expose_value", False)
attrs.setdefault("is_eager", True)
attrs.setdefault("help", "Show the version and exit.")
attrs["callback"] = callback
return option(*(param_decls or ("--version",)), **attrs)(f)
return decorator
def help_option(*param_decls, **attrs):
"""Adds a ``--help`` option which immediately ends the program
printing out the help page. This is usually unnecessary to add as
this is added by default to all commands unless suppressed.
Like :func:`version_option`, this is implemented as eager option that
prints in the callback and exits.
All arguments are forwarded to :func:`option`.
def decorator(f):
def callback(ctx, param, value):
if value and not ctx.resilient_parsing:
echo(ctx.get_help(), color=ctx.color)
attrs.setdefault("is_flag", True)
attrs.setdefault("expose_value", False)
attrs.setdefault("help", "Show this message and exit.")
attrs.setdefault("is_eager", True)
attrs["callback"] = callback
return option(*(param_decls or ("--help",)), **attrs)(f)
return decorator
from ._compat import filename_to_ui
from ._compat import get_text_stderr
from ._compat import PY2
from .utils import echo
def _join_param_hints(param_hint):
if isinstance(param_hint, (tuple, list)):
return " / ".join(repr(x) for x in param_hint)
return param_hint
class ClickException(Exception):
"""An exception that Click can handle and show to the user."""
#: The exit code for this exception
exit_code = 1
def __init__(self, message):
ctor_msg = message
if PY2:
if ctor_msg is not None:
ctor_msg = ctor_msg.encode("utf-8")
Exception.__init__(self, ctor_msg)
self.message = message
def format_message(self):
return self.message
def __str__(self):
return self.message
if PY2:
__unicode__ = __str__
def __str__(self):
return self.message.encode("utf-8")
def show(self, file=None):
if file is None:
file = get_text_stderr()
echo("Error: {}".format(self.format_message()), file=file)
class UsageError(ClickException):
"""An internal exception that signals a usage error. This typically
aborts any further handling.
:param message: the error message to display.
:param ctx: optionally the context that caused this error. Click will
fill in the context automatically in some situations.
exit_code = 2
def __init__(self, message, ctx=None):
ClickException.__init__(self, message)
self.ctx = ctx
self.cmd = self.ctx.command if self.ctx else None
def show(self, file=None):
if file is None:
file = get_text_stderr()
color = None
hint = ""
if self.cmd is not None and self.cmd.get_help_option(self.ctx) is not None:
hint = "Try '{} {}' for help.\n".format(
self.ctx.command_path, self.ctx.help_option_names[0]
if self.ctx is not None:
color = self.ctx.color
echo("{}\n{}".format(self.ctx.get_usage(), hint), file=file, color=color)
echo("Error: {}".format(self.format_message()), file=file, color=color)
class BadParameter(UsageError):
"""An exception that formats out a standardized error message for a
bad parameter. This is useful when thrown from a callback or type as
Click will attach contextual information to it (for instance, which
parameter it is).
.. versionadded:: 2.0
:param param: the parameter object that caused this error. This can
be left out, and Click will attach this info itself
if possible.
:param param_hint: a string that shows up as parameter name. This
can be used as alternative to `param` in cases
where custom validation should happen. If it is
a string it's used as such, if it's a list then
each item is quoted and separated.
def __init__(self, message, ctx=None, param=None, param_hint=None):
UsageError.__init__(self, message, ctx)
self.param = param
self.param_hint = param_hint
def format_message(self):
if self.param_hint is not None:
param_hint = self.param_hint
elif self.param is not None:
param_hint = self.param.get_error_hint(self.ctx)
return "Invalid value: {}".format(self.message)
param_hint = _join_param_hints(param_hint)
return "Invalid value for {}: {}".format(param_hint, self.message)
class MissingParameter(BadParameter):
"""Raised if click required an option or argument but it was not
provided when invoking the script.
.. versionadded:: 4.0
:param param_type: a string that indicates the type of the parameter.
The default is to inherit the parameter type from
the given `param`. Valid values are ``'parameter'``,
``'option'`` or ``'argument'``.
def __init__(
self, message=None, ctx=None, param=None, param_hint=None, param_type=None
BadParameter.__init__(self, message, ctx, param, param_hint)
self.param_type = param_type
def format_message(self):
if self.param_hint is not None:
param_hint = self.param_hint
elif self.param is not None:
param_hint = self.param.get_error_hint(self.ctx)
param_hint = None
param_hint = _join_param_hints(param_hint)
param_type = self.param_type
if param_type is None and self.param is not None:
param_type = self.param.param_type_name
msg = self.message
if self.param is not None:
msg_extra = self.param.type.get_missing_message(self.param)
if msg_extra:
if msg:
msg += ". {}".format(msg_extra)
msg = msg_extra
return "Missing {}{}{}{}".format(
" {}".format(param_hint) if param_hint else "",
". " if msg else ".",
msg or "",
def __str__(self):
if self.message is None:
param_name = if self.param else None
return "missing parameter: {}".format(param_name)
return self.message
if PY2:
__unicode__ = __str__
def __str__(self):
return self.__unicode__().encode("utf-8")
class NoSuchOption(UsageError):
"""Raised if click attempted to handle an option that does not
.. versionadded:: 4.0
def __init__(self, option_name, message=None, possibilities=None, ctx=None):
if message is None:
message = "no such option: {}".format(option_name)
UsageError.__init__(self, message, ctx)
self.option_name = option_name
self.possibilities = possibilities
def format_message(self):
bits = [self.message]
if self.possibilities:
if len(self.possibilities) == 1:
bits.append("Did you mean {}?".format(self.possibilities[0]))
possibilities = sorted(self.possibilities)
bits.append("(Possible options: {})".format(", ".join(possibilities)))
return " ".join(bits)
class BadOptionUsage(UsageError):
"""Raised if an option is generally supplied but the use of the option
was incorrect. This is for instance raised if the number of arguments
for an option is not correct.
.. versionadded:: 4.0
:param option_name: the name of the option being used incorrectly.
def __init__(self, option_name, message, ctx=None):
UsageError.__init__(self, message, ctx)
self.option_name = option_name
class BadArgumentUsage(UsageError):
"""Raised if an argument is generally supplied but the use of the argument
was incorrect. This is for instance raised if the number of values
for an argument is not correct.
.. versionadded:: 6.0
def __init__(self, message, ctx=None):
UsageError.__init__(self, message, ctx)
class FileError(ClickException):
"""Raised if a file cannot be opened."""
def __init__(self, filename, hint=None):
ui_filename = filename_to_ui(filename)
if hint is None:
hint = "unknown error"
ClickException.__init__(self, hint)
self.ui_filename = ui_filename
self.filename = filename
def format_message(self):
return "Could not open file {}: {}".format(self.ui_filename, self.message)
class Abort(RuntimeError):
"""An internal signalling exception that signals Click to abort."""
class Exit(RuntimeError):
"""An exception that indicates that the application should exit with some
status code.
:param code: the status code to exit with.
__slots__ = ("exit_code",)
def __init__(self, code=0):
self.exit_code = code
