diff --git a/pre_commit/clientlib.py b/pre_commit/clientlib.py index 4570e107..2fa7b153 100644 --- a/pre_commit/clientlib.py +++ b/pre_commit/clientlib.py @@ -56,6 +56,7 @@ MANIFEST_HOOK_DICT = cfgv.Map( cfgv.Optional('language_version', cfgv.check_string, 'default'), cfgv.Optional('log_file', cfgv.check_string, ''), cfgv.Optional('minimum_pre_commit_version', cfgv.check_string, '0'), + cfgv.Optional('require_serial', cfgv.check_bool, False), cfgv.Optional('stages', cfgv.check_array(cfgv.check_one_of(C.STAGES)), []), cfgv.Optional('verbose', cfgv.check_bool, False), ) diff --git a/pre_commit/languages/docker.py b/pre_commit/languages/docker.py index f3c46a33..bfdd3585 100644 --- a/pre_commit/languages/docker.py +++ b/pre_commit/languages/docker.py @@ -9,7 +9,6 @@ from pre_commit.languages import helpers from pre_commit.util import CalledProcessError from pre_commit.util import clean_path_on_failure from pre_commit.util import cmd_output -from pre_commit.xargs import xargs ENVIRONMENT_DIR = 'docker' @@ -97,4 +96,4 @@ def run_hook(prefix, hook, file_args): # pragma: windows no cover entry_tag = ('--entrypoint', entry_exe, docker_tag(prefix)) cmd = docker_cmd() + entry_tag + cmd_rest - return xargs(cmd, file_args) + return helpers.run_xargs(hook, cmd, file_args) diff --git a/pre_commit/languages/docker_image.py b/pre_commit/languages/docker_image.py index 6301970c..e7ebad7f 100644 --- a/pre_commit/languages/docker_image.py +++ b/pre_commit/languages/docker_image.py @@ -4,7 +4,6 @@ from __future__ import unicode_literals from pre_commit.languages import helpers from pre_commit.languages.docker import assert_docker_available from pre_commit.languages.docker import docker_cmd -from pre_commit.xargs import xargs ENVIRONMENT_DIR = None @@ -16,4 +15,4 @@ install_environment = helpers.no_install def run_hook(prefix, hook, file_args): # pragma: windows no cover assert_docker_available() cmd = docker_cmd() + helpers.to_cmd(hook) - return xargs(cmd, file_args) + return helpers.run_xargs(hook, cmd, file_args) diff --git a/pre_commit/languages/golang.py b/pre_commit/languages/golang.py index 14354e0c..09e3476c 100644 --- a/pre_commit/languages/golang.py +++ b/pre_commit/languages/golang.py @@ -11,7 +11,6 @@ from pre_commit.languages import helpers from pre_commit.util import clean_path_on_failure from pre_commit.util import cmd_output from pre_commit.util import rmtree -from pre_commit.xargs import xargs ENVIRONMENT_DIR = 'golangenv' @@ -81,4 +80,4 @@ def install_environment(prefix, version, additional_dependencies): def run_hook(prefix, hook, file_args): with in_env(prefix): - return xargs(helpers.to_cmd(hook), file_args) + return helpers.run_xargs(hook, helpers.to_cmd(hook), file_args) diff --git a/pre_commit/languages/helpers.py b/pre_commit/languages/helpers.py index ddbe2e80..aa5a5d13 100644 --- a/pre_commit/languages/helpers.py +++ b/pre_commit/languages/helpers.py @@ -1,8 +1,11 @@ from __future__ import unicode_literals +import multiprocessing +import os import shlex from pre_commit.util import cmd_output +from pre_commit.xargs import xargs def run_setup_cmd(prefix, cmd): @@ -45,3 +48,21 @@ def basic_healthy(prefix, language_version): def no_install(prefix, version, additional_dependencies): raise AssertionError('This type is not installable') + + +def target_concurrency(hook): + if hook['require_serial'] or 'PRE_COMMIT_NO_CONCURRENCY' in os.environ: + return 1 + else: + # Travis appears to have a bunch of CPUs, but we can't use them all. + if 'TRAVIS' in os.environ: + return 2 + else: + try: + return multiprocessing.cpu_count() + except NotImplementedError: + return 1 + + +def run_xargs(hook, cmd, file_args): + return xargs(cmd, file_args, target_concurrency=target_concurrency(hook)) diff --git a/pre_commit/languages/node.py b/pre_commit/languages/node.py index 7b464930..8e5dc7e5 100644 --- a/pre_commit/languages/node.py +++ b/pre_commit/languages/node.py @@ -10,7 +10,6 @@ from pre_commit.languages import helpers from pre_commit.languages.python import bin_dir from pre_commit.util import clean_path_on_failure from pre_commit.util import cmd_output -from pre_commit.xargs import xargs ENVIRONMENT_DIR = 'node_env' @@ -71,4 +70,4 @@ def install_environment(prefix, version, additional_dependencies): def run_hook(prefix, hook, file_args): with in_env(prefix, hook['language_version']): - return xargs(helpers.to_cmd(hook), file_args) + return helpers.run_xargs(hook, helpers.to_cmd(hook), file_args) diff --git a/pre_commit/languages/python.py b/pre_commit/languages/python.py index ee7b2a4f..4b7580a4 100644 --- a/pre_commit/languages/python.py +++ b/pre_commit/languages/python.py @@ -12,7 +12,6 @@ from pre_commit.parse_shebang import find_executable from pre_commit.util import CalledProcessError from pre_commit.util import clean_path_on_failure from pre_commit.util import cmd_output -from pre_commit.xargs import xargs ENVIRONMENT_DIR = 'py_env' @@ -127,7 +126,7 @@ def py_interface(_dir, _make_venv): def run_hook(prefix, hook, file_args): with in_env(prefix, hook['language_version']): - return xargs(helpers.to_cmd(hook), file_args) + return helpers.run_xargs(hook, helpers.to_cmd(hook), file_args) def install_environment(prefix, version, additional_dependencies): additional_dependencies = tuple(additional_dependencies) diff --git a/pre_commit/languages/ruby.py b/pre_commit/languages/ruby.py index bef3fe38..0330ae8d 100644 --- a/pre_commit/languages/ruby.py +++ b/pre_commit/languages/ruby.py @@ -12,7 +12,6 @@ from pre_commit.languages import helpers from pre_commit.util import CalledProcessError from pre_commit.util import clean_path_on_failure from pre_commit.util import resource_bytesio -from pre_commit.xargs import xargs ENVIRONMENT_DIR = 'rbenv' @@ -126,4 +125,4 @@ def install_environment( def run_hook(prefix, hook, file_args): # pragma: windows no cover with in_env(prefix, hook['language_version']): - return xargs(helpers.to_cmd(hook), file_args) + return helpers.run_xargs(hook, helpers.to_cmd(hook), file_args) diff --git a/pre_commit/languages/rust.py b/pre_commit/languages/rust.py index 41053f88..8a5a0704 100644 --- a/pre_commit/languages/rust.py +++ b/pre_commit/languages/rust.py @@ -10,7 +10,6 @@ from pre_commit.envcontext import Var from pre_commit.languages import helpers from pre_commit.util import clean_path_on_failure from pre_commit.util import cmd_output -from pre_commit.xargs import xargs ENVIRONMENT_DIR = 'rustenv' @@ -91,4 +90,4 @@ def install_environment(prefix, version, additional_dependencies): def run_hook(prefix, hook, file_args): with in_env(prefix): - return xargs(helpers.to_cmd(hook), file_args) + return helpers.run_xargs(hook, helpers.to_cmd(hook), file_args) diff --git a/pre_commit/languages/script.py b/pre_commit/languages/script.py index 551b4d80..809efb85 100644 --- a/pre_commit/languages/script.py +++ b/pre_commit/languages/script.py @@ -1,7 +1,6 @@ from __future__ import unicode_literals from pre_commit.languages import helpers -from pre_commit.xargs import xargs ENVIRONMENT_DIR = None @@ -13,4 +12,4 @@ install_environment = helpers.no_install def run_hook(prefix, hook, file_args): cmd = helpers.to_cmd(hook) cmd = (prefix.path(cmd[0]),) + cmd[1:] - return xargs(cmd, file_args) + return helpers.run_xargs(hook, cmd, file_args) diff --git a/pre_commit/languages/swift.py b/pre_commit/languages/swift.py index 2863fbee..c282de5d 100644 --- a/pre_commit/languages/swift.py +++ b/pre_commit/languages/swift.py @@ -8,7 +8,6 @@ from pre_commit.envcontext import Var from pre_commit.languages import helpers from pre_commit.util import clean_path_on_failure from pre_commit.util import cmd_output -from pre_commit.xargs import xargs ENVIRONMENT_DIR = 'swift_env' get_default_version = helpers.basic_get_default_version @@ -53,4 +52,4 @@ def install_environment( def run_hook(prefix, hook, file_args): # pragma: windows no cover with in_env(prefix): - return xargs(helpers.to_cmd(hook), file_args) + return helpers.run_xargs(hook, helpers.to_cmd(hook), file_args) diff --git a/pre_commit/languages/system.py b/pre_commit/languages/system.py index 84cd1fe4..e590d486 100644 --- a/pre_commit/languages/system.py +++ b/pre_commit/languages/system.py @@ -1,7 +1,6 @@ from __future__ import unicode_literals from pre_commit.languages import helpers -from pre_commit.xargs import xargs ENVIRONMENT_DIR = None @@ -11,4 +10,4 @@ install_environment = helpers.no_install def run_hook(prefix, hook, file_args): - return xargs(helpers.to_cmd(hook), file_args) + return helpers.run_xargs(hook, helpers.to_cmd(hook), file_args) diff --git a/pre_commit/xargs.py b/pre_commit/xargs.py index 2fe8a454..3b4a25f9 100644 --- a/pre_commit/xargs.py +++ b/pre_commit/xargs.py @@ -1,8 +1,12 @@ from __future__ import absolute_import +from __future__ import division from __future__ import unicode_literals +import contextlib +import math import sys +import concurrent.futures import six from pre_commit import parse_shebang @@ -34,8 +38,13 @@ class ArgumentTooLongError(RuntimeError): pass -def partition(cmd, varargs, _max_length=None): +def partition(cmd, varargs, target_concurrency, _max_length=None): _max_length = _max_length or _get_platform_max_length() + + # Generally, we try to partition evenly into at least `target_concurrency` + # partitions, but we don't want a bunch of tiny partitions. + max_args = max(4, math.ceil(len(varargs) / target_concurrency)) + cmd = tuple(cmd) ret = [] @@ -48,7 +57,10 @@ def partition(cmd, varargs, _max_length=None): arg = varargs.pop() arg_length = _command_length(arg) + 1 - if total_length + arg_length <= _max_length: + if ( + total_length + arg_length <= _max_length + and len(ret_cmd) < max_args + ): ret_cmd.append(arg) total_length += arg_length elif not ret_cmd: @@ -65,12 +77,23 @@ def partition(cmd, varargs, _max_length=None): return tuple(ret) +@contextlib.contextmanager +def _thread_mapper(maxsize): + if maxsize == 1: + yield map + else: + with concurrent.futures.ThreadPoolExecutor(maxsize) as ex: + yield ex.map + + def xargs(cmd, varargs, **kwargs): """A simplified implementation of xargs. negate: Make nonzero successful and zero a failure + target_concurrency: Target number of partitions to run concurrently """ negate = kwargs.pop('negate', False) + target_concurrency = kwargs.pop('target_concurrency', 1) retcode = 0 stdout = b'' stderr = b'' @@ -80,22 +103,28 @@ def xargs(cmd, varargs, **kwargs): except parse_shebang.ExecutableNotFoundError as e: return e.to_output() - for run_cmd in partition(cmd, varargs, **kwargs): - proc_retcode, proc_out, proc_err = cmd_output( - *run_cmd, encoding=None, retcode=None - ) - # This is *slightly* too clever so I'll explain it. - # First the xor boolean table: - # T | F | - # +-------+ - # T | F | T | - # --+-------+ - # F | T | F | - # --+-------+ - # When negate is True, it has the effect of flipping the return code - # Otherwise, the retuncode is unchanged - retcode |= bool(proc_retcode) ^ negate - stdout += proc_out - stderr += proc_err + partitions = partition(cmd, varargs, target_concurrency, **kwargs) + + def run_cmd_partition(run_cmd): + return cmd_output(*run_cmd, encoding=None, retcode=None) + + threads = min(len(partitions), target_concurrency) + with _thread_mapper(threads) as thread_map: + results = thread_map(run_cmd_partition, partitions) + + for proc_retcode, proc_out, proc_err in results: + # This is *slightly* too clever so I'll explain it. + # First the xor boolean table: + # T | F | + # +-------+ + # T | F | T | + # --+-------+ + # F | T | F | + # --+-------+ + # When negate is True, it has the effect of flipping the return + # code. Otherwise, the returncode is unchanged. + retcode |= bool(proc_retcode) ^ negate + stdout += proc_out + stderr += proc_err return retcode, stdout, stderr diff --git a/setup.py b/setup.py index 7c0a958f..dd3eb425 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,10 @@ setup( 'toml', 'virtualenv', ], - extras_require={':python_version<"3.7"': ['importlib-resources']}, + extras_require={ + ':python_version<"3.2"': ['futures'], + ':python_version<"3.7"': ['importlib-resources'], + }, entry_points={ 'console_scripts': [ 'pre-commit = pre_commit.main:main', diff --git a/tests/languages/helpers_test.py b/tests/languages/helpers_test.py index ada2095b..e7bd4702 100644 --- a/tests/languages/helpers_test.py +++ b/tests/languages/helpers_test.py @@ -1,8 +1,11 @@ from __future__ import absolute_import from __future__ import unicode_literals +import multiprocessing +import os import sys +import mock import pytest from pre_commit.languages import helpers @@ -28,3 +31,34 @@ def test_failed_setup_command_does_not_unicode_error(): # an assertion that this does not raise `UnicodeError` with pytest.raises(CalledProcessError): helpers.run_setup_cmd(Prefix('.'), (sys.executable, '-c', script)) + + +def test_target_concurrency_normal(): + with mock.patch.object(multiprocessing, 'cpu_count', return_value=123): + with mock.patch.dict(os.environ, {}, clear=True): + assert helpers.target_concurrency({'require_serial': False}) == 123 + + +def test_target_concurrency_cpu_count_require_serial_true(): + with mock.patch.dict(os.environ, {}, clear=True): + assert helpers.target_concurrency({'require_serial': True}) == 1 + + +def test_target_concurrency_testing_env_var(): + with mock.patch.dict( + os.environ, {'PRE_COMMIT_NO_CONCURRENCY': '1'}, clear=True, + ): + assert helpers.target_concurrency({'require_serial': False}) == 1 + + +def test_target_concurrency_on_travis(): + with mock.patch.dict(os.environ, {'TRAVIS': '1'}, clear=True): + assert helpers.target_concurrency({'require_serial': False}) == 2 + + +def test_target_concurrency_cpu_count_not_implemented(): + with mock.patch.object( + multiprocessing, 'cpu_count', side_effect=NotImplementedError, + ): + with mock.patch.dict(os.environ, {}, clear=True): + assert helpers.target_concurrency({'require_serial': False}) == 1 diff --git a/tests/repository_test.py b/tests/repository_test.py index 8d578f39..f1b0f6e0 100644 --- a/tests/repository_test.py +++ b/tests/repository_test.py @@ -837,6 +837,7 @@ def test_manifest_hooks(tempdir_factory, store): 'minimum_pre_commit_version': '0', 'name': 'Bash hook', 'pass_filenames': True, + 'require_serial': False, 'stages': [], 'types': ['file'], 'exclude_types': [], diff --git a/tests/xargs_test.py b/tests/xargs_test.py index bf685e16..ed65ed46 100644 --- a/tests/xargs_test.py +++ b/tests/xargs_test.py @@ -3,7 +3,9 @@ from __future__ import absolute_import from __future__ import unicode_literals import sys +import time +import concurrent.futures import mock import pytest import six @@ -35,11 +37,11 @@ def linux_mock(): def test_partition_trivial(): - assert xargs.partition(('cmd',), ()) == (('cmd',),) + assert xargs.partition(('cmd',), (), 1) == (('cmd',),) def test_partition_simple(): - assert xargs.partition(('cmd',), ('foo',)) == (('cmd', 'foo'),) + assert xargs.partition(('cmd',), ('foo',), 1) == (('cmd', 'foo'),) def test_partition_limits(): @@ -53,6 +55,7 @@ def test_partition_limits(): '.' * 5, '.' * 6, ), + 1, _max_length=20, ) assert ret == ( @@ -67,21 +70,21 @@ def test_partition_limit_win32_py3(win32_py3_mock): cmd = ('ninechars',) # counted as half because of utf-16 encode varargs = ('😑' * 5,) - ret = xargs.partition(cmd, varargs, _max_length=20) + ret = xargs.partition(cmd, varargs, 1, _max_length=20) assert ret == (cmd + varargs,) def test_partition_limit_win32_py2(win32_py2_mock): cmd = ('ninechars',) varargs = ('😑' * 5,) # 4 bytes * 5 - ret = xargs.partition(cmd, varargs, _max_length=30) + ret = xargs.partition(cmd, varargs, 1, _max_length=30) assert ret == (cmd + varargs,) def test_partition_limit_linux(linux_mock): cmd = ('ninechars',) varargs = ('😑' * 5,) - ret = xargs.partition(cmd, varargs, _max_length=30) + ret = xargs.partition(cmd, varargs, 1, _max_length=30) assert ret == (cmd + varargs,) @@ -89,12 +92,39 @@ def test_argument_too_long_with_large_unicode(linux_mock): cmd = ('ninechars',) varargs = ('😑' * 10,) # 4 bytes * 10 with pytest.raises(xargs.ArgumentTooLongError): - xargs.partition(cmd, varargs, _max_length=20) + xargs.partition(cmd, varargs, 1, _max_length=20) + + +def test_partition_target_concurrency(): + ret = xargs.partition( + ('foo',), ('A',) * 22, + 4, + _max_length=50, + ) + assert ret == ( + ('foo',) + ('A',) * 6, + ('foo',) + ('A',) * 6, + ('foo',) + ('A',) * 6, + ('foo',) + ('A',) * 4, + ) + + +def test_partition_target_concurrency_wont_make_tiny_partitions(): + ret = xargs.partition( + ('foo',), ('A',) * 10, + 4, + _max_length=50, + ) + assert ret == ( + ('foo',) + ('A',) * 4, + ('foo',) + ('A',) * 4, + ('foo',) + ('A',) * 2, + ) def test_argument_too_long(): with pytest.raises(xargs.ArgumentTooLongError): - xargs.partition(('a' * 5,), ('a' * 5,), _max_length=10) + xargs.partition(('a' * 5,), ('a' * 5,), 1, _max_length=10) def test_xargs_smoke(): @@ -132,3 +162,34 @@ def test_xargs_retcode_normal(): ret, _, _ = xargs.xargs(exit_cmd, ('0', '1'), _max_length=max_length) assert ret == 1 + + +def test_xargs_concurrency(): + bash_cmd = ('bash', '-c') + print_pid = ('sleep 0.5 && echo $$',) + + start = time.time() + ret, stdout, _ = xargs.xargs( + bash_cmd, print_pid * 5, + target_concurrency=5, + _max_length=len(' '.join(bash_cmd + print_pid)), + ) + elapsed = time.time() - start + assert ret == 0 + pids = stdout.splitlines() + assert len(pids) == 5 + # It would take 0.5*5=2.5 seconds ot run all of these in serial, so if it + # takes less, they must have run concurrently. + assert elapsed < 2.5 + + +def test_thread_mapper_concurrency_uses_threadpoolexecutor_map(): + with xargs._thread_mapper(10) as thread_map: + assert isinstance( + thread_map.__self__, concurrent.futures.ThreadPoolExecutor, + ) is True + + +def test_thread_mapper_concurrency_uses_regular_map(): + with xargs._thread_mapper(1) as thread_map: + assert thread_map is map diff --git a/tox.ini b/tox.ini index d4b590bf..52f3d3ee 100644 --- a/tox.ini +++ b/tox.ini @@ -27,3 +27,4 @@ env = GIT_AUTHOR_EMAIL=test@example.com GIT_COMMITTER_EMAIL=test@example.com VIRTUALENV_NO_DOWNLOAD=1 + PRE_COMMIT_NO_CONCURRENCY=1