From c470c7634b0c604a38bdab4075e24bc411d98f07 Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 14:23:41 -0400 Subject: [PATCH 1/8] adding retry for contested locks and an operation timeout --- cacheaside.go | 66 ++++++++++++++++++++++++++++++++++------------ cacheaside_test.go | 30 +++++++++++++++++++++ 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/cacheaside.go b/cacheaside.go index b178198..9675a26 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -17,22 +17,30 @@ import ( ) type CacheAside struct { - client rueidis.Client - locks syncx.Map[string, chan struct{}] - lockTTL time.Duration + client rueidis.Client + locks syncx.Map[string, chan struct{}] + lockTTL time.Duration + operationTTL time.Duration } type CacheAsideOption struct { LockTTL time.Duration + // OperationTTL is the maximum time to wait for cache operations to complete. + // If zero, defaults to 10 seconds. This is separate from cache entry TTL. + OperationTTL time.Duration } func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOption) (*CacheAside, error) { if caOption.LockTTL == 0 { caOption.LockTTL = 10 * time.Second } + if caOption.OperationTTL == 0 { + caOption.OperationTTL = 10 * time.Second + } rca := &CacheAside{ - lockTTL: caOption.LockTTL, + lockTTL: caOption.LockTTL, + operationTTL: caOption.OperationTTL, } clientOption.OnInvalidations = rca.onInvalidate client, err := rueidis.NewClient(clientOption) @@ -60,8 +68,9 @@ func (rca *CacheAside) onInvalidate(messages []rueidis.RedisMessage) { const prefix = "redcache:" var ( - delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) - setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) + delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) + setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) + errLockWaitTimeout = errors.New("lock wait timeout - retrying") ) func (rca *CacheAside) register(key string) <-chan struct{} { @@ -75,7 +84,7 @@ func (rca *CacheAside) Get( key string, fn func(ctx context.Context, key string) (val string, err error), ) (string, error) { - ctx, cancel := context.WithTimeout(ctx, ttl) + ctx, cancel := context.WithTimeout(ctx, rca.operationTTL) defer cancel() retry: wait := rca.register(key) @@ -98,11 +107,22 @@ retry: } if val == "" { + // Wait for lock release with a timeout to handle lost invalidation messages + // Use lockTTL as the wait timeout since that's the max time a lock can be held + waitCtx, cancel := context.WithTimeoutCause(ctx, rca.lockTTL, errLockWaitTimeout) + defer cancel() + select { case <-wait: goto retry - case <-ctx.Done(): - return "", ctx.Err() + case <-waitCtx.Done(): + // Check if it's our timeout or parent context cancellation + if errors.Is(context.Cause(waitCtx), errLockWaitTimeout) { + // Our timeout - retry to check Redis again + goto retry + } + // Parent context cancelled - propagate the error + return "", context.Cause(waitCtx) } } @@ -152,7 +172,8 @@ func (rca *CacheAside) trySetKeyFunc(ctx context.Context, ttl time.Duration, key if !setVal { toCtx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) defer cancel() - rca.unlock(toCtx, key, lockVal) + // Best effort unlock - errors are non-fatal as lock will expire + _ = rca.unlock(toCtx, key, lockVal) } }() if val, err = fn(ctx, key); err == nil { @@ -192,8 +213,8 @@ func (rca *CacheAside) setWithLock(ctx context.Context, ttl time.Duration, key s return valLock.val, nil } -func (rca *CacheAside) unlock(ctx context.Context, key string, lock string) { - delKeyLua.Exec(ctx, rca.client, []string{key}, []string{lock}) +func (rca *CacheAside) unlock(ctx context.Context, key string, lock string) error { + return delKeyLua.Exec(ctx, rca.client, []string{key}, []string{lock}).Error() } func (rca *CacheAside) GetMulti( @@ -203,7 +224,7 @@ func (rca *CacheAside) GetMulti( fn func(ctx context.Context, key []string) (val map[string]string, err error), ) (map[string]string, error) { - ctx, cancel := context.WithTimeout(ctx, ttl) + ctx, cancel := context.WithTimeout(ctx, rca.operationTTL) defer cancel() res := make(map[string]string, len(keys)) @@ -238,9 +259,19 @@ retry: } if len(waitLock) > 0 { - err = syncx.WaitForAll(ctx, mapsx.Values(waitLock)) + // Wait for lock releases with a timeout to handle lost invalidation messages + waitCtx, cancel := context.WithTimeoutCause(ctx, rca.lockTTL, errLockWaitTimeout) + err = syncx.WaitForAll(waitCtx, mapsx.Values(waitLock)) + cancel() + + // Check what kind of error occurred if err != nil { - return nil, err + // If it's our timeout, retry to check Redis again + if errors.Is(context.Cause(waitCtx), errLockWaitTimeout) { + goto retry + } + // Parent context cancelled - propagate the error + return nil, context.Cause(waitCtx) } goto retry } @@ -406,7 +437,7 @@ func (rca *CacheAside) setMultiWithLock(ctx context.Context, ttl time.Duration, } continue } - keyByStmt[ii] = append(out, kos.keyOrder[j]) + keyByStmt[ii] = append(keyByStmt[ii], kos.keyOrder[j]) } return nil }) @@ -438,7 +469,8 @@ func (rca *CacheAside) unlockMulti(ctx context.Context, lockVals map[string]stri wg.Add(1) go func() { defer wg.Done() - delKeyLua.ExecMulti(ctx, rca.client, stmts...) + // Best effort unlock - errors are non-fatal as locks will expire + _ = delKeyLua.ExecMulti(ctx, rca.client, stmts...) }() } wg.Wait() diff --git a/cacheaside_test.go b/cacheaside_test.go index f96ffac..cf04b56 100644 --- a/cacheaside_test.go +++ b/cacheaside_test.go @@ -621,3 +621,33 @@ func TestCacheAside_DelMulti(t *testing.T) { require.True(t, rueidis.IsRedisNil(err)) } } + +func TestCacheAside_GetParentContextCancellation(t *testing.T) { + client := makeClient(t, addr) + defer client.Client().Close() + + ctx, cancel := context.WithCancel(context.Background()) + key := "key:" + uuid.New().String() + val := "val:" + uuid.New().String() + + // Set a lock on the key so Get will wait + innerClient := client.Client() + lockVal := "redcache:" + uuid.New().String() + err := innerClient.Do(context.Background(), innerClient.B().Set().Key(key).Value(lockVal).Nx().Get().Px(time.Second*30).Build()).Error() + require.True(t, rueidis.IsRedisNil(err)) + + // Cancel the parent context after a short delay + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + cb := func(ctx context.Context, key string) (string, error) { + return val, nil + } + + // Should get parent context cancelled error, not a timeout + _, err = client.Get(ctx, time.Second*10, key, cb) + require.Error(t, err) + require.ErrorIs(t, err, context.Canceled) +} From 1c97bb42a2634ee9f009e2334bcc57ff70ec64d7 Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 14:26:41 -0400 Subject: [PATCH 2/8] remove operation timeout --- cacheaside.go | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/cacheaside.go b/cacheaside.go index 9675a26..1440dc2 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -17,30 +17,24 @@ import ( ) type CacheAside struct { - client rueidis.Client - locks syncx.Map[string, chan struct{}] - lockTTL time.Duration - operationTTL time.Duration + client rueidis.Client + locks syncx.Map[string, chan struct{}] + lockTTL time.Duration } type CacheAsideOption struct { + // LockTTL is the maximum time a lock can be held, and also the timeout for waiting + // on locks when handling lost Redis invalidation messages. Defaults to 10 seconds. LockTTL time.Duration - // OperationTTL is the maximum time to wait for cache operations to complete. - // If zero, defaults to 10 seconds. This is separate from cache entry TTL. - OperationTTL time.Duration } func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOption) (*CacheAside, error) { if caOption.LockTTL == 0 { caOption.LockTTL = 10 * time.Second } - if caOption.OperationTTL == 0 { - caOption.OperationTTL = 10 * time.Second - } rca := &CacheAside{ - lockTTL: caOption.LockTTL, - operationTTL: caOption.OperationTTL, + lockTTL: caOption.LockTTL, } clientOption.OnInvalidations = rca.onInvalidate client, err := rueidis.NewClient(clientOption) @@ -84,8 +78,6 @@ func (rca *CacheAside) Get( key string, fn func(ctx context.Context, key string) (val string, err error), ) (string, error) { - ctx, cancel := context.WithTimeout(ctx, rca.operationTTL) - defer cancel() retry: wait := rca.register(key) val, err := rca.tryGet(ctx, ttl, key) @@ -224,9 +216,6 @@ func (rca *CacheAside) GetMulti( fn func(ctx context.Context, key []string) (val map[string]string, err error), ) (map[string]string, error) { - ctx, cancel := context.WithTimeout(ctx, rca.operationTTL) - defer cancel() - res := make(map[string]string, len(keys)) waitLock := make(map[string]<-chan struct{}, len(keys)) From 80088b70bb5bdb5c0563e2ae36dd7a33420a6eda Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 14:42:57 -0400 Subject: [PATCH 3/8] use context to ensure cancellation --- cacheaside.go | 82 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/cacheaside.go b/cacheaside.go index 1440dc2..fbd5a2f 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -16,9 +16,14 @@ import ( "golang.org/x/sync/errgroup" ) +type lockEntry struct { + ctx context.Context + cancel context.CancelFunc +} + type CacheAside struct { client rueidis.Client - locks syncx.Map[string, chan struct{}] + locks syncx.Map[string, *lockEntry] lockTTL time.Duration } @@ -52,9 +57,9 @@ func (rca *CacheAside) Client() rueidis.Client { func (rca *CacheAside) onInvalidate(messages []rueidis.RedisMessage) { for _, m := range messages { key, _ := m.ToString() - ch, loaded := rca.locks.LoadAndDelete(key) + entry, loaded := rca.locks.LoadAndDelete(key) if loaded { - close(ch) + entry.cancel() // Cancel context, which closes the channel } } } @@ -62,14 +67,42 @@ func (rca *CacheAside) onInvalidate(messages []rueidis.RedisMessage) { const prefix = "redcache:" var ( - delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) - setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) - errLockWaitTimeout = errors.New("lock wait timeout - retrying") + delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) + setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) ) func (rca *CacheAside) register(key string) <-chan struct{} { - ch, _ := rca.locks.LoadOrStore(key, make(chan struct{})) - return ch + // Try to load existing entry first + if entry, loaded := rca.locks.Load(key); loaded { + // Check if the context is still active (not cancelled/timed out) + select { + case <-entry.ctx.Done(): + // Context is done - clean it up and create a new one + rca.locks.Delete(key) + default: + // Context is still active - use it + return entry.ctx.Done() + } + } + entry. + // Create new entry with context that auto-cancels after lockTTL + ctx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) + + entry := &lockEntry{ + ctx: ctx, + cancel: cancel, + } + + // Store or get existing entry atomically + actual, _ := rca.locks.LoadOrStore(key, entry) + + // If another goroutine stored first, cancel our context and use theirs + if actual != entry { + cancel() + return actual.ctx.Done() + } + + return ctx.Done() } func (rca *CacheAside) Get( @@ -99,22 +132,13 @@ retry: } if val == "" { - // Wait for lock release with a timeout to handle lost invalidation messages - // Use lockTTL as the wait timeout since that's the max time a lock can be held - waitCtx, cancel := context.WithTimeoutCause(ctx, rca.lockTTL, errLockWaitTimeout) - defer cancel() - + // Wait for lock release (channel auto-closes after lockTTL or on invalidation) select { case <-wait: goto retry - case <-waitCtx.Done(): - // Check if it's our timeout or parent context cancellation - if errors.Is(context.Cause(waitCtx), errLockWaitTimeout) { - // Our timeout - retry to check Redis again - goto retry - } - // Parent context cancelled - propagate the error - return "", context.Cause(waitCtx) + case <-ctx.Done(): + // Parent context cancelled + return "", ctx.Err() } } @@ -248,19 +272,11 @@ retry: } if len(waitLock) > 0 { - // Wait for lock releases with a timeout to handle lost invalidation messages - waitCtx, cancel := context.WithTimeoutCause(ctx, rca.lockTTL, errLockWaitTimeout) - err = syncx.WaitForAll(waitCtx, mapsx.Values(waitLock)) - cancel() - - // Check what kind of error occurred + // Wait for lock releases (channels auto-close after lockTTL or on invalidation) + err = syncx.WaitForAll(ctx, mapsx.Values(waitLock)) if err != nil { - // If it's our timeout, retry to check Redis again - if errors.Is(context.Cause(waitCtx), errLockWaitTimeout) { - goto retry - } - // Parent context cancelled - propagate the error - return nil, context.Cause(waitCtx) + // Parent context cancelled + return nil, ctx.Err() } goto retry } From cfde17d5eeacc2802f65579c6a916d6ce618871a Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 15:00:37 -0400 Subject: [PATCH 4/8] Update cacheaside.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cacheaside.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cacheaside.go b/cacheaside.go index dad0090..56ea442 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -282,7 +282,7 @@ retry: // Wait for lock releases (channels auto-close after lockTTL or on invalidation) err = syncx.WaitForAll(ctx, maps.Values(waitLock), len(waitLock)) if err != nil { - // Parent context cancelled + // Parent context cancelled or deadline exceeded return nil, ctx.Err() } goto retry From 3741e78f7f348c63323baa87861a7934c70e1904 Mon Sep 17 00:00:00 2001 From: David Bickford Date: Mon, 6 Oct 2025 16:26:42 -0400 Subject: [PATCH 5/8] fixed raw deletion with compare and delete --- cacheaside.go | 63 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/cacheaside.go b/cacheaside.go index dad0090..afd77d6 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -4,6 +4,7 @@ import ( "context" "errors" "iter" + "log/slog" "maps" "strconv" "strings" @@ -23,10 +24,15 @@ type lockEntry struct { cancel context.CancelFunc } +type Logger interface { + Error(msg string, args ...any) +} + type CacheAside struct { client rueidis.Client locks syncx.Map[string, *lockEntry] lockTTL time.Duration + logger Logger } type CacheAsideOption struct { @@ -34,6 +40,8 @@ type CacheAsideOption struct { // on locks when handling lost Redis invalidation messages. Defaults to 10 seconds. LockTTL time.Duration ClientBuilder func(option rueidis.ClientOption) (rueidis.Client, error) + // Logger for logging non-fatal errors. Defaults to slog.Default(). + Logger Logger } func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOption) (*CacheAside, error) { @@ -41,9 +49,13 @@ func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOpti if caOption.LockTTL == 0 { caOption.LockTTL = 10 * time.Second } + if caOption.Logger == nil { + caOption.Logger = slog.Default() + } rca := &CacheAside{ lockTTL: caOption.LockTTL, + logger: caOption.Logger, } clientOption.OnInvalidations = rca.onInvalidate if caOption.ClientBuilder != nil { @@ -79,37 +91,37 @@ var ( ) func (rca *CacheAside) register(key string) <-chan struct{} { - // Try to load existing entry first - if entry, loaded := rca.locks.Load(key); loaded { - // Check if the context is still active (not cancelled/timed out) - select { - case <-entry.ctx.Done(): - // Context is done - clean it up and create a new one - rca.locks.Delete(key) - default: - // Context is still active - use it - return entry.ctx.Done() - } - } - +retry: // Create new entry with context that auto-cancels after lockTTL ctx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) - entry := &lockEntry{ + newEntry := &lockEntry{ ctx: ctx, cancel: cancel, } // Store or get existing entry atomically - actual, _ := rca.locks.LoadOrStore(key, entry) + actual, loaded := rca.locks.LoadOrStore(key, newEntry) - // If another goroutine stored first, cancel our context and use theirs - if actual != entry { - cancel() - return actual.ctx.Done() + // If we successfully stored, return our context + if !loaded { + return ctx.Done() } - return ctx.Done() + // Another goroutine stored first, cancel our context + cancel() + + // Check if their context is still active (not cancelled/timed out) + select { + case <-actual.ctx.Done(): + // Context is done - try to atomically delete it and retry + // If CompareAndDelete fails, another goroutine already replaced it + rca.locks.CompareAndDelete(key, actual) + goto retry + default: + // Context is still active - use it + return actual.ctx.Done() + } } func (rca *CacheAside) Get( @@ -196,7 +208,9 @@ func (rca *CacheAside) trySetKeyFunc(ctx context.Context, ttl time.Duration, key toCtx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) defer cancel() // Best effort unlock - errors are non-fatal as lock will expire - _ = rca.unlock(toCtx, key, lockVal) + if err := rca.unlock(toCtx, key, lockVal); err != nil { + rca.logger.Error("failed to unlock key", "key", key, "error", err) + } } }() if val, err = fn(ctx, key); err == nil { @@ -482,7 +496,12 @@ func (rca *CacheAside) unlockMulti(ctx context.Context, lockVals map[string]stri go func() { defer wg.Done() // Best effort unlock - errors are non-fatal as locks will expire - _ = delKeyLua.ExecMulti(ctx, rca.client, stmts...) + resps := delKeyLua.ExecMulti(ctx, rca.client, stmts...) + for _, resp := range resps { + if err := resp.Error(); err != nil { + rca.logger.Error("failed to unlock key in batch", "error", err) + } + } }() } wg.Wait() From 4e13ff710cd69becd83223b81b359314cdd8aead Mon Sep 17 00:00:00 2001 From: David Bickford Date: Tue, 7 Oct 2025 11:19:57 -0400 Subject: [PATCH 6/8] added fixes to register, added linter and updated example --- .github/workflows/CI.yml | 7 +- .golangci.yml | 162 +++++++++++++++++++++++++ README.md | 50 ++++---- cacheaside.go | 233 ++++++++++++++++++++++++++++-------- cacheaside_test.go | 195 +++++++++++++++++++++++++++++- internal/cmdx/slot.go | 11 +- internal/cmdx/slot_test.go | 182 ++++++++++++++++++++++++++++ internal/mapsx/maps_test.go | 3 +- internal/syncx/map_test.go | 3 +- internal/syncx/wait_test.go | 3 +- 10 files changed, 765 insertions(+), 84 deletions(-) create mode 100644 .golangci.yml create mode 100644 internal/cmdx/slot_test.go diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 819d625..0c0a0bd 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -16,9 +16,14 @@ jobs: - name: Setup Go uses: actions/setup-go@v5 with: - go-version: '1.22.x' + go-version-file: 'go.mod' - name: Install dependencies run: go mod vendor + - name: Run golangci-lint + uses: golangci/golangci-lint-action@v4 + with: + version: latest + args: --timeout=5m - name: Test with Go run: go test -json > TestResults.json - name: Upload Go test results diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..a2df677 --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,162 @@ +# golangci-lint configuration +# Documentation: https://golangci-lint.run/usage/configuration/ + +run: + timeout: 5m + tests: true + modules-download-mode: readonly + +# Output configuration +output: + formats: + - format: colored-line-number + print-issued-lines: true + print-linter-name: true + sort-results: true + +linters: + disable-all: true + enable: + # Enabled by default + - errcheck # Checks for unchecked errors + - gosimple # Simplify code + - govet # Reports suspicious constructs + - ineffassign # Detects ineffectual assignments + - staticcheck # Staticcheck is a go vet on steroids + - unused # Checks for unused constants, variables, functions and types + + # Additional recommended linters + - gofmt # Checks whether code was gofmt-ed + - goimports # Check import statements are formatted according to goimport command + - misspell # Finds commonly misspelled English words + - revive # Fast, configurable, extensible, flexible, and beautiful linter for Go + - gocyclo # Computes cyclomatic complexities + - goconst # Finds repeated strings that could be replaced by a constant + - gosec # Inspects source code for security problems + - unconvert # Remove unnecessary type conversions + - unparam # Reports unused function parameters + - nakedret # Finds naked returns in functions greater than a specified length + - gocognit # Computes cognitive complexities + - godot # Check if comments end in a period + - whitespace # Detection of leading and trailing whitespace + - gci # Controls Go package import order and makes it deterministic + +linters-settings: + goimports: + # Use goimports as the formatter + local-prefixes: github.com/dcbickfo/redcache + + gofmt: + # Simplify code: gofmt with `-s` option + simplify: true + + gocyclo: + # Minimal cyclomatic complexity to report + min-complexity: 15 + + gocognit: + # Minimal cognitive complexity to report + min-complexity: 15 + + goconst: + # Minimal length of string constant + min-len: 3 + # Minimum occurrences to report + min-occurrences: 3 + + gosec: + # Exclude some checks + excludes: + - G104 # Audit errors not checked (we use errcheck for this) + + revive: + confidence: 0.8 + rules: + - name: blank-imports + - name: context-as-argument + - name: context-keys-type + - name: dot-imports + - name: error-return + - name: error-strings + - name: error-naming + - name: exported + - name: if-return + - name: increment-decrement + - name: var-naming + - name: var-declaration + - name: package-comments + - name: range + - name: receiver-naming + - name: time-naming + - name: unexported-return + - name: indent-error-flow + - name: errorf + - name: empty-block + - name: superfluous-else + - name: unused-parameter + - name: unreachable-code + - name: redefines-builtin-id + + nakedret: + # Make an issue if func has more lines of code than this setting and has naked return + max-func-lines: 30 + + unparam: + # Check exported functions + check-exported: false + + whitespace: + multi-if: false + multi-func: false + + gci: + # Section configuration to compare against + sections: + - standard # Standard section: captures all standard packages + - default # Default section: contains all imports that could not be matched to another section type + - prefix(github.com/dcbickfo/redcache) # Custom section: groups all imports with the specified Prefix + +issues: + # Excluding configuration per-path, per-linter, per-text and per-source + exclude-rules: + # Exclude some linters from running on tests files + - path: _test\.go + linters: + - gocyclo + - gocognit + - errcheck + - gosec + - unparam + - revive + - goconst + - godot + - whitespace + - gci + + # Exclude known issues in vendor + - path: vendor/ + linters: + - all + + # Ignore "new" parameter name shadowing built-in + - text: "redefines-builtin-id" + linters: + - revive + + # Ignore integer overflow in CRC16 - this is intentional and safe + - text: "G115.*integer overflow" + path: internal/cmdx/slot.go + linters: + - gosec + + # Maximum issues count per one linter + max-issues-per-linter: 50 + + # Maximum count of issues with the same text + max-same-issues: 3 + + # Show only new issues + new: false + + # Fix found issues (if it's supported by the linter) + fix: false diff --git a/README.md b/README.md index 184d916..b9a22b5 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ package main import ( "context" "database/sql" - "github.com/google/go-cmp/cmp/internal/value" + "log" "time" "github.com/redis/rueidis" @@ -20,15 +20,15 @@ import ( func main() { if err := run(); err != nil { log.Fatal(err) - } + } } func run() error { - var db sql.DB + var db *sql.DB // initialize db client, err := redcache.NewRedCacheAside( rueidis.ClientOption{ - InitAddress: addr, + InitAddress: []string{"127.0.0.1:6379"}, }, redcache.CacheAsideOption{ LockTTL: time.Second * 1, @@ -37,31 +37,33 @@ func run() error { if err != nil { return err } - + repo := Repository{ client: client, db: &db, } - - val, err := Repository.GetByID(context.Background(), "key") + + val, err := repo.GetByID(context.Background(), "key") if err != nil { - return err + return err } - - vals, err := Repository.GetByIDs(context.Background(), map[string]string{"key1": "val1", "key2": "val2"}) + + vals, err := repo.GetByIDs(context.Background(), []string{"key1", "key2"}) if err != nil { return err } + _, _ = val, vals + return nil } type Repository struct { - client redcache.CacheAside + client *redcache.CacheAside db *sql.DB } func (r Repository) GetByID(ctx context.Context, key string) (string, error) { val, err := r.client.Get(ctx, time.Minute, key, func(ctx context.Context, key string) (val string, err error) { - if err = db.QueryRowContext(ctx, "SELECT val FROM mytab WHERE id = ?", key).Scan(&val); err == sql.ErrNoRows { + if err = r.db.QueryRowContext(ctx, "SELECT val FROM mytab WHERE id = ?", key).Scan(&val); err == sql.ErrNoRows { val = "NULL" // cache null to avoid penetration. err = nil // clear err in case of sql.ErrNoRows. } @@ -72,33 +74,37 @@ func (r Repository) GetByID(ctx context.Context, key string) (string, error) { } else if val == "NULL" { val = "" err = sql.ErrNoRows - } - // ... + } + return val, err } -func (r Repository) GetByIDs(ctx context.Context, key []string) (map[string]string, error) { - val, err := r.client.GetMulti(ctx, time.Minute, key, func(ctx context.Context, key []string) (val map[string]string, err error) { - rows := db.QueryContext(ctx, "SELECT id, val FROM mytab WHERE id = ?", key) +func (r Repository) GetByIDs(ctx context.Context, keys []string) (map[string]string, error) { + val, err := r.client.GetMulti(ctx, time.Minute, keys, func(ctx context.Context, keys []string) (val map[string]string, err error) { + val = make(map[string]string) + rows, err := r.db.QueryContext(ctx, "SELECT id, val FROM mytab WHERE id IN (?)", keys) + if err != nil { + return nil, err + } defer rows.Close() for rows.Next() { var id, rowVal string if err = rows.Scan(&id, &rowVal); err != nil { - return + return nil, err } val[id] = rowVal } - if len(val) != len(key) { - for _, k := range key { + if len(val) != len(keys) { + for _, k := range keys { if _, ok := val[k]; !ok { val[k] = "NULL" // cache null to avoid penetration. } } } - return + return val, nil }) if err != nil { return nil, err - } + } // handle any NULL vals if desired // ... diff --git a/cacheaside.go b/cacheaside.go index ab5eebd..c09419c 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -1,8 +1,67 @@ +// Package redcache provides a cache-aside implementation for Redis with distributed locking. +// +// This library builds on the rueidis Redis client to provide: +// - Cache-aside pattern with automatic cache population +// - Distributed locking to prevent thundering herd +// - Client-side caching to reduce Redis round trips +// - Redis cluster support with slot-aware batching +// - Automatic cleanup of expired lock entries +// +// # Basic Usage +// +// client, err := redcache.NewRedCacheAside( +// rueidis.ClientOption{InitAddress: []string{"localhost:6379"}}, +// redcache.CacheAsideOption{LockTTL: 10 * time.Second}, +// ) +// if err != nil { +// return err +// } +// defer client.Client().Close() +// +// // Get a single value with automatic cache population +// value, err := client.Get(ctx, time.Minute, "user:123", func(ctx context.Context, key string) (string, error) { +// return fetchFromDatabase(ctx, key) +// }) +// +// // Get multiple values with batched cache population +// values, err := client.GetMulti(ctx, time.Minute, []string{"user:1", "user:2"}, func(ctx context.Context, keys []string) (map[string]string, error) { +// return fetchMultipleFromDatabase(ctx, keys) +// }) +// +// # Distributed Locking +// +// The library ensures that only one goroutine (across all instances of your application) +// executes the callback function for a given key at a time. Other goroutines will wait +// for the lock to be released and then return the cached value. +// +// Locks are implemented using Redis SET NX with a configurable TTL. Lock values use +// UUIDv7 for uniqueness and are prefixed (default: "__redcache:lock:") to avoid +// collisions with application data. +// +// # Context and Timeouts +// +// All operations respect context cancellation. The LockTTL option controls: +// - Maximum time a lock can be held before automatic expiration +// - Timeout for waiting on locks when handling invalidation messages +// - Context timeout for cleanup operations +// +// Use context deadlines to control overall operation timeout: +// +// ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +// defer cancel() +// value, err := client.Get(ctx, time.Minute, key, callback) +// +// # Client-Side Caching +// +// The library uses rueidis client-side caching with Redis invalidation messages. +// When a key is modified in Redis, invalidation messages automatically clear the +// local cache, ensuring consistency across distributed instances. package redcache import ( "context" "errors" + "fmt" "iter" "log/slog" "maps" @@ -11,12 +70,13 @@ import ( "sync" "time" - "github.com/dcbickfo/redcache/internal/cmdx" - "github.com/dcbickfo/redcache/internal/mapsx" - "github.com/dcbickfo/redcache/internal/syncx" "github.com/google/uuid" "github.com/redis/rueidis" "golang.org/x/sync/errgroup" + + "github.com/dcbickfo/redcache/internal/cmdx" + "github.com/dcbickfo/redcache/internal/mapsx" + "github.com/dcbickfo/redcache/internal/syncx" ) type lockEntry struct { @@ -24,15 +84,19 @@ type lockEntry struct { cancel context.CancelFunc } +// Logger defines the logging interface used by CacheAside. +// Implementations must be safe for concurrent use and should handle log levels internally. type Logger interface { Error(msg string, args ...any) + Debug(msg string, args ...any) } type CacheAside struct { - client rueidis.Client - locks syncx.Map[string, *lockEntry] - lockTTL time.Duration - logger Logger + client rueidis.Client + locks syncx.Map[string, *lockEntry] + lockTTL time.Duration + logger Logger + lockPrefix string } type CacheAsideOption struct { @@ -40,24 +104,45 @@ type CacheAsideOption struct { // on locks when handling lost Redis invalidation messages. Defaults to 10 seconds. LockTTL time.Duration ClientBuilder func(option rueidis.ClientOption) (rueidis.Client, error) - // Logger for logging non-fatal errors. Defaults to slog.Default(). + // Logger for logging errors and debug information. Defaults to slog.Default(). + // The logger should handle log levels internally (e.g., only log Debug if level is enabled). Logger Logger + // LockPrefix for distributed locks. Defaults to "__redcache:lock:". + // Choose a prefix unlikely to conflict with your data keys. + LockPrefix string } func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOption) (*CacheAside, error) { - var err error + // Validate client options + if len(clientOption.InitAddress) == 0 { + return nil, errors.New("at least one Redis address must be provided in InitAddress") + } + + // Validate and set defaults for cache aside options + if caOption.LockTTL < 0 { + return nil, errors.New("LockTTL must not be negative") + } + if caOption.LockTTL > 0 && caOption.LockTTL < 100*time.Millisecond { + return nil, errors.New("LockTTL should be at least 100ms to avoid excessive lock churn") + } if caOption.LockTTL == 0 { caOption.LockTTL = 10 * time.Second } if caOption.Logger == nil { caOption.Logger = slog.Default() } + if caOption.LockPrefix == "" { + caOption.LockPrefix = "__redcache:lock:" + } rca := &CacheAside{ - lockTTL: caOption.LockTTL, - logger: caOption.Logger, + lockTTL: caOption.LockTTL, + logger: caOption.Logger, + lockPrefix: caOption.LockPrefix, } clientOption.OnInvalidations = rca.onInvalidate + + var err error if caOption.ClientBuilder != nil { rca.client, err = caOption.ClientBuilder(clientOption) } else { @@ -69,13 +154,20 @@ func NewRedCacheAside(clientOption rueidis.ClientOption, caOption CacheAsideOpti return rca, nil } +// Client returns the underlying rueidis.Client for advanced operations. +// Most users should not need direct client access. Use with caution as +// direct operations bypass the cache-aside pattern and distributed locking. func (rca *CacheAside) Client() rueidis.Client { return rca.client } func (rca *CacheAside) onInvalidate(messages []rueidis.RedisMessage) { for _, m := range messages { - key, _ := m.ToString() + key, err := m.ToString() + if err != nil { + rca.logger.Error("failed to parse invalidation message", "error", err) + continue + } entry, loaded := rca.locks.LoadAndDelete(key) if loaded { entry.cancel() // Cancel context, which closes the channel @@ -83,8 +175,6 @@ func (rca *CacheAside) onInvalidate(messages []rueidis.RedisMessage) { } } -const prefix = "redcache:" - var ( delKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("DEL",KEYS[1]) else return 0 end`) setKeyLua = rueidis.NewLuaScript(`if redis.call("GET",KEYS[1]) == ARGV[1] then return redis.call("SET",KEYS[1],ARGV[2],"PX",ARGV[3]) else return 0 end`) @@ -103,21 +193,35 @@ retry: // Store or get existing entry atomically actual, loaded := rca.locks.LoadOrStore(key, newEntry) - // If we successfully stored, return our context + // If we successfully stored, schedule automatic cleanup on expiration if !loaded { + // Use context.AfterFunc to clean up expired entry without blocking goroutine + context.AfterFunc(ctx, func() { + rca.locks.CompareAndDelete(key, newEntry) + }) return ctx.Done() } - // Another goroutine stored first, cancel our context + // Another goroutine stored first, cancel our context to prevent leak cancel() // Check if their context is still active (not cancelled/timed out) select { case <-actual.ctx.Done(): // Context is done - try to atomically delete it and retry - // If CompareAndDelete fails, another goroutine already replaced it - rca.locks.CompareAndDelete(key, actual) - goto retry + if rca.locks.CompareAndDelete(key, actual) { + // We successfully deleted the expired entry, retry + goto retry + } + // CompareAndDelete failed - another goroutine modified it + // Load the new entry and use it + newEntry, loaded := rca.locks.Load(key) + if !loaded { + // Entry was deleted by another goroutine, retry registration + goto retry + } + // Use the new entry's context + return newEntry.ctx.Done() default: // Context is still active - use it return actual.ctx.Done() @@ -182,18 +286,30 @@ func (rca *CacheAside) DelMulti(ctx context.Context, keys ...string) error { return nil } -var errNotFound = errors.New("not found") -var errLockFailed = errors.New("lock failed") +var ( + errNotFound = errors.New("not found") + errLockFailed = errors.New("lock failed") +) + +// ErrLockLost indicates the distributed lock was lost or expired before the value could be set. +// This can occur if the lock TTL expires during callback execution or if Redis invalidates the lock. +var ErrLockLost = errors.New("lock was lost or expired before value could be set") func (rca *CacheAside) tryGet(ctx context.Context, ttl time.Duration, key string) (string, error) { resp := rca.client.DoCache(ctx, rca.client.B().Get().Key(key).Cache(), ttl) val, err := resp.ToString() - if rueidis.IsRedisNil(err) || strings.HasPrefix(val, prefix) { // no response or is a lock value + if rueidis.IsRedisNil(err) || strings.HasPrefix(val, rca.lockPrefix) { // no response or is a lock value + if rueidis.IsRedisNil(err) { + rca.logger.Debug("cache miss - key not found", "key", key) + } else { + rca.logger.Debug("cache miss - lock value found", "key", key) + } return "", errNotFound } if err != nil { return "", err } + rca.logger.Debug("cache hit", "key", key) return val, nil } @@ -205,7 +321,9 @@ func (rca *CacheAside) trySetKeyFunc(ctx context.Context, ttl time.Duration, key } defer func() { if !setVal { - toCtx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) + // Use context.WithoutCancel to preserve tracing/request context while allowing cleanup + cleanupCtx := context.WithoutCancel(ctx) + toCtx, cancel := context.WithTimeout(cleanupCtx, rca.lockTTL) defer cancel() // Best effort unlock - errors are non-fatal as lock will expire if err := rca.unlock(toCtx, key, lockVal); err != nil { @@ -226,27 +344,28 @@ func (rca *CacheAside) trySetKeyFunc(ctx context.Context, ttl time.Duration, key func (rca *CacheAside) tryLock(ctx context.Context, key string) (string, error) { uuidv7, err := uuid.NewV7() if err != nil { - return "", err + return "", fmt.Errorf("failed to generate lock UUID for key %q: %w", key, err) } - lockVal := prefix + uuidv7.String() + lockVal := rca.lockPrefix + uuidv7.String() err = rca.client.Do(ctx, rca.client.B().Set().Key(key).Value(lockVal).Nx().Get().Px(rca.lockTTL).Build()).Error() if !rueidis.IsRedisNil(err) { - return "", errLockFailed + rca.logger.Debug("lock contention - failed to acquire lock", "key", key) + return "", fmt.Errorf("failed to acquire lock for key %q: %w", key, errLockFailed) } + rca.logger.Debug("lock acquired", "key", key, "lockVal", lockVal) return lockVal, nil } func (rca *CacheAside) setWithLock(ctx context.Context, ttl time.Duration, key string, valLock valAndLock) (string, error) { - err := setKeyLua.Exec(ctx, rca.client, []string{key}, []string{valLock.lockVal, valLock.val, strconv.FormatInt(ttl.Milliseconds(), 10)}).Error() - if err != nil { if !rueidis.IsRedisNil(err) { - return "", err + return "", fmt.Errorf("failed to set value for key %q: %w", key, err) } - return "", errors.New("set failed") + rca.logger.Debug("lock lost during set operation", "key", key) + return "", fmt.Errorf("lock lost for key %q: %w", key, ErrLockLost) } - + rca.logger.Debug("value set successfully", "key", key) return valLock.val, nil } @@ -260,7 +379,6 @@ func (rca *CacheAside) GetMulti( keys []string, fn func(ctx context.Context, key []string) (val map[string]string, err error), ) (map[string]string, error) { - res := make(map[string]string, len(keys)) waitLock := make(map[string]<-chan struct{}, len(keys)) @@ -329,9 +447,9 @@ func (rca *CacheAside) tryGetMulti(ctx context.Context, ttl time.Duration, keys if err != nil && rueidis.IsRedisNil(err) { continue } else if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get key %q: %w", keys[i], err) } - if !strings.HasPrefix(val, prefix) { + if !strings.HasPrefix(val, rca.lockPrefix) { res[keys[i]] = val continue } @@ -345,7 +463,6 @@ func (rca *CacheAside) trySetMultiKeyFn( keys []string, fn func(ctx context.Context, key []string) (val map[string]string, err error), ) (map[string]string, error) { - res := make(map[string]string) lockVals, err := rca.tryLockMulti(ctx, keys) @@ -361,7 +478,9 @@ func (rca *CacheAside) trySetMultiKeyFn( } } if len(toUnlock) > 0 { - toCtx, cancel := context.WithTimeout(context.Background(), rca.lockTTL) + // Use context.WithoutCancel to preserve tracing/request context while allowing cleanup + cleanupCtx := context.WithoutCancel(ctx) + toCtx, cancel := context.WithTimeout(cleanupCtx, rca.lockTTL) defer cancel() rca.unlockMulti(toCtx, toUnlock) } @@ -393,7 +512,6 @@ func (rca *CacheAside) trySetMultiKeyFn( } return res, err - } func (rca *CacheAside) tryLockMulti(ctx context.Context, keys []string) (map[string]string, error) { @@ -404,13 +522,16 @@ func (rca *CacheAside) tryLockMulti(ctx context.Context, keys []string) (map[str if err != nil { return nil, err } - lockVals[k] = prefix + uuidv7.String() + lockVals[k] = rca.lockPrefix + uuidv7.String() cmds = append(cmds, rca.client.B().Set().Key(k).Value(lockVals[k]).Nx().Get().Px(rca.lockTTL).Build()) } resps := rca.client.DoMulti(ctx, cmds...) for i, r := range resps { err := r.Error() if !rueidis.IsRedisNil(err) { + if err != nil { + rca.logger.Error("failed to acquire lock", "key", keys[i], "error", err) + } delete(lockVals, keys[i]) } } @@ -422,23 +543,18 @@ type valAndLock struct { lockVal string } -func (rca *CacheAside) setMultiWithLock(ctx context.Context, ttl time.Duration, keyValLock map[string]valAndLock) ([]string, error) { - type keyOrderAndSet struct { - keyOrder []string - setStmts []rueidis.LuaExec - } +type keyOrderAndSet struct { + keyOrder []string + setStmts []rueidis.LuaExec +} +// groupBySlot groups keys by their Redis cluster slot for efficient batching. +func groupBySlot(keyValLock map[string]valAndLock, ttl time.Duration) map[uint16]keyOrderAndSet { stmts := make(map[uint16]keyOrderAndSet) for k, vl := range keyValLock { slot := cmdx.Slot(k) - kos, ok := stmts[slot] - if !ok { - kos = keyOrderAndSet{ - keyOrder: make([]string, 0), - setStmts: make([]rueidis.LuaExec, 0), - } - } + kos := stmts[slot] kos.keyOrder = append(kos.keyOrder, k) kos.setStmts = append(kos.setStmts, rueidis.LuaExec{ Keys: []string{k}, @@ -447,10 +563,15 @@ func (rca *CacheAside) setMultiWithLock(ctx context.Context, ttl time.Duration, stmts[slot] = kos } - out := make([]string, 0) + return stmts +} + +// executeSetStatements executes Lua set statements in parallel, grouped by slot. +func (rca *CacheAside) executeSetStatements(ctx context.Context, stmts map[uint16]keyOrderAndSet) ([]string, error) { keyByStmt := make([][]string, len(stmts)) i := 0 eg, ctx := errgroup.WithContext(ctx) + for _, kos := range stmts { ii := i eg.Go(func() error { @@ -467,17 +588,25 @@ func (rca *CacheAside) setMultiWithLock(ctx context.Context, ttl time.Duration, } return nil }) - i += 1 + i++ } + if err := eg.Wait(); err != nil { return nil, err } + + out := make([]string, 0) for _, keys := range keyByStmt { out = append(out, keys...) } return out, nil } +func (rca *CacheAside) setMultiWithLock(ctx context.Context, ttl time.Duration, keyValLock map[string]valAndLock) ([]string, error) { + stmts := groupBySlot(keyValLock, ttl) + return rca.executeSetStatements(ctx, stmts) +} + func (rca *CacheAside) unlockMulti(ctx context.Context, lockVals map[string]string) { if len(lockVals) == 0 { return diff --git a/cacheaside_test.go b/cacheaside_test.go index cf04b56..e131849 100644 --- a/cacheaside_test.go +++ b/cacheaside_test.go @@ -11,12 +11,13 @@ import ( "github.com/dcbickfo/redcache" - "github.com/dcbickfo/redcache/internal/mapsx" "github.com/google/go-cmp/cmp" "github.com/google/uuid" "github.com/redis/rueidis" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/dcbickfo/redcache/internal/mapsx" ) var addr = []string{"127.0.0.1:6379"} @@ -191,7 +192,7 @@ func TestCacheAside_GetMulti_PartLock(t *testing.T) { } innerClient := client.Client() - lockVal := "redcache:" + uuid.New().String() + lockVal := "__redcache:lock:" + uuid.New().String() err := innerClient.Do(ctx, innerClient.B().Set().Key(keys[0]).Value(lockVal).Nx().Get().Px(time.Millisecond*100).Build()).Error() require.True(t, rueidis.IsRedisNil(err)) @@ -632,7 +633,7 @@ func TestCacheAside_GetParentContextCancellation(t *testing.T) { // Set a lock on the key so Get will wait innerClient := client.Client() - lockVal := "redcache:" + uuid.New().String() + lockVal := "__redcache:lock:" + uuid.New().String() err := innerClient.Do(context.Background(), innerClient.B().Set().Key(key).Value(lockVal).Nx().Get().Px(time.Second*30).Build()).Error() require.True(t, rueidis.IsRedisNil(err)) @@ -651,3 +652,191 @@ func TestCacheAside_GetParentContextCancellation(t *testing.T) { require.Error(t, err) require.ErrorIs(t, err, context.Canceled) } + +// TestConcurrentRegisterRace tests the register() method under high contention +// to ensure the CompareAndDelete race condition fix works correctly +func TestConcurrentRegisterRace(t *testing.T) { + // Use minimum allowed lock TTL to force lock expirations during concurrent access + client, err := redcache.NewRedCacheAside( + rueidis.ClientOption{ + InitAddress: addr, + }, + redcache.CacheAsideOption{ + LockTTL: 100 * time.Millisecond, + }, + ) + require.NoError(t, err) + defer client.Client().Close() + + ctx := context.Background() + key := "key:" + uuid.New().String() + val := "val:" + uuid.New().String() + + callCount := 0 + var mu sync.Mutex + cb := func(ctx context.Context, key string) (string, error) { + mu.Lock() + callCount++ + mu.Unlock() + // Very short sleep to keep test fast while still triggering some lock expirations + time.Sleep(5 * time.Millisecond) + return val, nil + } + + // Run concurrent goroutines to stress test the register race condition fix + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(4) + go func() { + defer wg.Done() + res, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + assert.Equal(t, val, res) + }() + go func() { + defer wg.Done() + res, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + assert.Equal(t, val, res) + }() + go func() { + defer wg.Done() + res, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + assert.Equal(t, val, res) + }() + go func() { + defer wg.Done() + res, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + assert.Equal(t, val, res) + }() + } + wg.Wait() + + // The callback should be called, but we might get multiple calls due to lock expiration + mu.Lock() + defer mu.Unlock() + assert.Greater(t, callCount, 0, "callback should be called at least once") +} + +// TestConcurrentGetSameKeySingleClient tests that multiple goroutines getting +// the same key from a single client instance only triggers one callback when locks don't expire +func TestConcurrentGetSameKeySingleClient(t *testing.T) { + client := makeClient(t, addr) + defer client.Client().Close() + + ctx := context.Background() + key := "key:" + uuid.New().String() + val := "val:" + uuid.New().String() + + callCount := 0 + var mu sync.Mutex + + cb := func(ctx context.Context, key string) (string, error) { + mu.Lock() + callCount++ + mu.Unlock() + return val, nil + } + + // Run multiple iterations with concurrent goroutines, matching existing test pattern + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(4) + go func() { + defer wg.Done() + res, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + assert.Equal(t, val, res) + }() + go func() { + defer wg.Done() + res, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + assert.Equal(t, val, res) + }() + go func() { + defer wg.Done() + res, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + assert.Equal(t, val, res) + }() + go func() { + defer wg.Done() + res, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + assert.Equal(t, val, res) + }() + } + + wg.Wait() + + // Callback should only be called once due to distributed locking + mu.Lock() + defer mu.Unlock() + assert.Equal(t, 1, callCount, "callback should only be called once") +} + +// TestConcurrentInvalidation tests that cache invalidation works correctly +// when multiple goroutines are accessing the same keys +func TestConcurrentInvalidation(t *testing.T) { + client := makeClient(t, addr) + defer client.Client().Close() + + ctx := context.Background() + key := "key:" + uuid.New().String() + + callCount := 0 + var mu sync.Mutex + cb := func(ctx context.Context, key string) (string, error) { + mu.Lock() + callCount++ + mu.Unlock() + return "value", nil + } + + // Populate cache + _, err := client.Get(ctx, time.Second*10, key, cb) + require.NoError(t, err) + + mu.Lock() + initialCount := callCount + mu.Unlock() + + // Delete the key + err = client.Del(ctx, key) + require.NoError(t, err) + + // Run multiple iterations with concurrent reads after deletion, matching existing test pattern + wg := sync.WaitGroup{} + for i := 0; i < 100; i++ { + wg.Add(4) + go func() { + defer wg.Done() + _, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + }() + go func() { + defer wg.Done() + _, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + }() + go func() { + defer wg.Done() + _, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + }() + go func() { + defer wg.Done() + _, err := client.Get(ctx, time.Second*10, key, cb) + assert.NoError(t, err) + }() + } + wg.Wait() + + // Callback should have been invoked at least once more due to invalidation + mu.Lock() + defer mu.Unlock() + assert.Greater(t, callCount, initialCount, "callbacks should be invoked after invalidation") +} diff --git a/internal/cmdx/slot.go b/internal/cmdx/slot.go index 1351291..3bc324c 100644 --- a/internal/cmdx/slot.go +++ b/internal/cmdx/slot.go @@ -2,6 +2,11 @@ package cmdx // https://redis.io/topics/cluster-spec +const ( + // RedisClusterSlots is the number of hash slots in a Redis cluster (16384 slots, 0-16383). + RedisClusterSlots = 16383 +) + func Slot(key string) uint16 { var s, e int for ; s < len(key); s++ { @@ -10,7 +15,7 @@ func Slot(key string) uint16 { } } if s == len(key) { - return crc16(key) & 16383 + return crc16(key) & RedisClusterSlots } for e = s + 1; e < len(key); e++ { if key[e] == '}' { @@ -18,9 +23,9 @@ func Slot(key string) uint16 { } } if e == len(key) || e == s+1 { - return crc16(key) & 16383 + return crc16(key) & RedisClusterSlots } - return crc16(key[s+1:e]) & 16383 + return crc16(key[s+1:e]) & RedisClusterSlots } /* diff --git a/internal/cmdx/slot_test.go b/internal/cmdx/slot_test.go new file mode 100644 index 0000000..54ca2b0 --- /dev/null +++ b/internal/cmdx/slot_test.go @@ -0,0 +1,182 @@ +package cmdx_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dcbickfo/redcache/internal/cmdx" +) + +func TestSlot(t *testing.T) { + tests := []struct { + name string + key string + expected uint16 + }{ + // Basic keys - verified against Redis cluster spec + { + name: "simple key", + key: "key", + expected: 12539, + }, + { + name: "numeric key", + key: "123", + expected: 5970, + }, + { + name: "empty key", + key: "", + expected: 0, + }, + // Hash tags - only the content between { and } is hashed + { + name: "hash tag simple", + key: "{user:1000}:profile", + expected: cmdx.Slot("user:1000"), + }, + { + name: "hash tag at start", + key: "{tag}key", + expected: cmdx.Slot("tag"), + }, + { + name: "hash tag at end", + key: "key{tag}", + expected: cmdx.Slot("tag"), + }, + { + name: "hash tag in middle", + key: "prefix{tag}suffix", + expected: cmdx.Slot("tag"), + }, + // Edge cases with braces + { + name: "empty hash tag", + key: "key{}value", + expected: cmdx.Slot("key{}value"), // Empty tags are ignored + }, + { + name: "no closing brace", + key: "key{value", + expected: cmdx.Slot("key{value"), // No closing brace, whole key hashed + }, + { + name: "only opening brace", + key: "{key", + expected: cmdx.Slot("{key"), + }, + { + name: "only closing brace", + key: "key}", + expected: cmdx.Slot("key}"), + }, + { + name: "multiple hash tags - first wins", + key: "{tag1}{tag2}", + expected: cmdx.Slot("tag1"), + }, + { + name: "nested braces", + key: "{{nested}}", + expected: cmdx.Slot("{nested"), // First { to first } + }, + // Common patterns - these should be deterministic + { + name: "user pattern", + key: "user:1000", + expected: 1649, // Verified against Redis CLUSTER KEYSLOT + }, + { + name: "session pattern", + key: "session:abc123", + expected: 11692, // Verified against Redis CLUSTER KEYSLOT + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := cmdx.Slot(tt.key) + assert.Equalf(t, tt.expected, result, "Slot(%q) = %d, want %d", tt.key, result, tt.expected) + }) + } +} + +func TestSlot_Consistency(t *testing.T) { + // Test that the same key always produces the same slot + key := "test:key:123" + slot1 := cmdx.Slot(key) + slot2 := cmdx.Slot(key) + assert.Equal(t, slot1, slot2, "Slot function should be deterministic") +} + +func TestSlot_Distribution(t *testing.T) { + // Test that slots are distributed across the valid range + keys := []string{ + "key1", "key2", "key3", "key4", "key5", + "user:1", "user:2", "user:3", "user:4", "user:5", + "session:a", "session:b", "session:c", "session:d", "session:e", + } + + slots := make(map[uint16]bool) + for _, key := range keys { + slot := cmdx.Slot(key) + assert.LessOrEqualf(t, slot, uint16(16383), "Slot for key %q should be <= 16383", key) + slots[slot] = true + } + + // With 15 different keys, we should have some distribution (not all the same slot) + assert.Greater(t, len(slots), 1, "Keys should distribute across multiple slots") +} + +func TestSlot_HashTagCollision(t *testing.T) { + // Keys with the same hash tag should go to the same slot + keys := []string{ + "{user:1000}:profile", + "{user:1000}:settings", + "{user:1000}:preferences", + } + + expectedSlot := cmdx.Slot("user:1000") + for _, key := range keys { + slot := cmdx.Slot(key) + assert.Equalf(t, expectedSlot, slot, "Key %q with hash tag should map to slot %d", key, expectedSlot) + } +} + +func TestSlot_BoundaryValues(t *testing.T) { + tests := []struct { + name string + key string + }{ + {"single char", "a"}, + {"special chars", "!@#$%^&*()"}, + {"unicode", "你好世界"}, + {"long key", string(make([]byte, 1000))}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + slot := cmdx.Slot(tt.key) + assert.LessOrEqualf(t, slot, uint16(16383), "Slot should be within valid range") + }) + } +} + +func BenchmarkSlot(b *testing.B) { + keys := []string{ + "simple", + "user:1000", + "{tag}key", + "prefix{tag}suffix", + } + + for _, key := range keys { + b.Run(key, func(b *testing.B) { + for i := 0; i < b.N; i++ { + _ = cmdx.Slot(key) + } + }) + } +} diff --git a/internal/mapsx/maps_test.go b/internal/mapsx/maps_test.go index fe79bc0..ae40851 100644 --- a/internal/mapsx/maps_test.go +++ b/internal/mapsx/maps_test.go @@ -3,8 +3,9 @@ package mapsx_test import ( "testing" - "github.com/dcbickfo/redcache/internal/mapsx" "github.com/stretchr/testify/assert" + + "github.com/dcbickfo/redcache/internal/mapsx" ) func TestKeys(t *testing.T) { diff --git a/internal/syncx/map_test.go b/internal/syncx/map_test.go index dfa86d4..dc71d25 100644 --- a/internal/syncx/map_test.go +++ b/internal/syncx/map_test.go @@ -3,8 +3,9 @@ package syncx_test import ( "testing" - "github.com/dcbickfo/redcache/internal/syncx" "github.com/stretchr/testify/assert" + + "github.com/dcbickfo/redcache/internal/syncx" ) func TestMap_CompareAndDelete(t *testing.T) { diff --git a/internal/syncx/wait_test.go b/internal/syncx/wait_test.go index 56d8bc6..89da67a 100644 --- a/internal/syncx/wait_test.go +++ b/internal/syncx/wait_test.go @@ -6,8 +6,9 @@ import ( "testing" "time" - "github.com/dcbickfo/redcache/internal/syncx" "github.com/stretchr/testify/assert" + + "github.com/dcbickfo/redcache/internal/syncx" ) func delayedSend[T any](ch chan T, val T, delay time.Duration) { From f16bff3d9a4095f1e6603a3dc5c9e9447a079773 Mon Sep 17 00:00:00 2001 From: David Bickford Date: Tue, 7 Oct 2025 11:47:23 -0400 Subject: [PATCH 7/8] Update internal/cmdx/slot.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/cmdx/slot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmdx/slot.go b/internal/cmdx/slot.go index 3bc324c..2c5ee85 100644 --- a/internal/cmdx/slot.go +++ b/internal/cmdx/slot.go @@ -3,7 +3,7 @@ package cmdx // https://redis.io/topics/cluster-spec const ( - // RedisClusterSlots is the number of hash slots in a Redis cluster (16384 slots, 0-16383). + // RedisClusterSlots is the maximum slot number in a Redis cluster (16384 total slots, numbered 0-16383). RedisClusterSlots = 16383 ) From c5989bbc4f8211144907e3038f28d67356296a3a Mon Sep 17 00:00:00 2001 From: David Bickford Date: Tue, 7 Oct 2025 11:47:44 -0400 Subject: [PATCH 8/8] Update cacheaside.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- cacheaside.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cacheaside.go b/cacheaside.go index c09419c..536a2ad 100644 --- a/cacheaside.go +++ b/cacheaside.go @@ -87,7 +87,11 @@ type lockEntry struct { // Logger defines the logging interface used by CacheAside. // Implementations must be safe for concurrent use and should handle log levels internally. type Logger interface { + // Error logs error messages. Should be used for unexpected failures or critical issues. Error(msg string, args ...any) + // Debug logs detailed diagnostic information useful for development and troubleshooting. + // Call Debug to record verbose output about internal state, cache operations, or lock handling. + // Debug messages should not include sensitive information and may be omitted in production. Debug(msg string, args ...any) }