feat: make step run replays more intuitive (#507)

* feat: make step run replays more intuitive

* fix: npmrc file for vercel pnpm version diff

* fix: address changes from PR review
This commit is contained in:
abelanger5
2024-05-17 17:32:15 -04:00
committed by GitHub
parent f652dd67df
commit 0dd38e45f4
19 changed files with 882 additions and 140 deletions

View File

@@ -287,6 +287,7 @@ StepRunEventReason:
- REASSIGNED
- TIMED_OUT
- SLOT_RELEASED
- RETRIED_BY_USER
StepRunEventSeverity:
type: string

View File

@@ -2,6 +2,7 @@ package stepruns
import (
"encoding/json"
"errors"
"fmt"
"time"
@@ -22,6 +23,7 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu
stepRun := ctx.Get("step-run").(*db.StepRunModel)
// preflight check to make sure there's at least one worker to serve this request
// FIXME: merge this preflight check with the one below
action := stepRun.Step().ActionID
sixSecAgo := time.Now().Add(-6 * time.Second)
@@ -38,6 +40,25 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu
), nil
}
// preflight check to verify step run status
err = t.config.EngineRepository.StepRun().PreflightCheckReplayStepRun(ctx.Request().Context(), tenant.ID, stepRun.ID)
if err != nil {
if errors.Is(err, repository.ErrPreflightReplayStepRunNotInFinalState) {
return gen.StepRunUpdateRerun400JSONResponse(
apierrors.NewAPIErrors("Step run cannot be replayed because it is not finished running yet."),
), nil
}
if errors.Is(err, repository.ErrPreflightReplayChildStepRunNotInFinalState) {
return gen.StepRunUpdateRerun400JSONResponse(
apierrors.NewAPIErrors("Step run cannot be replayed because it has child step runs that are not finished running yet."),
), nil
}
return nil, fmt.Errorf("could not preflight check step run: %w", err)
}
// make sure input can be marshalled and unmarshalled to input type
inputBytes, err := json.Marshal(request.Body.Input)
@@ -49,7 +70,7 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu
data := &datautils.StepRunData{}
if err := json.Unmarshal(inputBytes, data); err != nil || data == nil {
if err := json.Unmarshal(inputBytes, data); err != nil {
return gen.StepRunUpdateRerun400JSONResponse(
apierrors.NewAPIErrors("Invalid input"),
), nil
@@ -63,33 +84,17 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu
), nil
}
// set the job run and workflow run to running status
err = t.config.APIRepository.JobRun().SetJobRunStatusRunning(tenant.ID, stepRun.JobRunID)
if err != nil {
return nil, err
}
engineStepRun, err := t.config.EngineRepository.StepRun().GetStepRunForEngine(ctx.Request().Context(), tenant.ID, stepRun.ID)
if err != nil {
return nil, fmt.Errorf("could not get step run for engine: %w", err)
}
// Unlink the step run from its existing worker. This is necessary because automatic retries increment the
// worker semaphore on failure/cancellation, but in this case we don't want to increment the semaphore.
// FIXME: this is very far decoupled from the actual worker logic, and should be refactored.
err = t.config.EngineRepository.StepRun().UnlinkStepRunFromWorker(ctx.Request().Context(), tenant.ID, stepRun.ID)
if err != nil {
return nil, fmt.Errorf("could not unlink step run from worker: %w", err)
}
// send a task to the taskqueue
err = t.config.MessageQueue.AddMessage(
ctx.Request().Context(),
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.StepRunRetryToTask(engineStepRun, inputBytes),
tasktypes.StepRunReplayToTask(engineStepRun, inputBytes),
)
if err != nil {

View File

@@ -80,6 +80,7 @@ const (
StepRunEventReasonREASSIGNED StepRunEventReason = "REASSIGNED"
StepRunEventReasonREQUEUEDNOWORKER StepRunEventReason = "REQUEUED_NO_WORKER"
StepRunEventReasonREQUEUEDRATELIMIT StepRunEventReason = "REQUEUED_RATE_LIMIT"
StepRunEventReasonRETRIEDBYUSER StepRunEventReason = "RETRIED_BY_USER"
StepRunEventReasonRETRYING StepRunEventReason = "RETRYING"
StepRunEventReasonSCHEDULINGTIMEDOUT StepRunEventReason = "SCHEDULING_TIMED_OUT"
StepRunEventReasonSLOTRELEASED StepRunEventReason = "SLOT_RELEASED"
@@ -7999,106 +8000,106 @@ var swaggerSpec = []string{
"AwHKqxnvwSgLPkKM6KJN75Hqkzusa+jvM8KEjqAQse40eA7a9TIlKBWnL42rw57hUEOIbq4VO1lDm7sS",
"blkgSKON1EAGmkFjePZ/b85uzk7vLi7vvl0Ofz8b+r38x+Hx9dnd+eDr4Nrv+aOT385Ob84HF1/urgdf",
"z07vLm/Yz8ej0eDLBXeQj66Ph9fCZz64GIx+K7rPh2fXw38L93ruSe/5bKzLm+u74dnn4ZnsMzzTRtUn",
"G51fspbnZ8cjixveSLfagjXbuoRlOLgenByf141WFzQg/7oTEH89uyghZaWggussjrykpEcgXFAUkMuE",
"XqbmXDqZe5vdTGaAeHHCrkNS+8wG2Tem3W48lc6Wl7ByYkNz4p01R8GY9bPddJ/NBF7WZP0Y17wDss28",
"F6bsqGm8J0jOH7IJuNzTeqNoOpKhptUlhYCIVnYXJc+x5Um4IPIAa8tTb4lMGnB0UIJnBcxnrulGgSUz",
"YQ6evYlq4gHqPc1QMPNoLOZer6nGTjFGgO30M8hs2ptJFHvJ0mXt12otSVoMs9UE4uWy0ZosTlJ62+xl",
"6rMda6JFncWMj1BIn11CshbS6PK90lOCGmhnZ0SOJOV2kkbsaRX+VyMo9+wzxnpNrW8IxKLHVXofoqCO",
"FPh4NQmVOsw7s+ly/5bZ9KHcJ6UcXn674Hr08enXwYXf87+eff10Zg7huMZoOoVYs2dZPU7mPMnqhbKt",
"Y+qGV3L4GdOd9ZU3eGzXkmlsZQgdEPv+b0Pn35yS4r6t8RzRCIW9capi8Fa/UpjiP26ISV436itgPMaQ",
"EF1vKagX6iCsqi/sw2+AzEx8MwNkpg/5f0hpOslJ4ugX9axGojSUdzID1DrhHxBndjE7vXDti1HLo2zO",
"fkW4CIOZZmaAXAFCnmLsOgfwEtnBI5Bu8fY5RiQJwaJAMmr/Wis6RezeWgjsZAaiKVQIspcPgE92JHIq",
"h0851pSaZoZ9CQGqRubrTmoByYCoxd9qMFQCUOWXXgFPNpSfx1MULV+4Yjn+XqmOxc5hXK0xacL1EE4R",
"oez/bwjdbmeJRTDs4G6palKum6YrKmSGEvJWlfDKpWSLp/kmThkxmWnbvnEPnC1aw2ISkR+FoUD48LwA",
"RF4CMVvffiubJHgEKAT3IVQ+94as/+q08BkGKYVeEEcyLjlc7JuL0SKS8EozeNBQx4+H+aBpBMde3mkH",
"KvqFgIegYnoPAT2mtSarHEvc0EhgRD3gzVTv/fVWRpyDZ/sOzsEzmqfz9e3kxp0IAqp9c2RAACNV6oHY",
"Qn5YGx6EI5aa1RbOB141LzePFzC4M/i36mKUKeH45HrwxxnPCZF/3rb1dtjFyQ6IfSnXjJ5LZRQxVS1I",
"wngxh81GLjXGadbjJI4maNpY8tpS+0DlHe9b8tkt28y+mIZwwpHMgS+TFRMW7bOvt8KQVgwp/cYgC8F0",
"eQypNV4D49Eliyu0o0o23h+iYxbUWhdC7sp2bNwTJTcDQ4mhKaTa98wBWTIYRPJ0l3aXKaSE4y7Iu3pT",
"1je7LGqEYNybEM0RHVEMKJwubNJKfPVoLGwRqmKrPisfx+N1akEwg2NdnAlv993g4u5qePlleDYa+T3/",
"dHh5dXdx9u1sdO33fB56kP/zy/Dy5upueHlzcXo3vPw0uDCaUVseavm5ldGasYTQ+0NzKlFh3+XUZQT2",
"jBtZRxUVGfVzlA2Y2kogLZXwbRzN5lvLtUgxnnecJJ5eU8ApUWEDvtcWZQzsS77VaEvkm5WNjhnxD06N",
"W6N6mxWFlcK3t6xjcD3CqeS9av0VUowCQ8AAl6q/w8WJivYzHGal/MWqUH6AC2LWmNXwTKjUTFHS0JmQ",
"Bx5JYIAmKMgn8f6RAELg2HtEwJugkEL8T8f0yG/FCi8uzqj81yutNcUpNIwvr8nWvML1xlxnbrFWMIrA",
"VXdSy4Ou1xbSo4KphQzb9j1WzD3Sgz63DcLGajHp9R6zKO368GrhrYXjT4sWg19rvbT6R1K5bKmLGkZY",
"vYrSH1r1MYm74mJv6+XDjlwktWuOu5xfV00oqbma4zgrkymMtV2bRqglAWMhOEM5jji60iRKNcEGx9GI",
"Ke6pJSccPjpcvLOIZJlau5kEsJakn3VqoGdywhORrE6NQvWxIo1uMku5NG3TIqwaDI9ub0N1aqgT0bGJ",
"n0rNK/NL7jHmPSjOM36UHGb8phjV+DHnXcPnutVcg6kJf2GM12MyWtmmYnYFCQjrCETKiBPMZO7ELCZq",
"UrvukIXzmiaUeQgTS5WAO1t6z4rTEvMK258vJbyZcgUfK6lvLQbO8LNeHVIc+mb05XrAnbSUtUezpsyU",
"eaVg6nLBhG4d06yqq9hKV8lyxONSOqLNMNTjD7mxY/M6z/+16ZBt6YJohkyzwJAfncTOk2Zad706qz4t",
"z1wFs8JkYaDbZpI6hey6Z66ZgcFT8bPBBAWevH8ffz33xlnD9lK1OI8D0OZH0rZEhT8BlfBExiDFiC5G",
"+eOJ9xBgiNUbi+JVRXZN5T/nC5xRytNIgjh+QFA1RwxD4idlwj/yKy+Fyqf3uH8KRZPYjGT1KOvx1YBn",
"IfLqSn7x12yX/IP9d/vv+CYnMAIJ8o/89/sH+++4wkpnfGl9kKB+iB6h9BBU5/2iPACsVQQJ8bKbHKPB",
"zA7qn8vvX/i6sLxw8VkO372rDvwbBCGdcTH60fT9IqbZnIWd8Y/+vO35RD2lwCDMGypf0J9y/GAGgwf/",
"lvXna8UQjBfNi2XNUN1qh6rBOpfLgeOxrvyJNo9iMJnImPi61WfQNi7/8aAPZCDuHo8K2eMmPdL/zn/W",
"f3sRMIaQGjTJU/478UD2dAxPXxKxL7x7BWOlGHAxgrg8gTmk/Oj6syaRpDKDxy9CnL8YPefcVVmKr8sH",
"YZITMmblm9XLbWXvP1SxNUqDABIyScNw4QmUjgvv7lSQ99LzPwgqCeKIynuxfPyXDdr/Sybc5utweZBX",
"xjeV7fVzEDIswLEXY+8ejD2c11398O792sEwQfE5xvdoPIYiLDunb0EndWSmKF4mntz2/Oc99ZYl/yDz",
"VnoGwrjlVwBq8nqJ4PlVSFyM8GOQuChyFwvZuRZicMgPMZBJLbZoLJ8srWDjxSyi17IQSz5pFfaCGJAP",
"nXZiwE0MCGrZnBjQD8gE7fFXMtmpqP7mp2FifIZ5CB/jB57Dmz8wK/xm2YwlMZEgXhRdrEl0d5ES2fAW",
"maBg3anjDvPlSTpXpeB/YKImbahakg7b2Gu5c4qM89/qKDnb8gIFC1NR/zv//0tf3Q1tKi/fm+x9PBDl",
"79YV6TZ7d0/ovI30Kl7Zsx1hwty/TVJdH83lLxA2iXkMKUbwUTKAwAjfj44LClcYDTM5DwhnTw39Cxoq",
"0L6IWtkDSdLXI26IlQHOEaG2OJ3qvS8LEGLdBqWmG6M3h4cy2hFicZG7RIsH2wHjJgIpncUY/QeOxcQf",
"tzPxV0hn8diLYqbFhPGTenI3v95/L1iQ/rx9Kdz3m8hV8Y5o4sYb/e/T2Z7+y0ufh9g580wWkIdgA8vw",
"h0hcDg8dHOsZUgL7jZ4mtmda2rF0YQ86jn67HF1ipjJDV07DMhOsxPL8d/bXHo+sfcn/zVjupX8v3ypy",
"Fg1Zh1qx8Clv9dYkQ88lQtkKZI7qWhDbTqreErXPKVu4T7kdCVh5C6udEMyorROAb1cAaiJjHcKv/6SV",
"0zdacLS5p2F8D0JVyd0itITh5gtv+i1r2ewDKhBugmP2DzjOJutodpdotuhlExQCTBTSrHErCux/l3+8",
"ONGiNHa60KIweua02HiIykGt5+eTRtZb1ag7jvnhOKZCx3UcM4f1xkqSvZmXZSCqAAh+EEQBrHCKyiWx",
"++rXhT6ZgNVGZcmiVXaFmBuCDfSQf7mPWa4OP59V5704EQkx0gVX2uQ+Kj3Oab9OgDD0Cq1tGyyMcoWG",
"G9VZTQ/zttp8VdausLpdIoSiklbahOr+65vM38Tpf+f/cwgm8Ub6GzqVLdYfRnKPHSmMaT3sOIg7GSRS",
"xEl33L3+cVcOTClTreIJ/nud70IQXZFjItL/TiLixC3Fd6Cq/BKRFmxSelTKyihSpO4cm5SQ0THKDjJK",
"hWAzVrkY1TJKRAxsIj6/KCe3/ULF5lW+5gqLtA7RsnFGBu1WzYzCayrellnKxa7BcPjxYwGIg+7K1l3Z",
"nK5shMJkD6f88JJ/vvTFE9d7CbZz5glv4gEvScNQ7YwMncpyBCpMK3LGBeOKEa6wCwOrWlz2w03C/vZC",
"KSUa8geGP+N4nlUOMwdR8sIFHo3FQ/nlXdhqAGVb8AsSRj6lzm9ThRX83AE2bNYP25n1ImZMm0blc1+y",
"d4mslCDJknvqTn7Fkc3iZiwfdauPcUOTiSppoqTBPaRPUJabmseEqtJ97BuIBF1NECb8l32bOPoCKX9W",
"7i3JoQ1x8xdItYf2lvTj8e3sOPiVOZjxzViQ9YbYNk9qtlv/RJsS39oYkfU4E4O+EV7s1ZRhorFHHlCi",
"YPs7hXiRAxdPJoRbtA2goIj+8sFYkal+OlFl735hmZJ/bjnjJoVN5dnEJeJxSSdotiVoCjz3BAi/mpgk",
"T5nxNbbPrAbsJ6YhrkcShfG0Xg4RL4ynXogiWJZFVa3gPJ6eo0i8PNWJod0QQz37o0IhfIQhYfOKUnY1",
"E/OWhZlrfUSSDlivzwiGY9vKCQQ4mHl8Ng2OSYwtgIgObQEZiV4GIL7xF+5ij6fJ29fPP39aiLW0nPxS",
"72vBg5h+jDBUr83XQHGqNVsGkrz/hqPbNGnQdD4xkuwOJ0uiCD8VMimsnQXn8bT9MSA+kyb7FPEAfyXH",
"kswnIm9EU3+T5p3iM171qbGZWecVEmFbWWxo9lhgl+eqG00yYmvKajVRdOYcEdUSa7LbebLfMyIURdN6",
"An87jpItpKu7MWFepOdVE9M7flxb3nmLLPNavjTXYKmPegOZtmrLgSdN9ShcryM7wcHbLNawhOXAvgkd",
"7xTUtTpqdWemXgsVrX2hlkx7+1kPN13DXF8tFmcV9OCVa7FUT8CuFourjrpSLRa3U7JPIGX/J81121QX",
"T3Wpr8SikQuKpiPZx7G8xU9yTGqIWeGM1PekY6WCh82KprXxUVbQqN7RltUXIm71izp9MssC4PhYNmlV",
"xTp1tr6y8pgVQSLtKiM1KYxLFOvqdESOAEXrmlq4+dCwfNKOv9bFX5IRliw9Vn/gOER1EJ7UVQjtEL0t",
"ZcfeylnzM7tRH+DCyYnK2hVmdSrBz8mAF8muPmdih0l7is4JtlxWtAZQexNvORBxGsly09AJVtXW2f1p",
"fi7olVzSfD9fxyHNp94Bd7QOh+6MriGWLPn5AS68RxCm0EsAwhV6yV5T+5Ox28ERb3rg99i/DsW/Dpl4",
"N63H8AifkRmaXhnaeAXGzqO+Fi0bqnhJx7qLLod/nwt5Rw1AnBwOWsDvcNFdOvOjcCn658jueMDEA57U",
"TNbJBxgmIVjUVY9m33X/nuho4QBVM5oP+vNeRgUC5KNWtQ4KVYSYn5BY4W17Tvg2ockMuO6ostbKZuhZ",
"82GFokdEYVvXu+pldicM+NfunFJeBA0fS/kPFLY7r4HJsZ7T4oa86WKCWlrvbKOa/1ygxM1tLnD7qr5y",
"Ae4yLnJJGB1bmv3iGd+sx4kn+Vz9sCf+3e6RLgdWbv0s125ZWot8VQ/bXoaOt362NnKv4c2xHeNeU8WY",
"bH9scf3FfWzzlpcDJ7zx0jA7yAmbDcpe7tx9tbBsR841PBO2y5wrw6Vbc27dyTeH83v5sHGLO5rqZWbx",
"r/xrd0dT1KjhY6k7msJ2pwya7mg5La5HFxSlA2tjuQplCUltNc2OC0QYl46TNqGOJVR3Bc52qPagxguW",
"2rUNdTodGLFPKMDUyo4j9lVEJ18ep3TmGcua3RCIxeHJAbpkCOU93yJnvn932FAXkKNMHiEFrMwgGMvD",
"PowFwRRppTz3S6unJjhKizMqQmA7sDQdNIXWlopfElMtyk4OSzl8MSrU8G4hictY7mTxzsniKiM4lYFt",
"jOh1qIfcmak5Aor8VRvIuz6aLU7qbG7uCjvvMENbOc+Ro2tPVEONptr6jnpJx4XgXFulxjdr0/vRS0e6",
"1nwtmiCy2mBdGbddK+PGGHOtpduc5EQ/AFEAQ3vM1TGlcJ5QHhIk2jpUlpQVp8XQnQR52xJkjAh3FUkR",
"Iogg3D0d45VjrpoYZVsMjSHrWBNDyTo48zBv3rHwLkZ14jSSW9XgyMvK14unr03LfdkJTaWL6ayN6RQ5",
"T1sXKPmaagvGi2aOpae/QDoSw3ai5fW0AzlefP8XDOiSNwm5792FYqcvFGqXNiI1nmL8AHG9xSEMPdGs",
"oV7GN96oM++LBNSlwgy6FH5L8RhJgKUXGpb3ZClEyxNT/2eT/a2Qx9zIEDIP+S2b4woLtj6krWHwDXOt",
"3K4l2bYzz5k5N8NNuwdWCjS1PD/3E+zw8LL+fhMpvc5mVIc1cmGDaO96kY7VNwVg4Qk9QoWf01Y0AjpX",
"KNA2b8Q7bt79r9PLkgV/VChogXQ7+VNyxRexs3EJRJyUad7STXvoFOr8aO5U6vUfzO14wpEJGn1TJ8rM",
"fg9oMCvXByJ1p+3b8U1tyH6s4YIIZLhakaVzo6yzkrUblfNHAdi/nnKAB2NSKK6zEoKr5ada2sWkQ8yg",
"xnfiQo+AEWRTJpmy1Ki3SblKDj646xnKQXG+hXfV9Xa1up5ewoTNOYU029p9y8S8/WDsb+tO5A6Z6rJ5",
"4BKAGdIs97YSWKLxN10Ybwk+g8fFCJt0MW0Wrp0uSdiVwKtB3Qrqf3emN9jmNnMN4Oa4OduRgDTFtQrQ",
"GOkXGVN2b2Nr/ypnfKtnfXcUdkfhdo/C7rSp3q2VFFnhxFHCqzt5NnLypARi0g9SjOVS6p/NkQ091s2Y",
"mvoF0hM52AZpjKdgtiMqDnGXFfT6WUGuGbiMyEvkVszArZLxFNFZer8HkqQfgDC8ryt+cBIzKUxhi4Tr",
"L3z44yThSdcnaobWOc2BnHr7Wc1q0XWJzbVoXW8GexGhKov9R8kQtyFyg7S5w4S5NjrcCA3uJgGuTG9x",
"PA3hZuiND/2D05tA35rpLUfcD0dvTUWS82r+xZq03DDhpF6yEfSyaMTfparEWgX9n6okscvlxfVYdStZ",
"bKW9PggCmNCalE7+vV2FR9HH34wvWgxeKUpo8R/XUJ9YeVd6tz5RkSOpsfSunb4w5H7xmhRD9r0dfYk+",
"/qZy5djga6AvsfKOvhoS1RiSlqCvMJ6imszV83hKPBR5gJ+N+zUKxjkfaENlVNkRzMbf0quOTnaeMJ5O",
"4dhDXdGX3TLvFI91RjWudpwwnsYpbWCGOKVu3MCG2hEaZaB0RPp2bJCCelzJVlZvnaGkxRVI6+R2DdLr",
"8PJuMiZrowRunrT9fUhHUXcnWuZOpGOwmSQTQMhTjMd2WSoLcQtJ6qn2dSL1So25OR3jZAaiaTbRLikb",
"AYdsnCGqE+dvSJwLsipSugMTYThlggzXXfpEC1KrkZzoT/dsgm0UGLvEMAp5nRv2TejpioRcdR5R2HoT",
"Hoa8vPXP4GmV6fki8RZilxR6g24oUvodU+XFGLVJqnyKt1s/Yom4ox2TTjtTOKJF3YieIp0KgYuI1iy3",
"3OFNMj2F3Cl61f1ZMi0Usz5Le6ss8KFBoukPdGUAduWHtlR+6MJSbUgSq0Yxy+RI89rlLnVTnDihxSmw",
"e2yw/nDUJWNQu9PAHHa6PIk3nAn9EEUPeyLUp8YCjKIHD3iimYdhEhNEY7zwaKwxipU3pG0YRQ8i/OdN",
"Mcr67445IoYZJl0TokPLTmy1yqYzkzNoJYdXIe6O0Vc+RjlXmyhpQ6LGJa2KkUoxfyoTBI8QE9MLItoJ",
"3CKNahfkiyG9RSRqVspLuOd4Lp/haQZoiuM04Uk2OQhqg6yg8E6/w0UBmNdQQ1bMiJEk1yXF7IrIKqTi",
"zDNu35DAohhNp3WG6GvRQD7Mv1ShRffHj3ZSYl0b2GXfG0y4BZSkjDrguCeqfwEKCc14ChFvAnmpf1sq",
"Zi7wd1yhk2Sg7WqbKumlon3bV+Nc6keqh5+66pG7JhKVDGqoW9lUf7mFWJR8SVzrziqOdxKJf4jGb8ic",
"8iPIxA1LGLmpK6phnazZKfUrJ8UNqV9KzvTHcIIipHyGbURO3rOt9DnN5+zk0A8mh7S9XfFiqFFmJ5x2",
"UDjpG7S8nCpHItxDgCHOIhF6xtgEiB+VvEhx6B/5/svty/8PAAD//+RAmEcTbgEA",
"G51fspbnZ8ejbMzB2endp3/f3YzOzFZyIyVrKNCs7RK64eB6cHJ8XjdaXRiB/OtOrOHr2UUJTSuFGVxn",
"keUltT0C4YKigFwm9DI1Z9fJbNzsrjIDxIsTdkGS+mg2yL4xEXfjyXW2TIWVUx2aU/GsWQvGPKDtJgBt",
"JhSzJg/IuOYdkHbmvTDlS03jPUFy/pBNwCWh1htF05EMPq0uKQREtLI7LXnWLU/LBZEHWFuejEtkGoGj",
"yxI8K2A+c903Ciy5CnPw7E1UEw9Q72mGgplHYzH3eo03dooxAmynn0Fm5d5M6thLlkBrv2hradNimK2m",
"FC+Xn9Zkg5LS22ZBU5/tWBMt6mxofIRCQu0SkrWQWJfvlZ4k1EA7OyNyJCm3kzRiT6vwvxpBueejMdZr",
"an1DIBY9rtL7EAV1pMDHq0mx1GHemU2X+7fMpg/lPinl8PLbBdesj0+/Di78nv/17Osni7p6jdF0CrFm",
"4bL6oMyZk9UrZltX1Q2v7fAzJkDrK2/w4a4l99jKEDog9v3fhs6/OSXFfVvjOaIRCnvjVEXlrX6lMEWE",
"3BCTvG7UV8B4jCEhut5SUC/UQVhVX9iH3wCZmfhmBshMH/L/kNJ0kpPE0S8qXI1EsSjvZAaodcI/IM4s",
"ZXZ64doXo5ZH2Zz9inARBjPNzAC5AoQ8xdh1DuAlsoNHIN3i7XOMSBKCRYFk1P61VnSK2L21ENjJDERT",
"qBBkLygAn+xI5FQOn3KsKTXNDPsSAlSNzNed1AKSAVGLv9VgqISkyi+9Ap5sKD+PpyhavpTFcvy9UmWL",
"ncO4WmPShOshnCJC2f/fELrdzhKLYNjB3VL1pVw3TVdUyAwl5K0q4ZVLyRZP802cMmIy07Z94z45W/yG",
"xSQiPwpDgfDqeQGIvARitr79VjZJ8AhQCO5DqLzwDXUAqtPCZxikFHpBHMlI5XCxby5Pi0jCa8/gQUNl",
"Px74g6YRHHt5px2o8RcCHpSK6T0E9JjWmqxyLHFDI4ER9YA3U73311srcQ6e7Ts4B89ons7Xt5MbdyII",
"qPbNsQIBjFTxB2ILAmJteFiOWGpWbTgfeNVM3TyCwODO4N+qi1GmhOOT68EfZzxLRP5529bbYRcnOyD2",
"pVwz+jKVUcRUxyAJ48UcNhu51BinWY+TOJqgaWMRbEs1BJWJvG/JcLdsM/tiGsIJRzIrvkxWTFi0z8fe",
"CkNaMaT0G4MsBNPlMaTWeA2MR5cst9COKtl4f4iOWZhrXVC5K9uxcU+U3AwMRYemkGrfMwdkyWAQydNd",
"2l2mkBKOuyDv6k1Z3+yyqBGCcW9CNEd0RDGgcLqwSSvx1aOxsEWoGq76rHwcj1euBcEMjnVxJrzdd4OL",
"u6vh5Zfh2Wjk9/zT4eXV3cXZt7PRtd/zeTBC/s8vw8ubq7vh5c3F6d3w8tPgwmhGbXmo5edWRmvGokLv",
"D83JRYV9l1OXEdgzbmQdVVRk1M9RSGBqK4q0VAq4cTSbby3XIsV43nGSeHqVAafUhQ34XlsUNrAv+Vaj",
"LZGBVjY6ZsQ/ODVujeptVhRWCujeso7B9QinIviq9VdIMQoMAQNcqv4OFycq/s9wmJUyGqtC+QEuiFlj",
"VsMzoVIzRUlDZ0IeeCSBAZqgIJ/E+0cCCIFj7xEBb4JCCvE/HRMmvxVrvrg4o/Jfr7TWFKfQML68Jlsz",
"DdcbhZ25xVrBKEJZ3UktD8NeW0iPCq8WMmzb91gx90gPA902CBurzqRXgMzitusDroW3Fo4/LVoMfq31",
"0ioiSeWypS5qGGH1ukp/aPXIJO6Ki72tlw87cpHUrjnucn5dVaKk5mqO46xMpjDWdm0aoZYEjIXgDAU6",
"4uhKkyjVlBscRyOmuKeWLHH46HDxzmKUZbLtZlLCWpJ+1qmBnskJT02yOjUK9ciKNLrJvOXStE2LsGow",
"PN69DdWpoU5ExyZ+KjWvzC+5x5gJoTjP+FFymPGbYlTjx5x3DZ/rVnMNpib8hTFej8loZZuK2RUkIKwj",
"ECkjTjCTuROzmKhJ9rpDFs5rmlBmJkwsdQPubAk/K05LzCtsf76U8GbKHnysJMO1GDjDz3p1SHHom9GX",
"6wF30lLWHs2aMlPmlYKpywUTunVMs6quYitdJe8Rj0sJijbDUI8/7caOzes8I9imQ7alC6IZMs0CQ350",
"EjtPmmnd9eqs+rQ8cxXMCpOFgW6bSeoUsuueuYoGBk/FzwYTFHjy/n389dwbZw3bS9XiPA5Am59N2xIV",
"/gRUwlMbgxQjuhjlzyneQ4AhVq8uincW2TWV/5wvcEYpTyMJ4vgBQdUcMQyJn5QJ/8ivvB0qH+Pj/ikU",
"TWIzktUzrcdXA56XyOst+cVfs13yD/bf7b/jm5zACCTIP/Lf7x/sv+MKK53xpfVBgvoheoTSQ1Cd94vy",
"ALBWESTEy25yjAYzO6h/Lr9/4evC8sLFZzl896468G8QhHTGxehH0/eLmGZzFnbGP/rztucT9bgCgzBv",
"qHxBf8rxgxkMHvxb1p+vFUMwXjQvljVDdasdqgbrXC4Hjse68kfbPIrBZCJj4utWn0HbuPzHgz6Qgbh7",
"PCpkj5v0SP87/1n/7UXAGEJq0CRP+e/EA9ljMjx9ScS+8O4VjJViwMUI4vIE5pDyo+vPmkSSygwevwhx",
"/mL0nHNXZSm+Lh+ESU7ImJVvVi+3lb3/UMXWKA0CSMgkDcOFJ1A6LrzEU0HeS8//IKgkiCMq78XyOWA2",
"aP8vmYKbr8PliV4Z31S2189ByLAAx16MvXsw9nBeifXDu/drB8MExecY36PxGIqw7Jy+BZ3UkZmieJl4",
"ctvzn/fU65b8g8xb6RkI45ZfAajJ6yWC51chcTHCj0HiouxdLGTnWojBIT/EQCa12KKxfMS0go0Xs4he",
"y0Is+aRV2AtiQD592okBNzEgqGVzYkA/IBO0x9/NZKei+pufhonxYeYhfIwfeA5v/uSs8JtlM5bERIJ4",
"mXSxJtHdRUpkw1tkgoJ1p447zJcn6VwVh/+BiZq0oWpJOmxjr+XOKTLOf6uj5GzLCxQsTEX97/z/L311",
"N7SpvHxvshfzQJS/ZFek2+wlPqHzNtKreHfPdoQJc/82SXV9NJe/Sdgk5jGkGMFHyQACI3w/Oi4oXGE0",
"zOQ8IJw9NfQvaKhA+yJqZQ8kSV+PuCFWBjhHhNridKr3vixAiHUblJpujN4cns5oR4jFRe4SLR5sB4yb",
"CKR0FmP0HzgWE3/czsRfIZ3FYy+KmRYTxk/qEd78ev+9YEH68/alcN9vIlfFO6KJG2/0v09ne/ovL30e",
"YufMM1lAHoINLMOfJnE5PHRwrGdICew3eprYHm5px9KFPeg4+u1ydImZygxdOQ3LTLASy/Pf2V97PLL2",
"Jf83Y7mX/r18vchZNGQdasXCp7zVW5MMPZcIZSuQOaprQWw7qXpd1D6nbOE+5XYkYOV1rHZCMKO2TgC+",
"XQGoiYx1CL/+k1Zg32jB0eaehvE9CFVtd4vQEoabL7zpt6xlsw+oQLgJjtk/4DibrKPZXaLZopdNUAgw",
"UUizxq0osP9d/vHiRIvS2OlCi8LomdNi4yEqB7Wen08aWW9Vo+445ofjmAod13HMHNYbK0n2il6WgagC",
"IPhBEAWwwikql8Tuq18X+mQCVhuVJYtW2RVibgg20EP+5T5muTr8fFad9+JEJMRIF1xpk/uo9Fyn/ToB",
"wtArtLZtsDDKFRpuVGc1PdXbavNVWbvC6naJEIpKWmkTqvuvbzJ/Jaf/nf/PIZjEG+mv6lS2WH8qyT12",
"pDCm9bDjIO5kkEgRJ91x9/rHXTkwpUy1iif473W+C0F0RY6JSP87iYgTtxRfhqryS0RasEnpmSkro0iR",
"unNsUkJGxyg7yCgVgs1Y5WJUyygRMbCJ+PyinNz2CxWbV/maKyzSOkTLxhkZtFs1MwqvqXhtZikXuwbD",
"4cePBSAOuitbd2VzurIRCpM9nPLDS/750hePXu8l2M6ZJ7yJB7wkDUO1MzJ0KssRqDCtyBkXjCtGuMIu",
"DKxqcdkPNwn72wullGjInxz+jON5VjnMHETJCxd4NBZP55d3YasBlG3BL0gY+bg6v00VVvBzB9iwWT9s",
"Z9aLmDFtGpXPfcneJbJSgiRL7qk7+RVHNoubsXzmrT7GDU0mqqSJkgb3kD5BWW5qHhOqSvexbyASdDVB",
"mPBf9m3i6Auk/KG5tySHNsTNXyDVnt5b0o/Ht7Pj4FfmYMY3Y0HWG2LbPKnZbv0TbUp8a2NE1uNMDPpG",
"eLFXU4aJxh55QImC7e8U4kUOXDyZEG7RNoCCIvrLB2NFpvrpRJW9+4VlSv655YybFDaVhxSXiMclnaDZ",
"lqAp8NwTIPxqYpI8ZcbX2D6zGrCfmIa4HkkUxtN6OUS8MJ56IYpgWRZVtYLzeHqOIvHyVCeGdkMM9eyP",
"CoXwEYaEzStK2dVMzFsWZq71EUk6YL0+IxiObSsnEOBg5vHZNDgmMbYAIjq0BWQkehmA+MZfuIs9niZv",
"Xz///Gkh1tJy8ku9rwUPYvoxwlC9P18DxanWbBlI8v4bjm7TpEHT+cRIsjucLIki/FTIpLB2FpzH0/bH",
"gPhMmuxTxAP8lRxLMp+IvBFN/U2ad4rPeNWnxmZmnVdIhG1lsaHZY4FdnqtuNMmIrSmr1UTRmXNEVEus",
"yW7nyX7PiFAUTesJ/O04SraQru7GhHmRnldNTO/4cW155y2yzGv50lyDpT7qDWTaqi0HnjTVo3C9juwE",
"B2+zWMMSlgP7JnS8U1DX6qjVnZl6LVS09oVaMu3tZz3cdA1zfbVYnFXQg1euxVI9AbtaLK466kq1WNxO",
"yT6BlP2fNNdtU1081aW+EotGLiiajmQfx/IWP8kxqSFmhTNS35OOlQoeNiua1sZHWUGjekdbVl+IuNUv",
"6vTJLAuA42PZpFUV69TZ+srKY1YEibSrjNSkMC5RrKvTETkCFK1rauHmQ8PySTv+Whd/SUZYsvRY/YHj",
"ENVBeFJXIbRD9LaUHXsrZ83P7EZ9gAsnJyprV5jVqQQ/JwNeJLv6nIkdJu0pOifYclnRGkDtTbzlQMRp",
"JMtNQydYVVtn96f5uaBXcknz/XwdhzSfegfc0TocujO6hliy5OcHuPAeQZhCLwEIV+gle03tT8ZuB0e8",
"6YHfY/86FP86ZOLdtB7DI3xGZmh6ZWjjFRg7j/patGyo4iUd6y66HP59LuQdNQBxcjhoAb/DRXfpzI/C",
"peifI7vjARMPeFIzWScfYJiEYFFXPZp91/17oqOFA1TNaD7oz3sZFQiQj1rVOihUEWJ+QmKFt+054duE",
"JjPguqPKWiuboWfNhxWKHhGFbV3vqpfZnTDgX7tzSnkRNHws5T9Q2O68BibHek6LG/Kmiwlqab2zjWr+",
"c4ESN7e5wO2r+soFuMu4yCVhdGxp9otnfLMeJ57kc/XDnvh3u0e6HFi59bNcu2VpLfJVPWx7GTre+tna",
"yL2GN8d2jHtNFWOy/bHF9Rf3sc1bXg6c8MZLw+wgJ2w2KHu5c/fVwrIdOdfwTNguc64Ml27NuXUn3xzO",
"7+XDxi3uaKqXmcW/8q/dHU1Ro4aPpe5oCtudMmi6o+W0uB5dUJQOrI3lKpQlJLXVNDsuEGFcOk7ahDqW",
"UN0VONuh2oMaL1hq1zbU6XRgxD6hAFMrO47YVxGdfHmc0plnLGt2QyAWhycH6JIhlPd8i5z5/t1hQ11A",
"jjJ5hBSwMoNgLA/7MBYEU6SV8twvrZ6a4CgtzqgIge3A0nTQFFpbKn5JTLUoOzks5fDFqFDDu4UkLmO5",
"k8U7J4urjOBUBrYxotehHnJnpuYIKPJXbSDv+mi2OKmzubkr7LzDDG3lPEeOrj1RDTWaaus76iUdF4Jz",
"bZUa36xN70cvHela87Vogshqg3Vl3HatjBtjzLWWbnOSE/0ARAEM7TFXx5TCeUJ5SJBo61BZUlacFkN3",
"EuRtS5AxItxVJEWIIIJw93SMV465amKUbTE0hqxjTQwl6+DMw7x5x8K7GNWJ00huVYMjLytfL56+Ni33",
"ZSc0lS6mszamU+Q8bV2g5GuqLRgvmjmWnv4C6UgM24mW19MO5Hjx/V8woEveJOS+dxeKnb5QqF3aiNR4",
"ivEDxPUWhzD0RLOGehnfeKPOvC8SUJcKM+hS+C3FYyQBll5oWN6TpRAtT0z9n032t0IecyNDyDzkt2yO",
"KyzY+pC2hsE3zLVyu5Zk2848Z+bcDDftHlgp0NTy/NxPsMPDy/r7TaT0OptRHdbIhQ2ivetFOlbfFICF",
"J/QIFX5OW9EI6FyhQNu8Ee+4efe/Ti9LFvxRoaAF0u3kT8kVX8TOxiUQcVKmeUs37aFTqPOjuVOp138w",
"t+MJRyZo9E2dKDP7PaDBrFwfiNSdtm/HN7Uh+7GGCyKQ4WpFls6Nss5K1m5Uzh8FYP96ygEejEmhuM5K",
"CK6Wn2ppF5MOMYMa34kLPQJGkE2ZZMpSo94m5So5+OCuZygHxfkW3lXX29XqenoJEzbnFNJsa/ctE/P2",
"g7G/rTuRO2Sqy+aBSwBmSLPc20pgicbfdGG8JfgMHhcjbNLFtFm4drokYVcCrwZ1K6j/3ZneYJvbzDWA",
"m+PmbEcC0hTXKkBjpF9kTNm9ja39q5zxrZ713VHYHYXbPQq706Z6t1ZSZIUTRwmv7uTZyMmTEohJP0gx",
"lkupfzZHNvRYN2Nq6hdIT+RgG6QxnoLZjqg4xF1W0OtnBblm4DIiL5FbMQO3SsZTRGfp/R5Ikn4AwvC+",
"rvjBScykMIUtEq6/8OGPk4QnXZ+oGVrnNAdy6u1nNatF1yU216J1vRnsRYSqLPYfJUPchsgN0uYOE+ba",
"6HAjNLibBLgyvcXxNISboTc+9A9ObwJ9a6a3HHE/HL01FUnOq/kXa9Jyw4STeslG0MuiEX+XqhJrFfR/",
"qpLELpcX12PVrWSxlfb6IAhgQmtSOvn3dhUeRR9/M75oMXilKKHFf1xDfWLlXend+kRFjqTG0rt2+sKQ",
"+8VrUgzZ93b0Jfr4m8qVY4Ovgb7Eyjv6akhUY0hagr7CeIpqMlfP4ynxUOQBfjbu1ygY53ygDZVRZUcw",
"G39Lrzo62XnCeDqFYw91RV92y7xTPNYZ1bjaccJ4Gqe0gRnilLpxAxtqR2iUgdIR6duxQQrqcSVbWb11",
"hpIWVyCtk9s1SK/Dy7vJmKyNErh50vb3IR1F3Z1omTuRjsFmkkwAIU8xHttlqSzELSSpp9rXidQrNebm",
"dIyTGYim2US7pGwEHLJxhqhOnL8hcS7IqkjpDkyE4ZQJMlx36RMtSK1GcqI/3bMJtlFg7BLDKOR1btg3",
"oacrEnLVeURh6014GPLy1j+Dp1Wm54vEW4hdUugNuqFI6XdMlRdj1Cap8inebv2IJeKOdkw67UzhiBZ1",
"I3qKdCoELiJas9xyhzfJ9BRyp+hV92fJtFDM+iztrbLAhwaJpj/QlQHYlR/aUvmhC0u1IUmsGsUskyPN",
"a5e71E1x4oQWp8DuscH6w1GXjEHtTgNz2OnyJN5wJvRDFD3siVCfGgswih484IlmHoZJTBCN8cKjscYo",
"Vt6QtmEUPYjwnzfFKOu/O+aIGGaYdE2IDi07sdUqm85MzqCVHF6FuDtGX/kY5VxtoqQNiRqXtCpGKsX8",
"qUwQPEJMTC+IaCdwizSqXZAvhvQWkahZKS/hnuO5fIanGaApjtOEJ9nkIKgNsoLCO/0OFwVgXkMNWTEj",
"RpJclxSzKyKrkIozz7h9QwKLYjSd1hmir0UD+TD/UoUW3R8/2kmJdW1gl31vMOEWUJIy6oDjnqj+BSgk",
"NOMpRLwJ5KX+bamYucDfcYVOkoG2q22qpJeK9m1fjXOpH6kefuqqR+6aSFQyqKFuZVP95RZiUfIlca07",
"qzjeSST+IRq/IXPKjyATNyxh5KauqIZ1sman1K+cFDekfik50x/DCYqQ8hm2ETl5z7bS5zSfs5NDP5gc",
"0vZ2xYuhRpmdcNpB4aRv0PJyqhyJcA8BhjiLROgZYxMgflTyIsWhf+T7L7cv/z8AAP//wIITRCVuAQA=",
}
// GetSwagger returns the content of the embedded swagger specification file

View File

@@ -691,6 +691,7 @@ export enum StepRunEventReason {
REASSIGNED = "REASSIGNED",
TIMED_OUT = "TIMED_OUT",
SLOT_RELEASED = "SLOT_RELEASED",
RETRIED_BY_USER = "RETRIED_BY_USER",
}
export enum StepRunEventSeverity {

View File

@@ -93,6 +93,7 @@ const REASON_TO_TITLE: Record<StepRunEventReason, string> = {
[StepRunEventReason.REASSIGNED]: 'Reassigned',
[StepRunEventReason.TIMED_OUT]: 'Execution timed out',
[StepRunEventReason.SLOT_RELEASED]: 'Slot released',
[StepRunEventReason.RETRIED_BY_USER]: 'Replayed by user',
};
function getTitleFromReason(reason: StepRunEventReason, message: string) {

1
frontend/docs/.npmrc Normal file
View File

@@ -0,0 +1 @@
package-manager-strict=false

View File

@@ -245,6 +245,7 @@ const (
StepRunEventReasonREASSIGNED StepRunEventReason = "REASSIGNED"
StepRunEventReasonSLOTRELEASED StepRunEventReason = "SLOT_RELEASED"
StepRunEventReasonTIMEOUTREFRESHED StepRunEventReason = "TIMEOUT_REFRESHED"
StepRunEventReasonRETRIEDBYUSER StepRunEventReason = "RETRIED_BY_USER"
)
func (e *StepRunEventReason) Scan(src interface{}) error {

View File

@@ -14,7 +14,7 @@ CREATE TYPE "JobRunStatus" AS ENUM ('PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED',
CREATE TYPE "LogLineLevel" AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR');
-- CreateEnum
CREATE TYPE "StepRunEventReason" AS ENUM ('REQUEUED_NO_WORKER', 'REQUEUED_RATE_LIMIT', 'SCHEDULING_TIMED_OUT', 'ASSIGNED', 'STARTED', 'FINISHED', 'FAILED', 'RETRYING', 'CANCELLED', 'TIMED_OUT', 'REASSIGNED', 'SLOT_RELEASED', 'TIMEOUT_REFRESHED');
CREATE TYPE "StepRunEventReason" AS ENUM ('REQUEUED_NO_WORKER', 'REQUEUED_RATE_LIMIT', 'SCHEDULING_TIMED_OUT', 'ASSIGNED', 'STARTED', 'FINISHED', 'FAILED', 'RETRYING', 'CANCELLED', 'TIMED_OUT', 'REASSIGNED', 'SLOT_RELEASED', 'TIMEOUT_REFRESHED', 'RETRIED_BY_USER');
-- CreateEnum
CREATE TYPE "StepRunEventSeverity" AS ENUM ('INFO', 'WARNING', 'CRITICAL');

View File

@@ -730,3 +730,137 @@ WHERE
srl."tenantId" = lrl."tenantId" AND
srl."key" = lrl."key"
RETURNING srl.*;
-- name: ReplayStepRunResetWorkflowRun :one
WITH workflow_run_id AS (
SELECT
"workflowRunId"
FROM
"JobRun"
WHERE
"id" = @jobRunId::uuid
)
UPDATE
"WorkflowRun"
SET
"status" = 'RUNNING',
"updatedAt" = CURRENT_TIMESTAMP,
"startedAt" = NULL,
"finishedAt" = NULL
WHERE
"id" = (SELECT "workflowRunId" FROM workflow_run_id)
RETURNING *;
-- name: ReplayStepRunResetJobRun :one
UPDATE
"JobRun"
SET
"status" = 'RUNNING',
"updatedAt" = CURRENT_TIMESTAMP,
"startedAt" = NULL,
"finishedAt" = NULL,
"timeoutAt" = NULL,
"cancelledAt" = NULL,
"cancelledReason" = NULL,
"cancelledError" = NULL
WHERE
"id" = @jobRunId::uuid
RETURNING *;
-- name: GetLaterStepRunsForReplay :many
WITH RECURSIVE currStepRun AS (
SELECT *
FROM "StepRun"
WHERE
"id" = @stepRunId::uuid AND
"tenantId" = @tenantId::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
WHERE sro."A" = (SELECT "id" FROM currStepRun)
UNION ALL
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
JOIN childStepRuns csr ON sro."A" = csr."id"
)
SELECT
sr.*
FROM
"StepRun" sr
JOIN
childStepRuns csr ON sr."id" = csr."id"
WHERE
sr."tenantId" = @tenantId::uuid;
-- name: ReplayStepRunResetLaterStepRuns :many
WITH RECURSIVE currStepRun AS (
SELECT *
FROM "StepRun"
WHERE
"id" = @stepRunId::uuid AND
"tenantId" = @tenantId::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
WHERE sro."A" = (SELECT "id" FROM currStepRun)
UNION ALL
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
JOIN childStepRuns csr ON sro."A" = csr."id"
)
UPDATE
"StepRun" as sr
SET
"status" = 'PENDING',
"scheduleTimeoutAt" = NULL,
"finishedAt" = NULL,
"startedAt" = NULL,
"output" = NULL,
"error" = NULL,
"cancelledAt" = NULL,
"cancelledReason" = NULL
FROM
childStepRuns csr
WHERE
sr."id" = csr."id" AND
sr."tenantId" = @tenantId::uuid
RETURNING sr.*;
-- name: ListNonFinalChildStepRuns :many
WITH RECURSIVE currStepRun AS (
SELECT *
FROM "StepRun"
WHERE
"id" = @stepRunId::uuid AND
"tenantId" = @tenantId::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
WHERE sro."A" = (SELECT "id" FROM currStepRun)
UNION ALL
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
JOIN childStepRuns csr ON sro."A" = csr."id"
)
-- Select all child step runs that are not in a final state
SELECT
sr.*
FROM
"StepRun" sr
JOIN
childStepRuns csr ON sr."id" = csr."id"
WHERE
sr."tenantId" = @tenantId::uuid AND
sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED');

View File

@@ -297,6 +297,89 @@ func (q *Queries) CreateStepRunEvent(ctx context.Context, db DBTX, arg CreateSte
return err
}
const getLaterStepRunsForReplay = `-- name: GetLaterStepRunsForReplay :many
WITH RECURSIVE currStepRun AS (
SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased"
FROM "StepRun"
WHERE
"id" = $2::uuid AND
"tenantId" = $1::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
WHERE sro."A" = (SELECT "id" FROM currStepRun)
UNION ALL
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
JOIN childStepRuns csr ON sro."A" = csr."id"
)
SELECT
sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased"
FROM
"StepRun" sr
JOIN
childStepRuns csr ON sr."id" = csr."id"
WHERE
sr."tenantId" = $1::uuid
`
type GetLaterStepRunsForReplayParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Steprunid pgtype.UUID `json:"steprunid"`
}
func (q *Queries) GetLaterStepRunsForReplay(ctx context.Context, db DBTX, arg GetLaterStepRunsForReplayParams) ([]*StepRun, error) {
rows, err := db.Query(ctx, getLaterStepRunsForReplay, arg.Tenantid, arg.Steprunid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*StepRun
for rows.Next() {
var i StepRun
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.TenantId,
&i.JobRunId,
&i.StepId,
&i.Order,
&i.WorkerId,
&i.TickerId,
&i.Status,
&i.Input,
&i.Output,
&i.RequeueAfter,
&i.ScheduleTimeoutAt,
&i.Error,
&i.StartedAt,
&i.FinishedAt,
&i.TimeoutAt,
&i.CancelledAt,
&i.CancelledReason,
&i.CancelledError,
&i.InputSchema,
&i.CallerFiles,
&i.GitRepoBranch,
&i.RetryCount,
&i.SemaphoreReleased,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getStepRun = `-- name: GetStepRun :one
SELECT
"StepRun".id, "StepRun"."createdAt", "StepRun"."updatedAt", "StepRun"."deletedAt", "StepRun"."tenantId", "StepRun"."jobRunId", "StepRun"."stepId", "StepRun"."order", "StepRun"."workerId", "StepRun"."tickerId", "StepRun".status, "StepRun".input, "StepRun".output, "StepRun"."requeueAfter", "StepRun"."scheduleTimeoutAt", "StepRun".error, "StepRun"."startedAt", "StepRun"."finishedAt", "StepRun"."timeoutAt", "StepRun"."cancelledAt", "StepRun"."cancelledReason", "StepRun"."cancelledError", "StepRun"."inputSchema", "StepRun"."callerFiles", "StepRun"."gitRepoBranch", "StepRun"."retryCount", "StepRun"."semaphoreReleased"
@@ -549,6 +632,91 @@ func (q *Queries) GetTotalSlots(ctx context.Context, db DBTX, arg GetTotalSlotsP
return items, nil
}
const listNonFinalChildStepRuns = `-- name: ListNonFinalChildStepRuns :many
WITH RECURSIVE currStepRun AS (
SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased"
FROM "StepRun"
WHERE
"id" = $2::uuid AND
"tenantId" = $1::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
WHERE sro."A" = (SELECT "id" FROM currStepRun)
UNION ALL
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
JOIN childStepRuns csr ON sro."A" = csr."id"
)
SELECT
sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased"
FROM
"StepRun" sr
JOIN
childStepRuns csr ON sr."id" = csr."id"
WHERE
sr."tenantId" = $1::uuid AND
sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED')
`
type ListNonFinalChildStepRunsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Steprunid pgtype.UUID `json:"steprunid"`
}
// Select all child step runs that are not in a final state
func (q *Queries) ListNonFinalChildStepRuns(ctx context.Context, db DBTX, arg ListNonFinalChildStepRunsParams) ([]*StepRun, error) {
rows, err := db.Query(ctx, listNonFinalChildStepRuns, arg.Tenantid, arg.Steprunid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*StepRun
for rows.Next() {
var i StepRun
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.TenantId,
&i.JobRunId,
&i.StepId,
&i.Order,
&i.WorkerId,
&i.TickerId,
&i.Status,
&i.Input,
&i.Output,
&i.RequeueAfter,
&i.ScheduleTimeoutAt,
&i.Error,
&i.StartedAt,
&i.FinishedAt,
&i.TimeoutAt,
&i.CancelledAt,
&i.CancelledReason,
&i.CancelledError,
&i.InputSchema,
&i.CallerFiles,
&i.GitRepoBranch,
&i.RetryCount,
&i.SemaphoreReleased,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listStartableStepRuns = `-- name: ListStartableStepRuns :many
WITH job_run AS (
SELECT "status"
@@ -983,6 +1151,186 @@ func (q *Queries) RefreshTimeoutBy(ctx context.Context, db DBTX, arg RefreshTime
return &i, err
}
const replayStepRunResetJobRun = `-- name: ReplayStepRunResetJobRun :one
UPDATE
"JobRun"
SET
"status" = 'RUNNING',
"updatedAt" = CURRENT_TIMESTAMP,
"startedAt" = NULL,
"finishedAt" = NULL,
"timeoutAt" = NULL,
"cancelledAt" = NULL,
"cancelledReason" = NULL,
"cancelledError" = NULL
WHERE
"id" = $1::uuid
RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobId", "tickerId", status, result, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "workflowRunId"
`
func (q *Queries) ReplayStepRunResetJobRun(ctx context.Context, db DBTX, jobrunid pgtype.UUID) (*JobRun, error) {
row := db.QueryRow(ctx, replayStepRunResetJobRun, jobrunid)
var i JobRun
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.TenantId,
&i.JobId,
&i.TickerId,
&i.Status,
&i.Result,
&i.StartedAt,
&i.FinishedAt,
&i.TimeoutAt,
&i.CancelledAt,
&i.CancelledReason,
&i.CancelledError,
&i.WorkflowRunId,
)
return &i, err
}
const replayStepRunResetLaterStepRuns = `-- name: ReplayStepRunResetLaterStepRuns :many
WITH RECURSIVE currStepRun AS (
SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased"
FROM "StepRun"
WHERE
"id" = $2::uuid AND
"tenantId" = $1::uuid
), childStepRuns AS (
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
WHERE sro."A" = (SELECT "id" FROM currStepRun)
UNION ALL
SELECT sr."id", sr."status"
FROM "StepRun" sr
JOIN "_StepRunOrder" sro ON sr."id" = sro."B"
JOIN childStepRuns csr ON sro."A" = csr."id"
)
UPDATE
"StepRun" as sr
SET
"status" = 'PENDING',
"scheduleTimeoutAt" = NULL,
"finishedAt" = NULL,
"startedAt" = NULL,
"output" = NULL,
"error" = NULL,
"cancelledAt" = NULL,
"cancelledReason" = NULL
FROM
childStepRuns csr
WHERE
sr."id" = csr."id" AND
sr."tenantId" = $1::uuid
RETURNING sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased"
`
type ReplayStepRunResetLaterStepRunsParams struct {
Tenantid pgtype.UUID `json:"tenantid"`
Steprunid pgtype.UUID `json:"steprunid"`
}
func (q *Queries) ReplayStepRunResetLaterStepRuns(ctx context.Context, db DBTX, arg ReplayStepRunResetLaterStepRunsParams) ([]*StepRun, error) {
rows, err := db.Query(ctx, replayStepRunResetLaterStepRuns, arg.Tenantid, arg.Steprunid)
if err != nil {
return nil, err
}
defer rows.Close()
var items []*StepRun
for rows.Next() {
var i StepRun
if err := rows.Scan(
&i.ID,
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.TenantId,
&i.JobRunId,
&i.StepId,
&i.Order,
&i.WorkerId,
&i.TickerId,
&i.Status,
&i.Input,
&i.Output,
&i.RequeueAfter,
&i.ScheduleTimeoutAt,
&i.Error,
&i.StartedAt,
&i.FinishedAt,
&i.TimeoutAt,
&i.CancelledAt,
&i.CancelledReason,
&i.CancelledError,
&i.InputSchema,
&i.CallerFiles,
&i.GitRepoBranch,
&i.RetryCount,
&i.SemaphoreReleased,
); err != nil {
return nil, err
}
items = append(items, &i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const replayStepRunResetWorkflowRun = `-- name: ReplayStepRunResetWorkflowRun :one
WITH workflow_run_id AS (
SELECT
"workflowRunId"
FROM
"JobRun"
WHERE
"id" = $1::uuid
)
UPDATE
"WorkflowRun"
SET
"status" = 'RUNNING',
"updatedAt" = CURRENT_TIMESTAMP,
"startedAt" = NULL,
"finishedAt" = NULL
WHERE
"id" = (SELECT "workflowRunId" FROM workflow_run_id)
RETURNING "createdAt", "updatedAt", "deletedAt", "tenantId", "workflowVersionId", status, error, "startedAt", "finishedAt", "concurrencyGroupId", "displayName", id, "gitRepoBranch", "childIndex", "childKey", "parentId", "parentStepRunId", "additionalMetadata"
`
func (q *Queries) ReplayStepRunResetWorkflowRun(ctx context.Context, db DBTX, jobrunid pgtype.UUID) (*WorkflowRun, error) {
row := db.QueryRow(ctx, replayStepRunResetWorkflowRun, jobrunid)
var i WorkflowRun
err := row.Scan(
&i.CreatedAt,
&i.UpdatedAt,
&i.DeletedAt,
&i.TenantId,
&i.WorkflowVersionId,
&i.Status,
&i.Error,
&i.StartedAt,
&i.FinishedAt,
&i.ConcurrencyGroupId,
&i.DisplayName,
&i.ID,
&i.GitRepoBranch,
&i.ChildIndex,
&i.ChildKey,
&i.ParentId,
&i.ParentStepRunId,
&i.AdditionalMetadata,
)
return &i, err
}
const resolveLaterStepRuns = `-- name: ResolveLaterStepRuns :many
WITH RECURSIVE currStepRun AS (
SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased"

View File

@@ -514,10 +514,6 @@ func (s *stepRunEngineRepository) assignStepRunToWorkerAttempt(ctx context.Conte
defer deferRollback(ctx, s.l, tx.Rollback)
if err != nil {
return nil, err
}
assigned, err := s.queries.AssignStepRunToWorker(ctx, tx, dbsqlc.AssignStepRunToWorkerParams{
Steprunid: stepRun.StepRun.ID,
Tenantid: stepRun.StepRun.TenantId,
@@ -769,6 +765,140 @@ func (s *stepRunEngineRepository) UpdateStepRun(ctx context.Context, tenantId, s
return stepRun, updateInfo, nil
}
func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, stepRunId string, opts *repository.UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, error) {
ctx, span := telemetry.NewSpan(ctx, "replay-step-run")
defer span.End()
if err := s.v.Validate(opts); err != nil {
return nil, err
}
updateParams, createEventParams, updateJobRunLookupDataParams, _, _, err := getUpdateParams(tenantId, stepRunId, opts)
if err != nil {
return nil, err
}
var stepRun *dbsqlc.GetStepRunForEngineRow
err = deadlockRetry(s.l, func() error {
tx, err := s.pool.Begin(ctx)
if err != nil {
return err
}
defer deferRollback(ctx, s.l, tx.Rollback)
innerStepRun, err := s.queries.GetStepRun(ctx, tx, dbsqlc.GetStepRunParams{
ID: sqlchelpers.UUIDFromStr(stepRunId),
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
})
if err != nil {
return err
}
stepRun, err = s.updateStepRunCore(ctx, tx, tenantId, updateParams, createEventParams, updateJobRunLookupDataParams, innerStepRun)
if err != nil {
return err
}
// reset the job run, workflow run and all fields as part of the core tx
_, err = s.queries.ReplayStepRunResetWorkflowRun(ctx, tx, stepRun.JobRunId)
if err != nil {
return err
}
_, err = s.queries.ReplayStepRunResetJobRun(ctx, tx, stepRun.JobRunId)
if err != nil {
return err
}
laterStepRuns, err := s.queries.GetLaterStepRunsForReplay(ctx, tx, dbsqlc.GetLaterStepRunsForReplayParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
})
if err != nil {
return err
}
// archive each of the later step run results
for _, laterStepRun := range laterStepRuns {
laterStepRunCp := laterStepRun
laterStepRunId := sqlchelpers.UUIDToStr(laterStepRun.ID)
err = s.archiveStepRunResult(ctx, tx, tenantId, laterStepRunId)
if err != nil {
return err
}
// create a deferred event for each of these step runs
defer s.deferredStepRunEvent(
laterStepRunCp.ID,
dbsqlc.StepRunEventReasonRETRIEDBYUSER,
dbsqlc.StepRunEventSeverityINFO,
fmt.Sprintf("Parent step run %s was replayed, resetting step run result", stepRun.StepReadableId.String),
nil,
)
}
// reset all later step runs to a pending state
_, err = s.queries.ReplayStepRunResetLaterStepRuns(ctx, tx, dbsqlc.ReplayStepRunResetLaterStepRunsParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
})
if err != nil {
return err
}
err = tx.Commit(ctx)
return err
})
if err != nil {
return nil, err
}
return stepRun, nil
}
func (s *stepRunEngineRepository) PreflightCheckReplayStepRun(ctx context.Context, tenantId, stepRunId string) error {
// verify that the step run is in a final state
stepRun, err := s.getStepRunForEngineTx(ctx, s.pool, tenantId, stepRunId)
if err != nil {
return err
}
if !repository.IsFinalStepRunStatus(stepRun.StepRun.Status) {
return repository.ErrPreflightReplayStepRunNotInFinalState
}
// verify that child step runs are in a final state
childStepRuns, err := s.queries.ListNonFinalChildStepRuns(ctx, s.pool, dbsqlc.ListNonFinalChildStepRunsParams{
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
})
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("could not list non-final child step runs: %w", err)
}
if len(childStepRuns) > 0 {
return repository.ErrPreflightReplayChildStepRunNotInFinalState
}
return nil
}
func (s *stepRunEngineRepository) UnlinkStepRunFromWorker(ctx context.Context, tenantId, stepRunId string) error {
_, err := s.queries.UnlinkStepRunFromWorker(ctx, s.pool, dbsqlc.UnlinkStepRunFromWorkerParams{
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
@@ -795,10 +925,6 @@ func (s *stepRunEngineRepository) UpdateStepRunOverridesData(ctx context.Context
defer deferRollback(ctx, s.l, tx.Rollback)
if err != nil {
return nil, err
}
pgTenantId := sqlchelpers.UUIDFromStr(tenantId)
pgStepRunId := sqlchelpers.UUIDFromStr(stepRunId)
@@ -923,16 +1049,13 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st
}
sr, err := s.updateStepRunCore(ctx, tx, tenantId, updateParams, createEventParams, updateJobRunLookupDataParams, innerStepRun)
if err != nil {
return err
}
stepRun = sr
if err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return err
}
@@ -1317,7 +1440,11 @@ func (s *stepRunEngineRepository) ListStartableStepRuns(ctx context.Context, ten
}
func (s *stepRunEngineRepository) ArchiveStepRunResult(ctx context.Context, tenantId, stepRunId string) error {
_, err := s.queries.ArchiveStepRunResultFromStepRun(ctx, s.pool, dbsqlc.ArchiveStepRunResultFromStepRunParams{
return s.archiveStepRunResult(ctx, s.pool, tenantId, stepRunId)
}
func (s *stepRunEngineRepository) archiveStepRunResult(ctx context.Context, db dbsqlc.DBTX, tenantId, stepRunId string) error {
_, err := s.queries.ArchiveStepRunResultFromStepRun(ctx, db, dbsqlc.ArchiveStepRunResultFromStepRunParams{
Tenantid: sqlchelpers.UUIDFromStr(tenantId),
Steprunid: sqlchelpers.UUIDFromStr(stepRunId),
})

View File

@@ -54,7 +54,7 @@ func (w *workflowRunAPIRepository) WorkflowRunMetricsCount(tenantId string, opts
return nil, err
}
return workflowRunMetricsCount(context.Background(), w.pool, w.queries, w.l, tenantId, opts)
return workflowRunMetricsCount(context.Background(), w.pool, w.queries, tenantId, opts)
}
func (w *workflowRunAPIRepository) CreateNewWorkflowRun(ctx context.Context, tenantId string, opts *repository.CreateWorkflowRunOpts) (*db.WorkflowRunModel, error) {
@@ -377,7 +377,7 @@ func listWorkflowRuns(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Q
return res, nil
}
func workflowRunMetricsCount(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Queries, l *zerolog.Logger, tenantId string, opts *repository.WorkflowRunsMetricsOpts) (*dbsqlc.WorkflowRunsMetricsCountRow, error) {
func workflowRunMetricsCount(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Queries, tenantId string, opts *repository.WorkflowRunsMetricsOpts) (*dbsqlc.WorkflowRunsMetricsCountRow, error) {
pgTenantId := &pgtype.UUID{}

View File

@@ -118,6 +118,9 @@ type RefreshTimeoutBy struct {
IncrementTimeoutBy string `validate:"required,duration"`
}
var ErrPreflightReplayStepRunNotInFinalState = fmt.Errorf("step run is not in a final state")
var ErrPreflightReplayChildStepRunNotInFinalState = fmt.Errorf("child step run is not in a final state")
type StepRunAPIRepository interface {
GetStepRunById(tenantId, stepRunId string) (*db.StepRunModel, error)
@@ -138,6 +141,11 @@ type StepRunEngineRepository interface {
UpdateStepRun(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, *StepRunUpdateInfo, error)
ReplayStepRun(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, error)
// PreflightCheckReplayStepRun checks if a step run can be replayed. If it can, it will return nil.
PreflightCheckReplayStepRun(ctx context.Context, tenantId, stepRunId string) error
CreateStepRunEvent(ctx context.Context, tenantId, stepRunId string, opts CreateStepRunEventOpts) error
UnlinkStepRunFromWorker(ctx context.Context, tenantId, stepRunId string) error

View File

@@ -215,6 +215,8 @@ func (ec *JobsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Mes
return ec.handleJobRunQueued(ctx, task)
case "job-run-cancelled":
return ec.handleJobRunCancelled(ctx, task)
case "step-run-replay":
return ec.handleStepRunReplay(ctx, task)
case "step-run-retry":
return ec.handleStepRunRetry(ctx, task)
case "step-run-queued":
@@ -370,7 +372,80 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq
return fmt.Errorf("could not archive step run result: %w", err)
}
ec.l.Error().Err(fmt.Errorf("starting step run retry"))
stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId)
if err != nil {
return fmt.Errorf("could not get step run: %w", err)
}
inputBytes := stepRun.StepRun.Input
retryCount := int(stepRun.StepRun.RetryCount) + 1
// update step run
_, _, err = ec.repo.StepRun().UpdateStepRun(
ctx,
metadata.TenantId,
sqlchelpers.UUIDToStr(stepRun.StepRun.ID),
&repository.UpdateStepRunOpts{
Input: inputBytes,
Status: repository.StepRunStatusPtr(db.StepRunStatusPending),
IsRerun: true,
RetryCount: &retryCount,
Event: &repository.CreateStepRunEventOpts{
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonRETRYING),
EventMessage: repository.StringPtr(
fmt.Sprintf("Retrying step run. This is retry %d / %d", retryCount, stepRun.StepRetries),
)},
},
)
if err != nil {
return fmt.Errorf("could not update step run: %w", err)
}
// send a task to the taskqueue
return ec.mq.AddMessage(
ctx,
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.StepRunQueuedToTask(stepRun),
)
}
// handleStepRunReplay replays a step run from scratch - it resets the workflow run state, job run state, and
// all cancelled step runs which are children of the step run being replayed.
func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-replay")
defer span.End()
payload := tasktypes.StepRunReplayTaskPayload{}
metadata := tasktypes.StepRunReplayTaskMetadata{}
err := ec.dv.DecodeAndValidate(task.Payload, &payload)
if err != nil {
return fmt.Errorf("could not decode job task payload: %w", err)
}
err = ec.dv.DecodeAndValidate(task.Metadata, &metadata)
if err != nil {
return fmt.Errorf("could not decode job task metadata: %w", err)
}
err = ec.repo.StepRun().ArchiveStepRunResult(ctx, metadata.TenantId, payload.StepRunId)
if err != nil {
return fmt.Errorf("could not archive step run result: %w", err)
}
// Unlink the step run from its existing worker. This is necessary because automatic retries increment the
// worker semaphore on failure/cancellation, but in this case we don't want to increment the semaphore.
// FIXME: this is very far decoupled from the actual worker logic, and should be refactored.
err = ec.repo.StepRun().UnlinkStepRunFromWorker(ctx, metadata.TenantId, payload.StepRunId)
if err != nil {
return fmt.Errorf("could not unlink step run from worker: %w", err)
}
stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId)
@@ -427,7 +502,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq
}
// update step run
_, _, err = ec.repo.StepRun().UpdateStepRun(
_, err = ec.repo.StepRun().ReplayStepRun(
ctx,
metadata.TenantId,
sqlchelpers.UUIDToStr(stepRun.StepRun.ID),
@@ -437,15 +512,15 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq
IsRerun: true,
RetryCount: &retryCount,
Event: &repository.CreateStepRunEventOpts{
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonRETRYING),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonRETRIEDBYUSER),
EventMessage: repository.StringPtr(
fmt.Sprintf("Retrying step run. This is retry %d / %d", retryCount, stepRun.StepRetries),
"This step was manually replayed by a user",
)},
},
)
if err != nil {
return fmt.Errorf("could not update step run: %w", err)
return fmt.Errorf("could not update step run for replay: %w", err)
}
// send a task to the taskqueue

View File

@@ -121,6 +121,18 @@ type StepRunRetryTaskMetadata struct {
TenantId string `json:"tenant_id" validate:"required,uuid"`
}
type StepRunReplayTaskPayload struct {
StepRunId string `json:"step_run_id" validate:"required,uuid"`
JobRunId string `json:"job_run_id" validate:"required,uuid"`
// optional - if not provided, the step run will be retried with the same input
InputData string `json:"input_data,omitempty"`
}
type StepRunReplayTaskMetadata struct {
TenantId string `json:"tenant_id" validate:"required,uuid"`
}
func TenantToStepRunRequeueTask(tenant db.TenantModel) *msgqueue.Message {
payload, _ := datautils.ToJSONMap(StepRunRequeueTaskPayload{
TenantId: tenant.ID,
@@ -161,6 +173,29 @@ func StepRunRetryToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte
}
}
func StepRunReplayToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte) *msgqueue.Message {
jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId)
stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID)
tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId)
payload, _ := datautils.ToJSONMap(StepRunReplayTaskPayload{
JobRunId: jobRunId,
StepRunId: stepRunId,
InputData: string(inputData),
})
metadata, _ := datautils.ToJSONMap(StepRunReplayTaskMetadata{
TenantId: tenantId,
})
return &msgqueue.Message{
ID: "step-run-replay",
Payload: payload,
Metadata: metadata,
Retries: 3,
}
}
func StepRunCancelToTask(stepRun *dbsqlc.GetStepRunForEngineRow, reason string) *msgqueue.Message {
stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID)
tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId)

View File

@@ -203,7 +203,7 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
transportCreds = credentials.NewTLS(opts.tls)
}
conn, err := grpc.Dial(
conn, err := grpc.NewClient(
opts.hostPort,
grpc.WithTransportCredentials(transportCreds),
)

View File

@@ -77,6 +77,7 @@ const (
StepRunEventReasonREASSIGNED StepRunEventReason = "REASSIGNED"
StepRunEventReasonREQUEUEDNOWORKER StepRunEventReason = "REQUEUED_NO_WORKER"
StepRunEventReasonREQUEUEDRATELIMIT StepRunEventReason = "REQUEUED_RATE_LIMIT"
StepRunEventReasonRETRIEDBYUSER StepRunEventReason = "RETRIED_BY_USER"
StepRunEventReasonRETRYING StepRunEventReason = "RETRYING"
StepRunEventReasonSCHEDULINGTIMEDOUT StepRunEventReason = "SCHEDULING_TIMED_OUT"
StepRunEventReasonSLOTRELEASED StepRunEventReason = "SLOT_RELEASED"

View File

@@ -0,0 +1,2 @@
-- AlterEnum
ALTER TYPE "StepRunEventReason" ADD VALUE 'RETRIED_BY_USER';

View File

@@ -1042,6 +1042,7 @@ enum StepRunEventReason {
FINISHED
FAILED
RETRYING
RETRIED_BY_USER
CANCELLED
TIMEOUT_REFRESHED
SLOT_RELEASED