Files
hatchet/python-sdk/examples/manual_trigger/trigger-generator.py
Gabe Ruttner 45813fdb1b feat: dashboard playground ui (#162)
* hotfix: add repository for npm publish

* release(py-sdk): bump version

* chore: ignore venv

* feat(dashboard): collapsible sidebar

* fix: hangup workflow listener when workflow run finishes (#161)

* wip: class based listener pattern

* fix: workflow run listener hangups

* fix: hang up workflow listener on finished

* fix: case for current workflow run

* address review comments

* bump version

---------

Co-authored-by: g <gabriel.ruttner@gmail.com>

* wip: focus state and flat playground

* fix: focus state

* feat: unify state

* wip: playground state

* cleanup: rm logging

* fix: default output

* cleanup: rm deadcode

* feat: can replay individual steps

* feat: icon tabs

* feat: sticky output

* cleanup: linting

---------

Co-authored-by: abelanger5 <belanger@sas.upenn.edu>
2024-02-13 10:24:46 -07:00

32 lines
1.1 KiB
Python

from hatchet_sdk import Hatchet
from dotenv import load_dotenv
import json
class StepRunEventType:
STEP_RUN_EVENT_TYPE_STARTED = 'STEP_RUN_EVENT_TYPE_STARTED'
STEP_RUN_EVENT_TYPE_COMPLETED = 'STEP_RUN_EVENT_TYPE_COMPLETED'
STEP_RUN_EVENT_TYPE_FAILED = 'STEP_RUN_EVENT_TYPE_FAILED'
STEP_RUN_EVENT_TYPE_CANCELLED = 'STEP_RUN_EVENT_TYPE_CANCELLED'
STEP_RUN_EVENT_TYPE_TIMED_OUT = 'STEP_RUN_EVENT_TYPE_TIMED_OUT'
load_dotenv()
hatchet = Hatchet().client
workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {
"test": "test"
})
listener = hatchet.listener.stream(workflowRunId)
for event in listener:
# TODO FIXME step run is not exported easily from the hatchet_sdk and event type and event.step is not defined on
# the event object, so fix this before merging...
# if event.step == 'step2' and event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED:
# listener.abort()
if event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED:
print('Step completed: ' + json.dumps(event.payload))
print('Workflow finished')