Files
pre-commit/pre_commit/repository.py
2020-01-12 13:39:53 -08:00

266 lines
8.2 KiB
Python

import json
import logging
import os
import shlex
from typing import Any
from typing import Dict
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
import pre_commit.constants as C
from pre_commit import five
from pre_commit.clientlib import load_manifest
from pre_commit.clientlib import LOCAL
from pre_commit.clientlib import MANIFEST_HOOK_DICT
from pre_commit.clientlib import META
from pre_commit.languages.all import languages
from pre_commit.languages.helpers import environment_dir
from pre_commit.prefix import Prefix
from pre_commit.store import Store
from pre_commit.util import parse_version
from pre_commit.util import rmtree
logger = logging.getLogger('pre_commit')
def _state(additional_deps: Sequence[str]) -> object:
return {'additional_dependencies': sorted(additional_deps)}
def _state_filename(prefix: Prefix, venv: str) -> str:
return prefix.path(venv, '.install_state_v' + C.INSTALLED_STATE_VERSION)
def _read_state(prefix: Prefix, venv: str) -> Optional[object]:
filename = _state_filename(prefix, venv)
if not os.path.exists(filename):
return None
else:
with open(filename) as f:
return json.load(f)
def _write_state(prefix: Prefix, venv: str, state: object) -> None:
state_filename = _state_filename(prefix, venv)
staging = state_filename + 'staging'
with open(staging, 'w') as state_file:
state_file.write(five.to_text(json.dumps(state)))
# Move the file into place atomically to indicate we've installed
os.rename(staging, state_filename)
_KEYS = tuple(item.key for item in MANIFEST_HOOK_DICT.items)
class Hook(NamedTuple):
src: str
prefix: Prefix
id: str
name: str
entry: str
language: str
alias: str
files: str
exclude: str
types: Sequence[str]
exclude_types: Sequence[str]
additional_dependencies: Sequence[str]
args: Sequence[str]
always_run: bool
pass_filenames: bool
description: str
language_version: str
log_file: str
minimum_pre_commit_version: str
require_serial: bool
stages: Sequence[str]
verbose: bool
@property
def cmd(self) -> Tuple[str, ...]:
return tuple(shlex.split(self.entry)) + tuple(self.args)
@property
def install_key(self) -> Tuple[Prefix, str, str, Tuple[str, ...]]:
return (
self.prefix,
self.language,
self.language_version,
tuple(self.additional_dependencies),
)
def installed(self) -> bool:
lang = languages[self.language]
venv = environment_dir(lang.ENVIRONMENT_DIR, self.language_version)
return (
venv is None or (
(
_read_state(self.prefix, venv) ==
_state(self.additional_dependencies)
) and
lang.healthy(self.prefix, self.language_version)
)
)
def install(self) -> None:
logger.info(f'Installing environment for {self.src}.')
logger.info('Once installed this environment will be reused.')
logger.info('This may take a few minutes...')
lang = languages[self.language]
assert lang.ENVIRONMENT_DIR is not None
venv = environment_dir(lang.ENVIRONMENT_DIR, self.language_version)
# There's potentially incomplete cleanup from previous runs
# Clean it up!
if self.prefix.exists(venv):
rmtree(self.prefix.path(venv))
lang.install_environment(
self.prefix, self.language_version, self.additional_dependencies,
)
# Write our state to indicate we're installed
_write_state(self.prefix, venv, _state(self.additional_dependencies))
def run(self, file_args: Sequence[str], color: bool) -> Tuple[int, bytes]:
lang = languages[self.language]
return lang.run_hook(self, file_args, color)
@classmethod
def create(cls, src: str, prefix: Prefix, dct: Dict[str, Any]) -> 'Hook':
# TODO: have cfgv do this (?)
extra_keys = set(dct) - set(_KEYS)
if extra_keys:
logger.warning(
f'Unexpected key(s) present on {src} => {dct["id"]}: '
f'{", ".join(sorted(extra_keys))}',
)
return cls(src=src, prefix=prefix, **{k: dct[k] for k in _KEYS})
def _hook(
*hook_dicts: Dict[str, Any],
root_config: Dict[str, Any],
) -> Dict[str, Any]:
ret, rest = dict(hook_dicts[0]), hook_dicts[1:]
for dct in rest:
ret.update(dct)
version = ret['minimum_pre_commit_version']
if parse_version(version) > parse_version(C.VERSION):
logger.error(
f'The hook `{ret["id"]}` requires pre-commit version {version} '
f'but version {C.VERSION} is installed. '
f'Perhaps run `pip install --upgrade pre-commit`.',
)
exit(1)
lang = ret['language']
if ret['language_version'] == C.DEFAULT:
ret['language_version'] = root_config['default_language_version'][lang]
if ret['language_version'] == C.DEFAULT:
ret['language_version'] = languages[lang].get_default_version()
if not ret['stages']:
ret['stages'] = root_config['default_stages']
return ret
def _non_cloned_repository_hooks(
repo_config: Dict[str, Any],
store: Store,
root_config: Dict[str, Any],
) -> Tuple[Hook, ...]:
def _prefix(language_name: str, deps: Sequence[str]) -> Prefix:
language = languages[language_name]
# pygrep / script / system / docker_image do not have
# environments so they work out of the current directory
if language.ENVIRONMENT_DIR is None:
return Prefix(os.getcwd())
else:
return Prefix(store.make_local(deps))
return tuple(
Hook.create(
repo_config['repo'],
_prefix(hook['language'], hook['additional_dependencies']),
_hook(hook, root_config=root_config),
)
for hook in repo_config['hooks']
)
def _cloned_repository_hooks(
repo_config: Dict[str, Any],
store: Store,
root_config: Dict[str, Any],
) -> Tuple[Hook, ...]:
repo, rev = repo_config['repo'], repo_config['rev']
manifest_path = os.path.join(store.clone(repo, rev), C.MANIFEST_FILE)
by_id = {hook['id']: hook for hook in load_manifest(manifest_path)}
for hook in repo_config['hooks']:
if hook['id'] not in by_id:
logger.error(
f'`{hook["id"]}` is not present in repository {repo}. '
f'Typo? Perhaps it is introduced in a newer version? '
f'Often `pre-commit autoupdate` fixes this.',
)
exit(1)
hook_dcts = [
_hook(by_id[hook['id']], hook, root_config=root_config)
for hook in repo_config['hooks']
]
return tuple(
Hook.create(
repo_config['repo'],
Prefix(store.clone(repo, rev, hook['additional_dependencies'])),
hook,
)
for hook in hook_dcts
)
def _repository_hooks(
repo_config: Dict[str, Any],
store: Store,
root_config: Dict[str, Any],
) -> Tuple[Hook, ...]:
if repo_config['repo'] in {LOCAL, META}:
return _non_cloned_repository_hooks(repo_config, store, root_config)
else:
return _cloned_repository_hooks(repo_config, store, root_config)
def install_hook_envs(hooks: Sequence[Hook], store: Store) -> None:
def _need_installed() -> List[Hook]:
seen: Set[Tuple[Prefix, str, str, Tuple[str, ...]]] = set()
ret = []
for hook in hooks:
if hook.install_key not in seen and not hook.installed():
ret.append(hook)
seen.add(hook.install_key)
return ret
if not _need_installed():
return
with store.exclusive_lock():
# Another process may have already completed this work
for hook in _need_installed():
hook.install()
def all_hooks(root_config: Dict[str, Any], store: Store) -> Tuple[Hook, ...]:
return tuple(
hook
for repo in root_config['repos']
for hook in _repository_hooks(repo, store, root_config)
)