Files
hatchet/frontend/snippets/generate.py
Gabe Ruttner 5babd09679 docs: chunky overhaul (#3129)
* improve overall docs structure and things
2026-03-04 14:33:15 -08:00

530 lines
16 KiB
Python

import glob
import json
import os
import re
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, Callable, cast
ROOT = "../../"
BASE_SNIPPETS_DIR = os.path.join(ROOT, "frontend", "docs", "lib")
OUTPUT_DIR = os.path.join(BASE_SNIPPETS_DIR, "generated", "snippets")
OUTPUT_GITHUB_ORG = "hatchet-dev"
OUTPUT_GITHUB_REPO = "hatchet"
IGNORED_FILE_PATTERNS = [
r"__init__\.py$",
r"test_.*\.py$",
r"\.test\.ts$",
r"\.test-d\.ts$",
r"test_.*\.go$",
r"_test\.go$",
r"\.e2e\.ts$",
r"test_.*_spec\.rb$",
r"spec_helper\.rb$",
r"Gemfile",
r"\.rspec$",
r"README\.md$",
]
GUIDES_BASE = "sdks/guides"
@dataclass
class ParsingContext:
example_path: str
extension: str
comment_prefix: str
class SDKParsingContext(Enum):
PYTHON = ParsingContext(
example_path="sdks/python/examples", extension=".py", comment_prefix="#"
)
TYPESCRIPT = ParsingContext(
example_path="sdks/typescript/src/v1/examples",
extension=".ts",
comment_prefix="//",
)
GO = ParsingContext(
example_path="sdks/go/examples", extension=".go", comment_prefix="//"
)
RUBY = ParsingContext(
example_path="sdks/ruby/examples", extension=".rb", comment_prefix="#"
)
@dataclass
class Snippet:
title: str
content: str
githubUrl: str
language: str
codePath: str
@dataclass
class ProcessedExample:
context: SDKParsingContext
filepath: str
snippets: list[Snippet]
raw_content: str
output_path: str
@dataclass
class DocumentationPage:
title: str
href: str
Title = str
Content = str
def to_snake_case(text):
text = re.sub(r"[^a-zA-Z0-9\s\-_]", "", text)
text = re.sub(r"[-\s]+", "_", text)
text = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", text)
text = re.sub(r"([A-Z])([A-Z][a-z])", r"\1_\2", text)
text = re.sub(r"_+", "_", text)
return text.strip("_").lower()
Title = str
Content = str
def dedent_code(code: str) -> str:
lines = code.split("\n")
if not lines:
return code
min_indent = min((len(line) - len(line.lstrip())) for line in lines if line.strip())
dedented_lines = [
line[min_indent:] if len(line) >= min_indent else line for line in lines
]
return "\n".join(dedented_lines).strip() + "\n"
def parse_snippet_from_block(match: re.Match[str]) -> tuple[Title, Content]:
title = to_snake_case(match.group(1).strip())
code = match.group(2)
return title, dedent_code(code)
def parse_snippets(ctx: SDKParsingContext, filename: str) -> list[Snippet]:
comment_prefix = re.escape(ctx.value.comment_prefix)
pattern = rf"{comment_prefix} >\s+(.+?)\n(.*?){comment_prefix} !!"
subdir = ctx.value.example_path.rstrip("/").lstrip("/")
base_path = ROOT + subdir
with open(filename) as f:
content = f.read()
code_path = f"examples/{ctx.name.lower()}{filename.replace(base_path, '')}"
github_url = f"https://github.com/{OUTPUT_GITHUB_ORG}/{OUTPUT_GITHUB_REPO}/tree/main/{code_path}"
matches = list(re.finditer(pattern, content, re.DOTALL))
if not matches:
return [
Snippet(
title="all",
content=content,
githubUrl=github_url,
language=ctx.name.lower(),
codePath=code_path,
)
]
return [
Snippet(
title=x[0],
content=x[1],
githubUrl=github_url,
language=ctx.name.lower(),
codePath=code_path,
)
for match in matches
if (x := parse_snippet_from_block(match))
]
def process_example(ctx: SDKParsingContext, filename: str) -> ProcessedExample:
with open(filename) as f:
content = f.read()
return ProcessedExample(
context=ctx,
filepath=filename,
output_path=f"examples/{ctx.name.lower()}{filename.replace(ROOT + ctx.value.example_path, '')}",
snippets=parse_snippets(ctx, filename),
raw_content=content,
)
def process_examples() -> list[ProcessedExample]:
examples: list[ProcessedExample] = []
for ctx in SDKParsingContext:
subdir = ctx.value.example_path.rstrip("/").lstrip("/")
base_path = ROOT + subdir
path = base_path + "/**/*" + ctx.value.extension
examples.extend(
[
process_example(ctx, filename)
for filename in glob.iglob(path, recursive=True)
if not any(
re.search(pattern, filename) for pattern in IGNORED_FILE_PATTERNS
)
]
)
return examples
GUIDES_LANG_TO_CTX: dict[str, SDKParsingContext] = {
"python": SDKParsingContext.PYTHON,
"typescript": SDKParsingContext.TYPESCRIPT,
"go": SDKParsingContext.GO,
"ruby": SDKParsingContext.RUBY,
}
def process_guides() -> list[ProcessedExample]:
"""Process guide examples from sdks/guides/{lang}/ into examples/{lang}/guides/."""
examples: list[ProcessedExample] = []
for lang_dir, ctx in GUIDES_LANG_TO_CTX.items():
guides_base = os.path.join(ROOT, GUIDES_BASE, lang_dir)
if not os.path.isdir(guides_base):
continue
pattern = guides_base + "/**/*" + ctx.value.extension
for filename in glob.iglob(pattern, recursive=True):
if any(re.search(p, filename) for p in IGNORED_FILE_PATTERNS):
continue
with open(filename) as f:
content = f.read()
rel_path = filename.replace(guides_base, "")
output_path = f"examples/{ctx.name.lower()}/guides{rel_path}"
code_path = output_path
github_url = f"https://github.com/{OUTPUT_GITHUB_ORG}/{OUTPUT_GITHUB_REPO}/tree/main/{code_path}"
comment_prefix = re.escape(ctx.value.comment_prefix)
snippet_pattern = rf"{comment_prefix} >\s+(.+?)\n(.*?){comment_prefix} !!"
matches = list(re.finditer(snippet_pattern, content, re.DOTALL))
if not matches:
snippets = [
Snippet(
title="all",
content=content,
githubUrl=github_url,
language=ctx.name.lower(),
codePath=code_path,
)
]
else:
snippets = [
Snippet(
title=x[0],
content=x[1],
githubUrl=github_url,
language=ctx.name.lower(),
codePath=code_path,
)
for match in matches
if (x := parse_snippet_from_block(match))
]
examples.append(
ProcessedExample(
context=ctx,
filepath=filename,
output_path=output_path,
snippets=snippets,
raw_content=content,
)
)
return examples
def create_snippet_tree(examples: list[ProcessedExample]) -> dict[str, dict[str, Any]]:
tree: dict[str, Any] = {}
for example in examples:
keys = (
example.output_path.replace("examples/", "")
.replace(example.context.value.extension, "")
.split("/")
)
for snippet in example.snippets:
full_keys = keys + [snippet.title]
current = tree
for key in full_keys[:-1]:
key = to_snake_case(key)
if key not in current:
current[key] = {}
current = current[key]
current[full_keys[-1]] = asdict(snippet)
return tree
def is_excluded_line(line: str, comment_prefix: str) -> bool:
end_pattern = f"{comment_prefix} !!"
return line.strip() == end_pattern or "eslint-disable" in line or "HH-" in line
def process_line_content(line: str) -> str:
return line.replace("@hatchet/", "@hatchet-dev/typescript-sdk/")
def clean_example_content(content: str, comment_prefix: str) -> str:
lines = content.split("\n")
return "\n".join(
[
process_line_content(line)
for line in lines
if not is_excluded_line(line, comment_prefix)
]
)
GUIDES_SOURCE = "sdks/guides"
GUIDES_OUTPUT = "examples"
def _read_sdk_version(lang: str) -> str:
"""Read the published SDK version from the source package file."""
if lang == "python":
path = os.path.join(ROOT, "sdks", "python", "pyproject.toml")
for line in open(path):
if line.startswith("version = "):
return line.split('"')[1].strip()
elif lang == "typescript":
data = json.load(open(os.path.join(ROOT, "sdks", "typescript", "package.json")))
return data["version"]
elif lang == "ruby":
path = os.path.join(ROOT, "sdks", "ruby", "src", "lib", "hatchet", "version.rb")
for line in open(path):
if "VERSION" in line:
return line.split('"')[1].strip()
elif lang == "go":
# Go module uses monorepo; use Python SDK version as proxy for hatchet release
return _read_sdk_version("python")
return "0.0.0"
def copy_guide_dep_file(
lang: str,
filename: str,
use_published: bool = True,
) -> None:
"""Copy a dep file from sdks/guides/{lang}/ to examples/{lang}/guides/.
If use_published, replace local path refs with published package versions."""
src = os.path.join(ROOT, GUIDES_SOURCE, lang, filename)
out_dir = os.path.join(ROOT, GUIDES_OUTPUT, lang, "guides")
if not os.path.isfile(src) or not os.path.isdir(out_dir):
return
content = open(src).read()
if use_published:
ver = _read_sdk_version(lang)
if lang == "go":
content = content.replace("module github.com/hatchet-dev/hatchet/sdks/guides/go", "module github.com/hatchet-dev/hatchet/examples/go/guides")
go_ver = f"v{ver}" if not ver.startswith("v") else ver
content = content.replace("github.com/hatchet-dev/hatchet v0.0.0", f"github.com/hatchet-dev/hatchet {go_ver}")
content = re.sub(r"\nreplace github\.com/hatchet-dev/hatchet => \.\./\.\./\.\.\s*\n?", "\n", content)
elif lang == "python":
content = content.replace('hatchet-sdk = { path = "../../python", develop = true }', f'hatchet-sdk = "^{ver}"')
elif lang == "ruby":
content = content.replace(
'gem "hatchet-sdk", path: "../../ruby/src"',
f'gem "hatchet-sdk", "~> {ver}"',
)
elif lang == "typescript":
content = content.replace(
'"@hatchet-dev/typescript-sdk": "file:../../typescript"',
f'"@hatchet-dev/typescript-sdk": "^{ver}"',
)
with open(os.path.join(out_dir, filename), "w") as f:
f.write(content)
def write_examples(examples: list[ProcessedExample]) -> None:
for example in examples:
out_path = os.path.join(ROOT, example.output_path)
out_dir = os.path.dirname(out_path)
os.makedirs(out_dir, exist_ok=True)
with open(out_path, "w") as f:
f.write(
clean_example_content(
example.raw_content, example.context.value.comment_prefix
)
)
# Copy dep files from sdks/guides/ to examples/*/guides/ with published SDK refs
copy_guide_dep_file("go", "go.mod")
copy_guide_dep_file("python", "pyproject.toml")
copy_guide_dep_file("ruby", "Gemfile")
copy_guide_dep_file("typescript", "package.json")
class JavaScriptObjectDecoder(json.JSONDecoder):
def replacement(self, match: re.Match[str]) -> str:
indent = match.group(1)
key = match.group(2)
return f'{indent}"{key}":'
def decode(self, s: str, _w: Callable[..., Any] = re.compile(r"\s").match) -> Any: # type: ignore[override]
pattern = r"^(\s*)([a-zA-Z_$][a-zA-Z0-9_$-]*)\s*:"
quoted = re.sub(pattern, self.replacement, s)
result = re.sub(pattern, self.replacement, quoted, flags=re.MULTILINE)
result = re.sub(
r"(\{\s*)([a-zA-Z_$][a-zA-Z0-9_$-]*)\s*:",
r'\1"\2":',
result,
)
result = re.sub(r",(\s*\n?\s*})(\s*);?", r"\1", result)
return super().decode(result)
def is_doc_page(key: str, children: str | dict[str, Any]) -> bool:
if key.strip().startswith("--"):
return False
if isinstance(children, str):
return True
return "title" in children
def extract_doc_name(value: str | dict[str, Any]) -> str:
if isinstance(value, str):
return value
if "title" in value:
return value["title"]
raise ValueError(f"Invalid doc value: {value}")
def keys_to_path(keys: list[str]) -> str:
keys = [k for k in keys if k]
if len(keys) == 0:
return ""
if len(keys) == 1:
return "/" + keys[0]
return "/" + "/".join(keys).replace("//", "/").rstrip("/")
def write_doc_index_to_app() -> None:
docs_root = os.path.join(ROOT, "frontend", "docs")
pages_dir = os.path.join(docs_root, "pages/")
path = docs_root + "/**/_meta.js"
tree: dict[str, Any] = {}
for filename in glob.iglob(path, recursive=True):
with open(filename) as f:
content = f.read().replace("export default ", "").strip().rstrip(";")
parsed_meta = cast(
dict[str, Any], json.loads(content, cls=JavaScriptObjectDecoder)
)
keys = (
filename.replace(pages_dir, "")
.replace("_meta.js", "")
.rstrip("/")
.split("/")
)
docs = {
key: extract_doc_name(value)
for key, value in parsed_meta.items()
if is_doc_page(key, value)
}
for key, title in docs.items():
key = key.strip() or "index"
full_keys = keys + [key]
full_keys = [k for k in full_keys]
current = tree
for k in full_keys[:-1]:
k = k or "index"
if k not in current:
current[k] = {}
elif isinstance(current[k], str):
break
current = current[k]
else:
current[full_keys[-1]] = asdict(
DocumentationPage(
title=title,
href=f"https://docs.hatchet.run{keys_to_path(full_keys[:-1])}/{key}",
)
)
out_dir = os.path.join(ROOT, "frontend", "app", "src", "lib", "generated", "docs")
os.makedirs(out_dir, exist_ok=True)
with open(os.path.join(out_dir, "index.ts"), "w") as f:
f.write("export const docsPages = ")
json.dump(tree, f, indent=2)
f.write(" as const;\n")
if __name__ == "__main__":
processed_examples = process_examples() + process_guides()
tree = create_snippet_tree(processed_examples)
print(f"Writing snippets to {OUTPUT_DIR}/index.ts")
os.makedirs(OUTPUT_DIR, exist_ok=True)
with open(os.path.join(OUTPUT_DIR, "index.ts"), "w") as f:
f.write("export const snippets = ")
json.dump(tree, f, indent=2)
f.write(" as const;\n")
language_union = ' | '.join([f"'{v.name.lower()}'" for v in SDKParsingContext])
snippet_type = (
"export type Snippet = {\n"
" title: string;\n"
" content: string;\n"
" githubUrl: string;\n"
" codePath: string;\n"
f" language: {language_union}\n"
"};\n"
)
print(f"Writing snippet type to {BASE_SNIPPETS_DIR}/snippet.ts")
with open(os.path.join(BASE_SNIPPETS_DIR, "snippet.ts"), "w") as f:
f.write(snippet_type)
write_examples(processed_examples)
write_doc_index_to_app()