Refine codes of datanode buffer (#23168)

This PR refines deltabuffer of datanode:
- Add last sync time for compacted segment, see also: #23210
- Ensure all deltabuffermanager handles all delete related operations
- Change usedMemory to atomic.Int64
- Remove allocator in delete buffer

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
This commit is contained in:
XuanYang-cn 2023-04-10 18:42:30 +08:00 committed by GitHub
parent bbfa396754
commit b90fa5f459
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 369 additions and 357 deletions

View File

@ -24,6 +24,7 @@ import (
"sync" "sync"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/msgpb"
@ -36,160 +37,164 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
// DelBufferManager is in charge of managing insertBuf and delBuf from an overall prospect // DeltaBufferManager is in charge of managing insertBuf and delBuf from an overall prospect
// not only controlling buffered data size based on every segment size, but also triggering // not only controlling buffered data size based on every segment size, but also triggering
// insert/delete flush when the memory usage of the whole manager reach a certain level. // insert/delete flush when the memory usage of the whole manager reach a certain level.
// but at the first stage, this struct is only used for delete buff // but at the first stage, this struct is only used for delete buff
type DelBufferManager struct { //
channel Channel // DeltaBufferManager manages channel, usedMemory and delBufHeap.
mu sync.Mutex // guards delMemorySize and delBufHeap type DeltaBufferManager struct {
delMemorySize int64 channel Channel
delBufHeap *PriorityQueue usedMemory atomic.Int64
heapGuard sync.Mutex // guards delBufHeap
delBufHeap *PriorityQueue
} }
func (bm *DelBufferManager) GetSegDelBufMemSize(segID UniqueID) int64 { func (m *DeltaBufferManager) GetEntriesNum(segID UniqueID) int64 {
bm.mu.Lock() if buffer, ok := m.Load(segID); ok {
defer bm.mu.Unlock() return buffer.GetEntriesNum()
if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok {
return delDataBuf.item.memorySize
} }
return 0 return 0
} }
func (bm *DelBufferManager) GetEntriesNum(segID UniqueID) int64 { func (m *DeltaBufferManager) UpdateCompactedSegments() {
bm.mu.Lock() compactedTo2From := m.channel.listCompactedSegmentIDs()
defer bm.mu.Unlock() for compactedTo, compactedFrom := range compactedTo2From {
if delDataBuf, ok := bm.channel.getCurDeleteBuffer(segID); ok {
return delDataBuf.GetEntriesNum() // if the compactedTo segment has 0 numRows, there'll be no segments
// in the channel meta, so remove all compacted from segments related
if !m.channel.hasSegment(compactedTo, true) {
for _, segID := range compactedFrom {
m.Delete(segID)
}
m.channel.removeSegments(compactedFrom...)
continue
}
compactToDelBuff, loaded := m.Load(compactedTo)
if !loaded {
compactToDelBuff = newDelDataBuf(compactedTo)
}
for _, segID := range compactedFrom {
if delDataBuf, loaded := m.Load(segID); loaded {
compactToDelBuff.MergeDelDataBuf(delDataBuf)
m.Delete(segID)
}
}
// only store delBuf if EntriesNum > 0
if compactToDelBuff.EntriesNum > 0 {
m.pushOrFixHeap(compactedTo, compactToDelBuff)
// We need to re-add the memorySize because m.Delete(segID) sub them all.
m.usedMemory.Add(compactToDelBuff.GetMemorySize())
m.updateMeta(compactedTo, compactToDelBuff)
}
log.Info("update delBuf for compacted segments",
zap.Int64("compactedTo segmentID", compactedTo),
zap.Int64s("compactedFrom segmentIDs", compactedFrom),
zap.Int64("usedMemory", m.usedMemory.Load()),
)
m.channel.removeSegments(compactedFrom...)
} }
return 0
} }
// Store :the method only for unit test func (m *DeltaBufferManager) updateMeta(segID UniqueID, delDataBuf *DelDataBuf) {
func (bm *DelBufferManager) Store(segID UniqueID, delDataBuf *DelDataBuf) { m.channel.setCurDeleteBuffer(segID, delDataBuf)
bm.channel.setCurDeleteBuffer(segID, delDataBuf)
} }
func (bm *DelBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey, // pushOrFixHeap updates and sync memory size with priority queue
func (m *DeltaBufferManager) pushOrFixHeap(segID UniqueID, buffer *DelDataBuf) {
m.heapGuard.Lock()
defer m.heapGuard.Unlock()
if _, loaded := m.Load(segID); loaded {
heap.Fix(m.delBufHeap, buffer.item.index)
} else {
heap.Push(m.delBufHeap, buffer.item)
}
}
// deleteFromHeap deletes an item from the heap
func (m *DeltaBufferManager) deleteFromHeap(buffer *DelDataBuf) {
m.heapGuard.Lock()
defer m.heapGuard.Unlock()
if itemIdx, ok := buffer.GetItemIndex(); ok {
heap.Remove(m.delBufHeap, itemIdx)
}
}
func (m *DeltaBufferManager) StoreNewDeletes(segID UniqueID, pks []primaryKey,
tss []Timestamp, tr TimeRange, startPos, endPos *msgpb.MsgPosition) { tss []Timestamp, tr TimeRange, startPos, endPos *msgpb.MsgPosition) {
//1. load or create delDataBuf buffer, loaded := m.Load(segID)
var delDataBuf *DelDataBuf
buffer, loaded := bm.channel.getCurDeleteBuffer(segID)
if loaded {
delDataBuf = buffer
} else {
delDataBuf = newDelDataBuf()
}
//2. fill in new delta
delData := delDataBuf.delData
rowCount := len(pks)
var bufSize int64
for i := 0; i < rowCount; i++ {
delData.Pks = append(delData.Pks, pks[i])
delData.Tss = append(delData.Tss, tss[i])
switch pks[i].Type() {
case schemapb.DataType_Int64:
bufSize += 8
case schemapb.DataType_VarChar:
varCharPk := pks[i].(*varCharPrimaryKey)
bufSize += int64(len(varCharPk.Value))
}
//accumulate buf size for timestamp, which is 8 bytes
bufSize += 8
}
//3. update statistics of del data
delDataBuf.accumulateEntriesNum(int64(rowCount))
delDataBuf.updateTimeRange(tr)
delDataBuf.updateStartAndEndPosition(startPos, endPos)
//4. update and sync memory size with priority queue
bm.mu.Lock()
defer bm.mu.Unlock()
if !loaded { if !loaded {
delDataBuf.item.segmentID = segID buffer = newDelDataBuf(segID)
delDataBuf.item.memorySize = bufSize
heap.Push(bm.delBufHeap, delDataBuf.item)
} else {
bm.delBufHeap.update(delDataBuf.item, delDataBuf.item.memorySize+bufSize)
} }
bm.channel.setCurDeleteBuffer(segID, delDataBuf)
bm.delMemorySize += bufSize size := buffer.Buffer(pks, tss, tr, startPos, endPos)
//4. sync metrics
m.pushOrFixHeap(segID, buffer)
m.updateMeta(segID, buffer)
m.usedMemory.Add(size)
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues( metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(rowCount)) fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Add(float64(len(pks)))
} }
func (bm *DelBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) { func (m *DeltaBufferManager) Load(segID UniqueID) (delDataBuf *DelDataBuf, ok bool) {
return bm.channel.getCurDeleteBuffer(segID) return m.channel.getCurDeleteBuffer(segID)
} }
func (bm *DelBufferManager) Delete(segID UniqueID) { func (m *DeltaBufferManager) Delete(segID UniqueID) {
bm.mu.Lock() if buffer, loaded := m.Load(segID); loaded {
defer bm.mu.Unlock() m.usedMemory.Sub(buffer.GetMemorySize())
if buf, ok := bm.channel.getCurDeleteBuffer(segID); ok { m.deleteFromHeap(buffer)
item := buf.item m.channel.rollDeleteBuffer(segID)
bm.delMemorySize -= item.memorySize
heap.Remove(bm.delBufHeap, item.index)
bm.channel.rollDeleteBuffer(segID)
} }
} }
func (bm *DelBufferManager) CompactSegBuf(compactedToSegID UniqueID, compactedFromSegIDs []UniqueID) { func (m *DeltaBufferManager) popHeapItem() *Item {
var compactToDelBuff *DelDataBuf m.heapGuard.Lock()
compactToDelBuff, loaded := bm.Load(compactedToSegID) defer m.heapGuard.Unlock()
if !loaded { return heap.Pop(m.delBufHeap).(*Item)
compactToDelBuff = newDelDataBuf()
compactToDelBuff.item.segmentID = compactedToSegID
}
for _, segID := range compactedFromSegIDs {
if delDataBuf, loaded := bm.Load(segID); loaded {
compactToDelBuff.mergeDelDataBuf(delDataBuf)
bm.Delete(segID)
}
}
bm.mu.Lock()
defer bm.mu.Unlock()
// only store delBuf if EntriesNum > 0
if compactToDelBuff.EntriesNum > 0 {
if loaded {
bm.delBufHeap.update(compactToDelBuff.item, compactToDelBuff.item.memorySize)
} else {
heap.Push(bm.delBufHeap, compactToDelBuff.item)
}
// We need to re-add the memorySize because bm.Delete(segID) sub them all.
bm.delMemorySize += compactToDelBuff.item.memorySize
bm.channel.setCurDeleteBuffer(compactedToSegID, compactToDelBuff)
}
} }
func (bm *DelBufferManager) ShouldFlushSegments() []UniqueID { func (m *DeltaBufferManager) ShouldFlushSegments() []UniqueID {
bm.mu.Lock() var memUsage = m.usedMemory.Load()
defer bm.mu.Unlock() if memUsage < Params.DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() {
var shouldFlushSegments []UniqueID return nil
if bm.delMemorySize < Params.DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() {
return shouldFlushSegments
} }
mmUsage := bm.delMemorySize
var poppedSegMem []*Item var (
poppedSegmentIDs []UniqueID
poppedItems []*Item
)
for { for {
segMem := heap.Pop(bm.delBufHeap).(*Item) segItem := m.popHeapItem()
poppedSegMem = append(poppedSegMem, segMem) poppedItems = append(poppedItems, segItem)
shouldFlushSegments = append(shouldFlushSegments, segMem.segmentID) poppedSegmentIDs = append(poppedSegmentIDs, segItem.segmentID)
log.Debug("add segment for delete buf flush", zap.Int64("segmentID", segMem.segmentID)) memUsage -= segItem.memorySize
mmUsage -= segMem.memorySize if memUsage < Params.DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() {
if mmUsage < Params.DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64() {
break break
} }
} }
//here we push all selected segment back into the heap //here we push all selected segment back into the heap
//in order to keep the heap semantically correct //in order to keep the heap semantically correct
for _, segMem := range poppedSegMem { m.heapGuard.Lock()
heap.Push(bm.delBufHeap, segMem) for _, segMem := range poppedItems {
heap.Push(m.delBufHeap, segMem)
} }
return shouldFlushSegments m.heapGuard.Unlock()
log.Info("Add segments to sync delete buffer for stressfull memory", zap.Any("segments", poppedItems))
return poppedSegmentIDs
} }
// An Item is something we manage in a memorySize priority queue. // An Item is something we manage in a memorySize priority queue.
@ -313,6 +318,49 @@ type DelDataBuf struct {
endPos *msgpb.MsgPosition endPos *msgpb.MsgPosition
} }
// Buffer returns the memory size buffered
func (ddb *DelDataBuf) Buffer(pks []primaryKey, tss []Timestamp, tr TimeRange, startPos, endPos *msgpb.MsgPosition) int64 {
var (
rowCount = len(pks)
bufSize int64
)
for i := 0; i < rowCount; i++ {
ddb.delData.Append(pks[i], tss[i])
switch pks[i].Type() {
case schemapb.DataType_Int64:
bufSize += 8
case schemapb.DataType_VarChar:
varCharPk := pks[i].(*varCharPrimaryKey)
bufSize += int64(len(varCharPk.Value))
}
//accumulate buf size for timestamp, which is 8 bytes
bufSize += 8
}
ddb.accumulateEntriesNum(int64(rowCount))
ddb.updateTimeRange(tr)
ddb.updateStartAndEndPosition(startPos, endPos)
// update memorysize
ddb.item.memorySize += bufSize
return bufSize
}
func (ddb *DelDataBuf) GetMemorySize() int64 {
if ddb.item != nil {
return ddb.item.memorySize
}
return 0
}
func (ddb *DelDataBuf) GetItemIndex() (int, bool) {
if ddb.item != nil {
return ddb.item.index, true
}
return 0, false
}
func (ddb *DelDataBuf) accumulateEntriesNum(entryNum int64) { func (ddb *DelDataBuf) accumulateEntriesNum(entryNum int64) {
ddb.EntriesNum += entryNum ddb.EntriesNum += entryNum
} }
@ -326,7 +374,7 @@ func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) {
} }
} }
func (ddb *DelDataBuf) mergeDelDataBuf(buf *DelDataBuf) { func (ddb *DelDataBuf) MergeDelDataBuf(buf *DelDataBuf) {
ddb.accumulateEntriesNum(buf.EntriesNum) ddb.accumulateEntriesNum(buf.EntriesNum)
tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom} tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom}
@ -391,7 +439,7 @@ func newBufferData(collSchema *schemapb.CollectionSchema) (*BufferData, error) {
tsTo: 0}, nil tsTo: 0}, nil
} }
func newDelDataBuf() *DelDataBuf { func newDelDataBuf(segmentID UniqueID) *DelDataBuf {
return &DelDataBuf{ return &DelDataBuf{
delData: &DeleteData{}, delData: &DeleteData{},
Binlog: datapb.Binlog{ Binlog: datapb.Binlog{
@ -400,7 +448,7 @@ func newDelDataBuf() *DelDataBuf {
TimestampTo: 0, TimestampTo: 0,
}, },
item: &Item{ item: &Item{
memorySize: 0, segmentID: segmentID,
}, },
} }
} }

View File

@ -18,10 +18,12 @@ package datanode
import ( import (
"container/heap" "container/heap"
"context"
"fmt" "fmt"
"math" "math"
"strconv" "strconv"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -29,6 +31,9 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
) )
@ -158,58 +163,68 @@ func TestPriorityQueueString(t *testing.T) {
func Test_CompactSegBuff(t *testing.T) { func Test_CompactSegBuff(t *testing.T) {
channelSegments := make(map[UniqueID]*Segment) channelSegments := make(map[UniqueID]*Segment)
delBufferManager := &DelBufferManager{ delBufferManager := &DeltaBufferManager{
channel: &ChannelMeta{ channel: &ChannelMeta{
segments: channelSegments, segments: channelSegments,
}, },
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
//1. set compactTo and compactFrom //1. set compactTo and compactFrom
compactedFromSegIDs := make([]UniqueID, 2) targetSeg := &Segment{segmentID: 3333}
var segID1 UniqueID = 1111 targetSeg.setType(datapb.SegmentType_Flushed)
var segID2 UniqueID = 2222
compactedFromSegIDs[0] = segID1 seg1 := &Segment{
compactedFromSegIDs[1] = segID2 segmentID: 1111,
channelSegments[segID1] = &Segment{} compactedTo: targetSeg.segmentID,
channelSegments[segID2] = &Segment{} }
var compactedToSegID UniqueID = 3333 seg1.setType(datapb.SegmentType_Compacted)
channelSegments[compactedToSegID] = &Segment{}
seg2 := &Segment{
segmentID: 2222,
compactedTo: targetSeg.segmentID,
}
seg2.setType(datapb.SegmentType_Compacted)
channelSegments[seg1.segmentID] = seg1
channelSegments[seg2.segmentID] = seg2
channelSegments[targetSeg.segmentID] = targetSeg
//2. set up deleteDataBuf for seg1 and seg2 //2. set up deleteDataBuf for seg1 and seg2
delDataBuf1 := newDelDataBuf() delDataBuf1 := newDelDataBuf(seg1.segmentID)
delDataBuf1.EntriesNum++ delDataBuf1.EntriesNum++
delDataBuf1.updateStartAndEndPosition(nil, &msgpb.MsgPosition{Timestamp: 50}) delDataBuf1.updateStartAndEndPosition(nil, &msgpb.MsgPosition{Timestamp: 50})
delBufferManager.Store(segID1, delDataBuf1) delBufferManager.updateMeta(seg1.segmentID, delDataBuf1)
heap.Push(delBufferManager.delBufHeap, delDataBuf1.item) heap.Push(delBufferManager.delBufHeap, delDataBuf1.item)
delDataBuf2 := newDelDataBuf()
delDataBuf2 := newDelDataBuf(seg2.segmentID)
delDataBuf2.EntriesNum++ delDataBuf2.EntriesNum++
delDataBuf2.updateStartAndEndPosition(nil, &msgpb.MsgPosition{Timestamp: 50}) delDataBuf2.updateStartAndEndPosition(nil, &msgpb.MsgPosition{Timestamp: 50})
delBufferManager.Store(segID2, delDataBuf2) delBufferManager.updateMeta(seg2.segmentID, delDataBuf2)
heap.Push(delBufferManager.delBufHeap, delDataBuf2.item) heap.Push(delBufferManager.delBufHeap, delDataBuf2.item)
//3. test compact //3. test compact
delBufferManager.CompactSegBuf(compactedToSegID, compactedFromSegIDs) delBufferManager.UpdateCompactedSegments()
//4. expect results in two aspects: //4. expect results in two aspects:
//4.1 compactedFrom segments are removed from delBufferManager //4.1 compactedFrom segments are removed from delBufferManager
//4.2 compactedTo seg is set properly with correct entriesNum //4.2 compactedTo seg is set properly with correct entriesNum
_, seg1Exist := delBufferManager.Load(segID1) _, seg1Exist := delBufferManager.Load(seg1.segmentID)
_, seg2Exist := delBufferManager.Load(segID2) _, seg2Exist := delBufferManager.Load(seg2.segmentID)
assert.False(t, seg1Exist) assert.False(t, seg1Exist)
assert.False(t, seg2Exist) assert.False(t, seg2Exist)
assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID)) assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(targetSeg.segmentID))
// test item of compactedToSegID is correct // test item of compactedToSegID is correct
compactTo, ok := delBufferManager.Load(compactedToSegID) targetSegBuf, ok := delBufferManager.Load(targetSeg.segmentID)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, compactedToSegID, compactTo.item.segmentID) assert.NotNil(t, targetSegBuf.item)
assert.Equal(t, targetSeg.segmentID, targetSegBuf.item.segmentID)
//5. test roll and evict (https://github.com/milvus-io/milvus/issues/20501) //5. test roll and evict (https://github.com/milvus-io/milvus/issues/20501)
delBufferManager.channel.rollDeleteBuffer(compactedToSegID) delBufferManager.channel.rollDeleteBuffer(targetSeg.segmentID)
_, segCompactedToExist := delBufferManager.Load(compactedToSegID) _, segCompactedToExist := delBufferManager.Load(targetSeg.segmentID)
assert.False(t, segCompactedToExist) assert.False(t, segCompactedToExist)
delBufferManager.channel.evictHistoryDeleteBuffer(compactedToSegID, &msgpb.MsgPosition{ delBufferManager.channel.evictHistoryDeleteBuffer(targetSeg.segmentID, &msgpb.MsgPosition{
Timestamp: 100, Timestamp: 100,
}) })
cp := delBufferManager.channel.getChannelCheckpoint(&msgpb.MsgPosition{ cp := delBufferManager.channel.getChannelCheckpoint(&msgpb.MsgPosition{
@ -217,3 +232,94 @@ func Test_CompactSegBuff(t *testing.T) {
}) })
assert.Equal(t, Timestamp(200), cp.Timestamp) // evict all buffer, use ttPos as cp assert.Equal(t, Timestamp(200), cp.Timestamp) // evict all buffer, use ttPos as cp
} }
func TestUpdateCompactedSegments(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
fm := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
chanName := "datanode-test-FlowGraphDeletenode-showDelBuf"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root")
channel := ChannelMeta{
segments: make(map[UniqueID]*Segment),
}
c := &nodeConfig{
channel: &channel,
vChannelName: chanName,
}
delBufManager := &DeltaBufferManager{
channel: &channel,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
require.NoError(t, err)
tests := []struct {
description string
compactToExist bool
compactedToIDs []UniqueID
compactedFromIDs []UniqueID
expectedSegsRemain []UniqueID
}{
{"zero segments", false,
[]UniqueID{}, []UniqueID{}, []UniqueID{}},
{"segment no compaction", false,
[]UniqueID{}, []UniqueID{}, []UniqueID{100, 101}},
{"segment compacted", true,
[]UniqueID{200}, []UniqueID{103}, []UniqueID{100, 101}},
{"segment compacted 100>201", true,
[]UniqueID{201}, []UniqueID{100}, []UniqueID{101, 201}},
{"segment compacted 100+101>201", true,
[]UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{201}},
{"segment compacted 100>201, 101>202", true,
[]UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{201, 202}},
// false
{"segment compacted 100>201", false,
[]UniqueID{201}, []UniqueID{100}, []UniqueID{101}},
{"segment compacted 100+101>201", false,
[]UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{}},
{"segment compacted 100>201, 101>202", false,
[]UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{}},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.compactToExist {
for _, segID := range test.compactedToIDs {
seg := Segment{
segmentID: segID,
numRows: 10,
}
seg.setType(datapb.SegmentType_Flushed)
channel.segments[segID] = &seg
}
} else { // clear all segments in channel
channel.segments = make(map[UniqueID]*Segment)
}
for i, segID := range test.compactedFromIDs {
seg := Segment{
segmentID: segID,
compactedTo: test.compactedToIDs[i],
}
seg.setType(datapb.SegmentType_Compacted)
channel.segments[segID] = &seg
}
delNode.delBufferManager.UpdateCompactedSegments()
for _, remain := range test.expectedSegsRemain {
delNode.channel.hasSegment(remain, true)
}
})
}
}

View File

@ -57,7 +57,7 @@ type dataSyncService struct {
dataCoord types.DataCoord // DataCoord instance to interact with dataCoord types.DataCoord // DataCoord instance to interact with
clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed clearSignal chan<- string // signal channel to notify flowgraph close for collection/partition drop msg consumed
delBufferManager *DelBufferManager delBufferManager *DeltaBufferManager
flushingSegCache *Cache // a guarding cache stores currently flushing segment ids flushingSegCache *Cache // a guarding cache stores currently flushing segment ids
flushManager flushManager // flush manager handles flush process flushManager flushManager // flush manager handles flush process
chunkManager storage.ChunkManager chunkManager storage.ChunkManager
@ -91,10 +91,9 @@ func newDataSyncService(ctx context.Context,
ctx1, cancel := context.WithCancel(ctx) ctx1, cancel := context.WithCancel(ctx)
delBufferManager := &DelBufferManager{ delBufferManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
service := &dataSyncService{ service := &dataSyncService{

View File

@ -25,7 +25,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/msgpb" "github.com/milvus-io/milvus-proto/go-api/msgpb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream"
@ -39,9 +38,8 @@ type deleteNode struct {
BaseNode BaseNode
ctx context.Context ctx context.Context
channelName string channelName string
delBufferManager *DelBufferManager // manager of delete msg delBufferManager *DeltaBufferManager // manager of delete msg
channel Channel channel Channel
idAllocator allocator.Allocator
flushManager flushManager flushManager flushManager
clearSignal chan<- string clearSignal chan<- string
@ -57,12 +55,12 @@ func (dn *deleteNode) Close() {
func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) { func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) {
for _, segID := range segIDs { for _, segID := range segIDs {
if _, ok := dn.delBufferManager.Load(segID); ok { if buffer, ok := dn.delBufferManager.Load(segID); ok {
log.Debug("delta buffer status", log.Debug("delta buffer status",
zap.Int64("segmentID", segID),
zap.Uint64("timestamp", ts), zap.Uint64("timestamp", ts),
zap.Int64("segment ID", segID), zap.Int64("entriesNum", buffer.GetEntriesNum()),
zap.Int64("entries", dn.delBufferManager.GetEntriesNum(segID)), zap.Int64("memorySize", buffer.GetMemorySize()),
zap.Int64("memory size", dn.delBufferManager.GetSegDelBufMemSize(segID)),
zap.String("vChannel", dn.channelName)) zap.String("vChannel", dn.channelName))
} }
} }
@ -92,7 +90,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
} }
// update compacted segment before operation // update compacted segment before operation
dn.updateCompactedSegments() dn.delBufferManager.UpdateCompactedSegments()
// process delete messages // process delete messages
segIDs := typeutil.NewUniqueSet() segIDs := typeutil.NewUniqueSet()
@ -154,29 +152,6 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
return in return in
} }
// update delBuf for compacted segments
func (dn *deleteNode) updateCompactedSegments() {
compactedTo2From := dn.channel.listCompactedSegmentIDs()
for compactedTo, compactedFrom := range compactedTo2From {
// if the compactedTo segment has 0 numRows, remove all segments related
if !dn.channel.hasSegment(compactedTo, true) {
for _, segID := range compactedFrom {
dn.delBufferManager.Delete(segID)
}
dn.channel.removeSegments(compactedFrom...)
continue
}
dn.delBufferManager.CompactSegBuf(compactedTo, compactedFrom)
log.Info("update delBuf for compacted segments",
zap.Int64("compactedTo segmentID", compactedTo),
zap.Int64s("compactedFrom segmentIDs", compactedFrom),
)
dn.channel.removeSegments(compactedFrom...)
}
}
func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange, startPos, endPos *msgpb.MsgPosition) ([]UniqueID, error) { func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange, startPos, endPos *msgpb.MsgPosition) ([]UniqueID, error) {
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName)) log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName))
@ -218,7 +193,7 @@ func (dn *deleteNode) filterSegmentByPK(partID UniqueID, pks []primaryKey, tss [
return segID2Pks, segID2Tss return segID2Pks, segID2Tss
} }
func newDeleteNode(ctx context.Context, fm flushManager, delBufManager *DelBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) { func newDeleteNode(ctx context.Context, fm flushManager, manager *DeltaBufferManager, sig chan<- string, config *nodeConfig) (*deleteNode, error) {
baseNode := BaseNode{} baseNode := BaseNode{}
baseNode.SetMaxQueueLength(config.maxQueueLength) baseNode.SetMaxQueueLength(config.maxQueueLength)
baseNode.SetMaxParallelism(config.maxParallelism) baseNode.SetMaxParallelism(config.maxParallelism)
@ -226,9 +201,8 @@ func newDeleteNode(ctx context.Context, fm flushManager, delBufManager *DelBuffe
return &deleteNode{ return &deleteNode{
ctx: ctx, ctx: ctx,
BaseNode: baseNode, BaseNode: baseNode,
delBufferManager: delBufManager, delBufferManager: manager,
channel: config.channel, channel: config.channel,
idAllocator: config.allocator,
channelName: config.vChannelName, channelName: config.vChannelName,
flushManager: fm, flushManager: fm,
clearSignal: sig, clearSignal: sig,

View File

@ -186,13 +186,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fm := NewRendezvousFlushManager(alloc, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) fm := NewRendezvousFlushManager(alloc, cm, channel, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
c := &nodeConfig{ c := &nodeConfig{
channel: channel, channel: channel,
allocator: alloc,
vChannelName: chanName, vChannelName: chanName,
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c) dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c)
@ -221,14 +219,12 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
t.Run("Test get segment by int64 primary keys", func(te *testing.T) { t.Run("Test get segment by int64 primary keys", func(te *testing.T) {
c := &nodeConfig{ c := &nodeConfig{
channel: channel, channel: channel,
allocator: alloc,
vChannelName: chanName, vChannelName: chanName,
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c) dn, err := newDeleteNode(context.Background(), fm, delBufManager, make(chan string, 1), c)
@ -263,13 +259,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{ c := &nodeConfig{
channel: channel, channel: channel,
allocator: allocator.NewMockAllocator(t),
vChannelName: chanName, vChannelName: chanName,
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(te, err) assert.Nil(te, err)
@ -291,13 +285,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{ c := &nodeConfig{
channel: channel, channel: channel,
allocator: allocator.NewMockAllocator(t),
vChannelName: chanName, vChannelName: chanName,
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(te, err) assert.Nil(te, err)
@ -325,13 +317,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{ c := &nodeConfig{
channel: channel, channel: channel,
allocator: allocator.NewMockAllocator(t),
vChannelName: chanName, vChannelName: chanName,
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
sig := make(chan string, 1) sig := make(chan string, 1)
delNode, err := newDeleteNode(ctx, fm, delBufManager, sig, c) delNode, err := newDeleteNode(ctx, fm, delBufManager, sig, c)
@ -367,13 +357,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{ c := &nodeConfig{
channel: channel, channel: channel,
allocator: allocator.NewMockAllocator(t),
vChannelName: chanName, vChannelName: chanName,
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
assert.Nil(t, err) assert.Nil(t, err)
@ -391,7 +379,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
msg.segmentsToSync = []UniqueID{compactedSegment} msg.segmentsToSync = []UniqueID{compactedSegment}
bufItem := &Item{memorySize: 0} bufItem := &Item{memorySize: 0}
delNode.delBufferManager.Store(compactedSegment, delNode.delBufferManager.updateMeta(compactedSegment,
&DelDataBuf{delData: &DeleteData{}, item: bufItem}) &DelDataBuf{delData: &DeleteData{}, item: bufItem})
heap.Push(delNode.delBufferManager.delBufHeap, bufItem) heap.Push(delNode.delBufferManager.delBufHeap, bufItem)
@ -422,13 +410,11 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
c := &nodeConfig{ c := &nodeConfig{
channel: channel, channel: channel,
allocator: allocator.NewMockAllocator(t),
vChannelName: chanName, vChannelName: chanName,
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
mockFlushManager := &mockFlushManager{ mockFlushManager := &mockFlushManager{
recordFlushedSeg: true, recordFlushedSeg: true,
@ -454,7 +440,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg}) delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 0, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, 0, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(208), delNode.delBufferManager.delMemorySize) assert.Equal(t, int64(208), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 5, delNode.delBufferManager.delBufHeap.Len()) assert.Equal(t, 5, delNode.delBufferManager.delBufHeap.Len())
//3. note that the whole memory size used by 5 segments will be 208 //3. note that the whole memory size used by 5 segments will be 208
@ -467,7 +453,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg}) delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize) assert.Equal(t, int64(160), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len()) assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len())
//4. there is no new delete msg and delBufferSize is still 200 //4. there is no new delete msg and delBufferSize is still 200
@ -475,7 +461,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg}) delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, 1, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(160), delNode.delBufferManager.delMemorySize) assert.Equal(t, int64(160), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len()) assert.Equal(t, 4, delNode.delBufferManager.delBufHeap.Len())
//5. we reset buffer bytes to 150, then we expect there would be one more //5. we reset buffer bytes to 150, then we expect there would be one more
@ -485,7 +471,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg}) delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 2, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, 2, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(112), delNode.delBufferManager.delMemorySize) assert.Equal(t, int64(112), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 3, delNode.delBufferManager.delBufHeap.Len()) assert.Equal(t, 3, delNode.delBufferManager.delBufHeap.Len())
//6. we reset buffer bytes to 60, then most of the segments will be flushed //6. we reset buffer bytes to 60, then most of the segments will be flushed
@ -494,7 +480,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg}) delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 4, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, 4, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(32), delNode.delBufferManager.delMemorySize) assert.Equal(t, int64(32), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 1, delNode.delBufferManager.delBufHeap.Len()) assert.Equal(t, 1, delNode.delBufferManager.delBufHeap.Len())
//7. we reset buffer bytes to 20, then as all segment-memory consumption //7. we reset buffer bytes to 20, then as all segment-memory consumption
@ -504,7 +490,7 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) {
fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments() fgMsg.(*flowGraphMsg).segmentsToSync = delNode.delBufferManager.ShouldFlushSegments()
delNode.Operate([]flowgraph.Msg{fgMsg}) delNode.Operate([]flowgraph.Msg{fgMsg})
assert.Equal(t, 5, len(mockFlushManager.flushedSegIDs)) assert.Equal(t, 5, len(mockFlushManager.flushedSegIDs))
assert.Equal(t, int64(0), delNode.delBufferManager.delMemorySize) assert.Equal(t, int64(0), delNode.delBufferManager.usedMemory.Load())
assert.Equal(t, 0, delNode.delBufferManager.delBufHeap.Len()) assert.Equal(t, 0, delNode.delBufferManager.delBufHeap.Len())
}) })
} }
@ -528,13 +514,11 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
} }
c := &nodeConfig{ c := &nodeConfig{
channel: channel, channel: channel,
allocator: allocator.NewMockAllocator(t),
vChannelName: chanName, vChannelName: chanName,
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c) delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
require.NoError(t, err) require.NoError(t, err)
@ -549,105 +533,11 @@ func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) {
} }
for _, test := range tests { for _, test := range tests {
delBuf := newDelDataBuf() delBuf := newDelDataBuf(test.seg)
delBuf.accumulateEntriesNum(test.numRows) delBuf.accumulateEntriesNum(test.numRows)
delNode.delBufferManager.Store(test.seg, delBuf) delNode.delBufferManager.updateMeta(test.seg, delBuf)
heap.Push(delNode.delBufferManager.delBufHeap, delBuf.item) heap.Push(delNode.delBufferManager.delBufHeap, delBuf.item)
} }
delNode.showDelBuf([]UniqueID{111, 112, 113}, 100) delNode.showDelBuf([]UniqueID{111, 112, 113}, 100)
} }
func TestFlowGraphDeleteNode_updateCompactedSegments(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir))
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
fm := NewRendezvousFlushManager(allocator.NewMockAllocator(t), cm, nil, func(*segmentFlushPack) {}, emptyFlushAndDropFunc)
chanName := "datanode-test-FlowGraphDeletenode-showDelBuf"
testPath := "/test/datanode/root/meta"
assert.NoError(t, clearEtcd(testPath))
Params.BaseTable.Save("etcd.rootPath", "/test/datanode/root")
channel := ChannelMeta{
segments: make(map[UniqueID]*Segment),
}
c := &nodeConfig{
channel: &channel,
allocator: allocator.NewMockAllocator(t),
vChannelName: chanName,
}
delBufManager := &DelBufferManager{
channel: &channel,
delMemorySize: 0,
delBufHeap: &PriorityQueue{},
}
delNode, err := newDeleteNode(ctx, fm, delBufManager, make(chan string, 1), c)
require.NoError(t, err)
tests := []struct {
description string
compactToExist bool
compactedToIDs []UniqueID
compactedFromIDs []UniqueID
expectedSegsRemain []UniqueID
}{
{"zero segments", false,
[]UniqueID{}, []UniqueID{}, []UniqueID{}},
{"segment no compaction", false,
[]UniqueID{}, []UniqueID{}, []UniqueID{100, 101}},
{"segment compacted", true,
[]UniqueID{200}, []UniqueID{103}, []UniqueID{100, 101}},
{"segment compacted 100>201", true,
[]UniqueID{201}, []UniqueID{100}, []UniqueID{101, 201}},
{"segment compacted 100+101>201", true,
[]UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{201}},
{"segment compacted 100>201, 101>202", true,
[]UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{201, 202}},
// false
{"segment compacted 100>201", false,
[]UniqueID{201}, []UniqueID{100}, []UniqueID{101}},
{"segment compacted 100+101>201", false,
[]UniqueID{201, 201}, []UniqueID{100, 101}, []UniqueID{}},
{"segment compacted 100>201, 101>202", false,
[]UniqueID{201, 202}, []UniqueID{100, 101}, []UniqueID{}},
}
for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
if test.compactToExist {
for _, segID := range test.compactedToIDs {
seg := Segment{
segmentID: segID,
numRows: 10,
}
seg.setType(datapb.SegmentType_Flushed)
channel.segments[segID] = &seg
}
} else { // clear all segments in channel
channel.segments = make(map[UniqueID]*Segment)
}
for i, segID := range test.compactedFromIDs {
seg := Segment{
segmentID: segID,
compactedTo: test.compactedToIDs[i],
}
seg.setType(datapb.SegmentType_Compacted)
channel.segments[segID] = &seg
}
delNode.updateCompactedSegments()
for _, remain := range test.expectedSegsRemain {
delNode.channel.hasSegment(remain, true)
}
})
}
}

View File

@ -49,7 +49,7 @@ type insertBufferNode struct {
ctx context.Context ctx context.Context
channelName string channelName string
delBufferManager *DelBufferManager // manager of delete msg delBufferManager *DeltaBufferManager // manager of delete msg
channel Channel channel Channel
idAllocator allocator.Allocator idAllocator allocator.Allocator
@ -661,7 +661,7 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
return ibNode.channel.getCollectionAndPartitionID(segmentID) return ibNode.channel.getCollectionAndPartitionID(segmentID)
} }
func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DelBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg, func newInsertBufferNode(ctx context.Context, collID UniqueID, delBufManager *DeltaBufferManager, flushCh <-chan flushMsg, resendTTCh <-chan resendTTMsg,
fm flushManager, flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) { fm flushManager, flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) {
baseNode := BaseNode{} baseNode := BaseNode{}

View File

@ -111,10 +111,9 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
allocator: alloc, allocator: alloc,
vChannelName: "string", vChannelName: "string",
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
@ -219,10 +218,9 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
allocator: alloc, allocator: alloc,
vChannelName: "string", vChannelName: "string",
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
@ -398,10 +396,9 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
allocator: alloc, allocator: alloc,
vChannelName: "string", vChannelName: "string",
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err) require.NoError(t, err)
@ -644,10 +641,9 @@ func TestInsertBufferNodeRollBF(t *testing.T) {
allocator: alloc, allocator: alloc,
vChannelName: "string", vChannelName: "string",
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err) require.NoError(t, err)
@ -729,7 +725,7 @@ type InsertBufferNodeSuite struct {
suite.Suite suite.Suite
channel *ChannelMeta channel *ChannelMeta
delBufManager *DelBufferManager delBufManager *DeltaBufferManager
collID UniqueID collID UniqueID
partID UniqueID partID UniqueID
@ -748,10 +744,9 @@ func (s *InsertBufferNodeSuite) SetupSuite() {
s.partID = 10 s.partID = 10
s.channel = newChannel("channel", s.collID, nil, rc, s.cm) s.channel = newChannel("channel", s.collID, nil, rc, s.cm)
s.delBufManager = &DelBufferManager{ s.delBufManager = &DeltaBufferManager{
channel: s.channel, channel: s.channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir)) s.cm = storage.NewLocalChunkManager(storage.RootPath(insertBufferNodeTestDir))
@ -1021,10 +1016,9 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
allocator: alloc, allocator: alloc,
vChannelName: "string", vChannelName: "string",
} }
delBufManager := &DelBufferManager{ delBufManager := &DeltaBufferManager{
channel: channel, channel: channel,
delMemorySize: 0, delBufHeap: &PriorityQueue{},
delBufHeap: &PriorityQueue{},
} }
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c) iBNode, err := newInsertBufferNode(ctx, collMeta.ID, delBufManager, flushChan, resendTTChan, fm, newCache(), c)
require.NoError(t, err) require.NoError(t, err)

View File

@ -387,6 +387,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
partitionID: partID, partitionID: partID,
segmentID: req.GetCompactedTo(), segmentID: req.GetCompactedTo(),
numRows: req.GetNumOfRows(), numRows: req.GetNumOfRows(),
lastSyncTs: tsoutil.GetCurrentTime(),
} }
err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime()) err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())