Add staged_files_only context manager.

This commit is contained in:
Anthony Sottile
2014-04-05 18:41:49 -07:00
parent 749615118e
commit 4ed9120ae9
7 changed files with 287 additions and 9 deletions

View File

@@ -6,6 +6,9 @@ import subprocess
class CalledProcessError(RuntimeError):
def __init__(self, returncode, cmd, expected_returncode, output=None):
super(CalledProcessError, self).__init__(
returncode, cmd, expected_returncode, output,
)
self.returncode = returncode
self.cmd = cmd
self.expected_returncode = expected_returncode
@@ -15,13 +18,13 @@ class CalledProcessError(RuntimeError):
return (
'Command: {0!r}\n'
'Return code: {1}\n'
'Expected return code {2}\n',
'Expected return code: {2}\n'
'Output: {3!r}\n'.format(
self.cmd,
self.returncode,
self.expected_returncode,
self.output,
),
)
)
@@ -48,15 +51,15 @@ class PrefixedCommandRunner(object):
self.__makedirs(self.prefix_dir)
def run(self, cmd, retcode=0, stdin=None, **kwargs):
popen_kwargs = {
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE,
}
popen_kwargs.update(kwargs)
self._create_path_if_not_exists()
replaced_cmd = _replace_cmd(cmd, prefix=self.prefix_dir)
proc = self.__popen(
replaced_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs
)
proc = self.__popen(replaced_cmd, **popen_kwargs)
stdout, stderr = proc.communicate(stdin)
returncode = proc.returncode

View File

@@ -0,0 +1,47 @@
import contextlib
import time
from pre_commit.prefixed_command_runner import CalledProcessError
@contextlib.contextmanager
def staged_files_only(cmd_runner):
"""Clear any unstaged changes from the git working directory inside this
context.
Args:
cmd_runner - PrefixedCommandRunner
"""
# Determine if there are unstaged files
retcode, _, _ = cmd_runner.run(
['git', 'diff-files', '--quiet'],
retcode=None,
)
if retcode:
# TODO: print a warning message that unstaged things are being stashed
# Save the current unstaged changes as a patch
# TODO: use a more unique patch filename
patch_filename = cmd_runner.path('patch{0}'.format(time.time()))
with open(patch_filename, 'w') as patch_file:
cmd_runner.run(['git', 'diff', '--binary'], stdout=patch_file)
# Clear the working directory of unstaged changes
cmd_runner.run(['git', 'checkout', '--', '.'])
try:
yield
finally:
# Try to apply the patch we saved
try:
cmd_runner.run(['git', 'apply', patch_filename])
except CalledProcessError:
# TOOD: print a warning about rolling back changes made by hooks
# We failed to apply the patch, presumably due to fixes made
# by hooks.
# Roll back the changes made by hooks.
cmd_runner.run(['git', 'checkout', '--', '.'])
cmd_runner.run(['git', 'apply', patch_filename])
else:
# There weren't any staged files so we don't need to do anything
# special
yield

BIN
testing/resources/img1.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 843 B

BIN
testing/resources/img2.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 891 B

BIN
testing/resources/img3.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 859 B

View File

@@ -9,6 +9,16 @@ from pre_commit.prefixed_command_runner import CalledProcessError
from pre_commit.prefixed_command_runner import PrefixedCommandRunner
def test_CalledProcessError_str():
error = CalledProcessError(1, ['git', 'status'], 0, ('stdout', 'stderr'))
assert str(error) == (
"Command: ['git', 'status']\n"
"Return code: 1\n"
"Expected return code: 0\n"
"Output: ('stdout', 'stderr')\n"
)
@pytest.fixture
def popen_mock():
popen = mock.Mock(spec=subprocess.Popen)

View File

@@ -0,0 +1,218 @@
import os.path
import pytest
import shutil
from plumbum import local
import pre_commit.constants as C
from pre_commit.prefixed_command_runner import PrefixedCommandRunner
from pre_commit.staged_files_only import staged_files_only
from testing.auto_namedtuple import auto_namedtuple
from testing.util import get_resource_path
FOO_CONTENTS = '\n'.join(('1', '2', '3', '4', '5', '6', '7', '8', ''))
def get_short_git_status():
git_status = local['git']['status', '-s']()
return dict(reversed(line.split()) for line in git_status.splitlines())
@pytest.yield_fixture
def foo_staged(empty_git_dir):
with open('.gitignore', 'w') as gitignore_file:
gitignore_file.write(C.HOOKS_WORKSPACE + '\n')
local['git']['add', '.']()
local['git']['commit', '-m', 'add gitignore']()
with open('foo', 'w') as foo_file:
foo_file.write(FOO_CONTENTS)
local['git']['add', 'foo']()
foo_filename = os.path.join(empty_git_dir, 'foo')
yield auto_namedtuple(path=empty_git_dir, foo_filename=foo_filename)
@pytest.fixture
def cmd_runner():
return PrefixedCommandRunner(C.HOOKS_WORKSPACE)
def _test_foo_state(path, foo_contents=FOO_CONTENTS, status='A'):
assert os.path.exists(path.foo_filename)
assert open(path.foo_filename).read() == foo_contents
actual_status = get_short_git_status()['foo']
assert status == actual_status
def test_foo_staged(foo_staged):
_test_foo_state(foo_staged)
def test_foo_nothing_unstaged(foo_staged, cmd_runner):
with staged_files_only(cmd_runner):
_test_foo_state(foo_staged)
_test_foo_state(foo_staged)
def test_foo_something_unstaged(foo_staged, cmd_runner):
with open(foo_staged.foo_filename, 'w') as foo_file:
foo_file.write('herp\nderp\n')
_test_foo_state(foo_staged, 'herp\nderp\n', 'AM')
with staged_files_only(cmd_runner):
_test_foo_state(foo_staged)
_test_foo_state(foo_staged, 'herp\nderp\n', 'AM')
def test_foo_both_modify_non_conflicting(foo_staged, cmd_runner):
with open(foo_staged.foo_filename, 'w') as foo_file:
foo_file.write(FOO_CONTENTS + '9\n')
_test_foo_state(foo_staged, FOO_CONTENTS + '9\n', 'AM')
with staged_files_only(cmd_runner):
_test_foo_state(foo_staged)
# Modify the file as part of the "pre-commit"
with open(foo_staged.foo_filename, 'w') as foo_file:
foo_file.write(FOO_CONTENTS.replace('1', 'a'))
_test_foo_state(foo_staged, FOO_CONTENTS.replace('1', 'a'), 'AM')
_test_foo_state(foo_staged, FOO_CONTENTS.replace('1', 'a') + '9\n', 'AM')
def test_foo_both_modify_conflicting(foo_staged, cmd_runner):
with open(foo_staged.foo_filename, 'w') as foo_file:
foo_file.write(FOO_CONTENTS.replace('1', 'a'))
_test_foo_state(foo_staged, FOO_CONTENTS.replace('1', 'a'), 'AM')
with staged_files_only(cmd_runner):
_test_foo_state(foo_staged)
# Modify in the same place as the stashed diff
with open(foo_staged.foo_filename, 'w') as foo_file:
foo_file.write(FOO_CONTENTS.replace('1', 'b'))
_test_foo_state(foo_staged, FOO_CONTENTS.replace('1', 'b'), 'AM')
_test_foo_state(foo_staged, FOO_CONTENTS.replace('1', 'a'), 'AM')
@pytest.yield_fixture
def img_staged(empty_git_dir):
with open('.gitignore', 'w') as gitignore_file:
gitignore_file.write(C.HOOKS_WORKSPACE + '\n')
local['git']['add', '.']()
local['git']['commit', '-m', 'add gitignore']()
img_filename = os.path.join(empty_git_dir, 'img.jpg')
shutil.copy(get_resource_path('img1.jpg'), img_filename)
local['git']['add', 'img.jpg']()
yield auto_namedtuple(path=empty_git_dir, img_filename=img_filename)
def _test_img_state(path, expected_file='img1.jpg', status='A'):
assert os.path.exists(path.img_filename)
assert (
open(path.img_filename).read() ==
open(get_resource_path(expected_file)).read()
)
actual_status = get_short_git_status()['img.jpg']
assert status == actual_status
def test_img_staged(img_staged):
_test_img_state(img_staged)
def test_img_nothing_unstaged(img_staged, cmd_runner):
with staged_files_only(cmd_runner):
_test_img_state(img_staged)
_test_img_state(img_staged)
def test_img_something_unstaged(img_staged, cmd_runner):
shutil.copy(get_resource_path('img2.jpg'), img_staged.img_filename)
_test_img_state(img_staged, 'img2.jpg', 'AM')
with staged_files_only(cmd_runner):
_test_img_state(img_staged)
_test_img_state(img_staged, 'img2.jpg', 'AM')
def test_img_conflict(img_staged, cmd_runner):
"""Admittedly, this shouldn't happen, but just in case."""
shutil.copy(get_resource_path('img2.jpg'), img_staged.img_filename)
_test_img_state(img_staged, 'img2.jpg', 'AM')
with staged_files_only(cmd_runner):
_test_img_state(img_staged)
shutil.copy(get_resource_path('img3.jpg'), img_staged.img_filename)
_test_img_state(img_staged, 'img3.jpg', 'AM')
_test_img_state(img_staged, 'img2.jpg', 'AM')
@pytest.yield_fixture
def submodule_with_commits(empty_git_dir):
local['git']['commit', '--allow-empty', '-m', 'foo']()
sha1 = local['git']['rev-parse', 'HEAD']().strip()
local['git']['commit', '--allow-empty', '-m', 'bar']()
sha2 = local['git']['rev-parse', 'HEAD']().strip()
yield auto_namedtuple(path=empty_git_dir, sha1=sha1, sha2=sha2)
def checkout_submodule(sha):
with local.cwd('sub'):
local['git']['checkout', sha]()
@pytest.yield_fixture
def sub_staged(submodule_with_commits, empty_git_dir):
local['git']['submodule', 'add', submodule_with_commits.path, 'sub']()
checkout_submodule(submodule_with_commits.sha1)
local['git']['add', 'sub']()
yield auto_namedtuple(
path=empty_git_dir,
sub_path=os.path.join(empty_git_dir, 'sub'),
submodule=submodule_with_commits,
)
def _test_sub_state(path, sha='sha1', status='A'):
assert os.path.exists(path.sub_path)
with local.cwd(path.sub_path):
actual_sha = local['git']['rev-parse', 'HEAD']().strip()
assert actual_sha == getattr(path.submodule, sha)
actual_status = get_short_git_status()['sub']
assert actual_status == status
def test_sub_staged(sub_staged):
_test_sub_state(sub_staged)
def test_sub_nothing_unstaged(sub_staged, cmd_runner):
with staged_files_only(cmd_runner):
_test_sub_state(sub_staged)
_test_sub_state(sub_staged)
def test_sub_something_unstaged(sub_staged, cmd_runner):
checkout_submodule(sub_staged.submodule.sha2)
_test_sub_state(sub_staged, 'sha2', 'AM')
with staged_files_only(cmd_runner):
# This is different from others, we don't want to touch subs
_test_sub_state(sub_staged, 'sha2', 'AM')
_test_sub_state(sub_staged, 'sha2', 'AM')