Files
pre-commit/pre_commit/util.py
2015-02-27 00:48:01 -08:00

176 lines
4.3 KiB
Python

from __future__ import unicode_literals
import contextlib
import errno
import functools
import os
import os.path
import shutil
import stat
import subprocess
import tarfile
import tempfile
import pkg_resources
from pre_commit import five
@contextlib.contextmanager
def cwd(path):
original_cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(original_cwd)
def memoize_by_cwd(func):
"""Memoize a function call based on os.getcwd()."""
@functools.wraps(func)
def wrapper(*args):
cwd = os.getcwd()
key = (cwd,) + args
try:
return wrapper._cache[key]
except KeyError:
ret = wrapper._cache[key] = func(*args)
return ret
wrapper._cache = {}
return wrapper
@contextlib.contextmanager
def clean_path_on_failure(path):
"""Cleans up the directory on an exceptional failure."""
try:
yield
except BaseException:
if os.path.exists(path):
rmtree(path)
raise
@contextlib.contextmanager
def noop_context():
yield
def shell_escape(arg):
return "'" + arg.replace("'", "'\"'\"'".strip()) + "'"
@contextlib.contextmanager
def tarfile_open(*args, **kwargs):
"""Compatibility layer because python2.6"""
tf = tarfile.open(*args, **kwargs)
try:
yield tf
finally:
tf.close()
@contextlib.contextmanager
def tmpdir():
"""Contextmanager to create a temporary directory. It will be cleaned up
afterwards.
"""
tempdir = tempfile.mkdtemp()
try:
yield tempdir
finally:
rmtree(tempdir)
def resource_filename(filename):
return pkg_resources.resource_filename(
'pre_commit',
os.path.join('resources', filename),
)
class CalledProcessError(RuntimeError):
def __init__(self, returncode, cmd, expected_returncode, output=None):
super(CalledProcessError, self).__init__(
returncode, cmd, expected_returncode, output,
)
self.returncode = returncode
self.cmd = cmd
self.expected_returncode = expected_returncode
self.output = output
def __str__(self):
output = []
for text in self.output:
if text:
output.append('\n ' + text.replace('\n', '\n '))
else:
output.append('(none)')
return (
'Command: {0!r}\n'
'Return code: {1}\n'
'Expected return code: {2}\n'
'Output: {3}\n'
'Errors: {4}\n'.format(
self.cmd,
self.returncode,
self.expected_returncode,
*output
)
)
def cmd_output(*cmd, **kwargs):
retcode = kwargs.pop('retcode', 0)
stdin = kwargs.pop('stdin', None)
encoding = kwargs.pop('encoding', 'UTF-8')
__popen = kwargs.pop('__popen', subprocess.Popen)
popen_kwargs = {
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
'stderr': subprocess.PIPE,
}
if stdin is not None:
stdin = stdin.encode('UTF-8')
# py2/py3 on windows are more strict about the types here
cmd = [five.n(arg) for arg in cmd]
kwargs['env'] = dict(
(five.n(key), five.n(value))
for key, value in kwargs.pop('env', {}).items()
) or None
popen_kwargs.update(kwargs)
proc = __popen(cmd, **popen_kwargs)
stdout, stderr = proc.communicate(stdin)
if encoding is not None and stdout is not None:
stdout = stdout.decode(encoding)
if encoding is not None and stderr is not None:
stderr = stderr.decode(encoding)
returncode = proc.returncode
if retcode is not None and retcode != returncode:
raise CalledProcessError(
returncode, cmd, retcode, output=(stdout, stderr),
)
return proc.returncode, stdout, stderr
def rmtree(path):
"""On windows, rmtree fails for readonly dirs."""
def handle_remove_readonly(func, path, exc): # pragma: no cover (windows)
excvalue = exc[1]
if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES:
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
func(path)
else:
raise
shutil.rmtree(path, ignore_errors=False, onerror=handle_remove_readonly)