diff --git a/pre_commit/clientlib.py b/pre_commit/clientlib.py index 4570e107..2fa7b153 100644 --- a/pre_commit/clientlib.py +++ b/pre_commit/clientlib.py @@ -56,6 +56,7 @@ MANIFEST_HOOK_DICT = cfgv.Map( cfgv.Optional('language_version', cfgv.check_string, 'default'), cfgv.Optional('log_file', cfgv.check_string, ''), cfgv.Optional('minimum_pre_commit_version', cfgv.check_string, '0'), + cfgv.Optional('require_serial', cfgv.check_bool, False), cfgv.Optional('stages', cfgv.check_array(cfgv.check_one_of(C.STAGES)), []), cfgv.Optional('verbose', cfgv.check_bool, False), ) diff --git a/pre_commit/languages/docker.py b/pre_commit/languages/docker.py index f3c46a33..7f00fe60 100644 --- a/pre_commit/languages/docker.py +++ b/pre_commit/languages/docker.py @@ -97,4 +97,8 @@ def run_hook(prefix, hook, file_args): # pragma: windows no cover entry_tag = ('--entrypoint', entry_exe, docker_tag(prefix)) cmd = docker_cmd() + entry_tag + cmd_rest - return xargs(cmd, file_args) + return xargs( + cmd, + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/languages/docker_image.py b/pre_commit/languages/docker_image.py index 6301970c..e990f18a 100644 --- a/pre_commit/languages/docker_image.py +++ b/pre_commit/languages/docker_image.py @@ -16,4 +16,8 @@ install_environment = helpers.no_install def run_hook(prefix, hook, file_args): # pragma: windows no cover assert_docker_available() cmd = docker_cmd() + helpers.to_cmd(hook) - return xargs(cmd, file_args) + return xargs( + cmd, + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/languages/golang.py b/pre_commit/languages/golang.py index 14354e0c..7d273e75 100644 --- a/pre_commit/languages/golang.py +++ b/pre_commit/languages/golang.py @@ -81,4 +81,8 @@ def install_environment(prefix, version, additional_dependencies): def run_hook(prefix, hook, file_args): with in_env(prefix): - return xargs(helpers.to_cmd(hook), file_args) + return xargs( + helpers.to_cmd(hook), + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/languages/helpers.py b/pre_commit/languages/helpers.py index ddbe2e80..b6a3fc2d 100644 --- a/pre_commit/languages/helpers.py +++ b/pre_commit/languages/helpers.py @@ -1,5 +1,6 @@ from __future__ import unicode_literals +import multiprocessing import shlex from pre_commit.util import cmd_output @@ -45,3 +46,11 @@ def basic_healthy(prefix, language_version): def no_install(prefix, version, additional_dependencies): raise AssertionError('This type is not installable') + + +def target_concurrency(hook): + if hook['require_serial']: + return 1 + else: + # TODO: something smart! + return multiprocessing.cpu_count() diff --git a/pre_commit/languages/node.py b/pre_commit/languages/node.py index 7b464930..494ca878 100644 --- a/pre_commit/languages/node.py +++ b/pre_commit/languages/node.py @@ -71,4 +71,8 @@ def install_environment(prefix, version, additional_dependencies): def run_hook(prefix, hook, file_args): with in_env(prefix, hook['language_version']): - return xargs(helpers.to_cmd(hook), file_args) + return xargs( + helpers.to_cmd(hook), + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/languages/python.py b/pre_commit/languages/python.py index ee7b2a4f..bb8a81a6 100644 --- a/pre_commit/languages/python.py +++ b/pre_commit/languages/python.py @@ -127,7 +127,11 @@ def py_interface(_dir, _make_venv): def run_hook(prefix, hook, file_args): with in_env(prefix, hook['language_version']): - return xargs(helpers.to_cmd(hook), file_args) + return xargs( + helpers.to_cmd(hook), + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) def install_environment(prefix, version, additional_dependencies): additional_dependencies = tuple(additional_dependencies) diff --git a/pre_commit/languages/ruby.py b/pre_commit/languages/ruby.py index bef3fe38..3c5745df 100644 --- a/pre_commit/languages/ruby.py +++ b/pre_commit/languages/ruby.py @@ -126,4 +126,8 @@ def install_environment( def run_hook(prefix, hook, file_args): # pragma: windows no cover with in_env(prefix, hook['language_version']): - return xargs(helpers.to_cmd(hook), file_args) + return xargs( + helpers.to_cmd(hook), + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/languages/rust.py b/pre_commit/languages/rust.py index 41053f88..e602adcc 100644 --- a/pre_commit/languages/rust.py +++ b/pre_commit/languages/rust.py @@ -91,4 +91,8 @@ def install_environment(prefix, version, additional_dependencies): def run_hook(prefix, hook, file_args): with in_env(prefix): - return xargs(helpers.to_cmd(hook), file_args) + return xargs( + helpers.to_cmd(hook), + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/languages/script.py b/pre_commit/languages/script.py index 551b4d80..d242694f 100644 --- a/pre_commit/languages/script.py +++ b/pre_commit/languages/script.py @@ -13,4 +13,8 @@ install_environment = helpers.no_install def run_hook(prefix, hook, file_args): cmd = helpers.to_cmd(hook) cmd = (prefix.path(cmd[0]),) + cmd[1:] - return xargs(cmd, file_args) + return xargs( + cmd, + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/languages/swift.py b/pre_commit/languages/swift.py index 2863fbee..eff4f9b0 100644 --- a/pre_commit/languages/swift.py +++ b/pre_commit/languages/swift.py @@ -53,4 +53,8 @@ def install_environment( def run_hook(prefix, hook, file_args): # pragma: windows no cover with in_env(prefix): - return xargs(helpers.to_cmd(hook), file_args) + return xargs( + helpers.to_cmd(hook), + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/languages/system.py b/pre_commit/languages/system.py index 84cd1fe4..70a42ddc 100644 --- a/pre_commit/languages/system.py +++ b/pre_commit/languages/system.py @@ -11,4 +11,8 @@ install_environment = helpers.no_install def run_hook(prefix, hook, file_args): - return xargs(helpers.to_cmd(hook), file_args) + return xargs( + helpers.to_cmd(hook), + file_args, + target_concurrency=helpers.target_concurrency(hook), + ) diff --git a/pre_commit/xargs.py b/pre_commit/xargs.py index 2fe8a454..aa4f27e0 100644 --- a/pre_commit/xargs.py +++ b/pre_commit/xargs.py @@ -1,8 +1,11 @@ from __future__ import absolute_import from __future__ import unicode_literals +import contextlib +import multiprocessing.pool import sys +import concurrent.futures import six from pre_commit import parse_shebang @@ -65,12 +68,23 @@ def partition(cmd, varargs, _max_length=None): return tuple(ret) +@contextlib.contextmanager +def _threadpool(size): + pool = multiprocessing.pool.ThreadPool(size) + try: + yield pool + finally: + pool.terminate() + + def xargs(cmd, varargs, **kwargs): """A simplified implementation of xargs. negate: Make nonzero successful and zero a failure + target_concurrency: Target number of partitions to run concurrently """ negate = kwargs.pop('negate', False) + target_concurrency = kwargs.pop('target_concurrency', 1) retcode = 0 stdout = b'' stderr = b'' @@ -80,10 +94,17 @@ def xargs(cmd, varargs, **kwargs): except parse_shebang.ExecutableNotFoundError as e: return e.to_output() - for run_cmd in partition(cmd, varargs, **kwargs): - proc_retcode, proc_out, proc_err = cmd_output( - *run_cmd, encoding=None, retcode=None - ) + # TODO: teach partition to intelligently target our desired concurrency + # while still respecting max_length. + partitions = partition(cmd, varargs, **kwargs) + + def run_cmd_partition(run_cmd): + return cmd_output(*run_cmd, encoding=None, retcode=None) + + with _threadpool(min(len(partitions), target_concurrency)) as pool: + results = pool.map(run_cmd_partition, partitions) + + for proc_retcode, proc_out, proc_err in results: # This is *slightly* too clever so I'll explain it. # First the xor boolean table: # T | F | diff --git a/tests/repository_test.py b/tests/repository_test.py index 8d578f39..f1b0f6e0 100644 --- a/tests/repository_test.py +++ b/tests/repository_test.py @@ -837,6 +837,7 @@ def test_manifest_hooks(tempdir_factory, store): 'minimum_pre_commit_version': '0', 'name': 'Bash hook', 'pass_filenames': True, + 'require_serial': False, 'stages': [], 'types': ['file'], 'exclude_types': [], diff --git a/tests/xargs_test.py b/tests/xargs_test.py index bf685e16..b60a37d6 100644 --- a/tests/xargs_test.py +++ b/tests/xargs_test.py @@ -3,6 +3,7 @@ from __future__ import absolute_import from __future__ import unicode_literals import sys +import time import mock import pytest @@ -132,3 +133,20 @@ def test_xargs_retcode_normal(): ret, _, _ = xargs.xargs(exit_cmd, ('0', '1'), _max_length=max_length) assert ret == 1 + + +def test_xargs_concurrency(): + bash_cmd = ('bash', '-c') + print_pid = ('sleep 0.5 && echo $$',) + + start = time.time() + ret, stdout, _ = xargs.xargs( + bash_cmd, print_pid * 5, + target_concurrency=5, + _max_length=len(' '.join(bash_cmd + print_pid)), + ) + elapsed = time.time() - start + assert ret == 0 + pids = stdout.splitlines() + assert len(pids) == 5 + assert elapsed < 1