mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-03-18 10:42:44 -05:00
530 lines
16 KiB
Python
530 lines
16 KiB
Python
import glob
|
|
import json
|
|
import os
|
|
import re
|
|
from dataclasses import asdict, dataclass
|
|
from enum import Enum
|
|
from typing import Any, Callable, cast
|
|
|
|
ROOT = "../../"
|
|
BASE_SNIPPETS_DIR = os.path.join(ROOT, "frontend", "docs", "lib")
|
|
OUTPUT_DIR = os.path.join(BASE_SNIPPETS_DIR, "generated", "snippets")
|
|
OUTPUT_GITHUB_ORG = "hatchet-dev"
|
|
OUTPUT_GITHUB_REPO = "hatchet"
|
|
IGNORED_FILE_PATTERNS = [
|
|
r"__init__\.py$",
|
|
r"test_.*\.py$",
|
|
r"\.test\.ts$",
|
|
r"\.test-d\.ts$",
|
|
r"test_.*\.go$",
|
|
r"_test\.go$",
|
|
r"\.e2e\.ts$",
|
|
r"test_.*_spec\.rb$",
|
|
r"spec_helper\.rb$",
|
|
r"Gemfile",
|
|
r"\.rspec$",
|
|
r"README\.md$",
|
|
]
|
|
|
|
GUIDES_BASE = "sdks/guides"
|
|
|
|
|
|
@dataclass
|
|
class ParsingContext:
|
|
example_path: str
|
|
extension: str
|
|
comment_prefix: str
|
|
|
|
|
|
class SDKParsingContext(Enum):
|
|
PYTHON = ParsingContext(
|
|
example_path="sdks/python/examples", extension=".py", comment_prefix="#"
|
|
)
|
|
TYPESCRIPT = ParsingContext(
|
|
example_path="sdks/typescript/src/v1/examples",
|
|
extension=".ts",
|
|
comment_prefix="//",
|
|
)
|
|
GO = ParsingContext(
|
|
example_path="sdks/go/examples", extension=".go", comment_prefix="//"
|
|
)
|
|
RUBY = ParsingContext(
|
|
example_path="sdks/ruby/examples", extension=".rb", comment_prefix="#"
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Snippet:
|
|
title: str
|
|
content: str
|
|
githubUrl: str
|
|
language: str
|
|
codePath: str
|
|
|
|
|
|
@dataclass
|
|
class ProcessedExample:
|
|
context: SDKParsingContext
|
|
filepath: str
|
|
snippets: list[Snippet]
|
|
raw_content: str
|
|
output_path: str
|
|
|
|
|
|
@dataclass
|
|
class DocumentationPage:
|
|
title: str
|
|
href: str
|
|
|
|
|
|
Title = str
|
|
Content = str
|
|
|
|
|
|
def to_snake_case(text):
|
|
text = re.sub(r"[^a-zA-Z0-9\s\-_]", "", text)
|
|
text = re.sub(r"[-\s]+", "_", text)
|
|
text = re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", text)
|
|
text = re.sub(r"([A-Z])([A-Z][a-z])", r"\1_\2", text)
|
|
text = re.sub(r"_+", "_", text)
|
|
return text.strip("_").lower()
|
|
|
|
|
|
Title = str
|
|
Content = str
|
|
|
|
|
|
def dedent_code(code: str) -> str:
|
|
lines = code.split("\n")
|
|
if not lines:
|
|
return code
|
|
|
|
min_indent = min((len(line) - len(line.lstrip())) for line in lines if line.strip())
|
|
|
|
dedented_lines = [
|
|
line[min_indent:] if len(line) >= min_indent else line for line in lines
|
|
]
|
|
|
|
return "\n".join(dedented_lines).strip() + "\n"
|
|
|
|
|
|
def parse_snippet_from_block(match: re.Match[str]) -> tuple[Title, Content]:
|
|
title = to_snake_case(match.group(1).strip())
|
|
code = match.group(2)
|
|
|
|
return title, dedent_code(code)
|
|
|
|
|
|
def parse_snippets(ctx: SDKParsingContext, filename: str) -> list[Snippet]:
|
|
comment_prefix = re.escape(ctx.value.comment_prefix)
|
|
pattern = rf"{comment_prefix} >\s+(.+?)\n(.*?){comment_prefix} !!"
|
|
|
|
subdir = ctx.value.example_path.rstrip("/").lstrip("/")
|
|
base_path = ROOT + subdir
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
|
|
code_path = f"examples/{ctx.name.lower()}{filename.replace(base_path, '')}"
|
|
|
|
github_url = f"https://github.com/{OUTPUT_GITHUB_ORG}/{OUTPUT_GITHUB_REPO}/tree/main/{code_path}"
|
|
|
|
matches = list(re.finditer(pattern, content, re.DOTALL))
|
|
|
|
if not matches:
|
|
return [
|
|
Snippet(
|
|
title="all",
|
|
content=content,
|
|
githubUrl=github_url,
|
|
language=ctx.name.lower(),
|
|
codePath=code_path,
|
|
)
|
|
]
|
|
|
|
return [
|
|
Snippet(
|
|
title=x[0],
|
|
content=x[1],
|
|
githubUrl=github_url,
|
|
language=ctx.name.lower(),
|
|
codePath=code_path,
|
|
)
|
|
for match in matches
|
|
if (x := parse_snippet_from_block(match))
|
|
]
|
|
|
|
|
|
def process_example(ctx: SDKParsingContext, filename: str) -> ProcessedExample:
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
return ProcessedExample(
|
|
context=ctx,
|
|
filepath=filename,
|
|
output_path=f"examples/{ctx.name.lower()}{filename.replace(ROOT + ctx.value.example_path, '')}",
|
|
snippets=parse_snippets(ctx, filename),
|
|
raw_content=content,
|
|
)
|
|
|
|
|
|
def process_examples() -> list[ProcessedExample]:
|
|
examples: list[ProcessedExample] = []
|
|
|
|
for ctx in SDKParsingContext:
|
|
subdir = ctx.value.example_path.rstrip("/").lstrip("/")
|
|
base_path = ROOT + subdir
|
|
path = base_path + "/**/*" + ctx.value.extension
|
|
|
|
examples.extend(
|
|
[
|
|
process_example(ctx, filename)
|
|
for filename in glob.iglob(path, recursive=True)
|
|
if not any(
|
|
re.search(pattern, filename) for pattern in IGNORED_FILE_PATTERNS
|
|
)
|
|
]
|
|
)
|
|
|
|
return examples
|
|
|
|
|
|
GUIDES_LANG_TO_CTX: dict[str, SDKParsingContext] = {
|
|
"python": SDKParsingContext.PYTHON,
|
|
"typescript": SDKParsingContext.TYPESCRIPT,
|
|
"go": SDKParsingContext.GO,
|
|
"ruby": SDKParsingContext.RUBY,
|
|
}
|
|
|
|
|
|
def process_guides() -> list[ProcessedExample]:
|
|
"""Process guide examples from sdks/guides/{lang}/ into examples/{lang}/guides/."""
|
|
examples: list[ProcessedExample] = []
|
|
|
|
for lang_dir, ctx in GUIDES_LANG_TO_CTX.items():
|
|
guides_base = os.path.join(ROOT, GUIDES_BASE, lang_dir)
|
|
if not os.path.isdir(guides_base):
|
|
continue
|
|
|
|
pattern = guides_base + "/**/*" + ctx.value.extension
|
|
|
|
for filename in glob.iglob(pattern, recursive=True):
|
|
if any(re.search(p, filename) for p in IGNORED_FILE_PATTERNS):
|
|
continue
|
|
|
|
with open(filename) as f:
|
|
content = f.read()
|
|
|
|
rel_path = filename.replace(guides_base, "")
|
|
output_path = f"examples/{ctx.name.lower()}/guides{rel_path}"
|
|
code_path = output_path
|
|
|
|
github_url = f"https://github.com/{OUTPUT_GITHUB_ORG}/{OUTPUT_GITHUB_REPO}/tree/main/{code_path}"
|
|
|
|
comment_prefix = re.escape(ctx.value.comment_prefix)
|
|
snippet_pattern = rf"{comment_prefix} >\s+(.+?)\n(.*?){comment_prefix} !!"
|
|
matches = list(re.finditer(snippet_pattern, content, re.DOTALL))
|
|
|
|
if not matches:
|
|
snippets = [
|
|
Snippet(
|
|
title="all",
|
|
content=content,
|
|
githubUrl=github_url,
|
|
language=ctx.name.lower(),
|
|
codePath=code_path,
|
|
)
|
|
]
|
|
else:
|
|
snippets = [
|
|
Snippet(
|
|
title=x[0],
|
|
content=x[1],
|
|
githubUrl=github_url,
|
|
language=ctx.name.lower(),
|
|
codePath=code_path,
|
|
)
|
|
for match in matches
|
|
if (x := parse_snippet_from_block(match))
|
|
]
|
|
|
|
examples.append(
|
|
ProcessedExample(
|
|
context=ctx,
|
|
filepath=filename,
|
|
output_path=output_path,
|
|
snippets=snippets,
|
|
raw_content=content,
|
|
)
|
|
)
|
|
|
|
return examples
|
|
|
|
|
|
def create_snippet_tree(examples: list[ProcessedExample]) -> dict[str, dict[str, Any]]:
|
|
tree: dict[str, Any] = {}
|
|
|
|
for example in examples:
|
|
keys = (
|
|
example.output_path.replace("examples/", "")
|
|
.replace(example.context.value.extension, "")
|
|
.split("/")
|
|
)
|
|
|
|
for snippet in example.snippets:
|
|
full_keys = keys + [snippet.title]
|
|
|
|
current = tree
|
|
for key in full_keys[:-1]:
|
|
key = to_snake_case(key)
|
|
if key not in current:
|
|
current[key] = {}
|
|
current = current[key]
|
|
|
|
current[full_keys[-1]] = asdict(snippet)
|
|
|
|
return tree
|
|
|
|
|
|
def is_excluded_line(line: str, comment_prefix: str) -> bool:
|
|
end_pattern = f"{comment_prefix} !!"
|
|
return line.strip() == end_pattern or "eslint-disable" in line or "HH-" in line
|
|
|
|
|
|
def process_line_content(line: str) -> str:
|
|
return line.replace("@hatchet/", "@hatchet-dev/typescript-sdk/")
|
|
|
|
|
|
def clean_example_content(content: str, comment_prefix: str) -> str:
|
|
lines = content.split("\n")
|
|
|
|
return "\n".join(
|
|
[
|
|
process_line_content(line)
|
|
for line in lines
|
|
if not is_excluded_line(line, comment_prefix)
|
|
]
|
|
)
|
|
|
|
|
|
GUIDES_SOURCE = "sdks/guides"
|
|
GUIDES_OUTPUT = "examples"
|
|
|
|
|
|
def _read_sdk_version(lang: str) -> str:
|
|
"""Read the published SDK version from the source package file."""
|
|
if lang == "python":
|
|
path = os.path.join(ROOT, "sdks", "python", "pyproject.toml")
|
|
for line in open(path):
|
|
if line.startswith("version = "):
|
|
return line.split('"')[1].strip()
|
|
elif lang == "typescript":
|
|
data = json.load(open(os.path.join(ROOT, "sdks", "typescript", "package.json")))
|
|
return data["version"]
|
|
elif lang == "ruby":
|
|
path = os.path.join(ROOT, "sdks", "ruby", "src", "lib", "hatchet", "version.rb")
|
|
for line in open(path):
|
|
if "VERSION" in line:
|
|
return line.split('"')[1].strip()
|
|
elif lang == "go":
|
|
# Go module uses monorepo; use Python SDK version as proxy for hatchet release
|
|
return _read_sdk_version("python")
|
|
return "0.0.0"
|
|
|
|
|
|
def copy_guide_dep_file(
|
|
lang: str,
|
|
filename: str,
|
|
use_published: bool = True,
|
|
) -> None:
|
|
"""Copy a dep file from sdks/guides/{lang}/ to examples/{lang}/guides/.
|
|
If use_published, replace local path refs with published package versions."""
|
|
src = os.path.join(ROOT, GUIDES_SOURCE, lang, filename)
|
|
out_dir = os.path.join(ROOT, GUIDES_OUTPUT, lang, "guides")
|
|
if not os.path.isfile(src) or not os.path.isdir(out_dir):
|
|
return
|
|
content = open(src).read()
|
|
|
|
if use_published:
|
|
ver = _read_sdk_version(lang)
|
|
if lang == "go":
|
|
content = content.replace("module github.com/hatchet-dev/hatchet/sdks/guides/go", "module github.com/hatchet-dev/hatchet/examples/go/guides")
|
|
go_ver = f"v{ver}" if not ver.startswith("v") else ver
|
|
content = content.replace("github.com/hatchet-dev/hatchet v0.0.0", f"github.com/hatchet-dev/hatchet {go_ver}")
|
|
content = re.sub(r"\nreplace github\.com/hatchet-dev/hatchet => \.\./\.\./\.\.\s*\n?", "\n", content)
|
|
elif lang == "python":
|
|
content = content.replace('hatchet-sdk = { path = "../../python", develop = true }', f'hatchet-sdk = "^{ver}"')
|
|
elif lang == "ruby":
|
|
content = content.replace(
|
|
'gem "hatchet-sdk", path: "../../ruby/src"',
|
|
f'gem "hatchet-sdk", "~> {ver}"',
|
|
)
|
|
elif lang == "typescript":
|
|
content = content.replace(
|
|
'"@hatchet-dev/typescript-sdk": "file:../../typescript"',
|
|
f'"@hatchet-dev/typescript-sdk": "^{ver}"',
|
|
)
|
|
|
|
with open(os.path.join(out_dir, filename), "w") as f:
|
|
f.write(content)
|
|
|
|
|
|
def write_examples(examples: list[ProcessedExample]) -> None:
|
|
for example in examples:
|
|
out_path = os.path.join(ROOT, example.output_path)
|
|
out_dir = os.path.dirname(out_path)
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
with open(out_path, "w") as f:
|
|
f.write(
|
|
clean_example_content(
|
|
example.raw_content, example.context.value.comment_prefix
|
|
)
|
|
)
|
|
|
|
# Copy dep files from sdks/guides/ to examples/*/guides/ with published SDK refs
|
|
copy_guide_dep_file("go", "go.mod")
|
|
copy_guide_dep_file("python", "pyproject.toml")
|
|
copy_guide_dep_file("ruby", "Gemfile")
|
|
copy_guide_dep_file("typescript", "package.json")
|
|
|
|
|
|
class JavaScriptObjectDecoder(json.JSONDecoder):
|
|
def replacement(self, match: re.Match[str]) -> str:
|
|
indent = match.group(1)
|
|
key = match.group(2)
|
|
return f'{indent}"{key}":'
|
|
|
|
def decode(self, s: str, _w: Callable[..., Any] = re.compile(r"\s").match) -> Any: # type: ignore[override]
|
|
pattern = r"^(\s*)([a-zA-Z_$][a-zA-Z0-9_$-]*)\s*:"
|
|
quoted = re.sub(pattern, self.replacement, s)
|
|
result = re.sub(pattern, self.replacement, quoted, flags=re.MULTILINE)
|
|
result = re.sub(
|
|
r"(\{\s*)([a-zA-Z_$][a-zA-Z0-9_$-]*)\s*:",
|
|
r'\1"\2":',
|
|
result,
|
|
)
|
|
result = re.sub(r",(\s*\n?\s*})(\s*);?", r"\1", result)
|
|
|
|
return super().decode(result)
|
|
|
|
|
|
def is_doc_page(key: str, children: str | dict[str, Any]) -> bool:
|
|
if key.strip().startswith("--"):
|
|
return False
|
|
|
|
if isinstance(children, str):
|
|
return True
|
|
|
|
return "title" in children
|
|
|
|
|
|
def extract_doc_name(value: str | dict[str, Any]) -> str:
|
|
if isinstance(value, str):
|
|
return value
|
|
|
|
if "title" in value:
|
|
return value["title"]
|
|
|
|
raise ValueError(f"Invalid doc value: {value}")
|
|
|
|
|
|
def keys_to_path(keys: list[str]) -> str:
|
|
keys = [k for k in keys if k]
|
|
|
|
if len(keys) == 0:
|
|
return ""
|
|
|
|
if len(keys) == 1:
|
|
return "/" + keys[0]
|
|
|
|
return "/" + "/".join(keys).replace("//", "/").rstrip("/")
|
|
|
|
|
|
def write_doc_index_to_app() -> None:
|
|
docs_root = os.path.join(ROOT, "frontend", "docs")
|
|
pages_dir = os.path.join(docs_root, "pages/")
|
|
|
|
path = docs_root + "/**/_meta.js"
|
|
tree: dict[str, Any] = {}
|
|
|
|
for filename in glob.iglob(path, recursive=True):
|
|
with open(filename) as f:
|
|
content = f.read().replace("export default ", "").strip().rstrip(";")
|
|
parsed_meta = cast(
|
|
dict[str, Any], json.loads(content, cls=JavaScriptObjectDecoder)
|
|
)
|
|
|
|
keys = (
|
|
filename.replace(pages_dir, "")
|
|
.replace("_meta.js", "")
|
|
.rstrip("/")
|
|
.split("/")
|
|
)
|
|
docs = {
|
|
key: extract_doc_name(value)
|
|
for key, value in parsed_meta.items()
|
|
if is_doc_page(key, value)
|
|
}
|
|
|
|
for key, title in docs.items():
|
|
key = key.strip() or "index"
|
|
full_keys = keys + [key]
|
|
full_keys = [k for k in full_keys]
|
|
|
|
current = tree
|
|
for k in full_keys[:-1]:
|
|
k = k or "index"
|
|
if k not in current:
|
|
current[k] = {}
|
|
elif isinstance(current[k], str):
|
|
break
|
|
|
|
current = current[k]
|
|
else:
|
|
current[full_keys[-1]] = asdict(
|
|
DocumentationPage(
|
|
title=title,
|
|
href=f"https://docs.hatchet.run{keys_to_path(full_keys[:-1])}/{key}",
|
|
)
|
|
)
|
|
|
|
out_dir = os.path.join(ROOT, "frontend", "app", "src", "lib", "generated", "docs")
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
|
|
with open(os.path.join(out_dir, "index.ts"), "w") as f:
|
|
f.write("export const docsPages = ")
|
|
json.dump(tree, f, indent=2)
|
|
f.write(" as const;\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
processed_examples = process_examples() + process_guides()
|
|
|
|
tree = create_snippet_tree(processed_examples)
|
|
|
|
print(f"Writing snippets to {OUTPUT_DIR}/index.ts")
|
|
os.makedirs(OUTPUT_DIR, exist_ok=True)
|
|
|
|
with open(os.path.join(OUTPUT_DIR, "index.ts"), "w") as f:
|
|
f.write("export const snippets = ")
|
|
json.dump(tree, f, indent=2)
|
|
f.write(" as const;\n")
|
|
|
|
language_union = ' | '.join([f"'{v.name.lower()}'" for v in SDKParsingContext])
|
|
snippet_type = (
|
|
"export type Snippet = {\n"
|
|
" title: string;\n"
|
|
" content: string;\n"
|
|
" githubUrl: string;\n"
|
|
" codePath: string;\n"
|
|
f" language: {language_union}\n"
|
|
"};\n"
|
|
)
|
|
|
|
print(f"Writing snippet type to {BASE_SNIPPETS_DIR}/snippet.ts")
|
|
with open(os.path.join(BASE_SNIPPETS_DIR, "snippet.ts"), "w") as f:
|
|
f.write(snippet_type)
|
|
|
|
write_examples(processed_examples)
|
|
write_doc_index_to_app()
|