Refactor batch-reading from cache and DB

This commit is contained in:
Taras Kushnir
2025-07-06 09:48:04 +03:00
parent 4a2cb859ff
commit 36c10eb60c
2 changed files with 127 additions and 104 deletions
+32 -103
View File
@@ -296,132 +296,61 @@ func (impl *BusinessStoreImpl) RetrievePropertyBySitekey(ctx context.Context, si
return reader.Read(ctx)
}
// TODO: Refactor this to use otter.Cache BulkGet() API
// and also it's clone RetrievePropertiesByID()
func (impl *BusinessStoreImpl) RetrievePropertiesBySitekey(ctx context.Context, sitekeys map[string]uint, minMissingCount uint) ([]*dbgen.Property, error) {
if len(sitekeys) == 0 {
return []*dbgen.Property{}, nil
reader := &StoreBulkReader[string, pgtype.UUID, dbgen.Property]{
ArgFunc: propertySitekeyFunc,
Cache: impl.cache,
CacheKeyFunc: PropertyBySitekeyCacheKey,
QueryKeyFunc: stringKeySitekeyUUID,
MinMissingCount: minMissingCount,
}
keys := make([]pgtype.UUID, 0, len(sitekeys))
keysMap := make(map[string]struct{})
result := make([]*dbgen.Property, 0, len(sitekeys))
t := struct{}{}
for sitekey := range sitekeys {
eid := UUIDFromSiteKey(sitekey)
if !eid.Valid {
continue
}
cacheKey := PropertyBySitekeyCacheKey(sitekey)
if property, err := FetchCachedOne[dbgen.Property](ctx, impl.cache, cacheKey); err == nil {
result = append(result, property)
continue
} else if err == ErrNegativeCacheHit {
continue
}
keys = append(keys, eid)
keysMap[sitekey] = t
if impl.querier != nil {
reader.QueryFunc = impl.querier.GetPropertiesByExternalID
}
if len(keys) == 0 {
if len(result) > 0 {
slog.DebugContext(ctx, "All properties are cached", "count", len(result))
return result, nil
}
slog.WarnContext(ctx, "No valid sitekeys to fetch from DB")
return nil, ErrInvalidInput
}
if impl.querier == nil {
return result, ErrMaintenance
}
properties, err := impl.querier.GetPropertiesByExternalID(ctx, keys)
if err != nil && err != pgx.ErrNoRows {
slog.ErrorContext(ctx, "Failed to retrieve properties by sitekeys", common.ErrAttr(err))
cached, items, err := reader.Read(ctx, sitekeys)
if err != nil {
return nil, err
}
slog.DebugContext(ctx, "Fetched properties from DB by sitekeys", "count", len(properties))
for _, p := range properties {
sitekey := UUIDToSiteKey(p.ExternalID)
for _, item := range items {
sitekey := UUIDToSiteKey(item.ExternalID)
cacheKey := PropertyBySitekeyCacheKey(sitekey)
_ = impl.cache.SetWithTTL(ctx, cacheKey, p, propertyTTL)
delete(keysMap, sitekey)
_ = impl.cache.SetWithTTL(ctx, cacheKey, item, propertyTTL)
}
for missingKey := range keysMap {
// TODO: Switch to a probabilistic logic via an interface for negative caching
if count, ok := sitekeys[missingKey]; ok && (count >= minMissingCount) {
_ = impl.cache.SetMissing(ctx, PropertyBySitekeyCacheKey(missingKey))
}
}
result = append(result, properties...)
result := cached
result = append(result, items...)
return result, nil
}
// this is pretty much a copy paste of RetrievePropertiesBySitekey
func (impl *BusinessStoreImpl) RetrievePropertiesByID(ctx context.Context, batch map[int32]uint) ([]*dbgen.Property, error) {
if len(batch) == 0 {
return []*dbgen.Property{}, nil
reader := &StoreBulkReader[int32, int32, dbgen.Property]{
ArgFunc: propertyIDFunc,
Cache: impl.cache,
CacheKeyFunc: propertyByIDCacheKey,
QueryKeyFunc: IdentityKeyFunc[int32],
}
keys := make([]int32, 0, len(batch))
keysMap := make(map[int32]struct{})
result := make([]*dbgen.Property, 0, len(batch))
t := struct{}{}
for pID := range batch {
cacheKey := propertyByIDCacheKey(pID)
if property, err := FetchCachedOne[dbgen.Property](ctx, impl.cache, cacheKey); err == nil {
result = append(result, property)
continue
} else if err == ErrNegativeCacheHit {
continue
}
keys = append(keys, pID)
keysMap[pID] = t
if impl.querier != nil {
reader.QueryFunc = impl.querier.GetPropertiesByID
}
if len(keys) == 0 {
if len(result) > 0 {
slog.DebugContext(ctx, "All properties are cached", "count", len(result))
return result, nil
}
slog.WarnContext(ctx, "No valid properties to fetch from DB")
return nil, ErrInvalidInput
}
if impl.querier == nil {
return result, ErrMaintenance
}
properties, err := impl.querier.GetPropertiesByID(ctx, keys)
if err != nil && err != pgx.ErrNoRows {
slog.ErrorContext(ctx, "Failed to retrieve properties by sitekeys", common.ErrAttr(err))
cached, items, err := reader.Read(ctx, batch)
if err != nil {
return nil, err
}
slog.DebugContext(ctx, "Fetched properties from DB by sitekeys", "count", len(properties))
for _, p := range properties {
sitekey := UUIDToSiteKey(p.ExternalID)
for _, item := range items {
sitekey := UUIDToSiteKey(item.ExternalID)
cacheKey := PropertyBySitekeyCacheKey(sitekey)
_ = impl.cache.SetWithTTL(ctx, cacheKey, p, propertyTTL)
delete(keysMap, p.ID)
_ = impl.cache.SetWithTTL(ctx, cacheKey, item, propertyTTL)
}
result = append(result, properties...)
result := cached
result = append(result, items...)
return result, nil
}
@@ -525,7 +454,7 @@ func (impl *BusinessStoreImpl) RetrieveUserOrganizations(ctx context.Context, us
}
if impl.querier != nil {
reader.QueryKeyFunc = queryKeyPgInt
reader.QueryKeyFunc = QueryKeyPgInt
reader.QueryFunc = impl.querier.GetUserOrganizations
}
@@ -818,7 +747,7 @@ func (impl *BusinessStoreImpl) RetrieveOrgProperties(ctx context.Context, orgID
}
if impl.querier != nil {
reader.QueryKeyFunc = queryKeyPgInt
reader.QueryKeyFunc = QueryKeyPgInt
reader.QueryFunc = impl.querier.GetOrgProperties
}
@@ -1043,7 +972,7 @@ func (impl *BusinessStoreImpl) RetrieveUserAPIKeys(ctx context.Context, userID i
}
if impl.querier != nil {
reader.QueryKeyFunc = queryKeyPgInt
reader.QueryKeyFunc = QueryKeyPgInt
reader.QueryFunc = impl.querier.GetUserAPIKeys
}
+95 -1
View File
@@ -196,7 +196,28 @@ func queryKeySitekeyUUID(key CacheKey) (pgtype.UUID, error) {
return result, nil
}
func queryKeyPgInt(key CacheKey) (pgtype.Int4, error) {
func stringKeySitekeyUUID(key string) (pgtype.UUID, error) {
result := UUIDFromSiteKey(key)
if !result.Valid {
return result, ErrInvalidInput
}
return result, nil
}
func IdentityKeyFunc[TKey any](key TKey) (TKey, error) {
return key, nil
}
func propertySitekeyFunc(p *dbgen.Property) string {
return UUIDToSiteKey(p.ExternalID)
}
func propertyIDFunc(p *dbgen.Property) int32 {
return p.ID
}
func QueryKeyPgInt(key CacheKey) (pgtype.Int4, error) {
return Int(key.IntValue), nil
}
@@ -344,3 +365,76 @@ func (sf *cachedPropertyReader) Read(ctx context.Context) (*dbgen.Property, erro
return nil, errInvalidCacheType
}
// TODO: Refactor this to use otter.Cache BulkGet() API
type StoreBulkReader[TArg comparable, TKey any, T any] struct {
ArgFunc func(*T) TArg
QueryFunc func(context.Context, []TKey) ([]*T, error)
QueryKeyFunc func(TArg) (TKey, error)
Cache common.Cache[CacheKey, any]
CacheKeyFunc func(TArg) CacheKey
MinMissingCount uint
}
// returns cached and fetched items separately
func (br *StoreBulkReader[TArg, TKey, T]) Read(ctx context.Context, args map[TArg]uint) ([]*T, []*T, error) {
if len(args) == 0 {
return []*T{}, []*T{}, nil
}
queryKeys := make([]TKey, 0, len(args))
argsMap := make(map[TArg]struct{})
cached := make([]*T, 0, len(args))
for arg := range args {
cacheKey := br.CacheKeyFunc(arg)
if t, err := FetchCachedOne[T](ctx, br.Cache, cacheKey); err == nil {
cached = append(cached, t)
continue
} else if err == ErrNegativeCacheHit {
continue
}
if key, err := br.QueryKeyFunc(arg); err == nil {
queryKeys = append(queryKeys, key)
argsMap[arg] = struct{}{}
}
}
if len(queryKeys) == 0 {
if len(cached) > 0 {
slog.DebugContext(ctx, "All items are cached", "count", len(cached))
return cached, []*T{}, nil
}
slog.WarnContext(ctx, "No valid keys to fetch from DB")
return nil, nil, ErrInvalidInput
}
if br.QueryFunc == nil {
return cached, []*T{}, ErrMaintenance
}
items, err := br.QueryFunc(ctx, queryKeys)
if err != nil && err != pgx.ErrNoRows {
slog.ErrorContext(ctx, "Failed to query items", "keys", len(queryKeys), common.ErrAttr(err))
return cached, nil, err
}
slog.DebugContext(ctx, "Fetched items from DB", "count", len(items))
for _, item := range items {
arg := br.ArgFunc(item)
delete(argsMap, arg)
}
for missingKey := range argsMap {
// TODO: Switch to a probabilistic logic via an interface for negative caching
if count, ok := args[missingKey]; ok && (count >= br.MinMissingCount) {
cacheKey := br.CacheKeyFunc(missingKey)
_ = br.Cache.SetMissing(ctx, cacheKey)
}
}
return cached, items, nil
}