Implement concurrent execution of individual hooks

This commit is contained in:
Chris Kuehl
2018-10-20 13:05:55 -07:00
committed by Chris Kuehl
parent 1f1cd2bc39
commit ba5e27e4ec
15 changed files with 104 additions and 14 deletions

View File

@@ -56,6 +56,7 @@ MANIFEST_HOOK_DICT = cfgv.Map(
cfgv.Optional('language_version', cfgv.check_string, 'default'),
cfgv.Optional('log_file', cfgv.check_string, ''),
cfgv.Optional('minimum_pre_commit_version', cfgv.check_string, '0'),
cfgv.Optional('require_serial', cfgv.check_bool, False),
cfgv.Optional('stages', cfgv.check_array(cfgv.check_one_of(C.STAGES)), []),
cfgv.Optional('verbose', cfgv.check_bool, False),
)

View File

@@ -97,4 +97,8 @@ def run_hook(prefix, hook, file_args): # pragma: windows no cover
entry_tag = ('--entrypoint', entry_exe, docker_tag(prefix))
cmd = docker_cmd() + entry_tag + cmd_rest
return xargs(cmd, file_args)
return xargs(
cmd,
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -16,4 +16,8 @@ install_environment = helpers.no_install
def run_hook(prefix, hook, file_args): # pragma: windows no cover
assert_docker_available()
cmd = docker_cmd() + helpers.to_cmd(hook)
return xargs(cmd, file_args)
return xargs(
cmd,
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -81,4 +81,8 @@ def install_environment(prefix, version, additional_dependencies):
def run_hook(prefix, hook, file_args):
with in_env(prefix):
return xargs(helpers.to_cmd(hook), file_args)
return xargs(
helpers.to_cmd(hook),
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -1,5 +1,6 @@
from __future__ import unicode_literals
import multiprocessing
import shlex
from pre_commit.util import cmd_output
@@ -45,3 +46,11 @@ def basic_healthy(prefix, language_version):
def no_install(prefix, version, additional_dependencies):
raise AssertionError('This type is not installable')
def target_concurrency(hook):
if hook['require_serial']:
return 1
else:
# TODO: something smart!
return multiprocessing.cpu_count()

View File

@@ -71,4 +71,8 @@ def install_environment(prefix, version, additional_dependencies):
def run_hook(prefix, hook, file_args):
with in_env(prefix, hook['language_version']):
return xargs(helpers.to_cmd(hook), file_args)
return xargs(
helpers.to_cmd(hook),
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -127,7 +127,11 @@ def py_interface(_dir, _make_venv):
def run_hook(prefix, hook, file_args):
with in_env(prefix, hook['language_version']):
return xargs(helpers.to_cmd(hook), file_args)
return xargs(
helpers.to_cmd(hook),
file_args,
target_concurrency=helpers.target_concurrency(hook),
)
def install_environment(prefix, version, additional_dependencies):
additional_dependencies = tuple(additional_dependencies)

View File

@@ -126,4 +126,8 @@ def install_environment(
def run_hook(prefix, hook, file_args): # pragma: windows no cover
with in_env(prefix, hook['language_version']):
return xargs(helpers.to_cmd(hook), file_args)
return xargs(
helpers.to_cmd(hook),
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -91,4 +91,8 @@ def install_environment(prefix, version, additional_dependencies):
def run_hook(prefix, hook, file_args):
with in_env(prefix):
return xargs(helpers.to_cmd(hook), file_args)
return xargs(
helpers.to_cmd(hook),
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -13,4 +13,8 @@ install_environment = helpers.no_install
def run_hook(prefix, hook, file_args):
cmd = helpers.to_cmd(hook)
cmd = (prefix.path(cmd[0]),) + cmd[1:]
return xargs(cmd, file_args)
return xargs(
cmd,
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -53,4 +53,8 @@ def install_environment(
def run_hook(prefix, hook, file_args): # pragma: windows no cover
with in_env(prefix):
return xargs(helpers.to_cmd(hook), file_args)
return xargs(
helpers.to_cmd(hook),
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -11,4 +11,8 @@ install_environment = helpers.no_install
def run_hook(prefix, hook, file_args):
return xargs(helpers.to_cmd(hook), file_args)
return xargs(
helpers.to_cmd(hook),
file_args,
target_concurrency=helpers.target_concurrency(hook),
)

View File

@@ -1,8 +1,11 @@
from __future__ import absolute_import
from __future__ import unicode_literals
import contextlib
import multiprocessing.pool
import sys
import concurrent.futures
import six
from pre_commit import parse_shebang
@@ -65,12 +68,23 @@ def partition(cmd, varargs, _max_length=None):
return tuple(ret)
@contextlib.contextmanager
def _threadpool(size):
pool = multiprocessing.pool.ThreadPool(size)
try:
yield pool
finally:
pool.terminate()
def xargs(cmd, varargs, **kwargs):
"""A simplified implementation of xargs.
negate: Make nonzero successful and zero a failure
target_concurrency: Target number of partitions to run concurrently
"""
negate = kwargs.pop('negate', False)
target_concurrency = kwargs.pop('target_concurrency', 1)
retcode = 0
stdout = b''
stderr = b''
@@ -80,10 +94,17 @@ def xargs(cmd, varargs, **kwargs):
except parse_shebang.ExecutableNotFoundError as e:
return e.to_output()
for run_cmd in partition(cmd, varargs, **kwargs):
proc_retcode, proc_out, proc_err = cmd_output(
*run_cmd, encoding=None, retcode=None
)
# TODO: teach partition to intelligently target our desired concurrency
# while still respecting max_length.
partitions = partition(cmd, varargs, **kwargs)
def run_cmd_partition(run_cmd):
return cmd_output(*run_cmd, encoding=None, retcode=None)
with _threadpool(min(len(partitions), target_concurrency)) as pool:
results = pool.map(run_cmd_partition, partitions)
for proc_retcode, proc_out, proc_err in results:
# This is *slightly* too clever so I'll explain it.
# First the xor boolean table:
# T | F |

View File

@@ -837,6 +837,7 @@ def test_manifest_hooks(tempdir_factory, store):
'minimum_pre_commit_version': '0',
'name': 'Bash hook',
'pass_filenames': True,
'require_serial': False,
'stages': [],
'types': ['file'],
'exclude_types': [],

View File

@@ -3,6 +3,7 @@ from __future__ import absolute_import
from __future__ import unicode_literals
import sys
import time
import mock
import pytest
@@ -132,3 +133,20 @@ def test_xargs_retcode_normal():
ret, _, _ = xargs.xargs(exit_cmd, ('0', '1'), _max_length=max_length)
assert ret == 1
def test_xargs_concurrency():
bash_cmd = ('bash', '-c')
print_pid = ('sleep 0.5 && echo $$',)
start = time.time()
ret, stdout, _ = xargs.xargs(
bash_cmd, print_pid * 5,
target_concurrency=5,
_max_length=len(' '.join(bash_cmd + print_pid)),
)
elapsed = time.time() - start
assert ret == 0
pids = stdout.splitlines()
assert len(pids) == 5
assert elapsed < 1