Database dump hooks for MySQL/MariaDB (#228).

This commit is contained in:
Dan Helfman 2019-11-08 11:17:52 -08:00
parent e4f0a336c2
commit 427b57e2a9
13 changed files with 389 additions and 138 deletions

4
NEWS
View file

@ -1,3 +1,7 @@
1.4.9.dev0
* #228: Database dump hooks for MySQL/MariaDB, so you can easily dump your databases before backups
run.
1.4.8
* Monitor backups with Cronhub hook integration. See the documentation for more information:
https://torsion.org/borgmatic/docs/how-to/monitor-your-backups/#cronhub-hook

View file

@ -18,7 +18,7 @@ from borgmatic.borg import list as borg_list
from borgmatic.borg import prune as borg_prune
from borgmatic.commands.arguments import parse_arguments
from borgmatic.config import checks, collect, convert, validate
from borgmatic.hooks import command, cronhub, cronitor, healthchecks, postgresql
from borgmatic.hooks import command, cronhub, cronitor, healthchecks, mysql, postgresql
from borgmatic.logger import configure_logging, should_do_markup
from borgmatic.signals import configure_signals
from borgmatic.verbosity import verbosity_to_log_level
@ -72,6 +72,9 @@ def run_configuration(config_filename, config, arguments):
postgresql.dump_databases(
hooks.get('postgresql_databases'), config_filename, global_arguments.dry_run
)
mysql.dump_databases(
hooks.get('mysql_databases'), config_filename, global_arguments.dry_run
)
except (OSError, CalledProcessError) as error:
encountered_error = error
yield from make_error_log_records(

View file

@ -434,6 +434,54 @@ map:
directories at runtime, backed up, and then removed afterwards. Requires
pg_dump/pg_dumpall/pg_restore commands. See
https://www.postgresql.org/docs/current/app-pgdump.html for details.
mysql_databases:
seq:
- map:
name:
required: true
type: str
desc: |
Database name (required if using this hook). Or "all" to dump all
databases on the host.
example: users
hostname:
type: str
desc: |
Database hostname to connect to. Defaults to connecting via local
Unix socket.
example: database.example.org
port:
type: int
desc: Port to connect to. Defaults to 3306.
example: 3307
username:
type: str
desc: |
Username with which to connect to the database. Defaults to the
username of the current user.
example: dbuser
password:
type: str
desc: |
Password with which to connect to the database. Omitting a password
will only work if MySQL is configured to trust the configured
username without a password.
example: trustsome1
options:
type: str
desc: |
Additional mysqldump options to pass directly to the dump command,
without performing any validation on them. See
https://dev.mysql.com/doc/refman/8.0/en/mysqldump.html or
https://mariadb.com/kb/en/library/mysqldump/ for details.
example: --skip-comments
desc: |
List of one or more MySQL/MariaDB databases to dump before creating a backup,
run once per configuration file. The database dumps are added to your source
directories at runtime, backed up, and then removed afterwards. Requires
mysqldump/mysql commands (from either MySQL or MariaDB). See
https://dev.mysql.com/doc/refman/8.0/en/mysqldump.html or
https://mariadb.com/kb/en/library/mysqldump/ for details.
healthchecks:
type: str
desc: |

View file

@ -20,21 +20,17 @@ def exit_code_indicates_error(command, exit_code, error_on_warnings=False):
return bool(exit_code != 0)
def execute_and_log_output(
full_command, output_log_level, shell, environment, working_directory, error_on_warnings
):
def log_output(command, process, output_buffer, output_log_level, error_on_warnings):
'''
Given a command already executed, its process opened by subprocess.Popen(), and the process'
relevant output buffer (stderr or stdout), log its output with the requested log level.
Additionally, raise a CalledProcessException if the process exits with an error (or a warning,
if error on warnings is True).
'''
last_lines = []
process = subprocess.Popen(
full_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=shell,
env=environment,
cwd=working_directory,
)
while process.poll() is None:
line = process.stdout.readline().rstrip().decode()
line = output_buffer.readline().rstrip().decode()
if not line:
continue
@ -46,26 +42,25 @@ def execute_and_log_output(
logger.log(output_log_level, line)
remaining_output = process.stdout.read().rstrip().decode()
remaining_output = output_buffer.read().rstrip().decode()
if remaining_output: # pragma: no cover
logger.log(output_log_level, remaining_output)
exit_code = process.poll()
if exit_code_indicates_error(full_command, exit_code, error_on_warnings):
if exit_code_indicates_error(command, exit_code, error_on_warnings):
# If an error occurs, include its output in the raised exception so that we don't
# inadvertently hide error output.
if len(last_lines) == ERROR_OUTPUT_MAX_LINE_COUNT:
last_lines.insert(0, '...')
raise subprocess.CalledProcessError(
exit_code, ' '.join(full_command), '\n'.join(last_lines)
)
raise subprocess.CalledProcessError(exit_code, ' '.join(command), '\n'.join(last_lines))
def execute_command(
full_command,
output_log_level=logging.INFO,
output_file=None,
shell=False,
extra_environment=None,
working_directory=None,
@ -73,10 +68,12 @@ def execute_command(
):
'''
Execute the given command (a sequence of command/argument strings) and log its output at the
given log level. If output log level is None, instead capture and return the output. If
shell is True, execute the command within a shell. If an extra environment dict is given, then
use it to augment the current environment, and pass the result into the command. If a working
directory is given, use that as the present working directory when running the command.
given log level. If output log level is None, instead capture and return the output. If an
open output file object is given, then write stdout to the file and only log stderr (but only
if an output log level is set). If shell is True, execute the command within a shell. If an
extra environment dict is given, then use it to augment the current environment, and pass the
result into the command. If a working directory is given, use that as the present working
directory when running the command.
Raise subprocesses.CalledProcessError if an error occurs while running the command.
'''
@ -89,13 +86,20 @@ def execute_command(
)
return output.decode() if output is not None else None
else:
execute_and_log_output(
process = subprocess.Popen(
full_command,
output_log_level,
stdout=output_file or subprocess.PIPE,
stderr=subprocess.PIPE if output_file else subprocess.STDOUT,
shell=shell,
environment=environment,
working_directory=working_directory,
error_on_warnings=error_on_warnings,
env=environment,
cwd=working_directory,
)
log_output(
full_command,
process,
process.stderr if output_file else process.stdout,
output_log_level,
error_on_warnings,
)

View file

@ -0,0 +1,14 @@
import os
def make_database_dump_filename(dump_path, name, hostname=None):
'''
Based on the given dump directory path, database name, and hostname, return a filename to use
for the database dump. The hostname defaults to localhost.
Raise ValueError if the database name is invalid.
'''
if os.path.sep in name:
raise ValueError('Invalid database name {}'.format(name))
return os.path.join(os.path.expanduser(dump_path), hostname or 'localhost', name)

48
borgmatic/hooks/mysql.py Normal file
View file

@ -0,0 +1,48 @@
import logging
import os
from borgmatic.execute import execute_command
from borgmatic.hooks.database import make_database_dump_filename
DUMP_PATH = '~/.borgmatic/mysql_databases'
logger = logging.getLogger(__name__)
def dump_databases(databases, log_prefix, dry_run):
'''
Dump the given MySQL/MariaDB databases to disk. The databases are supplied as a sequence of
dicts, one dict describing each database as per the configuration schema. Use the given log
prefix in any log entries. If this is a dry run, then don't actually dump anything.
'''
if not databases:
logger.debug('{}: No MySQL databases configured'.format(log_prefix))
return
dry_run_label = ' (dry run; not actually dumping anything)' if dry_run else ''
logger.info('{}: Dumping MySQL databases{}'.format(log_prefix, dry_run_label))
for database in databases:
name = database['name']
dump_filename = make_database_dump_filename(DUMP_PATH, name, database.get('hostname'))
command = (
('mysqldump', '--add-drop-database')
+ (('--host', database['hostname']) if 'hostname' in database else ())
+ (('--port', str(database['port'])) if 'port' in database else ())
+ (('--protocol', 'tcp') if 'hostname' in database or 'port' in database else ())
+ (('--user', database['username']) if 'username' in database else ())
+ (tuple(database['options'].split(' ')) if 'options' in database else ())
+ (('--all-databases',) if name == 'all' else ('--databases', name))
)
extra_environment = {'MYSQL_PWD': database['password']} if 'password' in database else None
logger.debug(
'{}: Dumping MySQL database {} to {}{}'.format(
log_prefix, name, dump_filename, dry_run_label
)
)
if not dry_run:
os.makedirs(os.path.dirname(dump_filename), mode=0o700, exist_ok=True)
execute_command(
command, output_file=open(dump_filename, 'w'), extra_environment=extra_environment
)

View file

@ -3,23 +3,12 @@ import logging
import os
from borgmatic.execute import execute_command
from borgmatic.hooks.database import make_database_dump_filename
DUMP_PATH = '~/.borgmatic/postgresql_databases'
logger = logging.getLogger(__name__)
def make_database_dump_filename(name, hostname=None):
'''
Based on the given database name and hostname, return a filename to use for the database dump.
Raise ValueError if the database name is invalid.
'''
if os.path.sep in name:
raise ValueError('Invalid database name {}'.format(name))
return os.path.join(os.path.expanduser(DUMP_PATH), hostname or 'localhost', name)
def dump_databases(databases, log_prefix, dry_run):
'''
Dump the given PostgreSQL databases to disk. The databases are supplied as a sequence of dicts,
@ -36,7 +25,7 @@ def dump_databases(databases, log_prefix, dry_run):
for database in databases:
name = database['name']
dump_filename = make_database_dump_filename(name, database.get('hostname'))
dump_filename = make_database_dump_filename(DUMP_PATH, name, database.get('hostname'))
all_databases = bool(name == 'all')
command = (
('pg_dumpall' if all_databases else 'pg_dump', '--no-password', '--clean')
@ -50,7 +39,11 @@ def dump_databases(databases, log_prefix, dry_run):
)
extra_environment = {'PGPASSWORD': database['password']} if 'password' in database else None
logger.debug('{}: Dumping PostgreSQL database {}{}'.format(log_prefix, name, dry_run_label))
logger.debug(
'{}: Dumping PostgreSQL database {} to {}{}'.format(
log_prefix, name, dump_filename, dry_run_label
)
)
if not dry_run:
os.makedirs(os.path.dirname(dump_filename), mode=0o700, exist_ok=True)
execute_command(command, extra_environment=extra_environment)
@ -71,7 +64,9 @@ def remove_database_dumps(databases, log_prefix, dry_run):
logger.info('{}: Removing PostgreSQL database dumps{}'.format(log_prefix, dry_run_label))
for database in databases:
dump_filename = make_database_dump_filename(database['name'], database.get('hostname'))
dump_filename = make_database_dump_filename(
DUMP_PATH, database['name'], database.get('hostname')
)
logger.debug(
'{}: Removing PostgreSQL database dump {} from {}{}'.format(
@ -94,7 +89,7 @@ def make_database_dump_patterns(names):
dumps in an archive. An empty sequence of names indicates that the patterns should match all
dumps.
'''
return [make_database_dump_filename(name, hostname='*') for name in (names or ['*'])]
return [make_database_dump_filename(DUMP_PATH, name, hostname='*') for name in (names or ['*'])]
def convert_glob_patterns_to_borg_patterns(patterns):
@ -155,7 +150,9 @@ def restore_database_dumps(databases, log_prefix, dry_run):
dry_run_label = ' (dry run; not actually restoring anything)' if dry_run else ''
for database in databases:
dump_filename = make_database_dump_filename(database['name'], database.get('hostname'))
dump_filename = make_database_dump_filename(
DUMP_PATH, database['name'], database.get('hostname')
)
restore_command = (
('pg_restore', '--no-password', '--clean', '--if-exists', '--exit-on-error')
+ (('--host', database['hostname']) if 'hostname' in database else ())

View file

@ -1,6 +1,6 @@
from setuptools import find_packages, setup
VERSION = '1.4.8'
VERSION = '1.4.9.dev0'
setup(

View file

@ -7,40 +7,42 @@ from flexmock import flexmock
from borgmatic import execute as module
def test_execute_and_log_output_logs_each_line_separately():
def test_log_output_logs_each_line_separately():
flexmock(module.logger).should_receive('log').with_args(logging.INFO, 'hi').once()
flexmock(module.logger).should_receive('log').with_args(logging.INFO, 'there').once()
flexmock(module).should_receive('exit_code_indicates_error').and_return(False)
module.execute_and_log_output(
hi_process = subprocess.Popen(['echo', 'hi'], stdout=subprocess.PIPE)
module.log_output(
['echo', 'hi'],
hi_process,
hi_process.stdout,
output_log_level=logging.INFO,
shell=False,
environment=None,
working_directory=None,
error_on_warnings=False,
)
module.execute_and_log_output(
there_process = subprocess.Popen(['echo', 'there'], stdout=subprocess.PIPE)
module.log_output(
['echo', 'there'],
there_process,
there_process.stdout,
output_log_level=logging.INFO,
shell=False,
environment=None,
working_directory=None,
error_on_warnings=False,
)
def test_execute_and_log_output_includes_error_output_in_exception():
def test_log_output_includes_error_output_in_exception():
flexmock(module.logger).should_receive('log')
flexmock(module).should_receive('exit_code_indicates_error').and_return(True)
process = subprocess.Popen(['grep'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
with pytest.raises(subprocess.CalledProcessError) as error:
module.execute_and_log_output(
module.log_output(
['grep'],
process,
process.stdout,
output_log_level=logging.INFO,
shell=False,
environment=None,
working_directory=None,
error_on_warnings=False,
)
@ -48,18 +50,19 @@ def test_execute_and_log_output_includes_error_output_in_exception():
assert error.value.output
def test_execute_and_log_output_truncates_long_error_output():
def test_log_output_truncates_long_error_output():
flexmock(module).ERROR_OUTPUT_MAX_LINE_COUNT = 0
flexmock(module.logger).should_receive('log')
flexmock(module).should_receive('exit_code_indicates_error').and_return(True)
process = subprocess.Popen(['grep'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
with pytest.raises(subprocess.CalledProcessError) as error:
module.execute_and_log_output(
module.log_output(
['grep'],
process,
process.stdout,
output_log_level=logging.INFO,
shell=False,
environment=None,
working_directory=None,
error_on_warnings=False,
)
@ -67,15 +70,11 @@ def test_execute_and_log_output_truncates_long_error_output():
assert error.value.output.startswith('...')
def test_execute_and_log_output_with_no_output_logs_nothing():
def test_log_output_with_no_output_logs_nothing():
flexmock(module.logger).should_receive('log').never()
flexmock(module).should_receive('exit_code_indicates_error').and_return(False)
module.execute_and_log_output(
['true'],
output_log_level=logging.INFO,
shell=False,
environment=None,
working_directory=None,
error_on_warnings=False,
process = subprocess.Popen(['true'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
module.log_output(
['true'], process, process.stdout, output_log_level=logging.INFO, error_on_warnings=False
)

View file

@ -0,0 +1,26 @@
import pytest
from flexmock import flexmock
from borgmatic.hooks import database as module
def test_make_database_dump_filename_uses_name_and_hostname():
flexmock(module.os.path).should_receive('expanduser').and_return('databases')
assert (
module.make_database_dump_filename('databases', 'test', 'hostname')
== 'databases/hostname/test'
)
def test_make_database_dump_filename_without_hostname_defaults_to_localhost():
flexmock(module.os.path).should_receive('expanduser').and_return('databases')
assert module.make_database_dump_filename('databases', 'test') == 'databases/localhost/test'
def test_make_database_dump_filename_with_invalid_name_raises():
flexmock(module.os.path).should_receive('expanduser').and_return('databases')
with pytest.raises(ValueError):
module.make_database_dump_filename('databases', 'invalid/name')

View file

@ -0,0 +1,122 @@
import sys
from flexmock import flexmock
from borgmatic.hooks import mysql as module
def test_dump_databases_runs_mysqldump_for_each_database():
databases = [{'name': 'foo'}, {'name': 'bar'}]
output_file = flexmock()
flexmock(module).should_receive('make_database_dump_filename').and_return(
'databases/localhost/foo'
).and_return('databases/localhost/bar')
flexmock(module.os).should_receive('makedirs')
flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
for name in ('foo', 'bar'):
flexmock(module).should_receive('execute_command').with_args(
('mysqldump', '--add-drop-database', '--databases', name),
output_file=output_file,
extra_environment=None,
).once()
module.dump_databases(databases, 'test.yaml', dry_run=False)
def test_dump_databases_with_dry_run_skips_mysqldump():
databases = [{'name': 'foo'}, {'name': 'bar'}]
flexmock(module).should_receive('make_database_dump_filename').and_return(
'databases/localhost/foo'
).and_return('databases/localhost/bar')
flexmock(module.os).should_receive('makedirs').never()
flexmock(module).should_receive('execute_command').never()
module.dump_databases(databases, 'test.yaml', dry_run=True)
def test_dump_databases_without_databases_does_not_raise():
module.dump_databases([], 'test.yaml', dry_run=False)
def test_dump_databases_runs_mysqldump_with_hostname_and_port():
databases = [{'name': 'foo', 'hostname': 'database.example.org', 'port': 5433}]
output_file = flexmock()
flexmock(module).should_receive('make_database_dump_filename').and_return(
'databases/database.example.org/foo'
)
flexmock(module.os).should_receive('makedirs')
flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
flexmock(module).should_receive('execute_command').with_args(
(
'mysqldump',
'--add-drop-database',
'--host',
'database.example.org',
'--port',
'5433',
'--protocol',
'tcp',
'--databases',
'foo',
),
output_file=output_file,
extra_environment=None,
).once()
module.dump_databases(databases, 'test.yaml', dry_run=False)
def test_dump_databases_runs_mysqldump_with_username_and_password():
databases = [{'name': 'foo', 'username': 'root', 'password': 'trustsome1'}]
output_file = flexmock()
flexmock(module).should_receive('make_database_dump_filename').and_return(
'databases/localhost/foo'
)
flexmock(module.os).should_receive('makedirs')
flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
flexmock(module).should_receive('execute_command').with_args(
('mysqldump', '--add-drop-database', '--user', 'root', '--databases', 'foo'),
output_file=output_file,
extra_environment={'MYSQL_PWD': 'trustsome1'},
).once()
module.dump_databases(databases, 'test.yaml', dry_run=False)
def test_dump_databases_runs_mysqldump_with_options():
databases = [{'name': 'foo', 'options': '--stuff=such'}]
output_file = flexmock()
flexmock(module).should_receive('make_database_dump_filename').and_return(
'databases/localhost/foo'
)
flexmock(module.os).should_receive('makedirs')
flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
flexmock(module).should_receive('execute_command').with_args(
('mysqldump', '--add-drop-database', '--stuff=such', '--databases', 'foo'),
output_file=output_file,
extra_environment=None,
).once()
module.dump_databases(databases, 'test.yaml', dry_run=False)
def test_dump_databases_runs_mysqldump_for_all_databases():
databases = [{'name': 'all'}]
output_file = flexmock()
flexmock(module).should_receive('make_database_dump_filename').and_return(
'databases/localhost/all'
)
flexmock(module.os).should_receive('makedirs')
flexmock(sys.modules['builtins']).should_receive('open').and_return(output_file)
flexmock(module).should_receive('execute_command').with_args(
('mysqldump', '--add-drop-database', '--all-databases'),
output_file=output_file,
extra_environment=None,
).once()
module.dump_databases(databases, 'test.yaml', dry_run=False)

View file

@ -4,25 +4,6 @@ from flexmock import flexmock
from borgmatic.hooks import postgresql as module
def test_make_database_dump_filename_uses_name_and_hostname():
flexmock(module.os.path).should_receive('expanduser').and_return('databases')
assert module.make_database_dump_filename('test', 'hostname') == 'databases/hostname/test'
def test_make_database_dump_filename_without_hostname_defaults_to_localhost():
flexmock(module.os.path).should_receive('expanduser').and_return('databases')
assert module.make_database_dump_filename('test') == 'databases/localhost/test'
def test_make_database_dump_filename_with_invalid_name_raises():
flexmock(module.os.path).should_receive('expanduser').and_return('databases')
with pytest.raises(ValueError):
module.make_database_dump_filename('invalid/name')
def test_dump_databases_runs_pg_dump_for_each_database():
databases = [{'name': 'foo'}, {'name': 'bar'}]
flexmock(module).should_receive('make_database_dump_filename').and_return(
@ -53,7 +34,7 @@ def test_dump_databases_with_dry_run_skips_pg_dump():
flexmock(module).should_receive('make_database_dump_filename').and_return(
'databases/localhost/foo'
).and_return('databases/localhost/bar')
flexmock(module.os).should_receive('makedirs')
flexmock(module.os).should_receive('makedirs').never()
flexmock(module).should_receive('execute_command').never()
module.dump_databases(databases, 'test.yaml', dry_run=True)
@ -199,6 +180,7 @@ def test_remove_database_dumps_removes_dump_for_each_database():
def test_remove_database_dumps_with_dry_run_skips_removal():
databases = [{'name': 'foo'}, {'name': 'bar'}]
flexmock(module.os).should_receive('rmdir').never()
flexmock(module.os).should_receive('remove').never()
module.remove_database_dumps(databases, 'test.yaml', dry_run=True)
@ -220,9 +202,9 @@ def test_make_database_dump_patterns_converts_names_to_glob_paths():
def test_make_database_dump_patterns_treats_empty_names_as_matching_all_databases():
flexmock(module).should_receive('make_database_dump_filename').with_args('*', '*').and_return(
'databases/*/*'
)
flexmock(module).should_receive('make_database_dump_filename').with_args(
module.DUMP_PATH, '*', '*'
).and_return('databases/*/*')
assert module.make_database_dump_patterns(()) == ['databases/*/*']

View file

@ -1,5 +1,3 @@
import logging
import pytest
from flexmock import flexmock
@ -47,31 +45,52 @@ def test_exit_code_indicates_error_with_non_borg_success_is_false():
def test_execute_command_calls_full_command():
full_command = ['foo', 'bar']
flexmock(module.os, environ={'a': 'b'})
flexmock(module).should_receive('execute_and_log_output').with_args(
flexmock(module.subprocess).should_receive('Popen').with_args(
full_command,
output_log_level=logging.INFO,
stdout=module.subprocess.PIPE,
stderr=module.subprocess.STDOUT,
shell=False,
environment=None,
working_directory=None,
error_on_warnings=False,
).once()
env=None,
cwd=None,
).and_return(flexmock(stdout=None)).once()
flexmock(module).should_receive('log_output')
output = module.execute_command(full_command)
assert output is None
def test_execute_command_calls_full_command_with_output_file():
full_command = ['foo', 'bar']
output_file = flexmock()
flexmock(module.os, environ={'a': 'b'})
flexmock(module.subprocess).should_receive('Popen').with_args(
full_command,
stdout=output_file,
stderr=module.subprocess.PIPE,
shell=False,
env=None,
cwd=None,
).and_return(flexmock(stderr=None)).once()
flexmock(module).should_receive('log_output')
output = module.execute_command(full_command, output_file=output_file)
assert output is None
def test_execute_command_calls_full_command_with_shell():
full_command = ['foo', 'bar']
flexmock(module.os, environ={'a': 'b'})
flexmock(module).should_receive('execute_and_log_output').with_args(
flexmock(module.subprocess).should_receive('Popen').with_args(
full_command,
output_log_level=logging.INFO,
stdout=module.subprocess.PIPE,
stderr=module.subprocess.STDOUT,
shell=True,
environment=None,
working_directory=None,
error_on_warnings=False,
).once()
env=None,
cwd=None,
).and_return(flexmock(stdout=None)).once()
flexmock(module).should_receive('log_output')
output = module.execute_command(full_command, shell=True)
@ -81,14 +100,15 @@ def test_execute_command_calls_full_command_with_shell():
def test_execute_command_calls_full_command_with_extra_environment():
full_command = ['foo', 'bar']
flexmock(module.os, environ={'a': 'b'})
flexmock(module).should_receive('execute_and_log_output').with_args(
flexmock(module.subprocess).should_receive('Popen').with_args(
full_command,
output_log_level=logging.INFO,
stdout=module.subprocess.PIPE,
stderr=module.subprocess.STDOUT,
shell=False,
environment={'a': 'b', 'c': 'd'},
working_directory=None,
error_on_warnings=False,
).once()
env={'a': 'b', 'c': 'd'},
cwd=None,
).and_return(flexmock(stdout=None)).once()
flexmock(module).should_receive('log_output')
output = module.execute_command(full_command, extra_environment={'c': 'd'})
@ -98,37 +118,21 @@ def test_execute_command_calls_full_command_with_extra_environment():
def test_execute_command_calls_full_command_with_working_directory():
full_command = ['foo', 'bar']
flexmock(module.os, environ={'a': 'b'})
flexmock(module).should_receive('execute_and_log_output').with_args(
flexmock(module.subprocess).should_receive('Popen').with_args(
full_command,
output_log_level=logging.INFO,
stdout=module.subprocess.PIPE,
stderr=module.subprocess.STDOUT,
shell=False,
environment=None,
working_directory='/working',
error_on_warnings=False,
).once()
env=None,
cwd='/working',
).and_return(flexmock(stdout=None)).once()
flexmock(module).should_receive('log_output')
output = module.execute_command(full_command, working_directory='/working')
assert output is None
def test_execute_command_calls_full_command_with_error_on_warnings():
full_command = ['foo', 'bar']
flexmock(module.os, environ={'a': 'b'})
flexmock(module).should_receive('execute_and_log_output').with_args(
full_command,
output_log_level=logging.INFO,
shell=False,
environment=None,
working_directory=None,
error_on_warnings=True,
).once()
output = module.execute_command(full_command, error_on_warnings=True)
assert output is None
def test_execute_command_captures_output():
full_command = ['foo', 'bar']
expected_output = '[]'