Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [ENHANCEMENT] discard ooo samples in some special cases. #7226
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
Expand Down
241 changes: 146 additions & 95 deletions pkg/cortexpb/cortex.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ message WriteRequest {

bool skip_label_name_validation = 1000; //set intentionally high to keep WriteRequest compatible with upstream Prometheus
MessageWithBufRef Ref = 1001 [(gogoproto.embed) = true, (gogoproto.customtype) = "MessageWithBufRef", (gogoproto.nullable) = false];
// When true, indicates that out-of-order samples should be discarded even if OOO is enabled.
bool discard_out_of_order = 1002;
}

// refer to https://github.com/prometheus/prometheus/blob/v3.5.0/prompb/io/prometheus/write/v2/types.proto
Expand Down
7 changes: 7 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,13 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

// Walk the samples, appending them to the users database
app := db.Appender(ctx).(extendedAppender)

// Even when OOO is enabled globally, we want to reject OOO samples in some cases.
// prometheus implementation: https://github.com/prometheus/prometheus/pull/14710
if req.DiscardOutOfOrder {
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
}

var newSeries []labels.Labels

for _, ts := range req.Timeseries {
Expand Down
205 changes: 205 additions & 0 deletions pkg/ingester/ingester_ooo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package ingester

import (
"context"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
)

// mockAppender implements the extendedAppender interface for testing
type mockAppender struct {
storage.Appender
lastOptions *storage.AppendOptions
}

func (m *mockAppender) SetOptions(opts *storage.AppendOptions) {
m.lastOptions = opts
}

func TestIngester_Push_DiscardOutOfOrder_True(t *testing.T) {
req := &cortexpb.WriteRequest{
Source: cortexpb.RULE,
DiscardOutOfOrder: true,
Timeseries: []cortexpb.PreallocTimeseries{},
}

assert.True(t, req.DiscardOutOfOrder, "DiscardOutOfOrder should be true")
assert.True(t, req.GetDiscardOutOfOrder(), "GetDiscardOutOfOrder should return true")
}

func TestIngester_Push_DiscardOutOfOrder_Default(t *testing.T) {
// Create a WriteRequest without setting DiscardOutOfOrder
req := &cortexpb.WriteRequest{
Source: cortexpb.API,
Timeseries: []cortexpb.PreallocTimeseries{},
}

// Verify the default value is false
assert.False(t, req.DiscardOutOfOrder, "DiscardOutOfOrder should default to false")
assert.False(t, req.GetDiscardOutOfOrder(), "GetDiscardOutOfOrder should return false by default")
}

func TestIngester_WriteRequest_MultipleScenarios(t *testing.T) {
scenarios := []struct {
name string
setupReq func() *cortexpb.WriteRequest
expectOpts bool
description string
}{
{
name: "Stale marker during rule migration",
setupReq: func() *cortexpb.WriteRequest {
return &cortexpb.WriteRequest{
Source: cortexpb.RULE,
DiscardOutOfOrder: true,
}
},
expectOpts: true,
description: "Should set appender options to discard OOO",
},
{
name: "Normal rule evaluation",
setupReq: func() *cortexpb.WriteRequest {
return &cortexpb.WriteRequest{
Source: cortexpb.RULE,
DiscardOutOfOrder: false,
}
},
expectOpts: false,
description: "Should not set appender options",
},
{
name: "API write request",
setupReq: func() *cortexpb.WriteRequest {
return &cortexpb.WriteRequest{
Source: cortexpb.API,
DiscardOutOfOrder: false,
}
},
expectOpts: false,
description: "API requests should never trigger OOO discard",
},
{
name: "Default values",
setupReq: func() *cortexpb.WriteRequest {
return &cortexpb.WriteRequest{}
},
expectOpts: false,
description: "Default values should not trigger OOO discard",
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
req := scenario.setupReq()
mock := &mockAppender{}

// Simulate the ingester logic
if req.DiscardOutOfOrder {
mock.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
}

// Verify expectations
if scenario.expectOpts {
require.NotNil(t, mock.lastOptions)
assert.True(t, mock.lastOptions.DiscardOutOfOrder)
}
})
}
}

func TestIngester_DiscardOutOfOrderFlagIngegrationTest(t *testing.T) {
registry := prometheus.NewRegistry()
cfg := defaultIngesterTestConfig(t)
cfg.LifecyclerConfig.JoinAfter = 0

limits := defaultLimitsTestConfig()
limits.EnableNativeHistograms = true
limits.OutOfOrderTimeWindow = model.Duration(60 * time.Minute)

i, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, nil, "", registry)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

// Wait until it's ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() any {
return i.lifecycler.GetState()
})

ctx := user.InjectOrgID(context.Background(), "test-user")

// Create labels for our test metric
metricLabels := labels.FromStrings("__name__", "test_metric", "job", "test")

currentTime := time.Now().UnixMilli()
olderTime := currentTime - 60000 // 1 minute earlier (within OOO window)

// First, push a sample with current timestamp with discardOutOfOrder=true
req1 := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 100, TimestampMs: currentTime}},
nil, nil, cortexpb.RULE)
req1.DiscardOutOfOrder = true

_, err = i.Push(ctx, req1)
require.NoError(t, err, "First sample push should succeed")

// Now try to push a sample with older timestamp with discardOutOfOrder=true
// This should be discarded because DiscardOutOfOrder is true
req2 := cortexpb.ToWriteRequest(
[]labels.Labels{metricLabels},
[]cortexpb.Sample{{Value: 50, TimestampMs: olderTime}},
nil, nil, cortexpb.RULE)
req2.DiscardOutOfOrder = true

_, _ = i.Push(ctx, req2)

// Query back the data to ensure only the first (current time) sample was stored
s := &mockQueryStreamServer{ctx: ctx}
err = i.QueryStream(&client.QueryRequest{
StartTimestampMs: olderTime - 1000,
EndTimestampMs: currentTime + 1000,
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: "__name__", Value: "test_metric"},
},
}, s)
require.NoError(t, err)

// Verify we only have one series with one sample (the current time sample)
require.Len(t, s.series, 1, "Should have exactly one series")

// Convert chunks to samples to verify content
series := s.series[0]
require.Len(t, series.Chunks, 1, "Should have exactly one chunk")

chunk := series.Chunks[0]
chunkData, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Data)
require.NoError(t, err)

iter := chunkData.Iterator(nil)
sampleCount := 0
for iter.Next() != chunkenc.ValNone {
ts, val := iter.At()
require.Equal(t, currentTime, ts, "Sample timestamp should match current time")
require.Equal(t, 100.0, val, "Sample value should match first push")
sampleCount++
}
require.NoError(t, iter.Err())
require.Equal(t, 1, sampleCount, "Should have exactly one sample stored")
}
11 changes: 10 additions & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type PusherAppender struct {
histogramLabels []labels.Labels
histograms []cortexpb.Histogram
userID string
opts *storage.AppendOptions
}

func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
Expand All @@ -73,7 +74,9 @@ func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v
return 0, nil
}

func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {}
func (a *PusherAppender) SetOptions(opts *storage.AppendOptions) {
a.opts = opts
}

func (a *PusherAppender) AppendHistogramCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
// AppendHistogramCTZeroSample is a no-op for PusherAppender as it happens during scrape time only.
Expand All @@ -93,6 +96,12 @@ func (a *PusherAppender) Commit() error {
a.totalWrites.Inc()

req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE)

// Set DiscardOutOfOrder flag if requested via AppendOptions
if a.opts != nil && a.opts.DiscardOutOfOrder {
req.DiscardOutOfOrder = true
}

req.AddHistogramTimeSeries(a.histogramLabels, a.histograms)
// Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push.
// We shouldn't call client.ReuseSlice here.
Expand Down
93 changes: 93 additions & 0 deletions pkg/ruler/compat_ooo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package ruler

import (
"context"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/cortexpb"
)

type mockPusher struct {
lastRequest *cortexpb.WriteRequest
pushError error
}

func (m *mockPusher) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) {
m.lastRequest = req
return &cortexpb.WriteResponse{}, m.pushError
}

func TestPusherAppender_Commit_WithDiscardOutOfOrder(t *testing.T) {
mock := &mockPusher{}
counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"})

appender := &PusherAppender{
ctx: context.Background(),
pusher: mock,
userID: "test-user",
totalWrites: counter,
failedWrites: counter,
labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")},
samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}},
}

appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})

err := appender.Commit()
require.NoError(t, err)

// Verify that DiscardOutOfOrder was set in the WriteRequest
require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent")
assert.True(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be true in WriteRequest")
}

func TestPusherAppender_Commit_WithoutDiscardOutOfOrder(t *testing.T) {
mock := &mockPusher{}
counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"})

appender := &PusherAppender{
ctx: context.Background(),
pusher: mock,
userID: "test-user",
totalWrites: counter,
failedWrites: counter,
labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")},
samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}},
}

appender.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: false})

err := appender.Commit()
require.NoError(t, err)

require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent")
assert.False(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be false in WriteRequest")
}

func TestPusherAppender_Commit_WithNilOptions(t *testing.T) {
mock := &mockPusher{}
counter := prometheus.NewCounter(prometheus.CounterOpts{Name: "test"})

appender := &PusherAppender{
ctx: context.Background(),
pusher: mock,
userID: "test-user",
totalWrites: counter,
failedWrites: counter,
labels: []labels.Labels{labels.FromStrings("__name__", "test_metric")},
samples: []cortexpb.Sample{{TimestampMs: 1000, Value: 1.0}},
opts: nil, // Explicitly nil
}

err := appender.Commit()
require.NoError(t, err)

require.NotNil(t, mock.lastRequest, "WriteRequest should have been sent")
assert.False(t, mock.lastRequest.DiscardOutOfOrder, "DiscardOutOfOrder should be false when opts is nil")
}
Loading