diff --git a/api-contracts/openapi/components/schemas/workflow_run.yaml b/api-contracts/openapi/components/schemas/workflow_run.yaml index 30fdc2edf..7e0d72940 100644 --- a/api-contracts/openapi/components/schemas/workflow_run.yaml +++ b/api-contracts/openapi/components/schemas/workflow_run.yaml @@ -287,6 +287,7 @@ StepRunEventReason: - REASSIGNED - TIMED_OUT - SLOT_RELEASED + - RETRIED_BY_USER StepRunEventSeverity: type: string diff --git a/api/v1/server/handlers/step-runs/rerun.go b/api/v1/server/handlers/step-runs/rerun.go index 9e3013eba..1d67e3a1a 100644 --- a/api/v1/server/handlers/step-runs/rerun.go +++ b/api/v1/server/handlers/step-runs/rerun.go @@ -2,6 +2,7 @@ package stepruns import ( "encoding/json" + "errors" "fmt" "time" @@ -22,6 +23,7 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu stepRun := ctx.Get("step-run").(*db.StepRunModel) // preflight check to make sure there's at least one worker to serve this request + // FIXME: merge this preflight check with the one below action := stepRun.Step().ActionID sixSecAgo := time.Now().Add(-6 * time.Second) @@ -38,6 +40,25 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu ), nil } + // preflight check to verify step run status + err = t.config.EngineRepository.StepRun().PreflightCheckReplayStepRun(ctx.Request().Context(), tenant.ID, stepRun.ID) + + if err != nil { + if errors.Is(err, repository.ErrPreflightReplayStepRunNotInFinalState) { + return gen.StepRunUpdateRerun400JSONResponse( + apierrors.NewAPIErrors("Step run cannot be replayed because it is not finished running yet."), + ), nil + } + + if errors.Is(err, repository.ErrPreflightReplayChildStepRunNotInFinalState) { + return gen.StepRunUpdateRerun400JSONResponse( + apierrors.NewAPIErrors("Step run cannot be replayed because it has child step runs that are not finished running yet."), + ), nil + } + + return nil, fmt.Errorf("could not preflight check step run: %w", err) + } + // make sure input can be marshalled and unmarshalled to input type inputBytes, err := json.Marshal(request.Body.Input) @@ -49,7 +70,7 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu data := &datautils.StepRunData{} - if err := json.Unmarshal(inputBytes, data); err != nil || data == nil { + if err := json.Unmarshal(inputBytes, data); err != nil { return gen.StepRunUpdateRerun400JSONResponse( apierrors.NewAPIErrors("Invalid input"), ), nil @@ -63,33 +84,17 @@ func (t *StepRunService) StepRunUpdateRerun(ctx echo.Context, request gen.StepRu ), nil } - // set the job run and workflow run to running status - err = t.config.APIRepository.JobRun().SetJobRunStatusRunning(tenant.ID, stepRun.JobRunID) - - if err != nil { - return nil, err - } - engineStepRun, err := t.config.EngineRepository.StepRun().GetStepRunForEngine(ctx.Request().Context(), tenant.ID, stepRun.ID) if err != nil { return nil, fmt.Errorf("could not get step run for engine: %w", err) } - // Unlink the step run from its existing worker. This is necessary because automatic retries increment the - // worker semaphore on failure/cancellation, but in this case we don't want to increment the semaphore. - // FIXME: this is very far decoupled from the actual worker logic, and should be refactored. - err = t.config.EngineRepository.StepRun().UnlinkStepRunFromWorker(ctx.Request().Context(), tenant.ID, stepRun.ID) - - if err != nil { - return nil, fmt.Errorf("could not unlink step run from worker: %w", err) - } - // send a task to the taskqueue err = t.config.MessageQueue.AddMessage( ctx.Request().Context(), msgqueue.JOB_PROCESSING_QUEUE, - tasktypes.StepRunRetryToTask(engineStepRun, inputBytes), + tasktypes.StepRunReplayToTask(engineStepRun, inputBytes), ) if err != nil { diff --git a/api/v1/server/oas/gen/openapi.gen.go b/api/v1/server/oas/gen/openapi.gen.go index a2fdd6767..74dd6ff31 100644 --- a/api/v1/server/oas/gen/openapi.gen.go +++ b/api/v1/server/oas/gen/openapi.gen.go @@ -80,6 +80,7 @@ const ( StepRunEventReasonREASSIGNED StepRunEventReason = "REASSIGNED" StepRunEventReasonREQUEUEDNOWORKER StepRunEventReason = "REQUEUED_NO_WORKER" StepRunEventReasonREQUEUEDRATELIMIT StepRunEventReason = "REQUEUED_RATE_LIMIT" + StepRunEventReasonRETRIEDBYUSER StepRunEventReason = "RETRIED_BY_USER" StepRunEventReasonRETRYING StepRunEventReason = "RETRYING" StepRunEventReasonSCHEDULINGTIMEDOUT StepRunEventReason = "SCHEDULING_TIMED_OUT" StepRunEventReasonSLOTRELEASED StepRunEventReason = "SLOT_RELEASED" @@ -7999,106 +8000,106 @@ var swaggerSpec = []string{ "AwHKqxnvwSgLPkKM6KJN75Hqkzusa+jvM8KEjqAQse40eA7a9TIlKBWnL42rw57hUEOIbq4VO1lDm7sS", "blkgSKON1EAGmkFjePZ/b85uzk7vLi7vvl0Ofz8b+r38x+Hx9dnd+eDr4Nrv+aOT385Ob84HF1/urgdf", "z07vLm/Yz8ej0eDLBXeQj66Ph9fCZz64GIx+K7rPh2fXw38L93ruSe/5bKzLm+u74dnn4ZnsMzzTRtUn", - "G51fspbnZ8cjixveSLfagjXbuoRlOLgenByf141WFzQg/7oTEH89uyghZaWggussjrykpEcgXFAUkMuE", - "XqbmXDqZe5vdTGaAeHHCrkNS+8wG2Tem3W48lc6Wl7ByYkNz4p01R8GY9bPddJ/NBF7WZP0Y17wDss28", - "F6bsqGm8J0jOH7IJuNzTeqNoOpKhptUlhYCIVnYXJc+x5Um4IPIAa8tTb4lMGnB0UIJnBcxnrulGgSUz", - "YQ6evYlq4gHqPc1QMPNoLOZer6nGTjFGgO30M8hs2ptJFHvJ0mXt12otSVoMs9UE4uWy0ZosTlJ62+xl", - "6rMda6JFncWMj1BIn11CshbS6PK90lOCGmhnZ0SOJOV2kkbsaRX+VyMo9+wzxnpNrW8IxKLHVXofoqCO", - "FPh4NQmVOsw7s+ly/5bZ9KHcJ6UcXn674Hr08enXwYXf87+eff10Zg7huMZoOoVYs2dZPU7mPMnqhbKt", - "Y+qGV3L4GdOd9ZU3eGzXkmlsZQgdEPv+b0Pn35yS4r6t8RzRCIW9capi8Fa/UpjiP26ISV436itgPMaQ", - "EF1vKagX6iCsqi/sw2+AzEx8MwNkpg/5f0hpOslJ4ugX9axGojSUdzID1DrhHxBndjE7vXDti1HLo2zO", - "fkW4CIOZZmaAXAFCnmLsOgfwEtnBI5Bu8fY5RiQJwaJAMmr/Wis6RezeWgjsZAaiKVQIspcPgE92JHIq", - "h0851pSaZoZ9CQGqRubrTmoByYCoxd9qMFQCUOWXXgFPNpSfx1MULV+4Yjn+XqmOxc5hXK0xacL1EE4R", - "oez/bwjdbmeJRTDs4G6palKum6YrKmSGEvJWlfDKpWSLp/kmThkxmWnbvnEPnC1aw2ISkR+FoUD48LwA", - "RF4CMVvffiubJHgEKAT3IVQ+94as/+q08BkGKYVeEEcyLjlc7JuL0SKS8EozeNBQx4+H+aBpBMde3mkH", - "KvqFgIegYnoPAT2mtSarHEvc0EhgRD3gzVTv/fVWRpyDZ/sOzsEzmqfz9e3kxp0IAqp9c2RAACNV6oHY", - "Qn5YGx6EI5aa1RbOB141LzePFzC4M/i36mKUKeH45HrwxxnPCZF/3rb1dtjFyQ6IfSnXjJ5LZRQxVS1I", - "wngxh81GLjXGadbjJI4maNpY8tpS+0DlHe9b8tkt28y+mIZwwpHMgS+TFRMW7bOvt8KQVgwp/cYgC8F0", - "eQypNV4D49Eliyu0o0o23h+iYxbUWhdC7sp2bNwTJTcDQ4mhKaTa98wBWTIYRPJ0l3aXKaSE4y7Iu3pT", - "1je7LGqEYNybEM0RHVEMKJwubNJKfPVoLGwRqmKrPisfx+N1akEwg2NdnAlv993g4u5qePlleDYa+T3/", - "dHh5dXdx9u1sdO33fB56kP/zy/Dy5upueHlzcXo3vPw0uDCaUVseavm5ldGasYTQ+0NzKlFh3+XUZQT2", - "jBtZRxUVGfVzlA2Y2kogLZXwbRzN5lvLtUgxnnecJJ5eU8ApUWEDvtcWZQzsS77VaEvkm5WNjhnxD06N", - "W6N6mxWFlcK3t6xjcD3CqeS9av0VUowCQ8AAl6q/w8WJivYzHGal/MWqUH6AC2LWmNXwTKjUTFHS0JmQ", - "Bx5JYIAmKMgn8f6RAELg2HtEwJugkEL8T8f0yG/FCi8uzqj81yutNcUpNIwvr8nWvML1xlxnbrFWMIrA", - "VXdSy4Ou1xbSo4KphQzb9j1WzD3Sgz63DcLGajHp9R6zKO368GrhrYXjT4sWg19rvbT6R1K5bKmLGkZY", - "vYrSH1r1MYm74mJv6+XDjlwktWuOu5xfV00oqbma4zgrkymMtV2bRqglAWMhOEM5jji60iRKNcEGx9GI", - "Ke6pJSccPjpcvLOIZJlau5kEsJakn3VqoGdywhORrE6NQvWxIo1uMku5NG3TIqwaDI9ub0N1aqgT0bGJ", - "n0rNK/NL7jHmPSjOM36UHGb8phjV+DHnXcPnutVcg6kJf2GM12MyWtmmYnYFCQjrCETKiBPMZO7ELCZq", - "UrvukIXzmiaUeQgTS5WAO1t6z4rTEvMK258vJbyZcgUfK6lvLQbO8LNeHVIc+mb05XrAnbSUtUezpsyU", - "eaVg6nLBhG4d06yqq9hKV8lyxONSOqLNMNTjD7mxY/M6z/+16ZBt6YJohkyzwJAfncTOk2Zad706qz4t", - "z1wFs8JkYaDbZpI6hey6Z66ZgcFT8bPBBAWevH8ffz33xlnD9lK1OI8D0OZH0rZEhT8BlfBExiDFiC5G", - "+eOJ9xBgiNUbi+JVRXZN5T/nC5xRytNIgjh+QFA1RwxD4idlwj/yKy+Fyqf3uH8KRZPYjGT1KOvx1YBn", - "IfLqSn7x12yX/IP9d/vv+CYnMAIJ8o/89/sH+++4wkpnfGl9kKB+iB6h9BBU5/2iPACsVQQJ8bKbHKPB", - "zA7qn8vvX/i6sLxw8VkO372rDvwbBCGdcTH60fT9IqbZnIWd8Y/+vO35RD2lwCDMGypf0J9y/GAGgwf/", - "lvXna8UQjBfNi2XNUN1qh6rBOpfLgeOxrvyJNo9iMJnImPi61WfQNi7/8aAPZCDuHo8K2eMmPdL/zn/W", - "f3sRMIaQGjTJU/478UD2dAxPXxKxL7x7BWOlGHAxgrg8gTmk/Oj6syaRpDKDxy9CnL8YPefcVVmKr8sH", - "YZITMmblm9XLbWXvP1SxNUqDABIyScNw4QmUjgvv7lSQ99LzPwgqCeKIynuxfPyXDdr/Sybc5utweZBX", - "xjeV7fVzEDIswLEXY+8ejD2c11398O792sEwQfE5xvdoPIYiLDunb0EndWSmKF4mntz2/Oc99ZYl/yDz", - "VnoGwrjlVwBq8nqJ4PlVSFyM8GOQuChyFwvZuRZicMgPMZBJLbZoLJ8srWDjxSyi17IQSz5pFfaCGJAP", - "nXZiwE0MCGrZnBjQD8gE7fFXMtmpqP7mp2FifIZ5CB/jB57Dmz8wK/xm2YwlMZEgXhRdrEl0d5ES2fAW", - "maBg3anjDvPlSTpXpeB/YKImbahakg7b2Gu5c4qM89/qKDnb8gIFC1NR/zv//0tf3Q1tKi/fm+x9PBDl", - "79YV6TZ7d0/ovI30Kl7Zsx1hwty/TVJdH83lLxA2iXkMKUbwUTKAwAjfj44LClcYDTM5DwhnTw39Cxoq", - "0L6IWtkDSdLXI26IlQHOEaG2OJ3qvS8LEGLdBqWmG6M3h4cy2hFicZG7RIsH2wHjJgIpncUY/QeOxcQf", - "tzPxV0hn8diLYqbFhPGTenI3v95/L1iQ/rx9Kdz3m8hV8Y5o4sYb/e/T2Z7+y0ufh9g580wWkIdgA8vw", - "h0hcDg8dHOsZUgL7jZ4mtmda2rF0YQ86jn67HF1ipjJDV07DMhOsxPL8d/bXHo+sfcn/zVjupX8v3ypy", - "Fg1Zh1qx8Clv9dYkQ88lQtkKZI7qWhDbTqreErXPKVu4T7kdCVh5C6udEMyorROAb1cAaiJjHcKv/6SV", - "0zdacLS5p2F8D0JVyd0itITh5gtv+i1r2ewDKhBugmP2DzjOJutodpdotuhlExQCTBTSrHErCux/l3+8", - "ONGiNHa60KIweua02HiIykGt5+eTRtZb1ag7jvnhOKZCx3UcM4f1xkqSvZmXZSCqAAh+EEQBrHCKyiWx", - "++rXhT6ZgNVGZcmiVXaFmBuCDfSQf7mPWa4OP59V5704EQkx0gVX2uQ+Kj3Oab9OgDD0Cq1tGyyMcoWG", - "G9VZTQ/zttp8VdausLpdIoSiklbahOr+65vM38Tpf+f/cwgm8Ub6GzqVLdYfRnKPHSmMaT3sOIg7GSRS", - "xEl33L3+cVcOTClTreIJ/nud70IQXZFjItL/TiLixC3Fd6Cq/BKRFmxSelTKyihSpO4cm5SQ0THKDjJK", - "hWAzVrkY1TJKRAxsIj6/KCe3/ULF5lW+5gqLtA7RsnFGBu1WzYzCayrellnKxa7BcPjxYwGIg+7K1l3Z", - "nK5shMJkD6f88JJ/vvTFE9d7CbZz5glv4gEvScNQ7YwMncpyBCpMK3LGBeOKEa6wCwOrWlz2w03C/vZC", - "KSUa8geGP+N4nlUOMwdR8sIFHo3FQ/nlXdhqAGVb8AsSRj6lzm9ThRX83AE2bNYP25n1ImZMm0blc1+y", - "d4mslCDJknvqTn7Fkc3iZiwfdauPcUOTiSppoqTBPaRPUJabmseEqtJ97BuIBF1NECb8l32bOPoCKX9W", - "7i3JoQ1x8xdItYf2lvTj8e3sOPiVOZjxzViQ9YbYNk9qtlv/RJsS39oYkfU4E4O+EV7s1ZRhorFHHlCi", - "YPs7hXiRAxdPJoRbtA2goIj+8sFYkal+OlFl735hmZJ/bjnjJoVN5dnEJeJxSSdotiVoCjz3BAi/mpgk", - "T5nxNbbPrAbsJ6YhrkcShfG0Xg4RL4ynXogiWJZFVa3gPJ6eo0i8PNWJod0QQz37o0IhfIQhYfOKUnY1", - "E/OWhZlrfUSSDlivzwiGY9vKCQQ4mHl8Ng2OSYwtgIgObQEZiV4GIL7xF+5ij6fJ29fPP39aiLW0nPxS", - "72vBg5h+jDBUr83XQHGqNVsGkrz/hqPbNGnQdD4xkuwOJ0uiCD8VMimsnQXn8bT9MSA+kyb7FPEAfyXH", - "kswnIm9EU3+T5p3iM171qbGZWecVEmFbWWxo9lhgl+eqG00yYmvKajVRdOYcEdUSa7LbebLfMyIURdN6", - "An87jpItpKu7MWFepOdVE9M7flxb3nmLLPNavjTXYKmPegOZtmrLgSdN9ShcryM7wcHbLNawhOXAvgkd", - "7xTUtTpqdWemXgsVrX2hlkx7+1kPN13DXF8tFmcV9OCVa7FUT8CuFourjrpSLRa3U7JPIGX/J81121QX", - "T3Wpr8SikQuKpiPZx7G8xU9yTGqIWeGM1PekY6WCh82KprXxUVbQqN7RltUXIm71izp9MssC4PhYNmlV", - "xTp1tr6y8pgVQSLtKiM1KYxLFOvqdESOAEXrmlq4+dCwfNKOv9bFX5IRliw9Vn/gOER1EJ7UVQjtEL0t", - "ZcfeylnzM7tRH+DCyYnK2hVmdSrBz8mAF8muPmdih0l7is4JtlxWtAZQexNvORBxGsly09AJVtXW2f1p", - "fi7olVzSfD9fxyHNp94Bd7QOh+6MriGWLPn5AS68RxCm0EsAwhV6yV5T+5Ox28ERb3rg99i/DsW/Dpl4", - "N63H8AifkRmaXhnaeAXGzqO+Fi0bqnhJx7qLLod/nwt5Rw1AnBwOWsDvcNFdOvOjcCn658jueMDEA57U", - "TNbJBxgmIVjUVY9m33X/nuho4QBVM5oP+vNeRgUC5KNWtQ4KVYSYn5BY4W17Tvg2ockMuO6ostbKZuhZ", - "82GFokdEYVvXu+pldicM+NfunFJeBA0fS/kPFLY7r4HJsZ7T4oa86WKCWlrvbKOa/1ygxM1tLnD7qr5y", - "Ae4yLnJJGB1bmv3iGd+sx4kn+Vz9sCf+3e6RLgdWbv0s125ZWot8VQ/bXoaOt362NnKv4c2xHeNeU8WY", - "bH9scf3FfWzzlpcDJ7zx0jA7yAmbDcpe7tx9tbBsR841PBO2y5wrw6Vbc27dyTeH83v5sHGLO5rqZWbx", - "r/xrd0dT1KjhY6k7msJ2pwya7mg5La5HFxSlA2tjuQplCUltNc2OC0QYl46TNqGOJVR3Bc52qPagxguW", - "2rUNdTodGLFPKMDUyo4j9lVEJ18ep3TmGcua3RCIxeHJAbpkCOU93yJnvn932FAXkKNMHiEFrMwgGMvD", - "PowFwRRppTz3S6unJjhKizMqQmA7sDQdNIXWlopfElMtyk4OSzl8MSrU8G4hictY7mTxzsniKiM4lYFt", - "jOh1qIfcmak5Aor8VRvIuz6aLU7qbG7uCjvvMENbOc+Ro2tPVEONptr6jnpJx4XgXFulxjdr0/vRS0e6", - "1nwtmiCy2mBdGbddK+PGGHOtpduc5EQ/AFEAQ3vM1TGlcJ5QHhIk2jpUlpQVp8XQnQR52xJkjAh3FUkR", - "Iogg3D0d45VjrpoYZVsMjSHrWBNDyTo48zBv3rHwLkZ14jSSW9XgyMvK14unr03LfdkJTaWL6ayN6RQ5", - "T1sXKPmaagvGi2aOpae/QDoSw3ai5fW0AzlefP8XDOiSNwm5792FYqcvFGqXNiI1nmL8AHG9xSEMPdGs", - "oV7GN96oM++LBNSlwgy6FH5L8RhJgKUXGpb3ZClEyxNT/2eT/a2Qx9zIEDIP+S2b4woLtj6krWHwDXOt", - "3K4l2bYzz5k5N8NNuwdWCjS1PD/3E+zw8LL+fhMpvc5mVIc1cmGDaO96kY7VNwVg4Qk9QoWf01Y0AjpX", - "KNA2b8Q7bt79r9PLkgV/VChogXQ7+VNyxRexs3EJRJyUad7STXvoFOr8aO5U6vUfzO14wpEJGn1TJ8rM", - "fg9oMCvXByJ1p+3b8U1tyH6s4YIIZLhakaVzo6yzkrUblfNHAdi/nnKAB2NSKK6zEoKr5ada2sWkQ8yg", - "xnfiQo+AEWRTJpmy1Ki3SblKDj646xnKQXG+hXfV9Xa1up5ewoTNOYU029p9y8S8/WDsb+tO5A6Z6rJ5", - "4BKAGdIs97YSWKLxN10Ybwk+g8fFCJt0MW0Wrp0uSdiVwKtB3Qrqf3emN9jmNnMN4Oa4OduRgDTFtQrQ", - "GOkXGVN2b2Nr/ypnfKtnfXcUdkfhdo/C7rSp3q2VFFnhxFHCqzt5NnLypARi0g9SjOVS6p/NkQ091s2Y", - "mvoF0hM52AZpjKdgtiMqDnGXFfT6WUGuGbiMyEvkVszArZLxFNFZer8HkqQfgDC8ryt+cBIzKUxhi4Tr", - "L3z44yThSdcnaobWOc2BnHr7Wc1q0XWJzbVoXW8GexGhKov9R8kQtyFyg7S5w4S5NjrcCA3uJgGuTG9x", - "PA3hZuiND/2D05tA35rpLUfcD0dvTUWS82r+xZq03DDhpF6yEfSyaMTfparEWgX9n6okscvlxfVYdStZ", - "bKW9PggCmNCalE7+vV2FR9HH34wvWgxeKUpo8R/XUJ9YeVd6tz5RkSOpsfSunb4w5H7xmhRD9r0dfYk+", - "/qZy5djga6AvsfKOvhoS1RiSlqCvMJ6imszV83hKPBR5gJ+N+zUKxjkfaENlVNkRzMbf0quOTnaeMJ5O", - "4dhDXdGX3TLvFI91RjWudpwwnsYpbWCGOKVu3MCG2hEaZaB0RPp2bJCCelzJVlZvnaGkxRVI6+R2DdLr", - "8PJuMiZrowRunrT9fUhHUXcnWuZOpGOwmSQTQMhTjMd2WSoLcQtJ6qn2dSL1So25OR3jZAaiaTbRLikb", - "AYdsnCGqE+dvSJwLsipSugMTYThlggzXXfpEC1KrkZzoT/dsgm0UGLvEMAp5nRv2TejpioRcdR5R2HoT", - "Hoa8vPXP4GmV6fki8RZilxR6g24oUvodU+XFGLVJqnyKt1s/Yom4ox2TTjtTOKJF3YieIp0KgYuI1iy3", - "3OFNMj2F3Cl61f1ZMi0Usz5Le6ss8KFBoukPdGUAduWHtlR+6MJSbUgSq0Yxy+RI89rlLnVTnDihxSmw", - "e2yw/nDUJWNQu9PAHHa6PIk3nAn9EEUPeyLUp8YCjKIHD3iimYdhEhNEY7zwaKwxipU3pG0YRQ8i/OdN", - "Mcr67445IoYZJl0TokPLTmy1yqYzkzNoJYdXIe6O0Vc+RjlXmyhpQ6LGJa2KkUoxfyoTBI8QE9MLItoJ", - "3CKNahfkiyG9RSRqVspLuOd4Lp/haQZoiuM04Uk2OQhqg6yg8E6/w0UBmNdQQ1bMiJEk1yXF7IrIKqTi", - "zDNu35DAohhNp3WG6GvRQD7Mv1ShRffHj3ZSYl0b2GXfG0y4BZSkjDrguCeqfwEKCc14ChFvAnmpf1sq", - "Zi7wd1yhk2Sg7WqbKumlon3bV+Nc6keqh5+66pG7JhKVDGqoW9lUf7mFWJR8SVzrziqOdxKJf4jGb8ic", - "8iPIxA1LGLmpK6phnazZKfUrJ8UNqV9KzvTHcIIipHyGbURO3rOt9DnN5+zk0A8mh7S9XfFiqFFmJ5x2", - "UDjpG7S8nCpHItxDgCHOIhF6xtgEiB+VvEhx6B/5/svty/8PAAD//+RAmEcTbgEA", + "G51fspbnZ8ejbMzB2endp3/f3YzOzFZyIyVrKNCs7RK64eB6cHJ8XjdaXRiB/OtOrOHr2UUJTSuFGVxn", + "keUltT0C4YKigFwm9DI1Z9fJbNzsrjIDxIsTdkGS+mg2yL4xEXfjyXW2TIWVUx2aU/GsWQvGPKDtJgBt", + "JhSzJg/IuOYdkHbmvTDlS03jPUFy/pBNwCWh1htF05EMPq0uKQREtLI7LXnWLU/LBZEHWFuejEtkGoGj", + "yxI8K2A+c903Ciy5CnPw7E1UEw9Q72mGgplHYzH3eo03dooxAmynn0Fm5d5M6thLlkBrv2hradNimK2m", + "FC+Xn9Zkg5LS22ZBU5/tWBMt6mxofIRCQu0SkrWQWJfvlZ4k1EA7OyNyJCm3kzRiT6vwvxpBueejMdZr", + "an1DIBY9rtL7EAV1pMDHq0mx1GHemU2X+7fMpg/lPinl8PLbBdesj0+/Di78nv/17Osni7p6jdF0CrFm", + "4bL6oMyZk9UrZltX1Q2v7fAzJkDrK2/w4a4l99jKEDog9v3fhs6/OSXFfVvjOaIRCnvjVEXlrX6lMEWE", + "3BCTvG7UV8B4jCEhut5SUC/UQVhVX9iH3wCZmfhmBshMH/L/kNJ0kpPE0S8qXI1EsSjvZAaodcI/IM4s", + "ZXZ64doXo5ZH2Zz9inARBjPNzAC5AoQ8xdh1DuAlsoNHIN3i7XOMSBKCRYFk1P61VnSK2L21ENjJDERT", + "qBBkLygAn+xI5FQOn3KsKTXNDPsSAlSNzNed1AKSAVGLv9VgqISkyi+9Ap5sKD+PpyhavpTFcvy9UmWL", + "ncO4WmPShOshnCJC2f/fELrdzhKLYNjB3VL1pVw3TVdUyAwl5K0q4ZVLyRZP802cMmIy07Z94z45W/yG", + "xSQiPwpDgfDqeQGIvARitr79VjZJ8AhQCO5DqLzwDXUAqtPCZxikFHpBHMlI5XCxby5Pi0jCa8/gQUNl", + "Px74g6YRHHt5px2o8RcCHpSK6T0E9JjWmqxyLHFDI4ER9YA3U73311srcQ6e7Ts4B89ons7Xt5MbdyII", + "qPbNsQIBjFTxB2ILAmJteFiOWGpWbTgfeNVM3TyCwODO4N+qi1GmhOOT68EfZzxLRP5529bbYRcnOyD2", + "pVwz+jKVUcRUxyAJ48UcNhu51BinWY+TOJqgaWMRbEs1BJWJvG/JcLdsM/tiGsIJRzIrvkxWTFi0z8fe", + "CkNaMaT0G4MsBNPlMaTWeA2MR5cst9COKtl4f4iOWZhrXVC5K9uxcU+U3AwMRYemkGrfMwdkyWAQydNd", + "2l2mkBKOuyDv6k1Z3+yyqBGCcW9CNEd0RDGgcLqwSSvx1aOxsEWoGq76rHwcj1euBcEMjnVxJrzdd4OL", + "u6vh5Zfh2Wjk9/zT4eXV3cXZt7PRtd/zeTBC/s8vw8ubq7vh5c3F6d3w8tPgwmhGbXmo5edWRmvGokLv", + "D83JRYV9l1OXEdgzbmQdVVRk1M9RSGBqK4q0VAq4cTSbby3XIsV43nGSeHqVAafUhQ34XlsUNrAv+Vaj", + "LZGBVjY6ZsQ/ODVujeptVhRWCujeso7B9QinIviq9VdIMQoMAQNcqv4OFycq/s9wmJUyGqtC+QEuiFlj", + "VsMzoVIzRUlDZ0IeeCSBAZqgIJ/E+0cCCIFj7xEBb4JCCvE/HRMmvxVrvrg4o/Jfr7TWFKfQML68Jlsz", + "DdcbhZ25xVrBKEJZ3UktD8NeW0iPCq8WMmzb91gx90gPA902CBurzqRXgMzitusDroW3Fo4/LVoMfq31", + "0ioiSeWypS5qGGH1ukp/aPXIJO6Ki72tlw87cpHUrjnucn5dVaKk5mqO46xMpjDWdm0aoZYEjIXgDAU6", + "4uhKkyjVlBscRyOmuKeWLHH46HDxzmKUZbLtZlLCWpJ+1qmBnskJT02yOjUK9ciKNLrJvOXStE2LsGow", + "PN69DdWpoU5ExyZ+KjWvzC+5x5gJoTjP+FFymPGbYlTjx5x3DZ/rVnMNpib8hTFej8loZZuK2RUkIKwj", + "ECkjTjCTuROzmKhJ9rpDFs5rmlBmJkwsdQPubAk/K05LzCtsf76U8GbKHnysJMO1GDjDz3p1SHHom9GX", + "6wF30lLWHs2aMlPmlYKpywUTunVMs6quYitdJe8Rj0sJijbDUI8/7caOzes8I9imQ7alC6IZMs0CQ350", + "EjtPmmnd9eqs+rQ8cxXMCpOFgW6bSeoUsuueuYoGBk/FzwYTFHjy/n389dwbZw3bS9XiPA5Am59N2xIV", + "/gRUwlMbgxQjuhjlzyneQ4AhVq8uincW2TWV/5wvcEYpTyMJ4vgBQdUcMQyJn5QJ/8ivvB0qH+Pj/ikU", + "TWIzktUzrcdXA56XyOst+cVfs13yD/bf7b/jm5zACCTIP/Lf7x/sv+MKK53xpfVBgvoheoTSQ1Cd94vy", + "ALBWESTEy25yjAYzO6h/Lr9/4evC8sLFZzl896468G8QhHTGxehH0/eLmGZzFnbGP/rztucT9bgCgzBv", + "qHxBf8rxgxkMHvxb1p+vFUMwXjQvljVDdasdqgbrXC4Hjse68kfbPIrBZCJj4utWn0HbuPzHgz6Qgbh7", + "PCpkj5v0SP87/1n/7UXAGEJq0CRP+e/EA9ljMjx9ScS+8O4VjJViwMUI4vIE5pDyo+vPmkSSygwevwhx", + "/mL0nHNXZSm+Lh+ESU7ImJVvVi+3lb3/UMXWKA0CSMgkDcOFJ1A6LrzEU0HeS8//IKgkiCMq78XyOWA2", + "aP8vmYKbr8PliV4Z31S2189ByLAAx16MvXsw9nBeifXDu/drB8MExecY36PxGIqw7Jy+BZ3UkZmieJl4", + "ctvzn/fU65b8g8xb6RkI45ZfAajJ6yWC51chcTHCj0HiouxdLGTnWojBIT/EQCa12KKxfMS0go0Xs4he", + "y0Is+aRV2AtiQD592okBNzEgqGVzYkA/IBO0x9/NZKei+pufhonxYeYhfIwfeA5v/uSs8JtlM5bERIJ4", + "mXSxJtHdRUpkw1tkgoJ1p447zJcn6VwVh/+BiZq0oWpJOmxjr+XOKTLOf6uj5GzLCxQsTEX97/z/L311", + "N7SpvHxvshfzQJS/ZFek2+wlPqHzNtKreHfPdoQJc/82SXV9NJe/Sdgk5jGkGMFHyQACI3w/Oi4oXGE0", + "zOQ8IJw9NfQvaKhA+yJqZQ8kSV+PuCFWBjhHhNridKr3vixAiHUblJpujN4cns5oR4jFRe4SLR5sB4yb", + "CKR0FmP0HzgWE3/czsRfIZ3FYy+KmRYTxk/qEd78ev+9YEH68/alcN9vIlfFO6KJG2/0v09ne/ovL30e", + "YufMM1lAHoINLMOfJnE5PHRwrGdICew3eprYHm5px9KFPeg4+u1ydImZygxdOQ3LTLASy/Pf2V97PLL2", + "Jf83Y7mX/r18vchZNGQdasXCp7zVW5MMPZcIZSuQOaprQWw7qXpd1D6nbOE+5XYkYOV1rHZCMKO2TgC+", + "XQGoiYx1CL/+k1Zg32jB0eaehvE9CFVtd4vQEoabL7zpt6xlsw+oQLgJjtk/4DibrKPZXaLZopdNUAgw", + "UUizxq0osP9d/vHiRIvS2OlCi8LomdNi4yEqB7Wen08aWW9Vo+445ofjmAod13HMHNYbK0n2il6WgagC", + "IPhBEAWwwikql8Tuq18X+mQCVhuVJYtW2RVibgg20EP+5T5muTr8fFad9+JEJMRIF1xpk/uo9Fyn/ToB", + "wtArtLZtsDDKFRpuVGc1PdXbavNVWbvC6naJEIpKWmkTqvuvbzJ/Jaf/nf/PIZjEG+mv6lS2WH8qyT12", + "pDCm9bDjIO5kkEgRJ91x9/rHXTkwpUy1iif473W+C0F0RY6JSP87iYgTtxRfhqryS0RasEnpmSkro0iR", + "unNsUkJGxyg7yCgVgs1Y5WJUyygRMbCJ+PyinNz2CxWbV/maKyzSOkTLxhkZtFs1MwqvqXhtZikXuwbD", + "4cePBSAOuitbd2VzurIRCpM9nPLDS/750hePXu8l2M6ZJ7yJB7wkDUO1MzJ0KssRqDCtyBkXjCtGuMIu", + "DKxqcdkPNwn72wullGjInxz+jON5VjnMHETJCxd4NBZP55d3YasBlG3BL0gY+bg6v00VVvBzB9iwWT9s", + "Z9aLmDFtGpXPfcneJbJSgiRL7qk7+RVHNoubsXzmrT7GDU0mqqSJkgb3kD5BWW5qHhOqSvexbyASdDVB", + "mPBf9m3i6Auk/KG5tySHNsTNXyDVnt5b0o/Ht7Pj4FfmYMY3Y0HWG2LbPKnZbv0TbUp8a2NE1uNMDPpG", + "eLFXU4aJxh55QImC7e8U4kUOXDyZEG7RNoCCIvrLB2NFpvrpRJW9+4VlSv655YybFDaVhxSXiMclnaDZ", + "lqAp8NwTIPxqYpI8ZcbX2D6zGrCfmIa4HkkUxtN6OUS8MJ56IYpgWRZVtYLzeHqOIvHyVCeGdkMM9eyP", + "CoXwEYaEzStK2dVMzFsWZq71EUk6YL0+IxiObSsnEOBg5vHZNDgmMbYAIjq0BWQkehmA+MZfuIs9niZv", + "Xz///Gkh1tJy8ku9rwUPYvoxwlC9P18DxanWbBlI8v4bjm7TpEHT+cRIsjucLIki/FTIpLB2FpzH0/bH", + "gPhMmuxTxAP8lRxLMp+IvBFN/U2ad4rPeNWnxmZmnVdIhG1lsaHZY4FdnqtuNMmIrSmr1UTRmXNEVEus", + "yW7nyX7PiFAUTesJ/O04SraQru7GhHmRnldNTO/4cW155y2yzGv50lyDpT7qDWTaqi0HnjTVo3C9juwE", + "B2+zWMMSlgP7JnS8U1DX6qjVnZl6LVS09oVaMu3tZz3cdA1zfbVYnFXQg1euxVI9AbtaLK466kq1WNxO", + "yT6BlP2fNNdtU1081aW+EotGLiiajmQfx/IWP8kxqSFmhTNS35OOlQoeNiua1sZHWUGjekdbVl+IuNUv", + "6vTJLAuA42PZpFUV69TZ+srKY1YEibSrjNSkMC5RrKvTETkCFK1rauHmQ8PySTv+Whd/SUZYsvRY/YHj", + "ENVBeFJXIbRD9LaUHXsrZ83P7EZ9gAsnJyprV5jVqQQ/JwNeJLv6nIkdJu0pOifYclnRGkDtTbzlQMRp", + "JMtNQydYVVtn96f5uaBXcknz/XwdhzSfegfc0TocujO6hliy5OcHuPAeQZhCLwEIV+gle03tT8ZuB0e8", + "6YHfY/86FP86ZOLdtB7DI3xGZmh6ZWjjFRg7j/patGyo4iUd6y66HP59LuQdNQBxcjhoAb/DRXfpzI/C", + "peifI7vjARMPeFIzWScfYJiEYFFXPZp91/17oqOFA1TNaD7oz3sZFQiQj1rVOihUEWJ+QmKFt+054duE", + "JjPguqPKWiuboWfNhxWKHhGFbV3vqpfZnTDgX7tzSnkRNHws5T9Q2O68BibHek6LG/Kmiwlqab2zjWr+", + "c4ESN7e5wO2r+soFuMu4yCVhdGxp9otnfLMeJ57kc/XDnvh3u0e6HFi59bNcu2VpLfJVPWx7GTre+tna", + "yL2GN8d2jHtNFWOy/bHF9Rf3sc1bXg6c8MZLw+wgJ2w2KHu5c/fVwrIdOdfwTNguc64Ml27NuXUn3xzO", + "7+XDxi3uaKqXmcW/8q/dHU1Ro4aPpe5oCtudMmi6o+W0uB5dUJQOrI3lKpQlJLXVNDsuEGFcOk7ahDqW", + "UN0VONuh2oMaL1hq1zbU6XRgxD6hAFMrO47YVxGdfHmc0plnLGt2QyAWhycH6JIhlPd8i5z5/t1hQ11A", + "jjJ5hBSwMoNgLA/7MBYEU6SV8twvrZ6a4CgtzqgIge3A0nTQFFpbKn5JTLUoOzks5fDFqFDDu4UkLmO5", + "k8U7J4urjOBUBrYxotehHnJnpuYIKPJXbSDv+mi2OKmzubkr7LzDDG3lPEeOrj1RDTWaaus76iUdF4Jz", + "bZUa36xN70cvHela87Vogshqg3Vl3HatjBtjzLWWbnOSE/0ARAEM7TFXx5TCeUJ5SJBo61BZUlacFkN3", + "EuRtS5AxItxVJEWIIIJw93SMV465amKUbTE0hqxjTQwl6+DMw7x5x8K7GNWJ00huVYMjLytfL56+Ni33", + "ZSc0lS6mszamU+Q8bV2g5GuqLRgvmjmWnv4C6UgM24mW19MO5Hjx/V8woEveJOS+dxeKnb5QqF3aiNR4", + "ivEDxPUWhzD0RLOGehnfeKPOvC8SUJcKM+hS+C3FYyQBll5oWN6TpRAtT0z9n032t0IecyNDyDzkt2yO", + "KyzY+pC2hsE3zLVyu5Zk2848Z+bcDDftHlgp0NTy/NxPsMPDy/r7TaT0OptRHdbIhQ2ivetFOlbfFICF", + "J/QIFX5OW9EI6FyhQNu8Ee+4efe/Ti9LFvxRoaAF0u3kT8kVX8TOxiUQcVKmeUs37aFTqPOjuVOp138w", + "t+MJRyZo9E2dKDP7PaDBrFwfiNSdtm/HN7Uh+7GGCyKQ4WpFls6Nss5K1m5Uzh8FYP96ygEejEmhuM5K", + "CK6Wn2ppF5MOMYMa34kLPQJGkE2ZZMpSo94m5So5+OCuZygHxfkW3lXX29XqenoJEzbnFNJsa/ctE/P2", + "g7G/rTuRO2Sqy+aBSwBmSLPc20pgicbfdGG8JfgMHhcjbNLFtFm4drokYVcCrwZ1K6j/3ZneYJvbzDWA", + "m+PmbEcC0hTXKkBjpF9kTNm9ja39q5zxrZ713VHYHYXbPQq706Z6t1ZSZIUTRwmv7uTZyMmTEohJP0gx", + "lkupfzZHNvRYN2Nq6hdIT+RgG6QxnoLZjqg4xF1W0OtnBblm4DIiL5FbMQO3SsZTRGfp/R5Ikn4AwvC+", + "rvjBScykMIUtEq6/8OGPk4QnXZ+oGVrnNAdy6u1nNatF1yU216J1vRnsRYSqLPYfJUPchsgN0uYOE+ba", + "6HAjNLibBLgyvcXxNISboTc+9A9ObwJ9a6a3HHE/HL01FUnOq/kXa9Jyw4STeslG0MuiEX+XqhJrFfR/", + "qpLELpcX12PVrWSxlfb6IAhgQmtSOvn3dhUeRR9/M75oMXilKKHFf1xDfWLlXend+kRFjqTG0rt2+sKQ", + "+8VrUgzZ93b0Jfr4m8qVY4Ovgb7Eyjv6akhUY0hagr7CeIpqMlfP4ynxUOQBfjbu1ygY53ygDZVRZUcw", + "G39Lrzo62XnCeDqFYw91RV92y7xTPNYZ1bjaccJ4Gqe0gRnilLpxAxtqR2iUgdIR6duxQQrqcSVbWb11", + "hpIWVyCtk9s1SK/Dy7vJmKyNErh50vb3IR1F3Z1omTuRjsFmkkwAIU8xHttlqSzELSSpp9rXidQrNebm", + "dIyTGYim2US7pGwEHLJxhqhOnL8hcS7IqkjpDkyE4ZQJMlx36RMtSK1GcqI/3bMJtlFg7BLDKOR1btg3", + "oacrEnLVeURh6014GPLy1j+Dp1Wm54vEW4hdUugNuqFI6XdMlRdj1Cap8inebv2IJeKOdkw67UzhiBZ1", + "I3qKdCoELiJas9xyhzfJ9BRyp+hV92fJtFDM+iztrbLAhwaJpj/QlQHYlR/aUvmhC0u1IUmsGsUskyPN", + "a5e71E1x4oQWp8DuscH6w1GXjEHtTgNz2OnyJN5wJvRDFD3siVCfGgswih484IlmHoZJTBCN8cKjscYo", + "Vt6QtmEUPYjwnzfFKOu/O+aIGGaYdE2IDi07sdUqm85MzqCVHF6FuDtGX/kY5VxtoqQNiRqXtCpGKsX8", + "qUwQPEJMTC+IaCdwizSqXZAvhvQWkahZKS/hnuO5fIanGaApjtOEJ9nkIKgNsoLCO/0OFwVgXkMNWTEj", + "RpJclxSzKyKrkIozz7h9QwKLYjSd1hmir0UD+TD/UoUW3R8/2kmJdW1gl31vMOEWUJIy6oDjnqj+BSgk", + "NOMpRLwJ5KX+bamYucDfcYVOkoG2q22qpJeK9m1fjXOpH6kefuqqR+6aSFQyqKFuZVP95RZiUfIlca07", + "qzjeSST+IRq/IXPKjyATNyxh5KauqIZ1sman1K+cFDekfik50x/DCYqQ8hm2ETl5z7bS5zSfs5NDP5gc", + "0vZ2xYuhRpmdcNpB4aRv0PJyqhyJcA8BhjiLROgZYxMgflTyIsWhf+T7L7cv/z8AAP//wIITRCVuAQA=", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/frontend/app/src/lib/api/generated/data-contracts.ts b/frontend/app/src/lib/api/generated/data-contracts.ts index f51291a3a..22c1c7545 100644 --- a/frontend/app/src/lib/api/generated/data-contracts.ts +++ b/frontend/app/src/lib/api/generated/data-contracts.ts @@ -691,6 +691,7 @@ export enum StepRunEventReason { REASSIGNED = "REASSIGNED", TIMED_OUT = "TIMED_OUT", SLOT_RELEASED = "SLOT_RELEASED", + RETRIED_BY_USER = "RETRIED_BY_USER", } export enum StepRunEventSeverity { diff --git a/frontend/app/src/pages/main/workflow-runs/$run/components/step-run-events.tsx b/frontend/app/src/pages/main/workflow-runs/$run/components/step-run-events.tsx index f5cf1900d..da480c0ac 100644 --- a/frontend/app/src/pages/main/workflow-runs/$run/components/step-run-events.tsx +++ b/frontend/app/src/pages/main/workflow-runs/$run/components/step-run-events.tsx @@ -93,6 +93,7 @@ const REASON_TO_TITLE: Record = { [StepRunEventReason.REASSIGNED]: 'Reassigned', [StepRunEventReason.TIMED_OUT]: 'Execution timed out', [StepRunEventReason.SLOT_RELEASED]: 'Slot released', + [StepRunEventReason.RETRIED_BY_USER]: 'Replayed by user', }; function getTitleFromReason(reason: StepRunEventReason, message: string) { diff --git a/frontend/docs/.npmrc b/frontend/docs/.npmrc new file mode 100644 index 000000000..e941d13c2 --- /dev/null +++ b/frontend/docs/.npmrc @@ -0,0 +1 @@ +package-manager-strict=false diff --git a/internal/repository/prisma/dbsqlc/models.go b/internal/repository/prisma/dbsqlc/models.go index 527437ea0..baa1084c2 100644 --- a/internal/repository/prisma/dbsqlc/models.go +++ b/internal/repository/prisma/dbsqlc/models.go @@ -245,6 +245,7 @@ const ( StepRunEventReasonREASSIGNED StepRunEventReason = "REASSIGNED" StepRunEventReasonSLOTRELEASED StepRunEventReason = "SLOT_RELEASED" StepRunEventReasonTIMEOUTREFRESHED StepRunEventReason = "TIMEOUT_REFRESHED" + StepRunEventReasonRETRIEDBYUSER StepRunEventReason = "RETRIED_BY_USER" ) func (e *StepRunEventReason) Scan(src interface{}) error { diff --git a/internal/repository/prisma/dbsqlc/schema.sql b/internal/repository/prisma/dbsqlc/schema.sql index a67dac04d..7f9da8092 100644 --- a/internal/repository/prisma/dbsqlc/schema.sql +++ b/internal/repository/prisma/dbsqlc/schema.sql @@ -14,7 +14,7 @@ CREATE TYPE "JobRunStatus" AS ENUM ('PENDING', 'RUNNING', 'SUCCEEDED', 'FAILED', CREATE TYPE "LogLineLevel" AS ENUM ('DEBUG', 'INFO', 'WARN', 'ERROR'); -- CreateEnum -CREATE TYPE "StepRunEventReason" AS ENUM ('REQUEUED_NO_WORKER', 'REQUEUED_RATE_LIMIT', 'SCHEDULING_TIMED_OUT', 'ASSIGNED', 'STARTED', 'FINISHED', 'FAILED', 'RETRYING', 'CANCELLED', 'TIMED_OUT', 'REASSIGNED', 'SLOT_RELEASED', 'TIMEOUT_REFRESHED'); +CREATE TYPE "StepRunEventReason" AS ENUM ('REQUEUED_NO_WORKER', 'REQUEUED_RATE_LIMIT', 'SCHEDULING_TIMED_OUT', 'ASSIGNED', 'STARTED', 'FINISHED', 'FAILED', 'RETRYING', 'CANCELLED', 'TIMED_OUT', 'REASSIGNED', 'SLOT_RELEASED', 'TIMEOUT_REFRESHED', 'RETRIED_BY_USER'); -- CreateEnum CREATE TYPE "StepRunEventSeverity" AS ENUM ('INFO', 'WARNING', 'CRITICAL'); diff --git a/internal/repository/prisma/dbsqlc/step_runs.sql b/internal/repository/prisma/dbsqlc/step_runs.sql index 49e1c9426..d648a1ee1 100644 --- a/internal/repository/prisma/dbsqlc/step_runs.sql +++ b/internal/repository/prisma/dbsqlc/step_runs.sql @@ -730,3 +730,137 @@ WHERE srl."tenantId" = lrl."tenantId" AND srl."key" = lrl."key" RETURNING srl.*; + +-- name: ReplayStepRunResetWorkflowRun :one +WITH workflow_run_id AS ( + SELECT + "workflowRunId" + FROM + "JobRun" + WHERE + "id" = @jobRunId::uuid +) +UPDATE + "WorkflowRun" +SET + "status" = 'RUNNING', + "updatedAt" = CURRENT_TIMESTAMP, + "startedAt" = NULL, + "finishedAt" = NULL +WHERE + "id" = (SELECT "workflowRunId" FROM workflow_run_id) +RETURNING *; + +-- name: ReplayStepRunResetJobRun :one +UPDATE + "JobRun" +SET + "status" = 'RUNNING', + "updatedAt" = CURRENT_TIMESTAMP, + "startedAt" = NULL, + "finishedAt" = NULL, + "timeoutAt" = NULL, + "cancelledAt" = NULL, + "cancelledReason" = NULL, + "cancelledError" = NULL +WHERE + "id" = @jobRunId::uuid +RETURNING *; + +-- name: GetLaterStepRunsForReplay :many +WITH RECURSIVE currStepRun AS ( + SELECT * + FROM "StepRun" + WHERE + "id" = @stepRunId::uuid AND + "tenantId" = @tenantId::uuid +), childStepRuns AS ( + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + WHERE sro."A" = (SELECT "id" FROM currStepRun) + + UNION ALL + + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + JOIN childStepRuns csr ON sro."A" = csr."id" +) +SELECT + sr.* +FROM + "StepRun" sr +JOIN + childStepRuns csr ON sr."id" = csr."id" +WHERE + sr."tenantId" = @tenantId::uuid; + +-- name: ReplayStepRunResetLaterStepRuns :many +WITH RECURSIVE currStepRun AS ( + SELECT * + FROM "StepRun" + WHERE + "id" = @stepRunId::uuid AND + "tenantId" = @tenantId::uuid +), childStepRuns AS ( + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + WHERE sro."A" = (SELECT "id" FROM currStepRun) + + UNION ALL + + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + JOIN childStepRuns csr ON sro."A" = csr."id" +) +UPDATE + "StepRun" as sr +SET + "status" = 'PENDING', + "scheduleTimeoutAt" = NULL, + "finishedAt" = NULL, + "startedAt" = NULL, + "output" = NULL, + "error" = NULL, + "cancelledAt" = NULL, + "cancelledReason" = NULL +FROM + childStepRuns csr +WHERE + sr."id" = csr."id" AND + sr."tenantId" = @tenantId::uuid +RETURNING sr.*; + +-- name: ListNonFinalChildStepRuns :many +WITH RECURSIVE currStepRun AS ( + SELECT * + FROM "StepRun" + WHERE + "id" = @stepRunId::uuid AND + "tenantId" = @tenantId::uuid +), childStepRuns AS ( + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + WHERE sro."A" = (SELECT "id" FROM currStepRun) + + UNION ALL + + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + JOIN childStepRuns csr ON sro."A" = csr."id" +) +-- Select all child step runs that are not in a final state +SELECT + sr.* +FROM + "StepRun" sr +JOIN + childStepRuns csr ON sr."id" = csr."id" +WHERE + sr."tenantId" = @tenantId::uuid AND + sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED'); diff --git a/internal/repository/prisma/dbsqlc/step_runs.sql.go b/internal/repository/prisma/dbsqlc/step_runs.sql.go index c1c9d30be..4e3838033 100644 --- a/internal/repository/prisma/dbsqlc/step_runs.sql.go +++ b/internal/repository/prisma/dbsqlc/step_runs.sql.go @@ -297,6 +297,89 @@ func (q *Queries) CreateStepRunEvent(ctx context.Context, db DBTX, arg CreateSte return err } +const getLaterStepRunsForReplay = `-- name: GetLaterStepRunsForReplay :many +WITH RECURSIVE currStepRun AS ( + SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased" + FROM "StepRun" + WHERE + "id" = $2::uuid AND + "tenantId" = $1::uuid +), childStepRuns AS ( + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + WHERE sro."A" = (SELECT "id" FROM currStepRun) + + UNION ALL + + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + JOIN childStepRuns csr ON sro."A" = csr."id" +) +SELECT + sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased" +FROM + "StepRun" sr +JOIN + childStepRuns csr ON sr."id" = csr."id" +WHERE + sr."tenantId" = $1::uuid +` + +type GetLaterStepRunsForReplayParams struct { + Tenantid pgtype.UUID `json:"tenantid"` + Steprunid pgtype.UUID `json:"steprunid"` +} + +func (q *Queries) GetLaterStepRunsForReplay(ctx context.Context, db DBTX, arg GetLaterStepRunsForReplayParams) ([]*StepRun, error) { + rows, err := db.Query(ctx, getLaterStepRunsForReplay, arg.Tenantid, arg.Steprunid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*StepRun + for rows.Next() { + var i StepRun + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.DeletedAt, + &i.TenantId, + &i.JobRunId, + &i.StepId, + &i.Order, + &i.WorkerId, + &i.TickerId, + &i.Status, + &i.Input, + &i.Output, + &i.RequeueAfter, + &i.ScheduleTimeoutAt, + &i.Error, + &i.StartedAt, + &i.FinishedAt, + &i.TimeoutAt, + &i.CancelledAt, + &i.CancelledReason, + &i.CancelledError, + &i.InputSchema, + &i.CallerFiles, + &i.GitRepoBranch, + &i.RetryCount, + &i.SemaphoreReleased, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getStepRun = `-- name: GetStepRun :one SELECT "StepRun".id, "StepRun"."createdAt", "StepRun"."updatedAt", "StepRun"."deletedAt", "StepRun"."tenantId", "StepRun"."jobRunId", "StepRun"."stepId", "StepRun"."order", "StepRun"."workerId", "StepRun"."tickerId", "StepRun".status, "StepRun".input, "StepRun".output, "StepRun"."requeueAfter", "StepRun"."scheduleTimeoutAt", "StepRun".error, "StepRun"."startedAt", "StepRun"."finishedAt", "StepRun"."timeoutAt", "StepRun"."cancelledAt", "StepRun"."cancelledReason", "StepRun"."cancelledError", "StepRun"."inputSchema", "StepRun"."callerFiles", "StepRun"."gitRepoBranch", "StepRun"."retryCount", "StepRun"."semaphoreReleased" @@ -549,6 +632,91 @@ func (q *Queries) GetTotalSlots(ctx context.Context, db DBTX, arg GetTotalSlotsP return items, nil } +const listNonFinalChildStepRuns = `-- name: ListNonFinalChildStepRuns :many +WITH RECURSIVE currStepRun AS ( + SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased" + FROM "StepRun" + WHERE + "id" = $2::uuid AND + "tenantId" = $1::uuid +), childStepRuns AS ( + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + WHERE sro."A" = (SELECT "id" FROM currStepRun) + + UNION ALL + + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + JOIN childStepRuns csr ON sro."A" = csr."id" +) +SELECT + sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased" +FROM + "StepRun" sr +JOIN + childStepRuns csr ON sr."id" = csr."id" +WHERE + sr."tenantId" = $1::uuid AND + sr."status" NOT IN ('SUCCEEDED', 'FAILED', 'CANCELLED') +` + +type ListNonFinalChildStepRunsParams struct { + Tenantid pgtype.UUID `json:"tenantid"` + Steprunid pgtype.UUID `json:"steprunid"` +} + +// Select all child step runs that are not in a final state +func (q *Queries) ListNonFinalChildStepRuns(ctx context.Context, db DBTX, arg ListNonFinalChildStepRunsParams) ([]*StepRun, error) { + rows, err := db.Query(ctx, listNonFinalChildStepRuns, arg.Tenantid, arg.Steprunid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*StepRun + for rows.Next() { + var i StepRun + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.DeletedAt, + &i.TenantId, + &i.JobRunId, + &i.StepId, + &i.Order, + &i.WorkerId, + &i.TickerId, + &i.Status, + &i.Input, + &i.Output, + &i.RequeueAfter, + &i.ScheduleTimeoutAt, + &i.Error, + &i.StartedAt, + &i.FinishedAt, + &i.TimeoutAt, + &i.CancelledAt, + &i.CancelledReason, + &i.CancelledError, + &i.InputSchema, + &i.CallerFiles, + &i.GitRepoBranch, + &i.RetryCount, + &i.SemaphoreReleased, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listStartableStepRuns = `-- name: ListStartableStepRuns :many WITH job_run AS ( SELECT "status" @@ -983,6 +1151,186 @@ func (q *Queries) RefreshTimeoutBy(ctx context.Context, db DBTX, arg RefreshTime return &i, err } +const replayStepRunResetJobRun = `-- name: ReplayStepRunResetJobRun :one +UPDATE + "JobRun" +SET + "status" = 'RUNNING', + "updatedAt" = CURRENT_TIMESTAMP, + "startedAt" = NULL, + "finishedAt" = NULL, + "timeoutAt" = NULL, + "cancelledAt" = NULL, + "cancelledReason" = NULL, + "cancelledError" = NULL +WHERE + "id" = $1::uuid +RETURNING id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobId", "tickerId", status, result, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "workflowRunId" +` + +func (q *Queries) ReplayStepRunResetJobRun(ctx context.Context, db DBTX, jobrunid pgtype.UUID) (*JobRun, error) { + row := db.QueryRow(ctx, replayStepRunResetJobRun, jobrunid) + var i JobRun + err := row.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.DeletedAt, + &i.TenantId, + &i.JobId, + &i.TickerId, + &i.Status, + &i.Result, + &i.StartedAt, + &i.FinishedAt, + &i.TimeoutAt, + &i.CancelledAt, + &i.CancelledReason, + &i.CancelledError, + &i.WorkflowRunId, + ) + return &i, err +} + +const replayStepRunResetLaterStepRuns = `-- name: ReplayStepRunResetLaterStepRuns :many +WITH RECURSIVE currStepRun AS ( + SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased" + FROM "StepRun" + WHERE + "id" = $2::uuid AND + "tenantId" = $1::uuid +), childStepRuns AS ( + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + WHERE sro."A" = (SELECT "id" FROM currStepRun) + + UNION ALL + + SELECT sr."id", sr."status" + FROM "StepRun" sr + JOIN "_StepRunOrder" sro ON sr."id" = sro."B" + JOIN childStepRuns csr ON sro."A" = csr."id" +) +UPDATE + "StepRun" as sr +SET + "status" = 'PENDING', + "scheduleTimeoutAt" = NULL, + "finishedAt" = NULL, + "startedAt" = NULL, + "output" = NULL, + "error" = NULL, + "cancelledAt" = NULL, + "cancelledReason" = NULL +FROM + childStepRuns csr +WHERE + sr."id" = csr."id" AND + sr."tenantId" = $1::uuid +RETURNING sr.id, sr."createdAt", sr."updatedAt", sr."deletedAt", sr."tenantId", sr."jobRunId", sr."stepId", sr."order", sr."workerId", sr."tickerId", sr.status, sr.input, sr.output, sr."requeueAfter", sr."scheduleTimeoutAt", sr.error, sr."startedAt", sr."finishedAt", sr."timeoutAt", sr."cancelledAt", sr."cancelledReason", sr."cancelledError", sr."inputSchema", sr."callerFiles", sr."gitRepoBranch", sr."retryCount", sr."semaphoreReleased" +` + +type ReplayStepRunResetLaterStepRunsParams struct { + Tenantid pgtype.UUID `json:"tenantid"` + Steprunid pgtype.UUID `json:"steprunid"` +} + +func (q *Queries) ReplayStepRunResetLaterStepRuns(ctx context.Context, db DBTX, arg ReplayStepRunResetLaterStepRunsParams) ([]*StepRun, error) { + rows, err := db.Query(ctx, replayStepRunResetLaterStepRuns, arg.Tenantid, arg.Steprunid) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*StepRun + for rows.Next() { + var i StepRun + if err := rows.Scan( + &i.ID, + &i.CreatedAt, + &i.UpdatedAt, + &i.DeletedAt, + &i.TenantId, + &i.JobRunId, + &i.StepId, + &i.Order, + &i.WorkerId, + &i.TickerId, + &i.Status, + &i.Input, + &i.Output, + &i.RequeueAfter, + &i.ScheduleTimeoutAt, + &i.Error, + &i.StartedAt, + &i.FinishedAt, + &i.TimeoutAt, + &i.CancelledAt, + &i.CancelledReason, + &i.CancelledError, + &i.InputSchema, + &i.CallerFiles, + &i.GitRepoBranch, + &i.RetryCount, + &i.SemaphoreReleased, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const replayStepRunResetWorkflowRun = `-- name: ReplayStepRunResetWorkflowRun :one +WITH workflow_run_id AS ( + SELECT + "workflowRunId" + FROM + "JobRun" + WHERE + "id" = $1::uuid +) +UPDATE + "WorkflowRun" +SET + "status" = 'RUNNING', + "updatedAt" = CURRENT_TIMESTAMP, + "startedAt" = NULL, + "finishedAt" = NULL +WHERE + "id" = (SELECT "workflowRunId" FROM workflow_run_id) +RETURNING "createdAt", "updatedAt", "deletedAt", "tenantId", "workflowVersionId", status, error, "startedAt", "finishedAt", "concurrencyGroupId", "displayName", id, "gitRepoBranch", "childIndex", "childKey", "parentId", "parentStepRunId", "additionalMetadata" +` + +func (q *Queries) ReplayStepRunResetWorkflowRun(ctx context.Context, db DBTX, jobrunid pgtype.UUID) (*WorkflowRun, error) { + row := db.QueryRow(ctx, replayStepRunResetWorkflowRun, jobrunid) + var i WorkflowRun + err := row.Scan( + &i.CreatedAt, + &i.UpdatedAt, + &i.DeletedAt, + &i.TenantId, + &i.WorkflowVersionId, + &i.Status, + &i.Error, + &i.StartedAt, + &i.FinishedAt, + &i.ConcurrencyGroupId, + &i.DisplayName, + &i.ID, + &i.GitRepoBranch, + &i.ChildIndex, + &i.ChildKey, + &i.ParentId, + &i.ParentStepRunId, + &i.AdditionalMetadata, + ) + return &i, err +} + const resolveLaterStepRuns = `-- name: ResolveLaterStepRuns :many WITH RECURSIVE currStepRun AS ( SELECT id, "createdAt", "updatedAt", "deletedAt", "tenantId", "jobRunId", "stepId", "order", "workerId", "tickerId", status, input, output, "requeueAfter", "scheduleTimeoutAt", error, "startedAt", "finishedAt", "timeoutAt", "cancelledAt", "cancelledReason", "cancelledError", "inputSchema", "callerFiles", "gitRepoBranch", "retryCount", "semaphoreReleased" diff --git a/internal/repository/prisma/step_run.go b/internal/repository/prisma/step_run.go index 8de1d010e..d41339914 100644 --- a/internal/repository/prisma/step_run.go +++ b/internal/repository/prisma/step_run.go @@ -514,10 +514,6 @@ func (s *stepRunEngineRepository) assignStepRunToWorkerAttempt(ctx context.Conte defer deferRollback(ctx, s.l, tx.Rollback) - if err != nil { - return nil, err - } - assigned, err := s.queries.AssignStepRunToWorker(ctx, tx, dbsqlc.AssignStepRunToWorkerParams{ Steprunid: stepRun.StepRun.ID, Tenantid: stepRun.StepRun.TenantId, @@ -769,6 +765,140 @@ func (s *stepRunEngineRepository) UpdateStepRun(ctx context.Context, tenantId, s return stepRun, updateInfo, nil } +func (s *stepRunEngineRepository) ReplayStepRun(ctx context.Context, tenantId, stepRunId string, opts *repository.UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, error) { + ctx, span := telemetry.NewSpan(ctx, "replay-step-run") + defer span.End() + + if err := s.v.Validate(opts); err != nil { + return nil, err + } + + updateParams, createEventParams, updateJobRunLookupDataParams, _, _, err := getUpdateParams(tenantId, stepRunId, opts) + + if err != nil { + return nil, err + } + + var stepRun *dbsqlc.GetStepRunForEngineRow + + err = deadlockRetry(s.l, func() error { + tx, err := s.pool.Begin(ctx) + + if err != nil { + return err + } + + defer deferRollback(ctx, s.l, tx.Rollback) + + innerStepRun, err := s.queries.GetStepRun(ctx, tx, dbsqlc.GetStepRunParams{ + ID: sqlchelpers.UUIDFromStr(stepRunId), + Tenantid: sqlchelpers.UUIDFromStr(tenantId), + }) + + if err != nil { + return err + } + + stepRun, err = s.updateStepRunCore(ctx, tx, tenantId, updateParams, createEventParams, updateJobRunLookupDataParams, innerStepRun) + + if err != nil { + return err + } + + // reset the job run, workflow run and all fields as part of the core tx + _, err = s.queries.ReplayStepRunResetWorkflowRun(ctx, tx, stepRun.JobRunId) + + if err != nil { + return err + } + + _, err = s.queries.ReplayStepRunResetJobRun(ctx, tx, stepRun.JobRunId) + + if err != nil { + return err + } + + laterStepRuns, err := s.queries.GetLaterStepRunsForReplay(ctx, tx, dbsqlc.GetLaterStepRunsForReplayParams{ + Tenantid: sqlchelpers.UUIDFromStr(tenantId), + Steprunid: sqlchelpers.UUIDFromStr(stepRunId), + }) + + if err != nil { + return err + } + + // archive each of the later step run results + for _, laterStepRun := range laterStepRuns { + laterStepRunCp := laterStepRun + laterStepRunId := sqlchelpers.UUIDToStr(laterStepRun.ID) + + err = s.archiveStepRunResult(ctx, tx, tenantId, laterStepRunId) + + if err != nil { + return err + } + + // create a deferred event for each of these step runs + defer s.deferredStepRunEvent( + laterStepRunCp.ID, + dbsqlc.StepRunEventReasonRETRIEDBYUSER, + dbsqlc.StepRunEventSeverityINFO, + fmt.Sprintf("Parent step run %s was replayed, resetting step run result", stepRun.StepReadableId.String), + nil, + ) + } + + // reset all later step runs to a pending state + _, err = s.queries.ReplayStepRunResetLaterStepRuns(ctx, tx, dbsqlc.ReplayStepRunResetLaterStepRunsParams{ + Tenantid: sqlchelpers.UUIDFromStr(tenantId), + Steprunid: sqlchelpers.UUIDFromStr(stepRunId), + }) + + if err != nil { + return err + } + + err = tx.Commit(ctx) + + return err + }) + + if err != nil { + return nil, err + } + + return stepRun, nil +} + +func (s *stepRunEngineRepository) PreflightCheckReplayStepRun(ctx context.Context, tenantId, stepRunId string) error { + // verify that the step run is in a final state + stepRun, err := s.getStepRunForEngineTx(ctx, s.pool, tenantId, stepRunId) + + if err != nil { + return err + } + + if !repository.IsFinalStepRunStatus(stepRun.StepRun.Status) { + return repository.ErrPreflightReplayStepRunNotInFinalState + } + + // verify that child step runs are in a final state + childStepRuns, err := s.queries.ListNonFinalChildStepRuns(ctx, s.pool, dbsqlc.ListNonFinalChildStepRunsParams{ + Steprunid: sqlchelpers.UUIDFromStr(stepRunId), + Tenantid: sqlchelpers.UUIDFromStr(tenantId), + }) + + if err != nil && !errors.Is(err, pgx.ErrNoRows) { + return fmt.Errorf("could not list non-final child step runs: %w", err) + } + + if len(childStepRuns) > 0 { + return repository.ErrPreflightReplayChildStepRunNotInFinalState + } + + return nil +} + func (s *stepRunEngineRepository) UnlinkStepRunFromWorker(ctx context.Context, tenantId, stepRunId string) error { _, err := s.queries.UnlinkStepRunFromWorker(ctx, s.pool, dbsqlc.UnlinkStepRunFromWorkerParams{ Steprunid: sqlchelpers.UUIDFromStr(stepRunId), @@ -795,10 +925,6 @@ func (s *stepRunEngineRepository) UpdateStepRunOverridesData(ctx context.Context defer deferRollback(ctx, s.l, tx.Rollback) - if err != nil { - return nil, err - } - pgTenantId := sqlchelpers.UUIDFromStr(tenantId) pgStepRunId := sqlchelpers.UUIDFromStr(stepRunId) @@ -923,16 +1049,13 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st } sr, err := s.updateStepRunCore(ctx, tx, tenantId, updateParams, createEventParams, updateJobRunLookupDataParams, innerStepRun) + if err != nil { return err } stepRun = sr - if err != nil { - return err - } - if err := tx.Commit(ctx); err != nil { return err } @@ -1317,7 +1440,11 @@ func (s *stepRunEngineRepository) ListStartableStepRuns(ctx context.Context, ten } func (s *stepRunEngineRepository) ArchiveStepRunResult(ctx context.Context, tenantId, stepRunId string) error { - _, err := s.queries.ArchiveStepRunResultFromStepRun(ctx, s.pool, dbsqlc.ArchiveStepRunResultFromStepRunParams{ + return s.archiveStepRunResult(ctx, s.pool, tenantId, stepRunId) +} + +func (s *stepRunEngineRepository) archiveStepRunResult(ctx context.Context, db dbsqlc.DBTX, tenantId, stepRunId string) error { + _, err := s.queries.ArchiveStepRunResultFromStepRun(ctx, db, dbsqlc.ArchiveStepRunResultFromStepRunParams{ Tenantid: sqlchelpers.UUIDFromStr(tenantId), Steprunid: sqlchelpers.UUIDFromStr(stepRunId), }) diff --git a/internal/repository/prisma/workflow_run.go b/internal/repository/prisma/workflow_run.go index 0240fa3cf..d6e92fed0 100644 --- a/internal/repository/prisma/workflow_run.go +++ b/internal/repository/prisma/workflow_run.go @@ -54,7 +54,7 @@ func (w *workflowRunAPIRepository) WorkflowRunMetricsCount(tenantId string, opts return nil, err } - return workflowRunMetricsCount(context.Background(), w.pool, w.queries, w.l, tenantId, opts) + return workflowRunMetricsCount(context.Background(), w.pool, w.queries, tenantId, opts) } func (w *workflowRunAPIRepository) CreateNewWorkflowRun(ctx context.Context, tenantId string, opts *repository.CreateWorkflowRunOpts) (*db.WorkflowRunModel, error) { @@ -377,7 +377,7 @@ func listWorkflowRuns(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Q return res, nil } -func workflowRunMetricsCount(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Queries, l *zerolog.Logger, tenantId string, opts *repository.WorkflowRunsMetricsOpts) (*dbsqlc.WorkflowRunsMetricsCountRow, error) { +func workflowRunMetricsCount(ctx context.Context, pool *pgxpool.Pool, queries *dbsqlc.Queries, tenantId string, opts *repository.WorkflowRunsMetricsOpts) (*dbsqlc.WorkflowRunsMetricsCountRow, error) { pgTenantId := &pgtype.UUID{} diff --git a/internal/repository/step_run.go b/internal/repository/step_run.go index a267b9240..1aca3f4fa 100644 --- a/internal/repository/step_run.go +++ b/internal/repository/step_run.go @@ -118,6 +118,9 @@ type RefreshTimeoutBy struct { IncrementTimeoutBy string `validate:"required,duration"` } +var ErrPreflightReplayStepRunNotInFinalState = fmt.Errorf("step run is not in a final state") +var ErrPreflightReplayChildStepRunNotInFinalState = fmt.Errorf("child step run is not in a final state") + type StepRunAPIRepository interface { GetStepRunById(tenantId, stepRunId string) (*db.StepRunModel, error) @@ -138,6 +141,11 @@ type StepRunEngineRepository interface { UpdateStepRun(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, *StepRunUpdateInfo, error) + ReplayStepRun(ctx context.Context, tenantId, stepRunId string, opts *UpdateStepRunOpts) (*dbsqlc.GetStepRunForEngineRow, error) + + // PreflightCheckReplayStepRun checks if a step run can be replayed. If it can, it will return nil. + PreflightCheckReplayStepRun(ctx context.Context, tenantId, stepRunId string) error + CreateStepRunEvent(ctx context.Context, tenantId, stepRunId string, opts CreateStepRunEventOpts) error UnlinkStepRunFromWorker(ctx context.Context, tenantId, stepRunId string) error diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 19e07023b..99cd120a9 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -215,6 +215,8 @@ func (ec *JobsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Mes return ec.handleJobRunQueued(ctx, task) case "job-run-cancelled": return ec.handleJobRunCancelled(ctx, task) + case "step-run-replay": + return ec.handleStepRunReplay(ctx, task) case "step-run-retry": return ec.handleStepRunRetry(ctx, task) case "step-run-queued": @@ -370,7 +372,80 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq return fmt.Errorf("could not archive step run result: %w", err) } - ec.l.Error().Err(fmt.Errorf("starting step run retry")) + stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId) + + if err != nil { + return fmt.Errorf("could not get step run: %w", err) + } + + inputBytes := stepRun.StepRun.Input + retryCount := int(stepRun.StepRun.RetryCount) + 1 + + // update step run + _, _, err = ec.repo.StepRun().UpdateStepRun( + ctx, + metadata.TenantId, + sqlchelpers.UUIDToStr(stepRun.StepRun.ID), + &repository.UpdateStepRunOpts{ + Input: inputBytes, + Status: repository.StepRunStatusPtr(db.StepRunStatusPending), + IsRerun: true, + RetryCount: &retryCount, + Event: &repository.CreateStepRunEventOpts{ + EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonRETRYING), + EventMessage: repository.StringPtr( + fmt.Sprintf("Retrying step run. This is retry %d / %d", retryCount, stepRun.StepRetries), + )}, + }, + ) + + if err != nil { + return fmt.Errorf("could not update step run: %w", err) + } + + // send a task to the taskqueue + return ec.mq.AddMessage( + ctx, + msgqueue.JOB_PROCESSING_QUEUE, + tasktypes.StepRunQueuedToTask(stepRun), + ) +} + +// handleStepRunReplay replays a step run from scratch - it resets the workflow run state, job run state, and +// all cancelled step runs which are children of the step run being replayed. +func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msgqueue.Message) error { + ctx, span := telemetry.NewSpan(ctx, "handle-step-run-replay") + defer span.End() + + payload := tasktypes.StepRunReplayTaskPayload{} + metadata := tasktypes.StepRunReplayTaskMetadata{} + + err := ec.dv.DecodeAndValidate(task.Payload, &payload) + + if err != nil { + return fmt.Errorf("could not decode job task payload: %w", err) + } + + err = ec.dv.DecodeAndValidate(task.Metadata, &metadata) + + if err != nil { + return fmt.Errorf("could not decode job task metadata: %w", err) + } + + err = ec.repo.StepRun().ArchiveStepRunResult(ctx, metadata.TenantId, payload.StepRunId) + + if err != nil { + return fmt.Errorf("could not archive step run result: %w", err) + } + + // Unlink the step run from its existing worker. This is necessary because automatic retries increment the + // worker semaphore on failure/cancellation, but in this case we don't want to increment the semaphore. + // FIXME: this is very far decoupled from the actual worker logic, and should be refactored. + err = ec.repo.StepRun().UnlinkStepRunFromWorker(ctx, metadata.TenantId, payload.StepRunId) + + if err != nil { + return fmt.Errorf("could not unlink step run from worker: %w", err) + } stepRun, err := ec.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId) @@ -427,7 +502,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq } // update step run - _, _, err = ec.repo.StepRun().UpdateStepRun( + _, err = ec.repo.StepRun().ReplayStepRun( ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.StepRun.ID), @@ -437,15 +512,15 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq IsRerun: true, RetryCount: &retryCount, Event: &repository.CreateStepRunEventOpts{ - EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonRETRYING), + EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonRETRIEDBYUSER), EventMessage: repository.StringPtr( - fmt.Sprintf("Retrying step run. This is retry %d / %d", retryCount, stepRun.StepRetries), + "This step was manually replayed by a user", )}, }, ) if err != nil { - return fmt.Errorf("could not update step run: %w", err) + return fmt.Errorf("could not update step run for replay: %w", err) } // send a task to the taskqueue diff --git a/internal/services/shared/tasktypes/step.go b/internal/services/shared/tasktypes/step.go index 76537c6db..a076d3492 100644 --- a/internal/services/shared/tasktypes/step.go +++ b/internal/services/shared/tasktypes/step.go @@ -121,6 +121,18 @@ type StepRunRetryTaskMetadata struct { TenantId string `json:"tenant_id" validate:"required,uuid"` } +type StepRunReplayTaskPayload struct { + StepRunId string `json:"step_run_id" validate:"required,uuid"` + JobRunId string `json:"job_run_id" validate:"required,uuid"` + + // optional - if not provided, the step run will be retried with the same input + InputData string `json:"input_data,omitempty"` +} + +type StepRunReplayTaskMetadata struct { + TenantId string `json:"tenant_id" validate:"required,uuid"` +} + func TenantToStepRunRequeueTask(tenant db.TenantModel) *msgqueue.Message { payload, _ := datautils.ToJSONMap(StepRunRequeueTaskPayload{ TenantId: tenant.ID, @@ -161,6 +173,29 @@ func StepRunRetryToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte } } +func StepRunReplayToTask(stepRun *dbsqlc.GetStepRunForEngineRow, inputData []byte) *msgqueue.Message { + jobRunId := sqlchelpers.UUIDToStr(stepRun.JobRunId) + stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID) + tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId) + + payload, _ := datautils.ToJSONMap(StepRunReplayTaskPayload{ + JobRunId: jobRunId, + StepRunId: stepRunId, + InputData: string(inputData), + }) + + metadata, _ := datautils.ToJSONMap(StepRunReplayTaskMetadata{ + TenantId: tenantId, + }) + + return &msgqueue.Message{ + ID: "step-run-replay", + Payload: payload, + Metadata: metadata, + Retries: 3, + } +} + func StepRunCancelToTask(stepRun *dbsqlc.GetStepRunForEngineRow, reason string) *msgqueue.Message { stepRunId := sqlchelpers.UUIDToStr(stepRun.StepRun.ID) tenantId := sqlchelpers.UUIDToStr(stepRun.StepRun.TenantId) diff --git a/pkg/client/client.go b/pkg/client/client.go index 1020be38d..bf82877df 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -203,7 +203,7 @@ func newFromOpts(opts *ClientOpts) (Client, error) { transportCreds = credentials.NewTLS(opts.tls) } - conn, err := grpc.Dial( + conn, err := grpc.NewClient( opts.hostPort, grpc.WithTransportCredentials(transportCreds), ) diff --git a/pkg/client/rest/gen.go b/pkg/client/rest/gen.go index 0b1dfd297..d791ddeb6 100644 --- a/pkg/client/rest/gen.go +++ b/pkg/client/rest/gen.go @@ -77,6 +77,7 @@ const ( StepRunEventReasonREASSIGNED StepRunEventReason = "REASSIGNED" StepRunEventReasonREQUEUEDNOWORKER StepRunEventReason = "REQUEUED_NO_WORKER" StepRunEventReasonREQUEUEDRATELIMIT StepRunEventReason = "REQUEUED_RATE_LIMIT" + StepRunEventReasonRETRIEDBYUSER StepRunEventReason = "RETRIED_BY_USER" StepRunEventReasonRETRYING StepRunEventReason = "RETRYING" StepRunEventReasonSCHEDULINGTIMEDOUT StepRunEventReason = "SCHEDULING_TIMED_OUT" StepRunEventReasonSLOTRELEASED StepRunEventReason = "SLOT_RELEASED" diff --git a/prisma/migrations/20240517204453_v0_28_1/migration.sql b/prisma/migrations/20240517204453_v0_28_1/migration.sql new file mode 100644 index 000000000..9b806a59c --- /dev/null +++ b/prisma/migrations/20240517204453_v0_28_1/migration.sql @@ -0,0 +1,2 @@ +-- AlterEnum +ALTER TYPE "StepRunEventReason" ADD VALUE 'RETRIED_BY_USER'; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 07f5041c0..a598c54ea 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -1042,6 +1042,7 @@ enum StepRunEventReason { FINISHED FAILED RETRYING + RETRIED_BY_USER CANCELLED TIMEOUT_REFRESHED SLOT_RELEASED