Feat: durable eviction take 2 (#3075)

* feat: simplified eviction feature

* fix: assign new worker id

* test: shorter sleep

* fix: completion race on same worker

* chore: address todo

* chore: lint

* chore: generate

* fix: n+1 queries

* refactor: WasEvicted bool

* feat: evicted state

* chore: generate

* fix: map status

* fix: update PendingCallback structure to include InvocationCount

* revert: comment

* feat: add support for EVICTED status in waterfall component and metrics display

* fix: implicit eviction

* chore: readable cte

* refactor: queued bool

* refactor: rename eviction_policy

* fix: aio only

* chore: example return type

* fix: map

* feat: eviction error cases

* refactor: change external ID maps to use UUID type

* chore: feedback, cleanup

* tests: additional cases

* chore: generate

* chore: lint

* chore: lint generate

* chore: clean up comments to make matt happy

* refactor: more feedback

* chore: add TODO for worker state reconciliation and clean up comments in eviction policy

* tests: fix

* chore: gen

* test: increase ruby timeout...

* fix: invocation count

* fix: test cases

* fix: stale log entry

* chore: lint

* revert: durable tests to use time.time

* chore: lint
This commit is contained in:
Gabe Ruttner
2026-02-27 09:25:50 -08:00
committed by GitHub
parent 05230016c2
commit daff28dbfe
95 changed files with 4096 additions and 868 deletions
+141
View File
@@ -240,6 +240,8 @@ const (
V1TaskEventTypeCANCELLED V1TaskEventType = "CANCELLED"
V1TaskEventTypeCOULDNOTSENDTOWORKER V1TaskEventType = "COULD_NOT_SEND_TO_WORKER"
V1TaskEventTypeCREATED V1TaskEventType = "CREATED"
V1TaskEventTypeDURABLEEVICTED V1TaskEventType = "DURABLE_EVICTED"
V1TaskEventTypeDURABLERESTORING V1TaskEventType = "DURABLE_RESTORING"
V1TaskEventTypeFAILED V1TaskEventType = "FAILED"
V1TaskEventTypeFINISHED V1TaskEventType = "FINISHED"
V1TaskEventTypeQUEUED V1TaskEventType = "QUEUED"
@@ -262,6 +264,7 @@ const (
const (
V1TaskStatusCANCELLED V1TaskStatus = "CANCELLED"
V1TaskStatusCOMPLETED V1TaskStatus = "COMPLETED"
V1TaskStatusEVICTED V1TaskStatus = "EVICTED"
V1TaskStatusFAILED V1TaskStatus = "FAILED"
V1TaskStatusQUEUED V1TaskStatus = "QUEUED"
V1TaskStatusRUNNING V1TaskStatus = "RUNNING"
@@ -1699,6 +1702,11 @@ type V1ReplayedTasks struct {
Ids *[]openapi_types.UUID `json:"ids,omitempty"`
}
// V1RestoreTaskResponse defines model for V1RestoreTaskResponse.
type V1RestoreTaskResponse struct {
Requeued bool `json:"requeued"`
}
// V1TaskEvent defines model for V1TaskEvent.
type V1TaskEvent struct {
// Attempt The attempt number of the task.
@@ -3287,6 +3295,9 @@ type ClientInterface interface {
// V1LogLineList request
V1LogLineList(ctx context.Context, task openapi_types.UUID, params *V1LogLineListParams, reqEditors ...RequestEditorFn) (*http.Response, error)
// V1TaskRestore request
V1TaskRestore(ctx context.Context, task openapi_types.UUID, reqEditors ...RequestEditorFn) (*http.Response, error)
// V1TaskEventList request
V1TaskEventList(ctx context.Context, task openapi_types.UUID, params *V1TaskEventListParams, reqEditors ...RequestEditorFn) (*http.Response, error)
@@ -3936,6 +3947,18 @@ func (c *Client) V1LogLineList(ctx context.Context, task openapi_types.UUID, par
return c.Client.Do(req)
}
func (c *Client) V1TaskRestore(ctx context.Context, task openapi_types.UUID, reqEditors ...RequestEditorFn) (*http.Response, error) {
req, err := NewV1TaskRestoreRequest(c.Server, task)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if err := c.applyEditors(ctx, req, reqEditors); err != nil {
return nil, err
}
return c.Client.Do(req)
}
func (c *Client) V1TaskEventList(ctx context.Context, task openapi_types.UUID, params *V1TaskEventListParams, reqEditors ...RequestEditorFn) (*http.Response, error) {
req, err := NewV1TaskEventListRequest(c.Server, task, params)
if err != nil {
@@ -6546,6 +6569,40 @@ func NewV1LogLineListRequest(server string, task openapi_types.UUID, params *V1L
return req, nil
}
// NewV1TaskRestoreRequest generates requests for V1TaskRestore
func NewV1TaskRestoreRequest(server string, task openapi_types.UUID) (*http.Request, error) {
var err error
var pathParam0 string
pathParam0, err = runtime.StyleParamWithLocation("simple", false, "task", runtime.ParamLocationPath, task)
if err != nil {
return nil, err
}
serverURL, err := url.Parse(server)
if err != nil {
return nil, err
}
operationPath := fmt.Sprintf("/api/v1/stable/tasks/%s/restore", pathParam0)
if operationPath[0] == '/' {
operationPath = "." + operationPath
}
queryURL, err := serverURL.Parse(operationPath)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", queryURL.String(), nil)
if err != nil {
return nil, err
}
return req, nil
}
// NewV1TaskEventListRequest generates requests for V1TaskEventList
func NewV1TaskEventListRequest(server string, task openapi_types.UUID, params *V1TaskEventListParams) (*http.Request, error) {
var err error
@@ -13365,6 +13422,9 @@ type ClientWithResponsesInterface interface {
// V1LogLineListWithResponse request
V1LogLineListWithResponse(ctx context.Context, task openapi_types.UUID, params *V1LogLineListParams, reqEditors ...RequestEditorFn) (*V1LogLineListResponse, error)
// V1TaskRestoreWithResponse request
V1TaskRestoreWithResponse(ctx context.Context, task openapi_types.UUID, reqEditors ...RequestEditorFn) (*V1TaskRestoreResponse, error)
// V1TaskEventListWithResponse request
V1TaskEventListWithResponse(ctx context.Context, task openapi_types.UUID, params *V1TaskEventListParams, reqEditors ...RequestEditorFn) (*V1TaskEventListResponse, error)
@@ -14198,6 +14258,31 @@ func (r V1LogLineListResponse) StatusCode() int {
return 0
}
type V1TaskRestoreResponse struct {
Body []byte
HTTPResponse *http.Response
JSON200 *V1RestoreTaskResponse
JSON400 *APIErrors
JSON403 *APIErrors
JSON404 *APIErrors
}
// Status returns HTTPResponse.Status
func (r V1TaskRestoreResponse) Status() string {
if r.HTTPResponse != nil {
return r.HTTPResponse.Status
}
return http.StatusText(0)
}
// StatusCode returns HTTPResponse.StatusCode
func (r V1TaskRestoreResponse) StatusCode() int {
if r.HTTPResponse != nil {
return r.HTTPResponse.StatusCode
}
return 0
}
type V1TaskEventListResponse struct {
Body []byte
HTTPResponse *http.Response
@@ -17236,6 +17321,15 @@ func (c *ClientWithResponses) V1LogLineListWithResponse(ctx context.Context, tas
return ParseV1LogLineListResponse(rsp)
}
// V1TaskRestoreWithResponse request returning *V1TaskRestoreResponse
func (c *ClientWithResponses) V1TaskRestoreWithResponse(ctx context.Context, task openapi_types.UUID, reqEditors ...RequestEditorFn) (*V1TaskRestoreResponse, error) {
rsp, err := c.V1TaskRestore(ctx, task, reqEditors...)
if err != nil {
return nil, err
}
return ParseV1TaskRestoreResponse(rsp)
}
// V1TaskEventListWithResponse request returning *V1TaskEventListResponse
func (c *ClientWithResponses) V1TaskEventListWithResponse(ctx context.Context, task openapi_types.UUID, params *V1TaskEventListParams, reqEditors ...RequestEditorFn) (*V1TaskEventListResponse, error) {
rsp, err := c.V1TaskEventList(ctx, task, params, reqEditors...)
@@ -19234,6 +19328,53 @@ func ParseV1LogLineListResponse(rsp *http.Response) (*V1LogLineListResponse, err
return response, nil
}
// ParseV1TaskRestoreResponse parses an HTTP response from a V1TaskRestoreWithResponse call
func ParseV1TaskRestoreResponse(rsp *http.Response) (*V1TaskRestoreResponse, error) {
bodyBytes, err := io.ReadAll(rsp.Body)
defer func() { _ = rsp.Body.Close() }()
if err != nil {
return nil, err
}
response := &V1TaskRestoreResponse{
Body: bodyBytes,
HTTPResponse: rsp,
}
switch {
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200:
var dest V1RestoreTaskResponse
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
}
response.JSON200 = &dest
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 400:
var dest APIErrors
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
}
response.JSON400 = &dest
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 403:
var dest APIErrors
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
}
response.JSON403 = &dest
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 404:
var dest APIErrors
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
}
response.JSON404 = &dest
}
return response, nil
}
// ParseV1TaskEventListResponse parses an HTTP response from a V1TaskEventListWithResponse call
func ParseV1TaskEventListResponse(rsp *http.Response) (*V1TaskEventListResponse, error) {
bodyBytes, err := io.ReadAll(rsp.Body)