feat: dashboard playground ui (#162)

* hotfix: add repository for npm publish

* release(py-sdk): bump version

* chore: ignore venv

* feat(dashboard): collapsible sidebar

* fix: hangup workflow listener when workflow run finishes (#161)

* wip: class based listener pattern

* fix: workflow run listener hangups

* fix: hang up workflow listener on finished

* fix: case for current workflow run

* address review comments

* bump version

---------

Co-authored-by: g <gabriel.ruttner@gmail.com>

* wip: focus state and flat playground

* fix: focus state

* feat: unify state

* wip: playground state

* cleanup: rm logging

* fix: default output

* cleanup: rm deadcode

* feat: can replay individual steps

* feat: icon tabs

* feat: sticky output

* cleanup: linting

---------

Co-authored-by: abelanger5 <belanger@sas.upenn.edu>
This commit is contained in:
Gabe Ruttner
2024-02-13 12:24:46 -05:00
committed by GitHub
parent 8d86f63300
commit 45813fdb1b
34 changed files with 1146 additions and 426 deletions

3
.gitignore vendored
View File

@@ -24,6 +24,7 @@ dump.rdb
*.pfx
*.cert
.next
.venv
node_modules
@@ -50,6 +51,8 @@ node_modules
# Crash log files
crash.log
.venv
# Exclude all .tfvars files, which are likely to contain sentitive data, such as
# password, private keys, and other secrets. These should not be part of version
# control as they are data points which are potentially sensitive and subject

View File

@@ -206,6 +206,10 @@ message WorkflowEvent {
// the event payload
string eventPayload = 6;
// whether this is the last event for the workflow run - server
// will hang up the connection but clients might want to case
bool hangup = 7;
}
message OverridesData {

View File

@@ -63,7 +63,7 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu
}
// update step run
_, err = t.config.Repository.StepRun().UpdateStepRun(tenant.ID, stepRun.ID, &repository.UpdateStepRunOpts{
_, _, err = t.config.Repository.StepRun().UpdateStepRun(tenant.ID, stepRun.ID, &repository.UpdateStepRunOpts{
Input: inputBytes,
Status: repository.StepRunStatusPtr(db.StepRunStatusPending),
IsRerun: true,

View File

@@ -35,6 +35,9 @@
"@radix-ui/react-slot": "^1.0.2",
"@radix-ui/react-tabs": "^1.0.4",
"@radix-ui/react-toast": "^1.1.5",
"@rjsf/core": "^5.17.0",
"@rjsf/utils": "^5.17.0",
"@rjsf/validator-ajv8": "^5.17.0",
"@tanstack/react-query": "^5.12.1",
"@tanstack/react-table": "^8.10.7",
"@visx/axis": "^3.5.0",
@@ -62,6 +65,7 @@
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-hook-form": "^7.48.2",
"react-icons": "^5.0.1",
"react-router-dom": "^6.20.0",
"react-syntax-highlighter": "^15.5.0",
"reactflow": "^11.10.3",

View File

@@ -62,6 +62,15 @@ dependencies:
'@radix-ui/react-toast':
specifier: ^1.1.5
version: 1.1.5(@types/react-dom@18.2.17)(@types/react@18.2.39)(react-dom@18.2.0)(react@18.2.0)
'@rjsf/core':
specifier: ^5.17.0
version: 5.17.0(@rjsf/utils@5.17.0)(react@18.2.0)
'@rjsf/utils':
specifier: ^5.17.0
version: 5.17.0(react@18.2.0)
'@rjsf/validator-ajv8':
specifier: ^5.17.0
version: 5.17.0(@rjsf/utils@5.17.0)
'@tanstack/react-query':
specifier: ^5.12.1
version: 5.12.1(react@18.2.0)
@@ -143,6 +152,9 @@ dependencies:
react-hook-form:
specifier: ^7.48.2
version: 7.48.2(react@18.2.0)
react-icons:
specifier: ^5.0.1
version: 5.0.1(react@18.2.0)
react-router-dom:
specifier: ^6.20.0
version: 6.20.0(react-dom@18.2.0)(react@18.2.0)
@@ -2068,6 +2080,49 @@ packages:
engines: {node: '>=14.0.0'}
dev: false
/@rjsf/core@5.17.0(@rjsf/utils@5.17.0)(react@18.2.0):
resolution: {integrity: sha512-0woSU+VU+t2kbDNSyMQhjxJOXJbk3F6lSHxf8XmS4yV3sXP/yr/vo7J3qcvXbSvCLPYMQHvskBFhCIaQqyHWBg==}
engines: {node: '>=14'}
peerDependencies:
'@rjsf/utils': ^5.16.x
react: ^16.14.0 || >=17
dependencies:
'@rjsf/utils': 5.17.0(react@18.2.0)
lodash: 4.17.21
lodash-es: 4.17.21
markdown-to-jsx: 7.4.1(react@18.2.0)
nanoid: 3.3.7
prop-types: 15.8.1
react: 18.2.0
dev: false
/@rjsf/utils@5.17.0(react@18.2.0):
resolution: {integrity: sha512-Hy2uAxMKWZIZSMzc2AiHrdACYvHj9GDynrdApMgUTxfjpzj5DT7Rghl/FGj7gg8Zy8VtdVNTCbkIzfS8xt4x7g==}
engines: {node: '>=14'}
peerDependencies:
react: ^16.14.0 || >=17
dependencies:
json-schema-merge-allof: 0.8.1
jsonpointer: 5.0.1
lodash: 4.17.21
lodash-es: 4.17.21
react: 18.2.0
react-is: 18.2.0
dev: false
/@rjsf/validator-ajv8@5.17.0(@rjsf/utils@5.17.0):
resolution: {integrity: sha512-ZLTpvZDzBt1+Wftao2AkpRaSvxaVRrutvFX3/oy640/KsWUfl0ofV33ai9O4aptKSnOPjfRiLqPJgbPHgQAhmw==}
engines: {node: '>=14'}
peerDependencies:
'@rjsf/utils': ^5.16.x
dependencies:
'@rjsf/utils': 5.17.0(react@18.2.0)
ajv: 8.12.0
ajv-formats: 2.1.1(ajv@8.12.0)
lodash: 4.17.21
lodash-es: 4.17.21
dev: false
/@rollup/pluginutils@4.2.1:
resolution: {integrity: sha512-iKnFXr7NkdZAIHiIWE+BX5ULi/ucVFYWD6TbAV+rZctiRTY2PL6tsIKhoIOaoskiWAkgu+VsbXgUVDNLHf+InQ==}
engines: {node: '>= 8.0.0'}
@@ -2834,6 +2889,17 @@ packages:
hasBin: true
dev: true
/ajv-formats@2.1.1(ajv@8.12.0):
resolution: {integrity: sha512-Wx0Kx52hxE7C18hkMEggYlEifqWZtYaRgouJor+WMdPnQyEK13vgEWyVNup7SoeeoLMsr4kf5h6dOW11I15MUA==}
peerDependencies:
ajv: ^8.0.0
peerDependenciesMeta:
ajv:
optional: true
dependencies:
ajv: 8.12.0
dev: false
/ajv@6.12.6:
resolution: {integrity: sha512-j3fVLgvTo527anyYyJOGTYJbG+vnnQYvE0m5mmkc1TK+nxAppkCLMIL0aZ4dblVCNoGShhm+kzE4ZUykBoMg4g==}
dependencies:
@@ -2843,6 +2909,15 @@ packages:
uri-js: 4.4.1
dev: true
/ajv@8.12.0:
resolution: {integrity: sha512-sRu1kpcO9yLtYxBKvqfTeh9KzZEwO3STyX1HT+4CaDzC6HpTGYhIhPIzj9XuKU7KYDwnaeh5hcOwjy1QuJzBPA==}
dependencies:
fast-deep-equal: 3.1.3
json-schema-traverse: 1.0.0
require-from-string: 2.0.2
uri-js: 4.4.1
dev: false
/ansi-regex@5.0.1:
resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==}
engines: {node: '>=8'}
@@ -3193,6 +3268,23 @@ packages:
resolution: {integrity: sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==}
engines: {node: '>= 6'}
/compute-gcd@1.2.1:
resolution: {integrity: sha512-TwMbxBNz0l71+8Sc4czv13h4kEqnchV9igQZBi6QUaz09dnz13juGnnaWWJTRsP3brxOoxeB4SA2WELLw1hCtg==}
dependencies:
validate.io-array: 1.0.6
validate.io-function: 1.0.2
validate.io-integer-array: 1.0.0
dev: false
/compute-lcm@1.1.2:
resolution: {integrity: sha512-OFNPdQAXnQhDSKioX8/XYT6sdUlXwpeMjfd6ApxMJfyZ4GxmLR1xvMERctlYhlHwIiz6CSpBc2+qYKjHGZw4TQ==}
dependencies:
compute-gcd: 1.2.1
validate.io-array: 1.0.6
validate.io-function: 1.0.2
validate.io-integer-array: 1.0.0
dev: false
/concat-map@0.0.1:
resolution: {integrity: sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==}
@@ -3973,7 +4065,6 @@ packages:
/fast-deep-equal@3.1.3:
resolution: {integrity: sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==}
dev: true
/fast-diff@1.3.0:
resolution: {integrity: sha512-VxPP4NqbUjj6MaAOafWeUn2cXWLcCtljklUtZf0Ind4XQ+QPtmA0b18zZy0jIQx+ExRVCR/ZQpBmik5lXshNsw==}
@@ -4612,10 +4703,29 @@ packages:
resolution: {integrity: sha512-4bV5BfR2mqfQTJm+V5tPPdf+ZpuhiIvTuAB5g8kcrXOZpTT/QwwVRWBywX1ozr6lEuPdbHxwaJlm9G6mI2sfSQ==}
dev: true
/json-schema-compare@0.2.2:
resolution: {integrity: sha512-c4WYmDKyJXhs7WWvAWm3uIYnfyWFoIp+JEoX34rctVvEkMYCPGhXtvmFFXiffBbxfZsvQ0RNnV5H7GvDF5HCqQ==}
dependencies:
lodash: 4.17.21
dev: false
/json-schema-merge-allof@0.8.1:
resolution: {integrity: sha512-CTUKmIlPJbsWfzRRnOXz+0MjIqvnleIXwFTzz+t9T86HnYX/Rozria6ZVGLktAU9e+NygNljveP+yxqtQp/Q4w==}
engines: {node: '>=12.0.0'}
dependencies:
compute-lcm: 1.1.2
json-schema-compare: 0.2.2
lodash: 4.17.21
dev: false
/json-schema-traverse@0.4.1:
resolution: {integrity: sha512-xbbCH5dCYU5T8LcEhhuh7HJ88HXuW3qsI3Y0zOZFKfZEHcpWiHU/Jxzk629Brsab/mMiHQti9wMP+845RPe3Vg==}
dev: true
/json-schema-traverse@1.0.0:
resolution: {integrity: sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==}
dev: false
/json-stable-stringify-without-jsonify@1.0.1:
resolution: {integrity: sha512-Bdboy+l7tA3OGW6FjyFHWkP5LuByj1Tk33Ljyq0axyzdk9//JSi2u3fP1QSmd1KNwq6VOKYGlAu87CisVir6Pw==}
dev: true
@@ -4633,6 +4743,11 @@ packages:
hasBin: true
dev: true
/jsonpointer@5.0.1:
resolution: {integrity: sha512-p/nXbhSEcu3pZRdkW1OfJhpsVtW1gd4Wa1fnQc9YLiTfAjn0312eMKimbdIQzuZl9aa9xUGaRlP9T/CJE/ditQ==}
engines: {node: '>=0.10.0'}
dev: false
/jsx-ast-utils@3.3.5:
resolution: {integrity: sha512-ZZow9HBI5O6EPgSJLUb8n2NKgmVWTwCvHGwFuJlMjvLFqlGG6pjirPhtdsseaLZjSibD8eegzmYpUZwoIlj2cQ==}
engines: {node: '>=4.0'}
@@ -4675,6 +4790,10 @@ packages:
p-locate: 5.0.0
dev: true
/lodash-es@4.17.21:
resolution: {integrity: sha512-mKnC+QJ9pWVzv+C4/U3rRsHapFfHvQFoFB92e52xeyGMcX6/OlIl78je1u8vePzYZSkkogMPJ2yjxxsb89cxyw==}
dev: false
/lodash.merge@4.6.2:
resolution: {integrity: sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==}
dev: true
@@ -4714,6 +4833,15 @@ packages:
engines: {node: '>=12'}
dev: false
/markdown-to-jsx@7.4.1(react@18.2.0):
resolution: {integrity: sha512-GbrbkTnHp9u6+HqbPRFJbObi369AgJNXi/sGqq5HRsoZW063xR1XDCaConqq+whfEIAlzB1YPnOgsPc7B7bc/A==}
engines: {node: '>= 10'}
peerDependencies:
react: '>= 0.14.0'
dependencies:
react: 18.2.0
dev: false
/math-expression-evaluator@1.4.0:
resolution: {integrity: sha512-4vRUvPyxdO8cWULGTh9dZWL2tZK6LDBvj+OGHBER7poH9Qdt7kXEoj20wiz4lQUbUXQZFjPbe5mVDo9nutizCw==}
dev: false
@@ -5126,7 +5254,6 @@ packages:
/punycode@2.3.1:
resolution: {integrity: sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==}
engines: {node: '>=6'}
dev: true
/qs@6.11.2:
resolution: {integrity: sha512-tDNIz22aBzCDxLtVH++VnTfzxlfeK5CbqohpSqpJgj1Wg/cQbStNAz3NuqCs5vV+pjBsK4x4pN9HlVh7rcYRiA==}
@@ -5157,9 +5284,21 @@ packages:
react: 18.2.0
dev: false
/react-icons@5.0.1(react@18.2.0):
resolution: {integrity: sha512-WqLZJ4bLzlhmsvme6iFdgO8gfZP17rfjYEJ2m9RsZjZ+cc4k1hTzknEz63YS1MeT50kVzoa1Nz36f4BEx+Wigw==}
peerDependencies:
react: '*'
dependencies:
react: 18.2.0
dev: false
/react-is@16.13.1:
resolution: {integrity: sha512-24e6ynE2H+OKt4kqsOvNd8kBpV65zoxbA4BVsEOB3ARVWQki/DHzaUoC5KuON/BiccDaCCTZBuOcfZs70kR8bQ==}
/react-is@18.2.0:
resolution: {integrity: sha512-xWGDIW6x921xtzPkhiULtthJHoJvBbF3q26fzloPCK0hsvxtPVelvftw3zjbHWSkR2km9Z+4uxbDDK/6Zw9B8w==}
dev: false
/react-refresh@0.14.0:
resolution: {integrity: sha512-wViHqhAd8OHeLS/IRMJjTSDHF3U9eWi62F/MledQGPdJGDhodXJ9PBLNGr6WWL7qlH12Mt3TyTpbS+hGXMjCzQ==}
engines: {node: '>=0.10.0'}
@@ -5356,6 +5495,11 @@ packages:
set-function-name: 2.0.1
dev: true
/require-from-string@2.0.2:
resolution: {integrity: sha512-Xf0nWe6RseziFMu+Ap9biiUbmplq6S9/p+7w7YXP/JBHhrUDDUhwa+vANyubuqfZWTveU//DYVGsDG7RKL/vEw==}
engines: {node: '>=0.10.0'}
dev: false
/resolve-from@4.0.0:
resolution: {integrity: sha512-pb/MYmXstAkysRFx8piNI1tGFNQIFA3vkE3Gq4EuA1dF6gHp/+vgZqsCGJapvy8N3Q+4o7FwvquPJcnZ7RYy4g==}
engines: {node: '>=4'}
@@ -5834,7 +5978,6 @@ packages:
resolution: {integrity: sha512-7rKUyy33Q1yc98pQ1DAmLtwX109F7TIfWlW1Ydo8Wl1ii1SeHieeh0HHfPeL2fMXK6z0s8ecKs9frCuLJvndBg==}
dependencies:
punycode: 2.3.1
dev: true
/use-callback-ref@1.3.0(@types/react@18.2.39)(react@18.2.0):
resolution: {integrity: sha512-3FT9PRuRdbB9HfXhEq35u4oZkvpJ5kuYbpqhCfmiZyReuRgpnhDlbr2ZEnnuS0RrJAPn6l23xjFg9kpDM+Ms7w==}
@@ -5878,6 +6021,31 @@ packages:
/util-deprecate@1.0.2:
resolution: {integrity: sha512-EPD5q1uXyFxJpCrLnCc1nHnq3gOa6DZBocAIiI2TaSCA7VCJ1UJDMagCzIkXNsUYfD1daK//LTEQ8xiIbrHtcw==}
/validate.io-array@1.0.6:
resolution: {integrity: sha512-DeOy7CnPEziggrOO5CZhVKJw6S3Yi7e9e65R1Nl/RTN1vTQKnzjfvks0/8kQ40FP/dsjRAOd4hxmJ7uLa6vxkg==}
dev: false
/validate.io-function@1.0.2:
resolution: {integrity: sha512-LlFybRJEriSuBnUhQyG5bwglhh50EpTL2ul23MPIuR1odjO7XaMLFV8vHGwp7AZciFxtYOeiSCT5st+XSPONiQ==}
dev: false
/validate.io-integer-array@1.0.0:
resolution: {integrity: sha512-mTrMk/1ytQHtCY0oNO3dztafHYyGU88KL+jRxWuzfOmQb+4qqnWmI+gykvGp8usKZOM0H7keJHEbRaFiYA0VrA==}
dependencies:
validate.io-array: 1.0.6
validate.io-integer: 1.0.5
dev: false
/validate.io-integer@1.0.5:
resolution: {integrity: sha512-22izsYSLojN/P6bppBqhgUDjCkr5RY2jd+N2a3DCAUey8ydvrZ/OkGvFPR7qfOpwR2LC5p4Ngzxz36g5Vgr/hQ==}
dependencies:
validate.io-number: 1.0.3
dev: false
/validate.io-number@1.0.3:
resolution: {integrity: sha512-kRAyotcbNaSYoDnXvb4MHg/0a1egJdLwS6oJ38TJY7aw9n93Fl/3blIXdyYvPOp55CNxywooG/3BcrwNrBpcSg==}
dev: false
/vite-plugin-eslint@1.8.1(eslint@8.56.0)(vite@5.0.6):
resolution: {integrity: sha512-PqdMf3Y2fLO9FsNPmMX+//2BF5SF8nEWspZdgl4kSt7UvHDRHVVfHvxsD7ULYzZrJDGRxR81Nq7TOFgwMnUang==}
peerDependencies:

View File

@@ -14,6 +14,8 @@ import { useNavigate } from 'react-router-dom';
import api, { User } from '@/lib/api';
import { useApiError } from '@/lib/hooks';
import { useMutation } from '@tanstack/react-query';
import hatchet from '@/assets/hatchet_logo.png';
import { useSidebar } from '@/components/sidebar-provider';
interface MainNavProps {
user: User;
@@ -22,6 +24,7 @@ interface MainNavProps {
export default function MainNav({ user }: MainNavProps) {
const navigate = useNavigate();
const { handleApiError } = useApiError({});
const { toggleSidebarOpen } = useSidebar();
const logoutMutation = useMutation({
mutationKey: ['user:update:logout'],
@@ -35,8 +38,11 @@ export default function MainNav({ user }: MainNavProps) {
});
return (
<div className="fixed top-0 w-screen h-16">
<div className="flex h-16 items-center pr-4 pl-7">
<div className="fixed top-0 w-screen h-16 border-b">
<div className="flex h-16 items-center pr-4 pl-4">
<button onClick={() => toggleSidebarOpen()}>
<img src={hatchet} alt="Hatchet" className="h-9 rounded" />
</button>
<div className="ml-auto flex items-center space-x-4">
<DropdownMenu>
<DropdownMenuTrigger asChild>

View File

@@ -0,0 +1,84 @@
import {
PropsWithChildren,
createContext,
useContext,
useEffect,
useState,
} from 'react';
type SidebarState = 'open' | 'closed';
type SidebarProviderProps = PropsWithChildren & {
defaultSidebarOpen?: SidebarState;
};
type SidebarProviderState = {
sidebarOpen: SidebarState;
setSidebarOpen: (open: SidebarState) => void;
toggleSidebarOpen: () => void;
};
const initialState: SidebarProviderState = {
sidebarOpen: 'closed',
setSidebarOpen: () => null,
toggleSidebarOpen: () => null,
};
const SidebarProviderContext =
createContext<SidebarProviderState>(initialState);
export function SidebarProvider({
children,
defaultSidebarOpen = 'closed',
...props
}: SidebarProviderProps) {
const [sidebarOpen, setSidebarOpen] = useState<SidebarState>(
() => defaultSidebarOpen,
);
useEffect(() => {
const handleResize = () => {
if (window.innerWidth >= 768) {
setSidebarOpen('open');
} else {
setSidebarOpen('closed');
}
};
handleResize();
window.addEventListener('resize', handleResize);
return () => {
window.removeEventListener('resize', handleResize);
};
}, []);
return (
<SidebarProviderContext.Provider
{...props}
value={{
sidebarOpen,
setSidebarOpen: (open: SidebarState) => {
setSidebarOpen(open);
},
toggleSidebarOpen: () => {
setSidebarOpen((state) => (state === 'open' ? 'closed' : 'open'));
},
}}
>
{children}
</SidebarProviderContext.Provider>
);
}
export const useSidebar = () => {
const context = useContext(SidebarProviderContext);
if (context === undefined) {
throw new Error('useSidebar must be used within a SidebarProvider');
}
return context;
};

View File

@@ -0,0 +1,109 @@
import { cn } from '@/lib/utils';
import { RJSFSchema } from '@rjsf/utils';
import validator from '@rjsf/validator-ajv8';
import Form from '@rjsf/core';
import { PlayIcon } from '@radix-ui/react-icons';
import { Button } from './button';
type JSONPrimitive = string | number | boolean | null;
type JSONType = { [key: string]: JSONType | JSONPrimitive };
export function JsonForm({
// json,
className,
setInput,
disabled,
onSubmit,
}: {
json: JSONType;
className?: string;
setInput: (input: string) => void;
disabled?: boolean;
onSubmit: () => void;
}) {
// const input = json ? json.input : json || ({} as JSONType);
const schema: RJSFSchema = {
type: 'object',
properties: {
input: {
type: 'object',
properties: {
test: {
type: 'string',
default: 'test',
},
},
additionalProperties: false,
},
parents: {
type: 'object',
properties: {},
additionalProperties: false,
},
overrides: {
type: 'object',
required: ['test', 'test2'],
properties: {
test: {
type: 'string',
default: 'test',
},
test2: {
type: 'integer',
default: 100,
},
},
additionalProperties: false,
},
user_data: {
type: 'object',
properties: {},
additionalProperties: false,
},
triggered_by: {
type: 'string',
default: 'schedule',
},
},
additionalProperties: false,
};
const uiSchema = {
input: {
test: {
'ui:widget': 'textarea',
},
},
};
return (
<div
className={cn(
className,
'w-full h-fit relative rounded-lg overflow-hidden',
)}
>
<Form
schema={schema}
disabled={disabled}
uiSchema={uiSchema}
validator={validator}
onChange={(data) => {
setInput(JSON.stringify(data.formData));
}}
onSubmit={onSubmit}
onError={(e) => {
console.error(e);
}}
>
<Button className="w-fit" disabled={disabled}>
<PlayIcon
className={cn(disabled ? 'rotate-180' : '', 'h-4 w-4 mr-2')}
/>
Play Step
</Button>
</Form>
</div>
);
}

View File

@@ -150,3 +150,89 @@ body {
color: rgb(203 213 225);
}
/* rjsf JSON Schema Form style hack */
.rjsf {
/* Left side is not needed due to fieldset left margin */
@apply mr-2;
}
/* Aka sections */
.rjsf fieldset > legend {
/* Display block doesn't work for some reason. Does not fill parent width which is still a div, not sure why */
@apply mb-3 pb-1 w-full border-b pt-4 text-xl font-semibold;
}
.rjsf fieldset > div {
/* Offset to indicate hierarchy */
@apply ml-3;
}
/* Label + component = group */
.rjsf .form-group {
@apply mb-2;
}
.rjsf label.control-label {
@apply block mb-1 font-bold;
}
.rjsf p.field-description {
@apply mb-1;
}
/* Select component, and maybe other .form-control too */
.rjsf .form-control {
@apply block w-full rounded-md border border-input p-2;
}
/* Input component */
.rjsf input {
@apply flex h-10 w-full rounded-md border border-input bg-transparent px-3 py-2 text-sm ring-offset-background file:border-0 file:bg-transparent file:text-sm file:font-medium placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 disabled:cursor-not-allowed disabled:opacity-50;
}
/* Array elements */
.rjsf .array-item {
/* @apply grid grid-cols-12; */
@apply flex flex-row items-end gap-4;
}
.rjsf .array-item .col-xs-9 {
/* @apply col-span-9; */
@apply grow;
}
.rjsf .array-item .col-xs-3 {
/* @apply col-span-3; */
@apply shrink-0;
}
.rjsf .array-item .array-item-toolbox {
/* mb-4 to match .form-group */
@apply mb-4 flex items-center justify-end;
}
/* Icons */
.rjsf .glyphicon {
@apply font-normal not-italic;
}
.rjsf .glyphicon-remove::before {
content: 'Remove';
}
.rjsf .glyphicon-arrow-up::before {
content: 'Up';
}
.rjsf .glyphicon-arrow-down::before {
content: 'Down';
}
.rjsf .glyphicon-plus::before {
content: 'Add';
}
/* Buttons (tends to be icon buttons */
.rjsf .btn {
@apply rounded-md p-2 border mx-1;
}
.rjsf .btn-danger {
@apply border-red-200;
}
.rjsf .btn-add {
@apply border-blue-200;
}
.rjsf button[type='submit'] {
@apply bg-primary text-primary-foreground hover:bg-primary/90;
}
.rjsf textarea{
@apply flex h-20 w-full rounded-md border border-input bg-transparent px-3 py-2 text-sm ring-offset-background file:border-0 file:bg-transparent file:text-sm file:font-medium placeholder:text-muted-foreground focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-ring focus-visible:ring-offset-2 disabled:cursor-not-allowed disabled:opacity-50;
}
/* END rjsf JSON Schema Form style hack */

View File

@@ -59,7 +59,26 @@ export function relativeDate(date?: string | number) {
return capitalize(rtf.format(value, time.unitOfTime));
}
function timeFrom(time: string | number, secondTime?: string | number) {
export function timeBetween(start: string | number, end: string | number) {
console.log('start', start);
console.log('end', end);
const startUnixTime = new Date(start).getTime();
const endUnixTime = new Date(end).getTime();
if (!startUnixTime || !endUnixTime) {
return;
}
// Calculate difference
const difference = endUnixTime - startUnixTime;
console.log('difference', difference);
return formatDuration(difference);
}
export function timeFrom(time: string | number, secondTime?: string | number) {
// Get timestamps
const unixTime = new Date(time).getTime();
if (!unixTime) {
@@ -126,3 +145,24 @@ function timeFrom(time: string | number, secondTime?: string | number) {
// Return time from now data
return tfn;
}
export function formatDuration(duration: number) {
const milliseconds = duration % 1000;
const seconds = Math.round(duration / 1000);
const minutes = Math.round(seconds / 60);
const hours = Math.round(minutes / 60);
if (seconds == 0 && hours == 0 && minutes == 0) {
return `${milliseconds}ms`;
}
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
}
if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
}
return `${seconds}s`;
}

View File

@@ -109,7 +109,7 @@ export default function Authenticated() {
return (
<div className="flex flex-row flex-1 w-full h-full">
<MainNav user={user} />
<div className="pt-12 flex-grow overflow-y-auto overflow-x-hidden">
<div className="pt-16 flex-grow overflow-y-auto overflow-x-hidden">
<Outlet context={ctx} />
</div>
</div>

View File

@@ -9,7 +9,6 @@ import {
ServerStackIcon,
Squares2X2Icon,
} from '@heroicons/react/24/outline';
import hatchet from '@/assets/hatchet_logo.png';
import invariant from 'tiny-invariant';
import {
@@ -40,6 +39,7 @@ import {
} from '@/lib/outlet';
import { useTenantContext } from '@/lib/atoms';
import { Loading, Spinner } from '@/components/ui/loading.tsx';
import { useSidebar } from '@/components/sidebar-provider';
function Main() {
const ctx = useOutletContext<UserContextType & MembershipsContextType>();
@@ -61,7 +61,7 @@ function Main() {
return (
<div className="flex flex-row flex-1 w-full h-full">
<Sidebar memberships={memberships} currTenant={currTenant} />
<div className="pt-12 pl-80 flex-grow overflow-y-auto overflow-x-hidden">
<div className="pt-6 flex-grow overflow-y-auto overflow-x-hidden">
<Outlet context={childCtx} />
</div>
</div>
@@ -76,12 +76,17 @@ interface SidebarProps extends React.HTMLAttributes<HTMLDivElement> {
}
function Sidebar({ className, memberships, currTenant }: SidebarProps) {
const { sidebarOpen } = useSidebar();
if (sidebarOpen === 'closed') {
return null;
}
return (
<div className={cn('h-full border-r w-80 absolute top-0', className)}>
<div className={cn('h-full border-r w-80 top-0', className)}>
<div className="flex flex-col justify-between items-start space-y-4 px-4 py-4 h-full">
<div className="grow">
<div className="py-2">
<img src={hatchet} alt="Hatchet" className="h-9 rounded mb-6" />
<h2 className="mb-2 text-lg font-semibold tracking-tight">
Events
</h2>

View File

@@ -1,53 +0,0 @@
import { StepRun } from '@/lib/api';
import { CodeEditor } from '@/components/ui/code-editor';
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
import { Loading } from '@/components/ui/loading';
export function StepInputOutputSection({
stepRun,
onInputChanged,
}: {
stepRun: StepRun;
onInputChanged?: (input: string) => void;
}) {
const input = stepRun.input || '{}';
const output = stepRun.output || '{}';
const isLoading =
stepRun?.status != 'SUCCEEDED' &&
stepRun?.status != 'FAILED' &&
stepRun?.status != 'CANCELLED';
return (
<Tabs defaultValue="input" className="w-full">
<TabsList className="grid w-full grid-cols-2">
<TabsTrigger value="input">Input</TabsTrigger>
<TabsTrigger value="output">Output</TabsTrigger>
</TabsList>
<TabsContent value="input">
<CodeEditor
language="json"
className="my-4"
height="400px"
code={JSON.stringify(JSON.parse(input), null, 2)}
setCode={(code: string | undefined) => {
if (onInputChanged && code) {
onInputChanged(code);
}
}}
/>
</TabsContent>
<TabsContent value="output">
{isLoading && <Loading />}
{!isLoading && (
<CodeEditor
language="json"
className="my-4"
height="400px"
code={JSON.stringify(JSON.parse(output), null, 2)}
/>
)}
</TabsContent>
</Tabs>
);
}

View File

@@ -0,0 +1,53 @@
import { CodeEditor } from '@/components/ui/code-editor';
import { JsonForm } from '@/components/ui/json-form';
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
import { VscNote, VscJson } from 'react-icons/vsc';
export interface StepRunOutputProps {
input: string;
setInput: (input: string) => void;
disabled: boolean;
handleOnPlay: () => void;
}
export const StepRunInputs: React.FC<StepRunOutputProps> = ({
input,
disabled,
handleOnPlay,
setInput,
}) => {
return (
<Tabs defaultValue="output" className="w-full">
<TabsList className="grid w-1/3 grid-cols-2">
<TabsTrigger value="form" aria-label="Form Editor">
<VscNote />
</TabsTrigger>
<TabsTrigger value="json" aria-label="JSON Editor">
<VscJson />
</TabsTrigger>
</TabsList>
<TabsContent value="form">
<JsonForm
json={JSON.parse(input)}
setInput={setInput}
onSubmit={handleOnPlay}
disabled={disabled}
/>
</TabsContent>
<TabsContent value="json">
<CodeEditor
language="json"
className="my-4"
height="400px"
code={JSON.stringify(JSON.parse(input), null, 2)}
setCode={(code: string | undefined) => {
if (!code) {
return;
}
setInput(code);
}}
/>
</TabsContent>
</Tabs>
);
};

View File

@@ -1,65 +1,69 @@
import { Card, CardContent } from '@/components/ui/card';
import { Label } from '@/components/ui/label';
import { StepRun, StepRunStatus } from '@/lib/api';
import { cn, relativeDate } from '@/lib/utils';
import { cn, formatDuration, relativeDate } from '@/lib/utils';
import { memo } from 'react';
import { Handle, Position } from 'reactflow';
import { RunIndicator } from '../../components/run-statuses';
// eslint-disable-next-line react/display-name
export default memo(
({
data,
}: {
data: {
stepRun: StepRun;
variant: 'default' | 'input_only' | 'output_only';
onClick: () => void;
};
}) => {
const variant = data.variant;
export interface StepRunNodeProps {
stepRun: StepRun;
variant: 'default' | 'input_only' | 'output_only';
selected: 'none' | 'selected' | 'not_selected';
onClick: () => void;
}
return (
<Card
className={cn(
data.stepRun.status == StepRunStatus.RUNNING ? 'active' : '',
'step-run-card p-3 cursor-pointer bg-[#020817] shadow-[0_1000px_0_0_hsl(0_0%_20%)_inset] transition-colors duration-200',
)}
onClick={data.onClick}
>
{data.stepRun.status == StepRunStatus.RUNNING && (
<span className="spark mask-gradient animate-flip before:animate-rotate absolute inset-0 h-[100%] w-[100%] overflow-hidden rounded-full [mask:linear-gradient(#4EB4D7,_transparent_50%)] before:absolute before:aspect-square before:w-[200%] before:rotate-[-90deg] before:bg-[conic-gradient(from_0deg,transparent_0_340deg,#4EB4D7_360deg)] before:content-[''] before:[inset:0_auto_auto_50%] before:[translate:-50%_-15%]" />
)}
<span className="step-run-backdrop absolute inset-[1px] rounded-full bg-background transition-colors duration-200" />
{(variant == 'default' || variant == 'input_only') && (
<Handle
type="target"
position={Position.Left}
style={{ visibility: 'hidden' }}
isConnectable={false}
/>
)}
<CardContent className="p-0 z-10 bg-background">
<div className="flex flex-row justify-between gap-2 items-center">
<RunIndicator status={data.stepRun.status} />
<div className="font-bold text-sm">
{data.stepRun.step?.readableId || data.stepRun.metadata.id}
</div>
// eslint-disable-next-line react/display-name
export default memo(({ data }: { data: StepRunNodeProps }) => {
const variant = data.variant;
const selected = data.selected;
return (
<Card
className={cn(
data.stepRun.status == StepRunStatus.RUNNING ? 'active' : '',
selected === 'none' || selected === 'selected'
? 'opacity-100'
: 'opacity-20',
selected === 'selected' ? 'border-primary' : '',
'step-run-card p-3 cursor-pointer bg-[#020817] shadow-[0_1000px_0_0_hsl(0_0%_20%)_inset] transition-colors duration-200',
)}
onClick={data.onClick}
>
{data.stepRun.status == StepRunStatus.RUNNING && (
<span className="spark mask-gradient animate-flip before:animate-rotate absolute inset-0 h-[100%] w-[100%] overflow-hidden rounded-full [mask:linear-gradient(#4EB4D7,_transparent_50%)] before:absolute before:aspect-square before:w-[200%] before:rotate-[-90deg] before:bg-[conic-gradient(from_0deg,transparent_0_340deg,#4EB4D7_360deg)] before:content-[''] before:[inset:0_auto_auto_50%] before:[translate:-50%_-15%]" />
)}
<span className="step-run-backdrop absolute inset-[1px] rounded-full bg-background transition-colors duration-200" />
{(variant == 'default' || variant == 'input_only') && (
<Handle
type="target"
position={Position.Left}
style={{ visibility: 'hidden' }}
isConnectable={false}
/>
)}
<CardContent className="p-0 z-10 bg-background">
<div className="flex flex-row justify-between gap-2 items-center">
<RunIndicator status={data.stepRun.status} />
<div className="font-bold text-sm">
{data.stepRun.step?.readableId || data.stepRun.metadata.id}
</div>
{getTiming({ stepRun: data.stepRun })}
</CardContent>
{(variant == 'default' || variant == 'output_only') && (
<Handle
type="source"
position={Position.Right}
style={{ visibility: 'hidden' }}
isConnectable={false}
/>
)}
</Card>
);
},
);
</div>
{getTiming({ stepRun: data.stepRun })}
</CardContent>
{(variant == 'default' || variant == 'output_only') && (
<Handle
type="source"
position={Position.Right}
style={{ visibility: 'hidden' }}
isConnectable={false}
/>
)}
</Card>
);
});
export function getTiming({ stepRun }: { stepRun: StepRun }) {
const start = stepRun.startedAtEpoch;
@@ -94,24 +98,3 @@ export function getTiming({ stepRun }: { stepRun: StepRun }) {
</Label>
);
}
export function formatDuration(duration: number) {
const milliseconds = duration % 1000;
const seconds = Math.round(duration / 1000);
const minutes = Math.round(seconds / 60);
const hours = Math.round(minutes / 60);
if (seconds == 0 && hours == 0 && minutes == 0) {
return `${milliseconds}ms`;
}
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
}
if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
}
return `${seconds}s`;
}

View File

@@ -0,0 +1,45 @@
import { CodeEditor } from '@/components/ui/code-editor';
import { Loading } from '@/components/ui/loading';
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
export interface StepRunOutputProps {
output: string;
isLoading: boolean;
errors: string[];
}
export const StepRunOutput: React.FC<StepRunOutputProps> = ({
output,
isLoading,
errors,
}) => {
if (isLoading) {
return <Loading />;
}
return (
<Tabs defaultValue="output" className="w-full">
<TabsList className="grid w-full grid-cols-4">
<TabsTrigger value="output">Output</TabsTrigger>
<TabsTrigger value="logs">Logs</TabsTrigger>
<TabsTrigger value="eval">Eval</TabsTrigger>
<TabsTrigger value="timing">Timing</TabsTrigger>
</TabsList>
<TabsContent value="output">
<CodeEditor
language="json"
className="my-4"
height="400px"
code={JSON.stringify(
errors.length > 0 ? errors : JSON.parse(output),
null,
2,
)}
/>
</TabsContent>
<TabsContent value="logs">Logs Coming Soon!</TabsContent>
<TabsContent value="eval">Evaluations Coming Soon!</TabsContent>
<TabsContent value="timing">Execution Timing Coming Soon!</TabsContent>
</Tabs>
);
};

View File

@@ -1,30 +1,23 @@
import {
Dialog,
DialogContent,
DialogDescription,
DialogHeader,
DialogTitle,
} from '@/components/ui/dialog';
import api, { StepRun, StepRunStatus, queries } from '@/lib/api';
import { useEffect, useState } from 'react';
import { RunStatus } from '../../components/run-statuses';
import { getTiming } from './step-run-node';
import { StepInputOutputSection } from './step-run-input-output';
import { Button } from '@/components/ui/button';
import invariant from 'tiny-invariant';
import { useApiError } from '@/lib/hooks';
import { useMutation, useQuery } from '@tanstack/react-query';
import { ArrowPathIcon } from '@heroicons/react/24/outline';
import { cn } from '@/lib/utils';
import { useOutletContext } from 'react-router-dom';
import { TenantContextType } from '@/lib/outlet';
import { PlayIcon } from '@radix-ui/react-icons';
import { StepRunOutput } from './step-run-output';
import { StepRunInputs } from './step-run-inputs';
export function StepRunPlayground({
stepRun,
setStepRun,
}: {
stepRun: StepRun | null;
setStepRun: (stepRun: StepRun | null) => void;
stepRun: StepRun | undefined;
setStepRun: (stepRun: StepRun | undefined) => void;
}) {
const { tenant } = useOutletContext<TenantContextType>();
invariant(tenant);
@@ -99,66 +92,68 @@ export function StepRunPlayground({
}
}, [getStepRunQuery.data, setStepRun]);
// const input = stepRun?.input || '{}';
const output = stepRun?.output || '{}';
const isLoading =
stepRun?.status != 'SUCCEEDED' &&
stepRun?.status != 'FAILED' &&
stepRun?.status != 'CANCELLED';
const handleOnPlay = () => {
const inputObj = JSON.parse(stepInput);
rerunStepMutation.mutate(inputObj);
};
return (
<Dialog
open={!!stepRun}
onOpenChange={(open) => {
if (!open) {
setStepRun(null);
}
}}
>
<DialogContent className="sm:max-w-[625px] py-12">
<DialogHeader>
<div className="flex flex-row justify-between items-center">
<DialogTitle>
{stepRun?.step?.readableId || stepRun?.metadata.id}
</DialogTitle>
<RunStatus status={stepRun?.status || StepRunStatus.PENDING} />
</div>
{stepRun && getTiming({ stepRun })}
<DialogDescription>
You can change the input to your step and see the output here. By
default, this will trigger all child steps.
</DialogDescription>
</DialogHeader>
<div className="flex flex-row justify-between items-center">
<div className="font-bold">Input</div>
<Button
className="w-fit"
disabled={rerunStepMutation.isPending}
onClick={() => {
const inputObj = JSON.parse(stepInput);
rerunStepMutation.mutate(inputObj);
}}
>
<ArrowPathIcon
className={cn(
rerunStepMutation.isPending ? 'rotate-180' : '',
'h-4 w-4 mr-2',
)}
/>
Rerun Step
</Button>
</div>
{stepRun && (
<StepInputOutputSection
stepRun={stepRun}
onInputChanged={(input: string) => {
setStepInput(input);
}}
/>
)}
{errors.length > 0 && (
<div className="mt-4">
{errors.map((error, index) => (
<div key={index} className="text-red-500 text-sm">
{error}
<div className="">
{stepRun && (
<>
<div className="flex flex-row gap-4 mt-4">
<div className="flex-grow w-1/2">
<StepRunInputs
input={stepInput}
setInput={setStepInput}
disabled={rerunStepMutation.isPending}
handleOnPlay={handleOnPlay}
/>
</div>
<div className="flex-grow flex-col flex gap-4 w-1/2 ">
<div className="flex flex-col sticky top-0">
<div className="flex flex-row justify-between items-center mb-4">
<Button
className="w-fit"
disabled={rerunStepMutation.isPending}
onClick={handleOnPlay}
>
<PlayIcon
className={cn(
rerunStepMutation.isPending ? 'rotate-180' : '',
'h-4 w-4 mr-2',
)}
/>
Play Step
</Button>
<RunStatus
status={
errors.length > 0
? StepRunStatus.FAILED
: stepRun?.status || StepRunStatus.PENDING
}
/>
</div>
<StepRunOutput
output={output}
isLoading={isLoading}
errors={errors}
/>
</div>
))}
</div>
</div>
)}
</DialogContent>
</Dialog>
</>
)}
{errors.length > 0 && <div className="mt-4"></div>}
</div>
);
}

View File

@@ -8,10 +8,9 @@ import ReactFlow, {
Edge,
} from 'reactflow';
import 'reactflow/dist/style.css';
import StepRunNode from './step-run-node';
import StepRunNode, { StepRunNodeProps } from './step-run-node';
import { StepRun, StepRunStatus, WorkflowRun } from '@/lib/api';
import dagre from 'dagre';
import { StepRunPlayground } from './step-run-playground';
import invariant from 'tiny-invariant';
const initBgColor = '#050c1c';
@@ -23,12 +22,15 @@ const nodeTypes = {
const WorkflowRunVisualizer = ({
workflowRun,
selectedStepRun,
setSelectedStepRun,
}: {
workflowRun: WorkflowRun;
selectedStepRun?: StepRun;
setSelectedStepRun: (stepRun: StepRun) => void;
}) => {
const [nodes, setNodes, onNodesChange] = useNodesState([]);
const [edges, setEdges, onEdgesChange] = useEdgesState([]);
const [selectedStepRun, setSelectedStepRun] = useState<StepRun | null>(null);
const [bgColor] = useState(initBgColor);
const dagreGraph = new dagre.graphlib.Graph();
dagreGraph.setDefaultEdgeLabel(() => ({}));
@@ -79,23 +81,32 @@ const WorkflowRunVisualizer = ({
const hasParent =
stepRun.step?.parents?.length && stepRun.step.parents.length > 0;
const data: StepRunNodeProps = {
stepRun: stepRun,
onClick: () => {
console.log('clicked');
console.log(setSelectedStepRun);
setSelectedStepRun(stepRun);
},
variant:
hasParent && hasChild
? 'default'
: hasChild
? 'output_only'
: 'input_only',
selected: !selectedStepRun
? 'none'
: selectedStepRun.stepId === stepRun.stepId
? 'selected'
: 'not_selected',
};
return {
id: stepRun.step.metadata.id,
selectable: false,
type: 'stepNode',
position: { x: 0, y: 0 }, // positioning gets set by dagre later
data: {
stepRun: stepRun,
onClick: () => {
setSelectedStepRun(stepRun);
},
variant:
hasParent && hasChild
? 'default'
: hasChild
? 'output_only'
: 'input_only',
},
data,
};
});
})
@@ -103,7 +114,7 @@ const WorkflowRunVisualizer = ({
setNodes(stepNodes);
setEdges(stepEdges);
}, [workflowRun, setNodes, setEdges]);
}, [workflowRun, setNodes, setEdges, setSelectedStepRun, selectedStepRun]);
const nodeWidth = 230;
const nodeHeight = 70;
@@ -167,10 +178,6 @@ const WorkflowRunVisualizer = ({
className="border-1 border-gray-800 rounded-lg"
maxZoom={1}
/>
<StepRunPlayground
stepRun={selectedStepRun}
setStepRun={setSelectedStepRun}
/>
</>
);
};

View File

@@ -5,12 +5,8 @@ import { useQuery } from '@tanstack/react-query';
import { Link, useOutletContext, useParams } from 'react-router-dom';
import invariant from 'tiny-invariant';
import { Badge } from '@/components/ui/badge';
import { relativeDate } from '@/lib/utils';
import {
AdjustmentsHorizontalIcon,
BoltIcon,
Square3Stack3DIcon,
} from '@heroicons/react/24/outline';
import { relativeDate, timeBetween } from '@/lib/utils';
import { ArrowLeftCircleIcon, BoltIcon } from '@heroicons/react/24/outline';
import { Button } from '@/components/ui/button';
import { DataTable } from '@/components/molecules/data-table/data-table';
import { JobRunColumns, columns } from './components/job-runs-columns';
@@ -22,10 +18,11 @@ import { CodeEditor } from '@/components/ui/code-editor';
import { Loading } from '@/components/ui/loading.tsx';
import { TenantContextType } from '@/lib/outlet';
import WorkflowRunVisualizer from './components/workflow-run-visualizer';
import { StepInputOutputSection } from './components/step-run-input-output';
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
import { StepRunPlayground } from './components/step-run-playground';
export default function ExpandedWorkflowRun() {
const [expandedStepRuns, setExpandedStepRuns] = useState<string[]>([]);
const [selectedStepRun, setSelectedStepRun] = useState<StepRun | undefined>();
const { tenant } = useOutletContext<TenantContextType>();
invariant(tenant);
@@ -56,50 +53,59 @@ export default function ExpandedWorkflowRun() {
return (
<div className="flex-grow h-full w-full">
<div className="mx-auto max-w-7xl py-8 px-4 sm:px-6 lg:px-8">
<div className="mx-auto max-w-7xl px-4 sm:px-6 lg:px-8">
<div className="flex flex-row justify-between items-center">
<div className="flex flex-row gap-4 items-center">
<AdjustmentsHorizontalIcon className="h-6 w-6 text-foreground mt-1" />
<h2 className="text-2xl font-bold leading-tight text-foreground">
{run?.displayName || run?.metadata.id}
<h2 className="text-2xl font-bold leading-tight text-foreground flex flex-row items-center">
{run?.workflowVersion?.workflow && (
<Link
to={`/workflows/${run?.workflowVersion?.workflow?.metadata.id}`}
>
<Button
variant="ghost"
className="flex flex-row items-center text-2xl gap-2 text-foreground hover:bg-muted"
>
<ArrowLeftCircleIcon className="h-4 w-4" />
</Button>
</Link>
)}
{run?.workflowVersion?.workflow?.name}-
{run?.displayName?.split('-')[1] || run?.metadata.id}/
{selectedStepRun?.step?.readableId || '*'}
</h2>
<Badge className="text-sm mt-1" variant={'secondary'}>
{/* {workflow.versions && workflow.versions[0].version} */}
{run.status}
</Badge>
</div>
</div>
<div className="flex flex-row justify-start items-center mt-4 gap-2">
{run?.workflowVersion?.workflow && (
<Link
to={`/workflows/${run?.workflowVersion?.workflow?.metadata.id}`}
>
<Button
variant="ghost"
className="flex flex-row items-center gap-2 text-sm text-foreground hover:bg-muted"
>
<Square3Stack3DIcon className="h-4 w-4" />
{run?.workflowVersion?.workflow?.name}
</Button>
</Link>
)}
<div className="text-sm text-muted-foreground">
Created {relativeDate(run?.metadata.createdAt)}
</div>
{run?.startedAt && (
<div className="text-sm text-muted-foreground">
Started {relativeDate(run?.startedAt)}
Started {relativeDate(run.startedAt)}
</div>
)}
{run?.finishedAt && (
{run?.startedAt && run?.finishedAt && (
<div className="text-sm text-muted-foreground">
Finished {relativeDate(run?.startedAt)}
Duration {timeBetween(run.startedAt, run.finishedAt)}
</div>
)}
<Badge className="text-sm mt-1" variant={'secondary'}>
{/* {workflow.versions && workflow.versions[0].version} */}
{run.status}
</Badge>
</div>
<Separator className="my-4" />
<div className="w-full h-[400px]">
<WorkflowRunVisualizer workflowRun={run} />
<div className="w-full h-[150px]">
<WorkflowRunVisualizer
workflowRun={run}
selectedStepRun={selectedStepRun}
setSelectedStepRun={(step) => {
setSelectedStepRun(
step.stepId === selectedStepRun?.stepId ? undefined : step,
);
}}
/>
</div>
<Separator className="my-4" />
{run.triggeredBy?.event && (
@@ -108,70 +114,79 @@ export default function ExpandedWorkflowRun() {
{run.triggeredBy?.cronSchedule && (
<TriggeringCronSection cron={run.triggeredBy.cronSchedule} />
)}
<h3 className="text-xl font-bold leading-tight text-foreground mb-4">
Job Runs
</h3>
<DataTable
columns={columns}
data={
run.jobRuns
?.map((jobRun): JobRunColumns[] => {
return [
{
kind: 'job',
isExpandable: false,
getRow: () => {
return getJobRunRow({ jobRun, columns });
},
...jobRun,
},
...(jobRun.stepRuns
?.map((stepRun): JobRunColumns[] => {
const res: JobRunColumns[] = [
{
kind: 'step',
isExpandable: true,
onClick: () => {
if (
expandedStepRuns.includes(stepRun.metadata.id)
) {
setExpandedStepRuns(
expandedStepRuns.filter(
(id) => id != stepRun.metadata.id,
),
);
} else {
setExpandedStepRuns([
...expandedStepRuns,
stepRun.metadata.id,
]);
}
},
...stepRun,
<Tabs defaultValue="playground" className="w-full">
<TabsList className="grid w-full grid-cols-2">
<TabsTrigger value="playground">Playground</TabsTrigger>
<TabsTrigger value="details">Run Details</TabsTrigger>
</TabsList>
<TabsContent value="playground">
{!selectedStepRun ? (
'Select a step to play with'
) : (
<StepRunPlayground
stepRun={selectedStepRun}
setStepRun={setSelectedStepRun}
/>
)}
</TabsContent>
<TabsContent value="details">
<DataTable
columns={columns}
data={
run.jobRuns
?.map((jobRun): JobRunColumns[] => {
return [
{
kind: 'job',
isExpandable: false,
getRow: () => {
return getJobRunRow({ jobRun, columns });
},
];
...jobRun,
},
...(jobRun.stepRuns
?.map((stepRun): JobRunColumns[] => {
const res: JobRunColumns[] = [
{
kind: 'step',
isExpandable: true,
onClick: () => {
setSelectedStepRun(
stepRun.stepId === selectedStepRun?.stepId
? undefined
: stepRun,
);
},
...stepRun,
},
];
if (expandedStepRuns.includes(stepRun.metadata.id)) {
res.push({
kind: 'step',
isExpandable: false,
if (selectedStepRun?.stepId == stepRun.stepId) {
res.push({
kind: 'step',
isExpandable: false,
getRow: () => {
return getExpandedStepRunRow({ stepRun, columns });
},
...stepRun,
});
}
getRow: () => {
return getExpandedStepRunRow({
stepRun,
columns,
});
},
...stepRun,
});
}
return res;
})
.flat() || []),
];
})
.flat() || []
}
filters={[]}
/>
return res;
})
.flat() || []),
];
})
.flat() || []
}
filters={[]}
/>
</TabsContent>
</Tabs>
</div>
</div>
);
@@ -216,7 +231,6 @@ function getExpandedStepRunRow({
<TableCell colSpan={columns.length} className="px-8 py-4">
<StepStatusSection stepRun={stepRun} />
<StepConfigurationSection stepRun={stepRun} />
<StepInputOutputSection stepRun={stepRun} />
</TableCell>
</TableRow>
);

View File

@@ -1,3 +1,4 @@
import { SidebarProvider } from '@/components/sidebar-provider';
import { ThemeProvider } from '@/components/theme-provider';
import { Toaster } from '@/components/ui/toaster';
import { Outlet } from 'react-router-dom';
@@ -5,10 +6,12 @@ import { Outlet } from 'react-router-dom';
function Root() {
return (
<ThemeProvider defaultTheme="dark" storageKey="vite-ui-theme">
<div className="fixed h-full w-full">
<Toaster />
<Outlet />
</div>
<SidebarProvider>
<div className="fixed h-full w-full">
<Toaster />
<Outlet />
</div>
</SidebarProvider>
</ThemeProvider>
);
}

View File

@@ -2,11 +2,7 @@
"compilerOptions": {
"target": "ES2020",
"useDefineForClassFields": true,
"lib": [
"ES2020",
"DOM",
"DOM.Iterable"
],
"lib": ["ES2020", "DOM", "DOM.Iterable"],
"module": "ESNext",
"skipLibCheck": true,
"moduleResolution": "bundler",
@@ -17,9 +13,7 @@
"jsx": "react-jsx",
"baseUrl": ".",
"paths": {
"@/*": [
"./src/*"
]
"@/*": ["./src/*"]
},
"strict": true,
"noUnusedLocals": true,

View File

@@ -165,17 +165,19 @@ var retrier = func(l *zerolog.Logger, f func() error) error {
return nil
}
func (s *stepRunRepository) UpdateStepRun(tenantId, stepRunId string, opts *repository.UpdateStepRunOpts) (*db.StepRunModel, error) {
func (s *stepRunRepository) UpdateStepRun(tenantId, stepRunId string, opts *repository.UpdateStepRunOpts) (*db.StepRunModel, *repository.StepRunUpdateInfo, error) {
if err := s.v.Validate(opts); err != nil {
return nil, err
return nil, nil, err
}
updateParams, updateJobRunLookupDataParams, resolveJobRunParams, resolveLaterStepRunsParams, err := getUpdateParams(tenantId, stepRunId, opts)
if err != nil {
return nil, err
return nil, nil, err
}
var updateInfo *repository.StepRunUpdateInfo
err = retrier(s.l, func() error {
tx, err := s.pool.Begin(context.Background())
@@ -185,7 +187,7 @@ func (s *stepRunRepository) UpdateStepRun(tenantId, stepRunId string, opts *repo
defer deferRollback(context.Background(), s.l, tx.Rollback)
err = s.updateStepRun(tx, tenantId, updateParams, updateJobRunLookupDataParams, resolveJobRunParams, resolveLaterStepRunsParams)
updateInfo, err = s.updateStepRun(tx, tenantId, updateParams, updateJobRunLookupDataParams, resolveJobRunParams, resolveLaterStepRunsParams)
if err != nil {
return err
@@ -197,10 +199,10 @@ func (s *stepRunRepository) UpdateStepRun(tenantId, stepRunId string, opts *repo
})
if err != nil {
return nil, err
return nil, nil, err
}
return s.client.StepRun.FindUnique(
stepRun, err := s.client.StepRun.FindUnique(
db.StepRun.ID.Equals(stepRunId),
).With(
db.StepRun.Children.Fetch(),
@@ -215,6 +217,12 @@ func (s *stepRunRepository) UpdateStepRun(tenantId, stepRunId string, opts *repo
),
db.StepRun.Ticker.Fetch(),
).Exec(context.Background())
if err != nil {
return nil, nil, err
}
return stepRun, updateInfo, nil
}
func (s *stepRunRepository) UpdateStepRunOverridesData(tenantId, stepRunId string, opts *repository.UpdateStepRunOverridesDataOpts) ([]byte, error) {
@@ -332,7 +340,7 @@ func (s *stepRunRepository) QueueStepRun(tenantId, stepRunId string, opts *repos
return nil, repository.ErrStepRunIsNotPending
}
err = s.updateStepRun(tx, tenantId, updateParams, updateJobRunLookupDataParams, resolveJobRunParams, resolveLaterStepRunsParams)
_, err = s.updateStepRun(tx, tenantId, updateParams, updateJobRunLookupDataParams, resolveJobRunParams, resolveLaterStepRunsParams)
if err != nil {
return nil, err
@@ -458,23 +466,23 @@ func (s *stepRunRepository) updateStepRun(
updateJobRunLookupDataParams *dbsqlc.UpdateJobRunLookupDataWithStepRunParams,
resolveJobRunParams dbsqlc.ResolveJobRunStatusParams,
resolveLaterStepRunsParams dbsqlc.ResolveLaterStepRunsParams,
) error {
) (*repository.StepRunUpdateInfo, error) {
_, err := s.queries.UpdateStepRun(context.Background(), tx, updateParams)
if err != nil {
return fmt.Errorf("could not update step run: %w", err)
return nil, fmt.Errorf("could not update step run: %w", err)
}
_, err = s.queries.ResolveLaterStepRuns(context.Background(), tx, resolveLaterStepRunsParams)
if err != nil {
return fmt.Errorf("could not resolve later step runs: %w", err)
return nil, fmt.Errorf("could not resolve later step runs: %w", err)
}
jobRun, err := s.queries.ResolveJobRunStatus(context.Background(), tx, resolveJobRunParams)
if err != nil {
return fmt.Errorf("could not resolve job run status: %w", err)
return nil, fmt.Errorf("could not resolve job run status: %w", err)
}
resolveWorkflowRunParams := dbsqlc.ResolveWorkflowRunStatusParams{
@@ -482,10 +490,10 @@ func (s *stepRunRepository) updateStepRun(
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
}
_, err = s.queries.ResolveWorkflowRunStatus(context.Background(), tx, resolveWorkflowRunParams)
workflowRun, err := s.queries.ResolveWorkflowRunStatus(context.Background(), tx, resolveWorkflowRunParams)
if err != nil {
return fmt.Errorf("could not resolve workflow run status: %w", err)
return nil, fmt.Errorf("could not resolve workflow run status: %w", err)
}
// update the job run lookup data if not nil
@@ -493,11 +501,24 @@ func (s *stepRunRepository) updateStepRun(
err = s.queries.UpdateJobRunLookupDataWithStepRun(context.Background(), tx, *updateJobRunLookupDataParams)
if err != nil {
return fmt.Errorf("could not update job run lookup data: %w", err)
return nil, fmt.Errorf("could not update job run lookup data: %w", err)
}
}
return nil
return &repository.StepRunUpdateInfo{
JobRunFinalState: isFinalJobRunStatus(jobRun.Status),
WorkflowRunFinalState: isFinalWorkflowRunStatus(workflowRun.Status),
WorkflowRunId: sqlchelpers.UUIDToStr(workflowRun.ID),
WorkflowRunStatus: string(workflowRun.Status),
}, nil
}
func isFinalJobRunStatus(status dbsqlc.JobRunStatus) bool {
return status != dbsqlc.JobRunStatusPENDING && status != dbsqlc.JobRunStatusRUNNING
}
func isFinalWorkflowRunStatus(status dbsqlc.WorkflowRunStatus) bool {
return status != dbsqlc.WorkflowRunStatusPENDING && status != dbsqlc.WorkflowRunStatusRUNNING && status != dbsqlc.WorkflowRunStatusQUEUED
}
func (s *stepRunRepository) GetStepRunById(tenantId, stepRunId string) (*db.StepRunModel, error) {
@@ -514,6 +535,7 @@ func (s *stepRunRepository) GetStepRunById(tenantId, stepRunId string) (*db.Step
),
db.StepRun.JobRun.Fetch().With(
db.JobRun.LookupData.Fetch(),
db.JobRun.WorkflowRun.Fetch(),
),
db.StepRun.Ticker.Fetch(),
).Exec(context.Background())

View File

@@ -63,6 +63,13 @@ func StepRunStatusPtr(status db.StepRunStatus) *db.StepRunStatus {
var ErrStepRunIsNotPending = fmt.Errorf("step run is not pending")
type StepRunUpdateInfo struct {
JobRunFinalState bool
WorkflowRunFinalState bool
WorkflowRunId string
WorkflowRunStatus string
}
type StepRunRepository interface {
// ListAllStepRuns returns a list of all step runs which match the given options.
ListAllStepRuns(opts *ListAllStepRunsOpts) ([]db.StepRunModel, error)
@@ -70,7 +77,7 @@ type StepRunRepository interface {
// ListStepRuns returns a list of step runs for a tenant which match the given options.
ListStepRuns(tenantId string, opts *ListStepRunsOpts) ([]db.StepRunModel, error)
UpdateStepRun(tenantId, stepRunId string, opts *UpdateStepRunOpts) (*db.StepRunModel, error)
UpdateStepRun(tenantId, stepRunId string, opts *UpdateStepRunOpts) (*db.StepRunModel, *StepRunUpdateInfo, error)
// UpdateStepRunOverridesData updates the overrides data field in the input for a step run. This returns the input
// bytes.

View File

@@ -287,12 +287,14 @@ func (ec *JobsControllerImpl) handleJobRunTimedOut(ctx context.Context, task *ta
now := time.Now().UTC()
// cancel current step run
stepRun, err := ec.repo.StepRun().UpdateStepRun(metadata.TenantId, currStepRun.ID, &repository.UpdateStepRunOpts{
stepRun, updateInfo, err := ec.repo.StepRun().UpdateStepRun(metadata.TenantId, currStepRun.ID, &repository.UpdateStepRunOpts{
CancelledAt: &now,
CancelledReason: repository.StringPtr("JOB_RUN_TIMED_OUT"),
Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled),
})
defer ec.handleStepRunUpdateInfo(stepRun, updateInfo)
if err != nil {
return fmt.Errorf("could not update step run: %w", err)
}
@@ -421,12 +423,14 @@ func (ec *JobsControllerImpl) handleStepRunRequeue(ctx context.Context, task *ta
// if the current time is after the scheduleTimeoutAt, then mark this as timed out
if scheduleTimeoutAt, ok := stepRunCp.ScheduleTimeoutAt(); ok && scheduleTimeoutAt.Before(now) {
_, err = ec.repo.StepRun().UpdateStepRun(payload.TenantId, stepRunCp.ID, &repository.UpdateStepRunOpts{
stepRun, updateInfo, err := ec.repo.StepRun().UpdateStepRun(payload.TenantId, stepRunCp.ID, &repository.UpdateStepRunOpts{
CancelledAt: &now,
CancelledReason: repository.StringPtr("SCHEDULING_TIMED_OUT"),
Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled),
})
defer ec.handleStepRunUpdateInfo(stepRun, updateInfo)
if err != nil {
return fmt.Errorf("could not update step run %s: %w", stepRunCp.ID, err)
}
@@ -436,7 +440,7 @@ func (ec *JobsControllerImpl) handleStepRunRequeue(ctx context.Context, task *ta
requeueAfter := time.Now().UTC().Add(time.Second * 5)
stepRun, err := ec.repo.StepRun().UpdateStepRun(payload.TenantId, stepRunCp.ID, &repository.UpdateStepRunOpts{
stepRun, _, err := ec.repo.StepRun().UpdateStepRun(payload.TenantId, stepRunCp.ID, &repository.UpdateStepRunOpts{
RequeueAfter: &requeueAfter,
})
@@ -680,11 +684,13 @@ func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *ta
return fmt.Errorf("could not parse started at: %w", err)
}
_, err = ec.repo.StepRun().UpdateStepRun(metadata.TenantId, payload.StepRunId, &repository.UpdateStepRunOpts{
stepRun, updateInfo, err := ec.repo.StepRun().UpdateStepRun(metadata.TenantId, payload.StepRunId, &repository.UpdateStepRunOpts{
StartedAt: &startedAt,
Status: repository.StepRunStatusPtr(db.StepRunStatusRunning),
})
defer ec.handleStepRunUpdateInfo(stepRun, updateInfo)
return err
}
@@ -726,12 +732,14 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *t
stepOutput = []byte(stepOutputStr)
}
stepRun, err := ec.repo.StepRun().UpdateStepRun(metadata.TenantId, payload.StepRunId, &repository.UpdateStepRunOpts{
stepRun, updateInfo, err := ec.repo.StepRun().UpdateStepRun(metadata.TenantId, payload.StepRunId, &repository.UpdateStepRunOpts{
FinishedAt: &finishedAt,
Status: repository.StepRunStatusPtr(db.StepRunStatusSucceeded),
Output: stepOutput,
})
defer ec.handleStepRunUpdateInfo(stepRun, updateInfo)
if err != nil {
return fmt.Errorf("could not update step run: %w", err)
}
@@ -804,12 +812,14 @@ func (ec *JobsControllerImpl) handleStepRunFailed(ctx context.Context, task *tas
return fmt.Errorf("could not parse started at: %w", err)
}
stepRun, err := ec.repo.StepRun().UpdateStepRun(metadata.TenantId, payload.StepRunId, &repository.UpdateStepRunOpts{
stepRun, updateInfo, err := ec.repo.StepRun().UpdateStepRun(metadata.TenantId, payload.StepRunId, &repository.UpdateStepRunOpts{
FinishedAt: &failedAt,
Error: &payload.Error,
Status: repository.StepRunStatusPtr(db.StepRunStatusFailed),
})
defer ec.handleStepRunUpdateInfo(stepRun, updateInfo)
if err != nil {
return fmt.Errorf("could not update step run: %w", err)
}
@@ -885,12 +895,14 @@ func (ec *JobsControllerImpl) cancelStepRun(ctx context.Context, tenantId, stepR
// cancel current step run
now := time.Now().UTC()
stepRun, err := ec.repo.StepRun().UpdateStepRun(tenantId, stepRunId, &repository.UpdateStepRunOpts{
stepRun, updateInfo, err := ec.repo.StepRun().UpdateStepRun(tenantId, stepRunId, &repository.UpdateStepRunOpts{
CancelledAt: &now,
CancelledReason: repository.StringPtr(reason),
Status: repository.StepRunStatusPtr(db.StepRunStatusCancelled),
})
defer ec.handleStepRunUpdateInfo(stepRun, updateInfo)
if err != nil {
return fmt.Errorf("could not update step run: %w", err)
}
@@ -923,6 +935,20 @@ func (ec *JobsControllerImpl) cancelStepRun(ctx context.Context, tenantId, stepR
return nil
}
func (ec *JobsControllerImpl) handleStepRunUpdateInfo(stepRun *db.StepRunModel, updateInfo *repository.StepRunUpdateInfo) {
if updateInfo.WorkflowRunFinalState {
err := ec.tq.AddTask(
context.Background(),
taskqueue.WORKFLOW_PROCESSING_QUEUE,
tasktypes.WorkflowRunFinishedToTask(stepRun.TenantID, updateInfo.WorkflowRunId, updateInfo.WorkflowRunStatus),
)
if err != nil {
ec.l.Error().Err(err).Msg("could not add workflow run finished task to task queue")
}
}
}
func (ec *JobsControllerImpl) handleTickerRemoved(ctx context.Context, task *taskqueue.Task) error {
ctx, span := telemetry.NewSpan(ctx, "handle-ticker-removed")
defer span.End()

View File

@@ -1041,6 +1041,9 @@ type WorkflowEvent struct {
EventTimestamp *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=eventTimestamp,proto3" json:"eventTimestamp,omitempty"`
// the event payload
EventPayload string `protobuf:"bytes,6,opt,name=eventPayload,proto3" json:"eventPayload,omitempty"`
// whether this is the last event for the workflow run - server
// will hang up the connection but clients might want to case
Hangup bool `protobuf:"varint,7,opt,name=hangup,proto3" json:"hangup,omitempty"`
}
func (x *WorkflowEvent) Reset() {
@@ -1117,6 +1120,7 @@ func (x *WorkflowEvent) GetEventPayload() string {
return ""
}
type OverridesData struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -1327,7 +1331,7 @@ var file_dispatcher_proto_rawDesc = []byte{
0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x77,
0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0d, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52, 0x75, 0x6e, 0x49,
0x64, 0x22, 0xa2, 0x02, 0x0a, 0x0d, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x45, 0x76,
0x64, 0x22, 0xba, 0x02, 0x0a, 0x0d, 0x57, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x45, 0x76,
0x65, 0x6e, 0x74, 0x12, 0x24, 0x0a, 0x0d, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x52,
0x75, 0x6e, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x77, 0x6f, 0x72, 0x6b,
0x66, 0x6c, 0x6f, 0x77, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x31, 0x0a, 0x0c, 0x72, 0x65, 0x73,

View File

@@ -290,7 +290,8 @@ func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeT
return err
}
ctx := stream.Context()
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
// subscribe to the task queue for the tenant
taskChan, err := s.tq.Subscribe(ctx, q)
@@ -321,6 +322,10 @@ func (s *DispatcherImpl) SubscribeToWorkflowEvents(request *contracts.SubscribeT
if err != nil {
s.l.Error().Err(err).Msgf("could not send workflow event to client")
}
if e.Hangup {
cancel()
}
}(task)
}
}
@@ -605,63 +610,75 @@ func (s *DispatcherImpl) handleGetGroupKeyRunFailed(ctx context.Context, request
}
func (s *DispatcherImpl) tenantTaskToWorkflowEvent(task *taskqueue.Task, tenantId, workflowRunId string) (*contracts.WorkflowEvent, error) {
// TODO: eventually process workflows as well, this is just steps
workflowEvent := &contracts.WorkflowEvent{
ResourceType: contracts.ResourceType_RESOURCE_TYPE_STEP_RUN,
}
workflowEvent := &contracts.WorkflowEvent{}
var stepRunId string
switch task.ID {
case "step-run-started":
stepRunId = task.Payload["step_run_id"].(string)
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_STEP_RUN
workflowEvent.ResourceId = stepRunId
workflowEvent.EventType = contracts.ResourceEventType_RESOURCE_EVENT_TYPE_STARTED
case "step-run-finished":
stepRunId = task.Payload["step_run_id"].(string)
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_STEP_RUN
workflowEvent.ResourceId = stepRunId
workflowEvent.EventType = contracts.ResourceEventType_RESOURCE_EVENT_TYPE_COMPLETED
workflowEvent.EventPayload = task.Payload["step_output_data"].(string)
case "step-run-failed":
stepRunId = task.Payload["step_run_id"].(string)
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_STEP_RUN
workflowEvent.ResourceId = stepRunId
workflowEvent.EventType = contracts.ResourceEventType_RESOURCE_EVENT_TYPE_FAILED
workflowEvent.EventPayload = task.Payload["error"].(string)
case "step-run-cancelled":
stepRunId = task.Payload["step_run_id"].(string)
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_STEP_RUN
workflowEvent.ResourceId = stepRunId
workflowEvent.EventType = contracts.ResourceEventType_RESOURCE_EVENT_TYPE_CANCELLED
case "step-run-timed-out":
stepRunId = task.Payload["step_run_id"].(string)
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_STEP_RUN
workflowEvent.ResourceId = stepRunId
workflowEvent.EventType = contracts.ResourceEventType_RESOURCE_EVENT_TYPE_TIMED_OUT
case "workflow-run-finished":
workflowRunId := task.Payload["workflow_run_id"].(string)
workflowEvent.ResourceType = contracts.ResourceType_RESOURCE_TYPE_WORKFLOW_RUN
workflowEvent.ResourceId = workflowRunId
workflowEvent.EventType = contracts.ResourceEventType_RESOURCE_EVENT_TYPE_COMPLETED
workflowEvent.Hangup = true
}
if stepRunId == "" {
// expected because not all tasks have step run ids
return nil, nil
if workflowEvent.ResourceType == contracts.ResourceType_RESOURCE_TYPE_STEP_RUN {
// determine if this step run matches the workflow run id
stepRun, err := s.repo.StepRun().GetStepRunById(tenantId, stepRunId)
if err != nil {
return nil, err
}
if stepRun.JobRun().WorkflowRunID != workflowRunId {
// this is an expected error, so we don't return it
return nil, nil
}
// attempt to unquote the payload
unquoted, err := strconv.Unquote(workflowEvent.EventPayload)
if err != nil {
unquoted = workflowEvent.EventPayload
}
workflowEvent.EventPayload = unquoted
} else if workflowEvent.ResourceType == contracts.ResourceType_RESOURCE_TYPE_WORKFLOW_RUN {
if workflowEvent.ResourceId != workflowRunId {
return nil, nil
}
workflowEvent.Hangup = true
}
// determine if this step run matches the workflow run id
stepRun, err := s.repo.StepRun().GetStepRunById(tenantId, stepRunId)
if err != nil {
return nil, err
}
if stepRun.JobRun().WorkflowRunID != workflowRunId {
// this is an expected error, so we don't return it
return nil, nil
}
// attempt to unquote the payload
unquoted, err := strconv.Unquote(workflowEvent.EventPayload)
if err != nil {
unquoted = workflowEvent.EventPayload
}
workflowEvent.EventPayload = unquoted
return workflowEvent, nil
}

View File

@@ -15,6 +15,32 @@ type WorkflowRunQueuedTaskMetadata struct {
WorkflowVersionId string `json:"workflow_version_id" validate:"required,uuid"`
}
type WorkflowRunFinishedTask struct {
WorkflowRunId string `json:"workflow_run_id" validate:"required,uuid"`
Status string `json:"status" validate:"required"`
}
type WorkflowRunFinishedTaskMetadata struct {
TenantId string `json:"tenant_id" validate:"required,uuid"`
}
func WorkflowRunFinishedToTask(tenantId, workflowRunId, status string) *taskqueue.Task {
payload, _ := datautils.ToJSONMap(WorkflowRunFinishedTask{
WorkflowRunId: workflowRunId,
Status: status,
})
metadata, _ := datautils.ToJSONMap(WorkflowRunFinishedTaskMetadata{
TenantId: tenantId,
})
return &taskqueue.Task{
ID: "workflow-run-finished",
Payload: payload,
Metadata: metadata,
}
}
func WorkflowRunQueuedToTask(workflowRun *db.WorkflowRunModel) *taskqueue.Task {
payload, _ := datautils.ToJSONMap(WorkflowRunQueuedTaskPayload{
WorkflowRunId: workflowRun.ID,

View File

@@ -2,15 +2,31 @@ from hatchet_sdk import Hatchet
from dotenv import load_dotenv
import json
class StepRunEventType:
STEP_RUN_EVENT_TYPE_STARTED = 'STEP_RUN_EVENT_TYPE_STARTED'
STEP_RUN_EVENT_TYPE_COMPLETED = 'STEP_RUN_EVENT_TYPE_COMPLETED'
STEP_RUN_EVENT_TYPE_FAILED = 'STEP_RUN_EVENT_TYPE_FAILED'
STEP_RUN_EVENT_TYPE_CANCELLED = 'STEP_RUN_EVENT_TYPE_CANCELLED'
STEP_RUN_EVENT_TYPE_TIMED_OUT = 'STEP_RUN_EVENT_TYPE_TIMED_OUT'
load_dotenv()
client = Hatchet().client
hatchet = Hatchet().client
workflowRunId = client.admin.run_workflow("ManualTriggerWorkflow", {
workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {
"test": "test"
})
for event in client.listener.generator(workflowRunId):
print('EVENT: ' + event.type + ' ' + json.dumps(event.payload))
listener = hatchet.listener.stream(workflowRunId)
# TODO - need to hangup the listener if the workflow is completed
for event in listener:
# TODO FIXME step run is not exported easily from the hatchet_sdk and event type and event.step is not defined on
# the event object, so fix this before merging...
# if event.step == 'step2' and event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED:
# listener.abort()
if event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED:
print('Step completed: ' + json.dumps(event.payload))
print('Workflow finished')

View File

@@ -6,7 +6,7 @@ load_dotenv()
hatchet = Hatchet(debug=True)
@hatchet.workflow(on_events=["user:create"])
@hatchet.workflow(on_events=["man:create"])
class ManualTriggerWorkflow:
@hatchet.step()
def step1(self, context):
@@ -22,7 +22,7 @@ class ManualTriggerWorkflow:
workflow = ManualTriggerWorkflow()
worker = hatchet.worker('test-worker', max_threads=4)
worker = hatchet.worker('manual-worker', max_threads=4)
worker.register_workflow(workflow)
worker.start()

View File

@@ -1,4 +1,5 @@
from .hatchet import Hatchet
from .clients.listener import StepRunEventType
from .worker import Worker
from .client import new_client
from .context import Context

View File

@@ -2,7 +2,7 @@ from typing import List
import grpc
from ..dispatcher_pb2_grpc import DispatcherStub
from ..dispatcher_pb2 import SubscribeToWorkflowEventsRequest, ResourceEventType
from ..dispatcher_pb2 import SubscribeToWorkflowEventsRequest, ResourceEventType, WorkflowEvent, RESOURCE_TYPE_STEP_RUN, RESOURCE_TYPE_WORKFLOW_RUN
from ..loader import ClientConfig
from ..metadata import get_metadata
import json
@@ -20,8 +20,14 @@ class StepRunEventType:
STEP_RUN_EVENT_TYPE_CANCELLED = 'STEP_RUN_EVENT_TYPE_CANCELLED'
STEP_RUN_EVENT_TYPE_TIMED_OUT = 'STEP_RUN_EVENT_TYPE_TIMED_OUT'
class WorkflowRunEventType:
WORKFLOW_RUN_EVENT_TYPE_STARTED = 'WORKFLOW_RUN_EVENT_TYPE_STARTED'
WORKFLOW_RUN_EVENT_TYPE_COMPLETED = 'WORKFLOW_RUN_EVENT_TYPE_COMPLETED'
WORKFLOW_RUN_EVENT_TYPE_FAILED = 'WORKFLOW_RUN_EVENT_TYPE_FAILED'
WORKFLOW_RUN_EVENT_TYPE_CANCELLED = 'WORKFLOW_RUN_EVENT_TYPE_CANCELLED'
WORKFLOW_RUN_EVENT_TYPE_TIMED_OUT = 'WORKFLOW_RUN_EVENT_TYPE_TIMED_OUT'
event_type_mapping = {
step_run_event_type_mapping = {
ResourceEventType.RESOURCE_EVENT_TYPE_STARTED: StepRunEventType.STEP_RUN_EVENT_TYPE_STARTED,
ResourceEventType.RESOURCE_EVENT_TYPE_COMPLETED: StepRunEventType.STEP_RUN_EVENT_TYPE_COMPLETED,
ResourceEventType.RESOURCE_EVENT_TYPE_FAILED: StepRunEventType.STEP_RUN_EVENT_TYPE_FAILED,
@@ -29,6 +35,13 @@ event_type_mapping = {
ResourceEventType.RESOURCE_EVENT_TYPE_TIMED_OUT: StepRunEventType.STEP_RUN_EVENT_TYPE_TIMED_OUT,
}
workflow_run_event_type_mapping = {
ResourceEventType.RESOURCE_EVENT_TYPE_STARTED: WorkflowRunEventType.WORKFLOW_RUN_EVENT_TYPE_STARTED,
ResourceEventType.RESOURCE_EVENT_TYPE_COMPLETED: WorkflowRunEventType.WORKFLOW_RUN_EVENT_TYPE_COMPLETED,
ResourceEventType.RESOURCE_EVENT_TYPE_FAILED: WorkflowRunEventType.WORKFLOW_RUN_EVENT_TYPE_FAILED,
ResourceEventType.RESOURCE_EVENT_TYPE_CANCELLED: WorkflowRunEventType.WORKFLOW_RUN_EVENT_TYPE_CANCELLED,
ResourceEventType.RESOURCE_EVENT_TYPE_TIMED_OUT: WorkflowRunEventType.WORKFLOW_RUN_EVENT_TYPE_TIMED_OUT,
}
class StepRunEvent:
def __init__(self, type: StepRunEventType, payload: str):
@@ -43,32 +56,58 @@ def new_listener(conn, config: ClientConfig):
)
class ListenerClientImpl:
def __init__(self, client: DispatcherStub, token):
class HatchetListener:
def __init__(self, client: DispatcherStub, workflow_run_id: str, token: str):
self.client = client
self.stop_signal = False
self.workflow_run_id = workflow_run_id
self.token = token
async def generator(self, workflowRunId: str) -> List[StepRunEvent]:
listener = self.retry_subscribe(workflowRunId)
def __iter__(self):
return self._generator()
def abort(self):
self.stop_signal = True
def _generator(self, stop_step: str = None) -> List[StepRunEvent]:
listener = self.retry_subscribe()
while listener:
if self.stop_signal:
listener = None
break
while True:
try:
for workflow_event in listener:
eventType = None
if workflow_event.eventType in event_type_mapping:
eventType = event_type_mapping[workflow_event.eventType]
else:
raise Exception(
f"Unknown event type: {workflow_event.eventType}")
if workflow_event.resourceType == RESOURCE_TYPE_STEP_RUN:
if workflow_event.eventType in step_run_event_type_mapping:
eventType = step_run_event_type_mapping[workflow_event.eventType]
else:
raise Exception(
f"Unknown event type: {workflow_event.eventType}")
payload = None
if workflow_event.eventPayload:
payload = json.loads(workflow_event.eventPayload)
payload = None
if workflow_event.eventPayload:
payload = json.loads(workflow_event.eventPayload)
# call the handler
event = StepRunEvent(type=eventType, payload=payload)
yield event
# call the handler
event = StepRunEvent(type=eventType, payload=payload)
yield event
elif workflow_event.resourceType == RESOURCE_TYPE_WORKFLOW_RUN:
if workflow_event.eventType in workflow_run_event_type_mapping:
eventType = workflow_run_event_type_mapping[workflow_event.eventType]
else:
raise Exception(
f"Unknown event type: {workflow_event.eventType}")
payload = None
if workflow_event.eventPayload:
payload = json.loads(workflow_event.eventPayload)
if workflow_event.hangup:
listener = None
print('hangup stopping listener...')
break
except grpc.RpcError as e:
# Handle different types of errors
@@ -78,7 +117,7 @@ class ListenerClientImpl:
elif e.code() == grpc.StatusCode.UNAVAILABLE:
# Retry logic
# logger.info("Could not connect to Hatchet, retrying...")
listener = self.retry_subscribe(workflowRunId)
listener = self.retry_subscribe()
elif e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
# logger.info("Deadline exceeded, retrying subscription")
continue
@@ -87,13 +126,7 @@ class ListenerClientImpl:
# logger.error(f"Failed to receive message: {e}")
break
def on(self, workflowRunId: str, handler: callable = None):
for event in self.generator(workflowRunId):
# call the handler if provided
if handler:
handler(event)
def retry_subscribe(self, workflowRunId: str):
def retry_subscribe(self):
retries = 0
while retries < DEFAULT_ACTION_LISTENER_RETRY_COUNT:
@@ -103,7 +136,7 @@ class ListenerClientImpl:
listener = self.client.SubscribeToWorkflowEvents(
SubscribeToWorkflowEventsRequest(
workflowRunId=workflowRunId,
workflowRunId=self.workflow_run_id,
), metadata=get_metadata(self.token))
return listener
except grpc.RpcError as e:
@@ -111,3 +144,18 @@ class ListenerClientImpl:
retries = retries + 1
else:
raise ValueError(f"gRPC error: {e}")
class ListenerClientImpl:
def __init__(self, client: DispatcherStub, token: str):
self.client = client
self.token = token
def stream(self, workflow_run_id: str):
return HatchetListener(self.client, workflow_run_id, self.token)
def on(self, workflow_run_id: str, handler: callable = None):
for event in self.stream(workflow_run_id):
# call the handler if provided
if handler:
handler(event)

View File

@@ -15,8 +15,10 @@ _sym_db = _symbol_database.Default()
from google.protobuf import timestamp_pb2 as google_dot_protobuf_dot_timestamp__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64ispatcher.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"N\n\x15WorkerRegisterRequest\x12\x12\n\nworkerName\x18\x01 \x01(\t\x12\x0f\n\x07\x61\x63tions\x18\x02 \x03(\t\x12\x10\n\x08services\x18\x03 \x03(\t\"P\n\x16WorkerRegisterResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\x12\x12\n\nworkerName\x18\x03 \x01(\t\"\xf2\x01\n\x0e\x41ssignedAction\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\r\n\x05jobId\x18\x04 \x01(\t\x12\x0f\n\x07jobName\x18\x05 \x01(\t\x12\x10\n\x08jobRunId\x18\x06 \x01(\t\x12\x0e\n\x06stepId\x18\x07 \x01(\t\x12\x11\n\tstepRunId\x18\x08 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\t \x01(\t\x12\x1f\n\nactionType\x18\n \x01(\x0e\x32\x0b.ActionType\x12\x15\n\ractionPayload\x18\x0b \x01(\t\"\'\n\x13WorkerListenRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\",\n\x18WorkerUnsubscribeRequest\x12\x10\n\x08workerId\x18\x01 \x01(\t\"?\n\x19WorkerUnsubscribeResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"\xe1\x01\n\x13GroupKeyActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\x15\n\rworkflowRunId\x18\x02 \x01(\t\x12\x18\n\x10getGroupKeyRunId\x18\x03 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12+\n\teventType\x18\x06 \x01(\x0e\x32\x18.GroupKeyActionEventType\x12\x14\n\x0c\x65ventPayload\x18\x07 \x01(\t\"\xec\x01\n\x0fStepActionEvent\x12\x10\n\x08workerId\x18\x01 \x01(\t\x12\r\n\x05jobId\x18\x02 \x01(\t\x12\x10\n\x08jobRunId\x18\x03 \x01(\t\x12\x0e\n\x06stepId\x18\x04 \x01(\t\x12\x11\n\tstepRunId\x18\x05 \x01(\t\x12\x10\n\x08\x61\x63tionId\x18\x06 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x07 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\'\n\teventType\x18\x08 \x01(\x0e\x32\x14.StepActionEventType\x12\x14\n\x0c\x65ventPayload\x18\t \x01(\t\"9\n\x13\x41\x63tionEventResponse\x12\x10\n\x08tenantId\x18\x01 \x01(\t\x12\x10\n\x08workerId\x18\x02 \x01(\t\"9\n SubscribeToWorkflowEventsRequest\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\"\xd0\x01\n\rWorkflowEvent\x12\x15\n\rworkflowRunId\x18\x01 \x01(\t\x12#\n\x0cresourceType\x18\x02 \x01(\x0e\x32\r.ResourceType\x12%\n\teventType\x18\x03 \x01(\x0e\x32\x12.ResourceEventType\x12\x12\n\nresourceId\x18\x04 \x01(\t\x12\x32\n\x0e\x65ventTimestamp\x18\x05 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x14\n\x0c\x65ventPayload\x18\x06 \x01(\t\"?\n\rOverridesData\x12\x11\n\tstepRunId\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\t\"\x17\n\x15OverridesDataResponse*N\n\nActionType\x12\x12\n\x0eSTART_STEP_RUN\x10\x00\x12\x13\n\x0f\x43\x41NCEL_STEP_RUN\x10\x01\x12\x17\n\x13START_GET_GROUP_KEY\x10\x02*\xa2\x01\n\x17GroupKeyActionEventType\x12 \n\x1cGROUP_KEY_EVENT_TYPE_UNKNOWN\x10\x00\x12 \n\x1cGROUP_KEY_EVENT_TYPE_STARTED\x10\x01\x12\"\n\x1eGROUP_KEY_EVENT_TYPE_COMPLETED\x10\x02\x12\x1f\n\x1bGROUP_KEY_EVENT_TYPE_FAILED\x10\x03*\x8a\x01\n\x13StepActionEventType\x12\x1b\n\x17STEP_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1b\n\x17STEP_EVENT_TYPE_STARTED\x10\x01\x12\x1d\n\x19STEP_EVENT_TYPE_COMPLETED\x10\x02\x12\x1a\n\x16STEP_EVENT_TYPE_FAILED\x10\x03*e\n\x0cResourceType\x12\x19\n\x15RESOURCE_TYPE_UNKNOWN\x10\x00\x12\x1a\n\x16RESOURCE_TYPE_STEP_RUN\x10\x01\x12\x1e\n\x1aRESOURCE_TYPE_WORKFLOW_RUN\x10\x02*\xde\x01\n\x11ResourceEventType\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_UNKNOWN\x10\x00\x12\x1f\n\x1bRESOURCE_EVENT_TYPE_STARTED\x10\x01\x12!\n\x1dRESOURCE_EVENT_TYPE_COMPLETED\x10\x02\x12\x1e\n\x1aRESOURCE_EVENT_TYPE_FAILED\x10\x03\x12!\n\x1dRESOURCE_EVENT_TYPE_CANCELLED\x10\x04\x12!\n\x1dRESOURCE_EVENT_TYPE_TIMED_OUT\x10\x05\x32\xe4\x03\n\nDispatcher\x12=\n\x08Register\x12\x16.WorkerRegisterRequest\x1a\x17.WorkerRegisterResponse\"\x00\x12\x33\n\x06Listen\x12\x14.WorkerListenRequest\x1a\x0f.AssignedAction\"\x00\x30\x01\x12R\n\x19SubscribeToWorkflowEvents\x12!.SubscribeToWorkflowEventsRequest\x1a\x0e.WorkflowEvent\"\x00\x30\x01\x12?\n\x13SendStepActionEvent\x12\x10.StepActionEvent\x1a\x14.ActionEventResponse\"\x00\x12G\n\x17SendGroupKeyActionEvent\x12\x14.GroupKeyActionEvent\x1a\x14.ActionEventResponse\"\x00\x12<\n\x10PutOverridesData\x12\x0e.OverridesData\x1a\x16.OverridesDataResponse\"\x00\x12\x46\n\x0bUnsubscribe\x12\x19.WorkerUnsubscribeRequest\x1a\x1a.WorkerUnsubscribeResponse\"\x00\x42GZEgithub.com/hatchet-dev/hatchet/internal/services/dispatcher/contractsb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispatcher_pb2', _globals)

View File

@@ -183,13 +183,14 @@ class SubscribeToWorkflowEventsRequest(_message.Message):
def __init__(self, workflowRunId: _Optional[str] = ...) -> None: ...
class WorkflowEvent(_message.Message):
__slots__ = ("workflowRunId", "resourceType", "eventType", "resourceId", "eventTimestamp", "eventPayload")
__slots__ = ("workflowRunId", "resourceType", "eventType", "resourceId", "eventTimestamp", "eventPayload", "hangup")
WORKFLOWRUNID_FIELD_NUMBER: _ClassVar[int]
RESOURCETYPE_FIELD_NUMBER: _ClassVar[int]
EVENTTYPE_FIELD_NUMBER: _ClassVar[int]
RESOURCEID_FIELD_NUMBER: _ClassVar[int]
EVENTTIMESTAMP_FIELD_NUMBER: _ClassVar[int]
EVENTPAYLOAD_FIELD_NUMBER: _ClassVar[int]
HANGUP_FIELD_NUMBER: _ClassVar[int]
workflowRunId: str
resourceType: ResourceType
eventType: ResourceEventType

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "0.9.3"
version = "0.9.4"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"