diff --git a/internal/datacoord/segment_reference_manager.go b/internal/datacoord/segment_reference_manager.go index 94eca4a3a1..8aa2a0ca38 100644 --- a/internal/datacoord/segment_reference_manager.go +++ b/internal/datacoord/segment_reference_manager.go @@ -109,7 +109,7 @@ func (srm *SegmentReferenceManager) AddSegmentsLock(taskID int64, segIDs []Uniqu for _, segID := range segIDs { srm.segmentReferCnt[segID]++ } - log.Info("add reference lock on segments successfully", zap.Int64s("segIDs", segIDs), zap.Int64("nodeID", nodeID)) + log.Info("add reference lock on segments successfully", zap.Int64("taskID", taskID), zap.Int64s("segIDs", segIDs), zap.Int64("nodeID", nodeID)) return nil } diff --git a/internal/datanode/flow_graph_delete_node.go b/internal/datanode/flow_graph_delete_node.go index 316691e340..126ef19879 100644 --- a/internal/datanode/flow_graph_delete_node.go +++ b/internal/datanode/flow_graph_delete_node.go @@ -105,7 +105,7 @@ func (dn *deleteNode) Close() { log.Info("Flowgraph Delete Node closing") } -func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) error { +func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) ([]UniqueID, error) { log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys), zap.String("vChannelName", dn.channelName)) // Update delBuf for merged segments @@ -129,11 +129,14 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er primaryKeys := storage.ParseIDs2PrimaryKeys(msg.PrimaryKeys) segIDToPks, segIDToTss := dn.filterSegmentByPK(msg.PartitionID, primaryKeys, msg.Timestamps) + segIDs := make([]UniqueID, 0, len(segIDToPks)) for segID, pks := range segIDToPks { + segIDs = append(segIDs, segID) + rows := len(pks) tss, ok := segIDToTss[segID] if !ok || rows != len(tss) { - return fmt.Errorf("primary keys and timestamp's element num mis-match, segmentID = %d", segID) + return nil, fmt.Errorf("primary keys and timestamp's element num mis-match, segmentID = %d", segID) } var delDataBuf *DelDataBuf @@ -164,23 +167,18 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er dn.delBuf.Store(segID, delDataBuf) } - return nil + return segIDs, nil } -func (dn *deleteNode) showDelBuf() { - segments := dn.replica.filterSegments(dn.channelName, common.InvalidPartitionID) - for _, seg := range segments { - segID := seg.segmentID +func (dn *deleteNode) showDelBuf(segIDs []UniqueID, ts Timestamp) { + for _, segID := range segIDs { if v, ok := dn.delBuf.Load(segID); ok { delDataBuf, _ := v.(*DelDataBuf) log.Debug("delta buffer status", + zap.Uint64("timestamp", ts), zap.Int64("segment ID", segID), zap.Int64("entries", delDataBuf.GetEntriesNum()), zap.String("vChannel", dn.channelName)) - } else { - log.Debug("segment not exist", - zap.Int64("segment ID", segID), - zap.String("vChannel", dn.channelName)) } } } @@ -211,22 +209,24 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { msg.SetTraceCtx(ctx) } + var segIDs []UniqueID for i, msg := range fgMsg.deleteMessages { traceID, _, _ := trace.InfoFromSpan(spans[i]) log.Info("Buffer delete request in DataNode", zap.String("traceID", traceID)) - err := dn.bufferDeleteMsg(msg, fgMsg.timeRange) + tmpSegIDs, err := dn.bufferDeleteMsg(msg, fgMsg.timeRange) if err != nil { // error occurs only when deleteMsg is misaligned, should not happen err = fmt.Errorf("buffer delete msg failed, err = %s", err) log.Error(err.Error()) panic(err) } + segIDs = append(segIDs, tmpSegIDs...) } - // show all data in dn.delBuf + // show changed segment's status in dn.delBuf of a certain ts if len(fgMsg.deleteMessages) != 0 { - dn.showDelBuf() + dn.showDelBuf(segIDs, fgMsg.timeRange.timestampMax) } // handle flush diff --git a/internal/datanode/flow_graph_delete_node_test.go b/internal/datanode/flow_graph_delete_node_test.go index b16055ced9..9eda476f27 100644 --- a/internal/datanode/flow_graph_delete_node_test.go +++ b/internal/datanode/flow_graph_delete_node_test.go @@ -31,6 +31,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var deleteNodeTestDir = "/tmp/milvus_test/deleteNode" @@ -426,3 +427,44 @@ func TestFlowGraphDeleteNode_Operate(t *testing.T) { }) }) } + +func TestFlowGraphDeleteNode_showDelBuf(t *testing.T) { + cm := storage.NewLocalChunkManager(storage.RootPath(deleteNodeTestDir)) + defer cm.RemoveWithPrefix("") + + fm := NewRendezvousFlushManager(NewAllocatorFactory(), cm, &mockReplica{}, func(*segmentFlushPack) {}, emptyFlushAndDropFunc) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + chanName := "datanode-test-FlowGraphDeletenode-showDelBuf" + testPath := "/test/datanode/root/meta" + assert.NoError(t, clearEtcd(testPath)) + Params.EtcdCfg.MetaRootPath = testPath + Params.DataNodeCfg.DeleteBinlogRootPath = testPath + + c := &nodeConfig{ + replica: &mockReplica{}, + allocator: NewAllocatorFactory(), + vChannelName: chanName, + } + delNode, err := newDeleteNode(ctx, fm, make(chan string, 1), c) + require.NoError(t, err) + + tests := []struct { + seg UniqueID + numRows int64 + }{ + {111, 10}, + {112, 10}, + {113, 1}, + } + + for _, test := range tests { + delBuf := newDelDataBuf() + delBuf.updateSize(test.numRows) + delNode.delBuf.Store(test.seg, delBuf) + } + + delNode.showDelBuf([]UniqueID{111, 112, 113}, 100) +}