Feat: Dynamic worker label assign (#3137)

* feat: initial wiring work on desired labels

* feat: initial wiring

* chore: gen python

* fix: use the whole desired label thing instead

* fix: more wiring, improve types

* fix: sql type

* fix: len check

* chore: gen python

* fix: initial plural label work

* fix: store the labels properly on the task

* fix: skip cache on override

* fix: bug

* fix: scoping bug whoops

* chore: lint

* fix: send labels back over the api correctly

* feat: python test

* fix: lint

* fix: comment

* fix: override

* fix: namespaces, ugh

* fix: no need for error here

* chore: version

* feat: ruby, go, ts

* feat: versions

* fix: appease the rubocop

* chore: lint

* chore: bundle install

* fix: tests

* chore: lint

* chore: lint more

* fix: ts test

* fix: rb

* chore: gen

* chore: reset gemfile

* chore: reset changelog

* fix: pgroup

* fix: tests, part i

* Revert "chore: reset changelog"

This reverts commit b63bf7d3e5.

* Revert "chore: reset gemfile"

This reverts commit bb848bb6f0.

* fix: go -> golang mapping hack

* fix: go enums

* fix: appease the cop

* fix: namespace

* chore: gen
This commit is contained in:
matt
2026-03-04 08:03:58 -08:00
committed by GitHub
parent 7acc5c611d
commit 6c29e48204
71 changed files with 2994 additions and 658 deletions
@@ -0,0 +1,97 @@
import pytest
from hatchet_sdk import Hatchet, TriggerWorkflowOptions
from hatchet_sdk.labels import DesiredWorkerLabel
from subprocess import Popen
from typing import Any, Generator
from examples.runtime_affinity.worker import affinity_example_task
from random import choice
from conftest import _on_demand_worker_fixture
labels = ["foo", "bar"]
@pytest.fixture(scope="session")
def on_demand_worker_a(
request: pytest.FixtureRequest,
) -> Generator[Popen[bytes], None, None]:
yield from _on_demand_worker_fixture(request)
@pytest.fixture(scope="session")
def on_demand_worker_b(
request: pytest.FixtureRequest,
) -> Generator[Popen[bytes], None, None]:
yield from _on_demand_worker_fixture(request)
@pytest.mark.parametrize(
"on_demand_worker_a",
[
(
[
"poetry",
"run",
"python",
"examples/runtime_affinity/worker.py",
"--label",
labels[0],
],
8003,
)
],
indirect=True,
)
@pytest.mark.parametrize(
"on_demand_worker_b",
[
(
[
"poetry",
"run",
"python",
"examples/runtime_affinity/worker.py",
"--label",
labels[1],
],
8004,
)
],
indirect=True,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_runtime_affinity(
hatchet: Hatchet,
on_demand_worker_a: Popen[Any],
on_demand_worker_b: Popen[Any],
) -> None:
workers = [
w
for w in (await hatchet.workers.aio_list()).rows or []
if w.status == "ACTIVE"
and w.name == hatchet.config.apply_namespace("runtime-affinity-worker")
]
assert len(workers) == 2
worker_label_to_id = {
label.value: worker.metadata.id
for worker in workers
for label in (worker.labels or [])
if label.key == "affinity" and label.value in labels
}
assert set(worker_label_to_id.keys()) == set(labels)
for _ in range(20):
target_worker = choice(labels)
res = await affinity_example_task.aio_run(
options=TriggerWorkflowOptions(
desired_worker_label={
"affinity": DesiredWorkerLabel(
value=target_worker,
required=True,
),
}
)
)
assert res.worker_id == worker_label_to_id[target_worker]
@@ -0,0 +1,33 @@
import argparse
from hatchet_sdk import Context, EmptyModel, Hatchet
from pydantic import BaseModel
hatchet = Hatchet(debug=True)
class AffinityResult(BaseModel):
worker_id: str
@hatchet.task()
async def affinity_example_task(i: EmptyModel, c: Context) -> AffinityResult:
return AffinityResult(worker_id=c.worker_id)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--label", type=str, required=True)
args = parser.parse_args()
worker = hatchet.worker(
"runtime-affinity-worker",
labels={"affinity": args.label},
workflows=[affinity_example_task],
)
worker.start()
if __name__ == "__main__":
main()