Refactor: Write to S3 outside of goroutine (#2646)

* feat: refactor olap job

* fix: core

* fix: bug
This commit is contained in:
matt
2025-12-11 18:37:50 -05:00
committed by GitHub
parent ba4b097bbc
commit b131ec94cd
2 changed files with 36 additions and 27 deletions

View File

@@ -2739,8 +2739,10 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
mu := sync.Mutex{}
eg := errgroup.Group{}
externalIdToKey := make(map[PayloadExternalId]ExternalPayloadLocationKey)
externalIdToPayload := make(map[PayloadExternalId]sqlcv1.ListPaginatedOLAPPayloadsForOffloadRow)
alreadyExternalPayloads := make(map[PayloadExternalId]ExternalPayloadLocationKey)
offloadToExternalStoreOpts := make([]OffloadToExternalStoreOpts, 0)
numPayloads := 0
for _, payloadRange := range payloadRanges {
@@ -2758,18 +2760,18 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
return fmt.Errorf("failed to list paginated payloads for offload")
}
alreadyExternalPayloads := make(map[PayloadExternalId]ExternalPayloadLocationKey)
alreadyExternalPayloadsInner := make(map[PayloadExternalId]ExternalPayloadLocationKey)
externalIdToPayloadInner := make(map[PayloadExternalId]sqlcv1.ListPaginatedOLAPPayloadsForOffloadRow)
offloadToExternalStoreOpts := make([]OffloadToExternalStoreOpts, 0)
offloadToExternalStoreOptsInner := make([]OffloadToExternalStoreOpts, 0)
for _, payload := range payloads {
externalId := PayloadExternalId(payload.ExternalID.String())
externalIdToPayloadInner[externalId] = *payload
if payload.Location != sqlcv1.V1PayloadLocationOlapINLINE {
alreadyExternalPayloads[externalId] = ExternalPayloadLocationKey(payload.ExternalLocationKey)
alreadyExternalPayloadsInner[externalId] = ExternalPayloadLocationKey(payload.ExternalLocationKey)
} else {
offloadToExternalStoreOpts = append(offloadToExternalStoreOpts, OffloadToExternalStoreOpts{
offloadToExternalStoreOptsInner = append(offloadToExternalStoreOptsInner, OffloadToExternalStoreOpts{
TenantId: TenantID(payload.TenantID.String()),
ExternalID: externalId,
InsertedAt: payload.InsertedAt,
@@ -2778,16 +2780,10 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
}
}
newlyOffloadedExternalIdToKey, err := p.PayloadStore().ExternalStore().Store(ctx, offloadToExternalStoreOpts...)
if err != nil {
return fmt.Errorf("failed to offload payloads to external store: %w", err)
}
mu.Lock()
maps.Copy(externalIdToKey, newlyOffloadedExternalIdToKey)
maps.Copy(externalIdToKey, alreadyExternalPayloads)
maps.Copy(externalIdToPayload, externalIdToPayloadInner)
maps.Copy(alreadyExternalPayloads, alreadyExternalPayloadsInner)
offloadToExternalStoreOpts = append(offloadToExternalStoreOpts, offloadToExternalStoreOptsInner...)
numPayloads += len(payloads)
mu.Unlock()
@@ -2801,6 +2797,14 @@ func (p *OLAPRepositoryImpl) processOLAPPayloadCutoverBatch(ctx context.Context,
return nil, err
}
externalIdToKey, err := p.PayloadStore().ExternalStore().Store(ctx, offloadToExternalStoreOpts...)
if err != nil {
return nil, fmt.Errorf("failed to offload payloads to external store: %w", err)
}
maps.Copy(externalIdToKey, alreadyExternalPayloads)
span.SetAttributes(attribute.Int("num_payloads_read", numPayloads))
payloadsToInsert := make([]sqlcv1.CutoverOLAPPayloadToInsert, 0, numPayloads)

View File

@@ -467,11 +467,13 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
}, nil
}
eg := errgroup.Group{}
mu := sync.Mutex{}
eg := errgroup.Group{}
externalIdToKey := make(map[PayloadExternalId]ExternalPayloadLocationKey)
externalIdToPayload := make(map[PayloadExternalId]sqlcv1.ListPaginatedPayloadsForOffloadRow)
alreadyExternalPayloads := make(map[PayloadExternalId]ExternalPayloadLocationKey)
offloadToExternalStoreOpts := make([]OffloadToExternalStoreOpts, 0)
numPayloads := 0
for _, payloadRange := range payloadRanges {
@@ -490,9 +492,9 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
return fmt.Errorf("failed to list paginated payloads for offload")
}
alreadyExternalPayloads := make(map[PayloadExternalId]ExternalPayloadLocationKey)
alreadyExternalPayloadsInner := make(map[PayloadExternalId]ExternalPayloadLocationKey)
externalIdToPayloadInner := make(map[PayloadExternalId]sqlcv1.ListPaginatedPayloadsForOffloadRow)
offloadOpts := make([]OffloadToExternalStoreOpts, 0, len(payloads))
offloadToExternalStoreOptsInner := make([]OffloadToExternalStoreOpts, 0)
for _, payload := range payloads {
externalId := PayloadExternalId(payload.ExternalID.String())
@@ -502,10 +504,11 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
}
externalIdToPayloadInner[externalId] = *payload
if payload.Location != sqlcv1.V1PayloadLocationINLINE {
alreadyExternalPayloads[externalId] = ExternalPayloadLocationKey(payload.ExternalLocationKey)
alreadyExternalPayloadsInner[externalId] = ExternalPayloadLocationKey(payload.ExternalLocationKey)
} else {
offloadOpts = append(offloadOpts, OffloadToExternalStoreOpts{
offloadToExternalStoreOptsInner = append(offloadToExternalStoreOptsInner, OffloadToExternalStoreOpts{
TenantId: TenantID(payload.TenantID.String()),
ExternalID: externalId,
InsertedAt: payload.InsertedAt,
@@ -514,16 +517,10 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
}
}
externalIdToKeyInner, err := p.ExternalStore().Store(ctx, offloadOpts...)
if err != nil {
return fmt.Errorf("failed to offload payloads to external store")
}
mu.Lock()
maps.Copy(externalIdToKey, externalIdToKeyInner)
maps.Copy(externalIdToKey, alreadyExternalPayloads)
maps.Copy(externalIdToPayload, externalIdToPayloadInner)
maps.Copy(alreadyExternalPayloads, alreadyExternalPayloadsInner)
offloadToExternalStoreOpts = append(offloadToExternalStoreOpts, offloadToExternalStoreOptsInner...)
numPayloads += len(payloads)
mu.Unlock()
@@ -537,6 +534,14 @@ func (p *payloadStoreRepositoryImpl) ProcessPayloadCutoverBatch(ctx context.Cont
return nil, err
}
externalIdToKey, err := p.ExternalStore().Store(ctx, offloadToExternalStoreOpts...)
if err != nil {
return nil, fmt.Errorf("failed to offload payloads to external store: %w", err)
}
maps.Copy(externalIdToKey, alreadyExternalPayloads)
span.SetAttributes(attribute.Int("num_payloads_read", numPayloads))
payloadsToInsert := make([]sqlcv1.CutoverPayloadToInsert, 0, numPayloads)