Refactor arguments parsing to fix bootstrap action CLI issues (#712).

This commit is contained in:
Dan Helfman 2023-06-19 16:18:47 -07:00
parent ee2ebb79b8
commit 7b8be800a4
8 changed files with 617 additions and 364 deletions

4
NEWS
View file

@ -1,7 +1,7 @@
1.7.15.dev0
* #399: Add a documentation troubleshooting note for MySQL/MariaDB authentication errors.
* #697: Extract borgmatic configuration from backup via "bootstrap" action—even when borgmatic
has no configuration yet!
* #697, #712: Extract borgmatic configuration from backup via "bootstrap" action—even when
borgmatic has no configuration yet!
* #669: Add sample systemd user service for running borgmatic as a non-root user.
1.7.14

View file

@ -38,10 +38,7 @@ def run_arbitrary_borg(
borg_command = tuple(options[:command_options_start_index])
command_options = tuple(options[command_options_start_index:])
if (
borg_command
and borg_command[0] in borgmatic.commands.arguments.SUBPARSER_ALIASES.keys()
):
if borg_command and borg_command[0] in borgmatic.commands.arguments.ACTION_ALIASES.keys():
logger.warning(
f"Borg's {borg_command[0]} subcommand is supported natively by borgmatic. Try this instead: borgmatic {borg_command[0]}"
)

View file

@ -1,18 +1,17 @@
import argparse
import collections
import itertools
import sys
from argparse import Action, ArgumentParser
from borgmatic.config import collect
SUBPARSER_ALIASES = {
ACTION_ALIASES = {
'rcreate': ['init', '-I'],
'prune': ['-p'],
'compact': [],
'create': ['-C'],
'check': ['-k'],
'config': [],
'config_bootstrap': [],
'extract': ['-x'],
'export-tar': [],
'mount': ['-m'],
@ -28,124 +27,192 @@ SUBPARSER_ALIASES = {
}
def get_unparsable_arguments(remaining_subparser_arguments):
def get_subaction_parsers(action_parser):
'''
Determine the remaining arguments that no subparsers have consumed.
Given an argparse.ArgumentParser instance, lookup the subactions in it and return a dict from
subaction name to subaction parser.
'''
if remaining_subparser_arguments:
remaining_arguments = [
argument
for argument in dict.fromkeys(
itertools.chain.from_iterable(remaining_subparser_arguments)
).keys()
if all(
argument in subparser_arguments
for subparser_arguments in remaining_subparser_arguments
)
]
else:
remaining_arguments = []
if not action_parser._subparsers:
return {}
return remaining_arguments
def parse_subparser_arguments(unparsed_arguments, subparsers):
'''
Given a sequence of arguments and a dict from subparser name to argparse.ArgumentParser
instance, give each requested action's subparser a shot at parsing all arguments. This allows
common arguments like "--repository" to be shared across multiple subparsers.
Return the result as a tuple of (a dict mapping from subparser name to a parsed namespace of
arguments, a list of remaining arguments not claimed by any subparser).
'''
arguments = collections.OrderedDict()
remaining_arguments = list(unparsed_arguments)
alias_to_subparser_name = {
alias: subparser_name
for subparser_name, aliases in SUBPARSER_ALIASES.items()
for alias in aliases
}
subcommand_parsers_mapping = {
'config': ['bootstrap'],
return {
subaction_name: subaction_parser
for group_action in action_parser._subparsers._group_actions
for subaction_name, subaction_parser in group_action.choices.items()
}
# If the "borg" action is used, skip all other subparsers. This avoids confusion like
# "borg list" triggering borgmatic's own list action.
if 'borg' in unparsed_arguments:
subparsers = {'borg': subparsers['borg']}
for argument in remaining_arguments:
canonical_name = alias_to_subparser_name.get(argument, argument)
subparser = subparsers.get(canonical_name)
if not subparser:
continue
# If a parsed value happens to be the same as the name of a subparser, remove it from the
# remaining arguments. This prevents, for instance, "check --only extract" from triggering
# the "extract" subparser.
parsed, unused_remaining = subparser.parse_known_args(
[argument for argument in unparsed_arguments if argument != canonical_name]
def get_subactions_for_actions(action_parsers):
'''
Given a dict from action name to an argparse.ArgumentParser instance, make a map from action
name to the names of contained sub-actions.
'''
return {
action: tuple(
subaction_name
for group_action in action_parser._subparsers._group_actions
for subaction_name in group_action.choices.keys()
)
for action, action_parser in action_parsers.items()
if action_parser._subparsers
}
def omit_values_colliding_with_action_names(unparsed_arguments, parsed_arguments):
'''
Given a sequence of string arguments and a dict from action name to parsed argparse.Namespace
arguments, return the string arguments with any values omitted that happen to be the same as
the name of a borgmatic action.
This prevents, for instance, "check --only extract" from triggering the "extract" action.
'''
remaining_arguments = list(unparsed_arguments)
for action_name, parsed in parsed_arguments.items():
for value in vars(parsed).values():
if isinstance(value, str):
if value in subparsers:
if value in ACTION_ALIASES.keys():
remaining_arguments.remove(value)
elif isinstance(value, list):
for item in value:
if item in subparsers:
if item in ACTION_ALIASES.keys():
remaining_arguments.remove(item)
arguments[canonical_name] = None if canonical_name in subcommand_parsers_mapping else parsed
return tuple(remaining_arguments)
for argument in arguments:
if not arguments[argument]:
if not any(
subcommand in arguments for subcommand in subcommand_parsers_mapping[argument]
):
raise ValueError(
f'Missing subcommand for {argument}. Expected one of {subcommand_parsers_mapping[argument]}'
)
# If no actions are explicitly requested, assume defaults.
if not arguments and '--help' not in unparsed_arguments and '-h' not in unparsed_arguments:
for subparser_name in ('create', 'prune', 'compact', 'check'):
subparser = subparsers[subparser_name]
parsed, unused_remaining = subparser.parse_known_args(unparsed_arguments)
arguments[subparser_name] = parsed
def parse_and_record_action_arguments(
unparsed_arguments, parsed_arguments, action_parser, action_name, canonical_name=None
):
'''
Given unparsed arguments as a sequence of strings, parsed arguments as a dict from action name
to parsed argparse.Namespace, a parser to parse with, an action name, and an optional canonical
action name (in case this the action name is an alias), parse the arguments and return a list of
any remaining string arguments that were not parsed. Also record the parsed argparse.Namespace
by setting it into the given parsed arguments. Return None if no parsing occurs because the
given action doesn't apply to the given unparsed arguments.
'''
filtered_arguments = omit_values_colliding_with_action_names(
unparsed_arguments, parsed_arguments
)
remaining_arguments = list(unparsed_arguments)
if action_name not in filtered_arguments:
return tuple(unparsed_arguments)
# Now ask each subparser, one by one, to greedily consume arguments, from last to first. This
# allows subparsers to consume arguments before their parent subparsers do.
remaining_subparser_arguments = []
parsed, remaining = action_parser.parse_known_args(filtered_arguments)
parsed_arguments[canonical_name or action_name] = parsed
for subparser_name, subparser in reversed(subparsers.items()):
if subparser_name not in arguments.keys():
# Special case: If this is a "borg" action, greedily consume all arguments after (+1) the "borg"
# argument.
if action_name == 'borg':
borg_options_index = remaining.index('borg') + 1
parsed_arguments['borg'].options = remaining[borg_options_index:]
remaining = remaining[:borg_options_index]
return tuple(argument for argument in remaining if argument != action_name)
def get_unparsable_arguments(remaining_action_arguments):
'''
Given a sequence of argument tuples (one tuple per action parser that parsed arguments),
determine the remaining arguments that no action parsers have consumed.
'''
if not remaining_action_arguments:
return ()
return tuple(
argument
for argument in dict.fromkeys(
itertools.chain.from_iterable(remaining_action_arguments)
).keys()
if all(argument in action_arguments for action_arguments in remaining_action_arguments)
)
def parse_arguments_for_actions(unparsed_arguments, action_parsers):
'''
Given a sequence of arguments and a dict from action name to argparse.ArgumentParser
instance, give each requested action's parser a shot at parsing all arguments. This allows
common arguments like "--repository" to be shared across multiple action parsers.
Return the result as a tuple of: (a dict mapping from action name to an argparse.Namespace of
parsed arguments, a list of strings of remaining arguments not claimed by any action parser).
'''
arguments = collections.OrderedDict()
help_requested = bool('--help' in unparsed_arguments or '-h' in unparsed_arguments)
remaining_action_arguments = []
alias_to_action_name = {
alias: action_name for action_name, aliases in ACTION_ALIASES.items() for alias in aliases
}
# If the "borg" action is used, skip all other action parsers. This avoids confusion like
# "borg list" triggering borgmatic's own list action.
if 'borg' in unparsed_arguments:
action_parsers = {'borg': action_parsers['borg']}
# Ask each action parser, one by one, to parse arguments.
for argument in unparsed_arguments:
action_name = argument
canonical_name = alias_to_action_name.get(action_name, action_name)
action_parser = action_parsers.get(action_name)
if not action_parser:
continue
subparser = subparsers[subparser_name]
unused_parsed, remaining = subparser.parse_known_args(
[argument for argument in unparsed_arguments if argument != subparser_name]
)
remaining_subparser_arguments.append(remaining)
subaction_parsers = get_subaction_parsers(action_parser)
if remaining_subparser_arguments:
remaining_arguments = get_unparsable_arguments(remaining_subparser_arguments)
# Parse with subaction parsers, if any.
if subaction_parsers:
subactions_parsed = False
# Special case: If "borg" is present in the arguments, consume all arguments after (+1) the
# "borg" action.
if 'borg' in arguments:
borg_options_index = remaining_arguments.index('borg') + 1
arguments['borg'].options = remaining_arguments[borg_options_index:]
remaining_arguments = remaining_arguments[:borg_options_index]
for subaction_name, subaction_parser in subaction_parsers.items():
remaining_action_arguments.append(
parse_and_record_action_arguments(
unparsed_arguments,
arguments,
subaction_parser,
subaction_name,
)
)
# Remove the subparser names themselves.
for subparser_name, subparser in subparsers.items():
if subparser_name in remaining_arguments:
remaining_arguments.remove(subparser_name)
if subaction_name in arguments:
subactions_parsed = True
return (arguments, remaining_arguments)
if not subactions_parsed:
if help_requested:
action_parser.print_help()
sys.exit(0)
else:
raise ValueError(
f"Missing sub-action after {action_name} action. Expected one of: {', '.join(get_subactions_for_actions(action_parsers)[action_name])}"
)
# Otherwise, parse with the main action parser.
else:
remaining_action_arguments.append(
parse_and_record_action_arguments(
unparsed_arguments, arguments, action_parser, action_name, canonical_name
)
)
# If no actions were explicitly requested, assume defaults.
if not arguments and not help_requested:
for default_action_name in ('create', 'prune', 'compact', 'check'):
default_action_parser = action_parsers[default_action_name]
remaining_action_arguments.append(
parse_and_record_action_arguments(
tuple(unparsed_arguments) + (default_action_name,),
arguments,
default_action_parser,
default_action_name,
)
)
return (
arguments,
get_unparsable_arguments(tuple(remaining_action_arguments))
if arguments
else unparsed_arguments,
)
class Extend_action(Action):
@ -164,7 +231,7 @@ class Extend_action(Action):
def make_parsers():
'''
Build a top-level parser and its subparsers and return them as a tuple.
Build a top-level parser and its action parsers and return them as a tuple.
'''
config_paths = collect.get_default_config_paths(expand_home=True)
unexpanded_config_paths = collect.get_default_config_paths(expand_home=False)
@ -283,14 +350,14 @@ def make_parsers():
parents=[global_parser],
)
subparsers = top_level_parser.add_subparsers(
action_parsers = top_level_parser.add_subparsers(
title='actions',
metavar='',
help='Specify zero or more actions. Defaults to create, prune, compact, and check. Use --help with action for details:',
)
rcreate_parser = subparsers.add_parser(
rcreate_parser = action_parsers.add_parser(
'rcreate',
aliases=SUBPARSER_ALIASES['rcreate'],
aliases=ACTION_ALIASES['rcreate'],
help='Create a new, empty Borg repository',
description='Create a new, empty Borg repository',
add_help=False,
@ -336,9 +403,9 @@ def make_parsers():
'-h', '--help', action='help', help='Show this help message and exit'
)
transfer_parser = subparsers.add_parser(
transfer_parser = action_parsers.add_parser(
'transfer',
aliases=SUBPARSER_ALIASES['transfer'],
aliases=ACTION_ALIASES['transfer'],
help='Transfer archives from one repository to another, optionally upgrading the transferred data [Borg 2.0+ only]',
description='Transfer archives from one repository to another, optionally upgrading the transferred data [Borg 2.0+ only]',
add_help=False,
@ -409,9 +476,9 @@ def make_parsers():
'-h', '--help', action='help', help='Show this help message and exit'
)
prune_parser = subparsers.add_parser(
prune_parser = action_parsers.add_parser(
'prune',
aliases=SUBPARSER_ALIASES['prune'],
aliases=ACTION_ALIASES['prune'],
help='Prune archives according to the retention policy (with Borg 1.2+, run compact afterwards to actually free space)',
description='Prune archives according to the retention policy (with Borg 1.2+, run compact afterwards to actually free space)',
add_help=False,
@ -453,9 +520,9 @@ def make_parsers():
)
prune_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
compact_parser = subparsers.add_parser(
compact_parser = action_parsers.add_parser(
'compact',
aliases=SUBPARSER_ALIASES['compact'],
aliases=ACTION_ALIASES['compact'],
help='Compact segments to free space [Borg 1.2+, borgmatic 1.5.23+ only]',
description='Compact segments to free space [Borg 1.2+, borgmatic 1.5.23+ only]',
add_help=False,
@ -489,9 +556,9 @@ def make_parsers():
'-h', '--help', action='help', help='Show this help message and exit'
)
create_parser = subparsers.add_parser(
create_parser = action_parsers.add_parser(
'create',
aliases=SUBPARSER_ALIASES['create'],
aliases=ACTION_ALIASES['create'],
help='Create an archive (actually perform a backup)',
description='Create an archive (actually perform a backup)',
add_help=False,
@ -523,9 +590,9 @@ def make_parsers():
)
create_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
check_parser = subparsers.add_parser(
check_parser = action_parsers.add_parser(
'check',
aliases=SUBPARSER_ALIASES['check'],
aliases=ACTION_ALIASES['check'],
help='Check archives for consistency',
description='Check archives for consistency',
add_help=False,
@ -565,9 +632,9 @@ def make_parsers():
)
check_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
extract_parser = subparsers.add_parser(
extract_parser = action_parsers.add_parser(
'extract',
aliases=SUBPARSER_ALIASES['extract'],
aliases=ACTION_ALIASES['extract'],
help='Extract files from a named archive to the current directory',
description='Extract a named archive to the current directory',
add_help=False,
@ -611,9 +678,9 @@ def make_parsers():
'-h', '--help', action='help', help='Show this help message and exit'
)
config_parser = subparsers.add_parser(
config_parser = action_parsers.add_parser(
'config',
aliases=SUBPARSER_ALIASES['config'],
aliases=ACTION_ALIASES['config'],
help='Perform configuration file related operations',
description='Perform configuration file related operations',
add_help=False,
@ -622,15 +689,14 @@ def make_parsers():
config_group = config_parser.add_argument_group('config arguments')
config_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
config_subparsers = config_parser.add_subparsers(
title='config subcommands',
description='Valid subcommands for config',
config_parsers = config_parser.add_subparsers(
title='config sub-actions',
description='Valid sub-actions for config',
help='Additional help',
)
config_bootstrap_parser = config_subparsers.add_parser(
config_bootstrap_parser = config_parsers.add_parser(
'bootstrap',
aliases=SUBPARSER_ALIASES['config_bootstrap'],
help='Extract the config files used to create a borgmatic repository',
description='Extract config files that were used to create a borgmatic repository during the "create" action',
add_help=False,
@ -676,9 +742,9 @@ def make_parsers():
'-h', '--help', action='help', help='Show this help message and exit'
)
export_tar_parser = subparsers.add_parser(
export_tar_parser = action_parsers.add_parser(
'export-tar',
aliases=SUBPARSER_ALIASES['export-tar'],
aliases=ACTION_ALIASES['export-tar'],
help='Export an archive to a tar-formatted file or stream',
description='Export an archive to a tar-formatted file or stream',
add_help=False,
@ -722,9 +788,9 @@ def make_parsers():
'-h', '--help', action='help', help='Show this help message and exit'
)
mount_parser = subparsers.add_parser(
mount_parser = action_parsers.add_parser(
'mount',
aliases=SUBPARSER_ALIASES['mount'],
aliases=ACTION_ALIASES['mount'],
help='Mount files from a named archive as a FUSE filesystem',
description='Mount a named archive as a FUSE filesystem',
add_help=False,
@ -787,9 +853,9 @@ def make_parsers():
mount_group.add_argument('--options', dest='options', help='Extra Borg mount options')
mount_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
umount_parser = subparsers.add_parser(
umount_parser = action_parsers.add_parser(
'umount',
aliases=SUBPARSER_ALIASES['umount'],
aliases=ACTION_ALIASES['umount'],
help='Unmount a FUSE filesystem that was mounted with "borgmatic mount"',
description='Unmount a mounted FUSE filesystem',
add_help=False,
@ -804,9 +870,9 @@ def make_parsers():
)
umount_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
restore_parser = subparsers.add_parser(
restore_parser = action_parsers.add_parser(
'restore',
aliases=SUBPARSER_ALIASES['restore'],
aliases=ACTION_ALIASES['restore'],
help='Restore database dumps from a named archive',
description='Restore database dumps from a named archive. (To extract files instead, use "borgmatic extract".)',
add_help=False,
@ -837,9 +903,9 @@ def make_parsers():
'-h', '--help', action='help', help='Show this help message and exit'
)
rlist_parser = subparsers.add_parser(
rlist_parser = action_parsers.add_parser(
'rlist',
aliases=SUBPARSER_ALIASES['rlist'],
aliases=ACTION_ALIASES['rlist'],
help='List repository',
description='List the archives in a repository',
add_help=False,
@ -897,9 +963,9 @@ def make_parsers():
)
rlist_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
list_parser = subparsers.add_parser(
list_parser = action_parsers.add_parser(
'list',
aliases=SUBPARSER_ALIASES['list'],
aliases=ACTION_ALIASES['list'],
help='List archive',
description='List the files in an archive or search for a file across archives',
add_help=False,
@ -970,9 +1036,9 @@ def make_parsers():
)
list_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
rinfo_parser = subparsers.add_parser(
rinfo_parser = action_parsers.add_parser(
'rinfo',
aliases=SUBPARSER_ALIASES['rinfo'],
aliases=ACTION_ALIASES['rinfo'],
help='Show repository summary information such as disk space used',
description='Show repository summary information such as disk space used',
add_help=False,
@ -987,9 +1053,9 @@ def make_parsers():
)
rinfo_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
info_parser = subparsers.add_parser(
info_parser = action_parsers.add_parser(
'info',
aliases=SUBPARSER_ALIASES['info'],
aliases=ACTION_ALIASES['info'],
help='Show archive summary information such as disk space used',
description='Show archive summary information such as disk space used',
add_help=False,
@ -1048,9 +1114,9 @@ def make_parsers():
)
info_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
break_lock_parser = subparsers.add_parser(
break_lock_parser = action_parsers.add_parser(
'break-lock',
aliases=SUBPARSER_ALIASES['break-lock'],
aliases=ACTION_ALIASES['break-lock'],
help='Break the repository and cache locks left behind by Borg aborting',
description='Break Borg repository and cache locks left behind by Borg aborting',
add_help=False,
@ -1064,9 +1130,9 @@ def make_parsers():
'-h', '--help', action='help', help='Show this help message and exit'
)
borg_parser = subparsers.add_parser(
borg_parser = action_parsers.add_parser(
'borg',
aliases=SUBPARSER_ALIASES['borg'],
aliases=ACTION_ALIASES['borg'],
help='Run an arbitrary Borg command',
description="Run an arbitrary Borg command based on borgmatic's configuration",
add_help=False,
@ -1086,42 +1152,21 @@ def make_parsers():
)
borg_group.add_argument('-h', '--help', action='help', help='Show this help message and exit')
merged_subparsers = merge_subparsers(subparsers, config_subparsers)
return top_level_parser, merged_subparsers
def merge_subparsers(*subparsers):
'''
Merge multiple subparsers into a single subparser.
'''
merged_subparsers = argparse._SubParsersAction(
None, None, metavar=None, dest='merged', parser_class=None
)
for subparser in subparsers:
for name, subparser in subparser.choices.items():
merged_subparsers._name_parser_map[name] = subparser
return merged_subparsers
return top_level_parser, action_parsers
def parse_arguments(*unparsed_arguments):
'''
Given command-line arguments with which this script was invoked, parse the arguments and return
them as a dict mapping from subparser name (or "global") to an argparse.Namespace instance.
them as a dict mapping from action name (or "global") to an argparse.Namespace instance.
'''
top_level_parser, subparsers = make_parsers()
top_level_parser, action_parsers = make_parsers()
arguments, remaining_arguments = parse_subparser_arguments(
unparsed_arguments, subparsers.choices
arguments, remaining_arguments = parse_arguments_for_actions(
unparsed_arguments, action_parsers.choices
)
if (
'bootstrap' in arguments.keys()
and 'config' in arguments.keys()
and len(arguments.keys()) > 2
):
if 'bootstrap' in arguments.keys() and len(arguments.keys()) > 1:
raise ValueError(
'The bootstrap action cannot be combined with other actions. Please run it separately.'
)

View file

@ -621,7 +621,7 @@ def collect_configuration_run_summary_logs(configs, arguments):
)
yield logging.makeLogRecord(
dict(
levelno=logging.INFO,
levelno=logging.ANSWER,
levelname='INFO',
msg='Bootstrap successful',
)

View file

@ -1,3 +1,6 @@
import borgmatic.commands.arguments
def upgrade_message(language: str, upgrade_command: str, completion_file: str):
return f'''
Your {language} completions script is from a different version of borgmatic than is
@ -18,24 +21,16 @@ def available_actions(subparsers, current_action=None):
"bootstrap" is a sub-action for "config", then "bootstrap" should be able to follow a current
action of "config" but not "list".
'''
# Make a map from action name to the names of contained sub-actions.
actions_to_subactions = {
action: tuple(
subaction_name
for group_action in subparser._subparsers._group_actions
for subaction_name in group_action.choices.keys()
)
for action, subparser in subparsers.choices.items()
if subparser._subparsers
}
current_subactions = actions_to_subactions.get(current_action)
action_to_subactions = borgmatic.commands.arguments.get_subactions_for_actions(
subparsers.choices
)
current_subactions = action_to_subactions.get(current_action)
if current_subactions:
return current_subactions
all_subactions = set(
subaction for subactions in actions_to_subactions.values() for subaction in subactions
subaction for subactions in action_to_subactions.values() for subaction in subactions
)
return tuple(action for action in subparsers.choices.keys() if action not in all_subactions)

View file

@ -1,5 +1,3 @@
import argparse
import pytest
from flexmock import flexmock
@ -534,24 +532,62 @@ def test_parse_arguments_extract_with_check_only_extract_does_not_raise():
module.parse_arguments('extract', '--archive', 'name', 'check', '--only', 'extract')
def test_merging_two_subparser_collections_merges_their_choices():
top_level_parser = argparse.ArgumentParser()
def test_parse_arguments_bootstrap_without_config_errors():
flexmock(module.collect).should_receive('get_default_config_paths').and_return(['default'])
subparsers = top_level_parser.add_subparsers()
subparser1 = subparsers.add_parser('subparser1')
with pytest.raises(SystemExit) as exit:
module.parse_arguments('bootstrap')
subparser2 = subparsers.add_parser('subparser2')
subsubparsers = subparser2.add_subparsers()
subsubparser1 = subsubparsers.add_parser('subsubparser1')
assert exit.value.code == 2
merged_subparsers = argparse._SubParsersAction(
None, None, metavar=None, dest='merged', parser_class=None
)
merged_subparsers = module.merge_subparsers(subparsers, subsubparsers)
def test_parse_arguments_config_with_no_subaction_errors():
flexmock(module.collect).should_receive('get_default_config_paths').and_return(['default'])
assert merged_subparsers.choices == {
'subparser1': subparser1,
'subparser2': subparser2,
'subsubparser1': subsubparser1,
}
with pytest.raises(ValueError):
module.parse_arguments('config')
def test_parse_arguments_config_with_help_shows_config_help(capsys):
flexmock(module.collect).should_receive('get_default_config_paths').and_return(['default'])
with pytest.raises(SystemExit) as exit:
module.parse_arguments('config', '--help')
assert exit.value.code == 0
captured = capsys.readouterr()
assert 'global arguments:' not in captured.out
assert 'config arguments:' in captured.out
assert 'config sub-actions:' in captured.out
def test_parse_arguments_config_with_subaction_but_missing_flags_errors(capsys):
flexmock(module.collect).should_receive('get_default_config_paths').and_return(['default'])
with pytest.raises(SystemExit) as exit:
module.parse_arguments('config', 'bootstrap')
assert exit.value.code == 2
def test_parse_arguments_config_with_subaction_and_help_shows_subaction_help(capsys):
flexmock(module.collect).should_receive('get_default_config_paths').and_return(['default'])
with pytest.raises(SystemExit) as exit:
module.parse_arguments('config', 'bootstrap', '--help')
assert exit.value.code == 0
captured = capsys.readouterr()
assert 'config bootstrap arguments:' in captured.out
def test_parse_arguments_config_with_subaction_and_required_flags_does_not_raise(capsys):
flexmock(module.collect).should_receive('get_default_config_paths').and_return(['default'])
module.parse_arguments('config', 'bootstrap', '--repository', 'repo.borg')
def test_parse_arguments_config_with_subaction_and_global_flags_does_not_raise(capsys):
flexmock(module.collect).should_receive('get_default_config_paths').and_return(['default'])
module.parse_arguments('--verbosity', '1', 'config', 'bootstrap', '--repository', 'repo.borg')

View file

@ -6,175 +6,128 @@ from flexmock import flexmock
from borgmatic.commands import arguments as module
def test_parse_subparser_arguments_consumes_subparser_arguments_before_subparser_name():
action_namespace = flexmock(foo=True)
subparsers = {
'action': flexmock(parse_known_args=lambda arguments: (action_namespace, ['action'])),
'other': flexmock(),
}
def test_get_subaction_parsers_with_no_subactions_returns_empty_result():
assert module.get_subaction_parsers(flexmock(_subparsers=None)) == {}
arguments, remaining_arguments = module.parse_subparser_arguments(
('--foo', 'true', 'action'), subparsers
def test_get_subaction_parsers_with_subactions_returns_one_entry_per_subaction():
foo_parser = flexmock()
bar_parser = flexmock()
baz_parser = flexmock()
assert module.get_subaction_parsers(
flexmock(
_subparsers=flexmock(
_group_actions=(
flexmock(choices={'foo': foo_parser, 'bar': bar_parser}),
flexmock(choices={'baz': baz_parser}),
)
)
)
) == {'foo': foo_parser, 'bar': bar_parser, 'baz': baz_parser}
def test_get_subactions_for_actions_with_no_subactions_returns_empty_result():
assert module.get_subactions_for_actions({'action': flexmock(_subparsers=None)}) == {}
def test_get_subactions_for_actions_with_subactions_returns_one_entry_per_action():
assert module.get_subactions_for_actions(
{
'action': flexmock(
_subparsers=flexmock(
_group_actions=(
flexmock(choices={'foo': flexmock(), 'bar': flexmock()}),
flexmock(choices={'baz': flexmock()}),
)
)
),
'other': flexmock(
_subparsers=flexmock(_group_actions=(flexmock(choices={'quux': flexmock()}),))
),
}
) == {'action': ('foo', 'bar', 'baz'), 'other': ('quux',)}
def test_omit_values_colliding_with_action_names_drops_action_names_that_have__been_parsed_as_values():
assert module.omit_values_colliding_with_action_names(
('check', '--only', 'extract', '--some-list', 'borg'),
{'check': flexmock(only='extract', some_list=['borg'])},
) == ('check', '--only', '--some-list')
def test_parse_and_record_action_arguments_without_action_name_leaves_arguments_untouched():
unparsed_arguments = ('--foo', '--bar')
flexmock(module).should_receive('omit_values_colliding_with_action_names').and_return(
unparsed_arguments
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == []
def test_parse_subparser_arguments_consumes_subparser_arguments_after_subparser_name():
action_namespace = flexmock(foo=True)
subparsers = {
'action': flexmock(parse_known_args=lambda arguments: (action_namespace, ['action'])),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_subparser_arguments(
('action', '--foo', 'true'), subparsers
assert (
module.parse_and_record_action_arguments(
unparsed_arguments, flexmock(), flexmock(), 'action'
)
== unparsed_arguments
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == []
def test_parse_subparser_arguments_consumes_subparser_arguments_with_alias():
action_namespace = flexmock(foo=True)
action_subparser = flexmock(parse_known_args=lambda arguments: (action_namespace, ['action']))
subparsers = {
'action': action_subparser,
'-a': action_subparser,
'other': flexmock(),
'-o': flexmock(),
}
flexmock(module).SUBPARSER_ALIASES = {'action': ['-a'], 'other': ['-o']}
arguments, remaining_arguments = module.parse_subparser_arguments(
('-a', '--foo', 'true'), subparsers
def test_parse_and_record_action_arguments_updates_parsed_arguments_and_returns_remaining():
unparsed_arguments = ('action', '--foo', '--bar', '--verbosity', '1')
other_parsed_arguments = flexmock()
parsed_arguments = {'other': other_parsed_arguments}
action_parsed_arguments = flexmock()
flexmock(module).should_receive('omit_values_colliding_with_action_names').and_return(
unparsed_arguments
)
action_parser = flexmock()
flexmock(action_parser).should_receive('parse_known_args').and_return(
action_parsed_arguments, ('action', '--verbosity', '1')
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == []
assert module.parse_and_record_action_arguments(
unparsed_arguments, parsed_arguments, action_parser, 'action'
) == ('--verbosity', '1')
assert parsed_arguments == {'other': other_parsed_arguments, 'action': action_parsed_arguments}
def test_parse_subparser_arguments_consumes_multiple_subparser_arguments():
action_namespace = flexmock(foo=True)
other_namespace = flexmock(bar=3)
subparsers = {
'action': flexmock(
parse_known_args=lambda arguments: (action_namespace, ['action', '--bar', '3'])
),
'other': flexmock(parse_known_args=lambda arguments: (other_namespace, [])),
}
arguments, remaining_arguments = module.parse_subparser_arguments(
('action', '--foo', 'true', 'other', '--bar', '3'), subparsers
def test_parse_and_record_action_arguments_with_alias_updates_canonical_parsed_arguments():
unparsed_arguments = ('action', '--foo', '--bar', '--verbosity', '1')
other_parsed_arguments = flexmock()
parsed_arguments = {'other': other_parsed_arguments}
action_parsed_arguments = flexmock()
flexmock(module).should_receive('omit_values_colliding_with_action_names').and_return(
unparsed_arguments
)
action_parser = flexmock()
flexmock(action_parser).should_receive('parse_known_args').and_return(
action_parsed_arguments, ('action', '--verbosity', '1')
)
assert arguments == {'action': action_namespace, 'other': other_namespace}
assert remaining_arguments == []
assert module.parse_and_record_action_arguments(
unparsed_arguments, parsed_arguments, action_parser, 'action', canonical_name='doit'
) == ('--verbosity', '1')
assert parsed_arguments == {'other': other_parsed_arguments, 'doit': action_parsed_arguments}
def test_parse_subparser_arguments_respects_command_line_action_ordering():
other_namespace = flexmock()
action_namespace = flexmock(foo=True)
subparsers = {
'action': flexmock(
parse_known_args=lambda arguments: (action_namespace, ['action', '--foo', 'true'])
),
'other': flexmock(parse_known_args=lambda arguments: (other_namespace, ['other'])),
}
arguments, remaining_arguments = module.parse_subparser_arguments(
('other', '--foo', 'true', 'action'), subparsers
def test_parse_and_record_action_arguments_with_borg_action_consumes_arguments_after_action_name():
unparsed_arguments = ('--verbosity', '1', 'borg', 'list')
parsed_arguments = {}
borg_parsed_arguments = flexmock(options=flexmock())
flexmock(module).should_receive('omit_values_colliding_with_action_names').and_return(
unparsed_arguments
)
borg_parser = flexmock()
flexmock(borg_parser).should_receive('parse_known_args').and_return(
borg_parsed_arguments, ('--verbosity', '1', 'borg', 'list')
)
assert arguments == collections.OrderedDict(
[('other', other_namespace), ('action', action_namespace)]
)
assert remaining_arguments == []
def test_parse_subparser_arguments_applies_default_subparsers():
prune_namespace = flexmock()
compact_namespace = flexmock()
create_namespace = flexmock(progress=True)
check_namespace = flexmock()
subparsers = {
'prune': flexmock(
parse_known_args=lambda arguments: (prune_namespace, ['prune', '--progress'])
),
'compact': flexmock(parse_known_args=lambda arguments: (compact_namespace, [])),
'create': flexmock(parse_known_args=lambda arguments: (create_namespace, [])),
'check': flexmock(parse_known_args=lambda arguments: (check_namespace, [])),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_subparser_arguments(('--progress'), subparsers)
assert arguments == {
'prune': prune_namespace,
'compact': compact_namespace,
'create': create_namespace,
'check': check_namespace,
}
assert remaining_arguments == []
def test_parse_subparser_arguments_passes_through_unknown_arguments_before_subparser_name():
action_namespace = flexmock()
subparsers = {
'action': flexmock(
parse_known_args=lambda arguments: (action_namespace, ['action', '--verbosity', 'lots'])
),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_subparser_arguments(
('--verbosity', 'lots', 'action'), subparsers
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == ['--verbosity', 'lots']
def test_parse_subparser_arguments_passes_through_unknown_arguments_after_subparser_name():
action_namespace = flexmock()
subparsers = {
'action': flexmock(
parse_known_args=lambda arguments: (action_namespace, ['action', '--verbosity', 'lots'])
),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_subparser_arguments(
('action', '--verbosity', 'lots'), subparsers
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == ['--verbosity', 'lots']
def test_parse_subparser_arguments_parses_borg_options_and_skips_other_subparsers():
action_namespace = flexmock(options=[])
subparsers = {
'borg': flexmock(parse_known_args=lambda arguments: (action_namespace, ['borg', 'list'])),
'list': flexmock(),
}
arguments, remaining_arguments = module.parse_subparser_arguments(('borg', 'list'), subparsers)
assert arguments == {'borg': action_namespace}
assert arguments['borg'].options == ['list']
assert remaining_arguments == []
def test_parse_subparser_arguments_raises_error_when_no_subparser_is_specified():
action_namespace = flexmock(options=[])
subparsers = {
'config': flexmock(parse_known_args=lambda arguments: (action_namespace, ['config'])),
}
with pytest.raises(ValueError):
module.parse_subparser_arguments(('config',), subparsers)
assert module.parse_and_record_action_arguments(
unparsed_arguments,
parsed_arguments,
borg_parser,
'borg',
) == ('--verbosity', '1')
assert parsed_arguments == {'borg': borg_parsed_arguments}
assert borg_parsed_arguments.options == ('list',)
@pytest.mark.parametrize(
@ -187,9 +140,7 @@ def test_parse_subparser_arguments_raises_error_when_no_subparser_is_specified()
('prune', 'check', 'list', '--test-flag'),
('prune', 'check', 'extract', '--test-flag'),
),
[
'--test-flag',
],
('--test-flag',),
),
(
(
@ -198,12 +149,241 @@ def test_parse_subparser_arguments_raises_error_when_no_subparser_is_specified()
('prune', 'check', 'list'),
('prune', 'check', 'extract'),
),
[],
(),
),
((), []),
((), ()),
],
)
def test_get_unparsable_arguments_returns_remaining_arguments_that_no_subparser_can_parse(
def test_get_unparsable_arguments_returns_remaining_arguments_that_no_action_can_parse(
arguments, expected
):
assert module.get_unparsable_arguments(arguments) == expected
def test_parse_arguments_for_actions_consumes_action_arguments_before_action_name():
action_namespace = flexmock(foo=True)
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{action: action_namespace}
)
).and_return(())
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(())
action_parsers = {'action': flexmock(), 'other': flexmock()}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('--foo', 'true', 'action'), action_parsers
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == ()
def test_parse_arguments_for_actions_consumes_action_arguments_after_action_name():
action_namespace = flexmock(foo=True)
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{action: action_namespace}
)
).and_return(())
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(())
action_parsers = {'action': flexmock(), 'other': flexmock()}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('action', '--foo', 'true'), action_parsers
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == ()
def test_parse_arguments_for_actions_consumes_action_arguments_with_alias():
action_namespace = flexmock(foo=True)
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{canonical or action: action_namespace}
)
).and_return(())
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(())
action_parsers = {
'action': flexmock(),
'-a': flexmock(),
'other': flexmock(),
'-o': flexmock(),
}
flexmock(module).ACTION_ALIASES = {'action': ['-a'], 'other': ['-o']}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('-a', '--foo', 'true'), action_parsers
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == ()
def test_parse_arguments_for_actions_consumes_multiple_action_arguments():
action_namespace = flexmock(foo=True)
other_namespace = flexmock(bar=3)
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{action: action_namespace if action == 'action' else other_namespace}
)
).and_return(('other', '--bar', '3')).and_return('action', '--foo', 'true')
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(())
action_parsers = {
'action': flexmock(),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('action', '--foo', 'true', 'other', '--bar', '3'), action_parsers
)
assert arguments == {'action': action_namespace, 'other': other_namespace}
assert remaining_arguments == ()
def test_parse_arguments_for_actions_respects_command_line_action_ordering():
other_namespace = flexmock()
action_namespace = flexmock(foo=True)
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{action: other_namespace if action == 'other' else action_namespace}
)
).and_return(('action',)).and_return(('other', '--foo', 'true'))
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(())
action_parsers = {
'action': flexmock(),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('other', '--foo', 'true', 'action'), action_parsers
)
assert arguments == collections.OrderedDict(
[('other', other_namespace), ('action', action_namespace)]
)
assert remaining_arguments == ()
def test_parse_arguments_for_actions_applies_default_action_parsers():
namespaces = {
'prune': flexmock(),
'compact': flexmock(),
'create': flexmock(progress=True),
'check': flexmock(),
}
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{action: namespaces.get(action)}
)
).and_return(())
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(())
action_parsers = {
'prune': flexmock(),
'compact': flexmock(),
'create': flexmock(),
'check': flexmock(),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('--progress'), action_parsers
)
assert arguments == namespaces
assert remaining_arguments == ()
def test_parse_arguments_for_actions_passes_through_unknown_arguments_before_action_name():
action_namespace = flexmock()
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{action: action_namespace}
)
).and_return(('--verbosity', 'lots'))
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(('--verbosity', 'lots'))
action_parsers = {
'action': flexmock(),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('--verbosity', 'lots', 'action'), action_parsers
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == ('--verbosity', 'lots')
def test_parse_arguments_for_actions_passes_through_unknown_arguments_after_action_name():
action_namespace = flexmock()
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{action: action_namespace}
)
).and_return(('--verbosity', 'lots'))
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(('--verbosity', 'lots'))
action_parsers = {
'action': flexmock(),
'other': flexmock(),
}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('action', '--verbosity', 'lots'), action_parsers
)
assert arguments == {'action': action_namespace}
assert remaining_arguments == ('--verbosity', 'lots')
def test_parse_arguments_for_actions_with_borg_action_skips_other_action_parsers():
action_namespace = flexmock(options=[])
flexmock(module).should_receive('get_subaction_parsers').and_return({})
flexmock(module).should_receive('parse_and_record_action_arguments').replace_with(
lambda unparsed, parsed, parser, action, canonical=None: parsed.update(
{action: action_namespace}
)
).and_return(())
flexmock(module).should_receive('get_subactions_for_actions').and_return({})
flexmock(module).should_receive('get_unparsable_arguments').and_return(())
action_parsers = {
'borg': flexmock(),
'list': flexmock(),
}
arguments, remaining_arguments = module.parse_arguments_for_actions(
('borg', 'list'), action_parsers
)
assert arguments == {'borg': action_namespace}
assert remaining_arguments == ()
def test_parse_arguments_for_actions_raises_error_when_no_action_is_specified():
flexmock(module).should_receive('get_subaction_parsers').and_return({'bootstrap': [flexmock()]})
flexmock(module).should_receive('parse_and_record_action_arguments').and_return(flexmock())
flexmock(module).should_receive('get_subactions_for_actions').and_return(
{'config': ['bootstrap']}
)
action_parsers = {'config': flexmock()}
with pytest.raises(ValueError):
module.parse_arguments_for_actions(('config',), action_parsers)

View file

@ -1013,7 +1013,7 @@ def test_collect_configuration_run_summary_logs_info_for_success_with_bootstrap(
logs = tuple(
module.collect_configuration_run_summary_logs({'test.yaml': {}}, arguments=arguments)
)
assert {log.levelno for log in logs} == {logging.INFO}
assert {log.levelno for log in logs} == {logging.ANSWER}
def test_collect_configuration_run_summary_logs_error_on_bootstrap_failure():