mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-06 19:02:18 +08:00
issue: #41544 - unify the log field of message - use the minimum one of flusher and recovery storage checkpoint as the truncate position Signed-off-by: chyezh <chyezh@outlook.com>
41 lines
1.2 KiB
Go
41 lines
1.2 KiB
Go
package recovery
|
|
|
|
import (
|
|
"math/rand"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
"github.com/milvus-io/milvus/pkg/v2/mocks/streaming/mock_walimpls"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/util/types"
|
|
"github.com/milvus-io/milvus/pkg/v2/streaming/walimpls/impls/rmq"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/tsoutil"
|
|
)
|
|
|
|
func TestTruncator(t *testing.T) {
|
|
w := mock_walimpls.NewMockWALImpls(t)
|
|
w.EXPECT().Truncate(mock.Anything, mock.Anything).Return(nil)
|
|
paramtable.Get().Save(paramtable.Get().StreamingCfg.WALTruncateSampleInterval.Key, "1ms")
|
|
paramtable.Get().Save(paramtable.Get().StreamingCfg.WALTruncateRetentionInterval.Key, "2ms")
|
|
|
|
truncator := newSamplingTruncator(&WALCheckpoint{
|
|
MessageID: rmq.NewRmqID(1),
|
|
TimeTick: 1,
|
|
Magic: RecoveryMagicStreamingInitialized,
|
|
}, w, newRecoveryStorageMetrics(types.PChannelInfo{Name: "test", Term: 1}))
|
|
|
|
for i := 0; i < 20; i++ {
|
|
time.Sleep(1 * time.Millisecond)
|
|
for rand.Int31n(3) < 1 {
|
|
truncator.SampleCheckpoint(&WALCheckpoint{
|
|
MessageID: rmq.NewRmqID(int64(i)),
|
|
TimeTick: tsoutil.ComposeTSByTime(time.Now(), 0),
|
|
Magic: RecoveryMagicStreamingInitialized,
|
|
})
|
|
}
|
|
}
|
|
truncator.Close()
|
|
}
|