diff --git a/internal/querynode/flow_graph_delete_node.go b/internal/querynode/flow_graph_delete_node.go index 9589ab2740..16be978d57 100644 --- a/internal/querynode/flow_graph_delete_node.go +++ b/internal/querynode/flow_graph_delete_node.go @@ -40,7 +40,8 @@ var newVarCharPrimaryKey = storage.NewVarCharPrimaryKey // deleteNode is the one of nodes in delta flow graph type deleteNode struct { baseNode - metaReplica ReplicaInterface // historical + collectionID UniqueID + metaReplica ReplicaInterface // historical } // Name returns the name of deleteNode @@ -82,6 +83,14 @@ func (dNode *deleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msg.SetTraceCtx(ctx) } + collection, err := dNode.metaReplica.getCollectionByID(dNode.collectionID) + if err != nil { + // QueryNode should add collection before start flow graph + panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", dNode.Name(), dNode.collectionID)) + } + collection.RLock() + defer collection.RUnlock() + // 1. filter segment by bloom filter for i, delMsg := range dMsg.deleteMessages { traceID, _, _ := trace.InfoFromSpan(spans[i]) @@ -173,7 +182,7 @@ func (dNode *deleteNode) delete(deleteData *deleteData, segmentID UniqueID, wg * } // newDeleteNode returns a new deleteNode -func newDeleteNode(metaReplica ReplicaInterface) *deleteNode { +func newDeleteNode(metaReplica ReplicaInterface, collectionID UniqueID) *deleteNode { maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism @@ -182,7 +191,8 @@ func newDeleteNode(metaReplica ReplicaInterface) *deleteNode { baseNode.SetMaxParallelism(maxParallelism) return &deleteNode{ - baseNode: baseNode, - metaReplica: metaReplica, + baseNode: baseNode, + collectionID: collectionID, + metaReplica: metaReplica, } } diff --git a/internal/querynode/flow_graph_delete_node_test.go b/internal/querynode/flow_graph_delete_node_test.go index 9c347a304b..6b84013954 100644 --- a/internal/querynode/flow_graph_delete_node_test.go +++ b/internal/querynode/flow_graph_delete_node_test.go @@ -32,7 +32,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { t.Run("test delete", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -53,7 +53,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { t.Run("test segment delete error", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -75,7 +75,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { t.Run("test no target segment", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) wg := &sync.WaitGroup{} wg.Add(1) err = deleteNode.delete(nil, defaultSegmentID, wg) @@ -85,7 +85,7 @@ func TestFlowGraphDeleteNode_delete(t *testing.T) { t.Run("test invalid segmentType", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -105,7 +105,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test operate", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -139,7 +139,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test invalid partitionID", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -163,7 +163,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test collection partition not exist", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -190,7 +190,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test partition not exist", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) err = historical.addSegment(defaultSegmentID, defaultPartitionID, @@ -216,7 +216,7 @@ func TestFlowGraphDeleteNode_operate(t *testing.T) { t.Run("test invalid input length", func(t *testing.T) { historical, err := genSimpleReplica() assert.NoError(t, err) - deleteNode := newDeleteNode(historical) + deleteNode := newDeleteNode(historical, defaultCollectionID) err = historical.addSegment(defaultSegmentID, defaultPartitionID, diff --git a/internal/querynode/flow_graph_filter_delete_node.go b/internal/querynode/flow_graph_filter_delete_node.go index 5f87eb29eb..f5ad6641c6 100644 --- a/internal/querynode/flow_graph_filter_delete_node.go +++ b/internal/querynode/flow_graph_filter_delete_node.go @@ -78,10 +78,18 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { }, } + collection, err := fddNode.metaReplica.getCollectionByID(fddNode.collectionID) + if err != nil { + // QueryNode should add collection before start flow graph + panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", fddNode.Name(), fddNode.collectionID)) + } + collection.RLock() + defer collection.RUnlock() + for _, msg := range msgStreamMsg.TsMessages() { switch msg.Type() { case commonpb.MsgType_Delete: - resMsg, err := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg)) + resMsg, err := fddNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg), collection.getLoadType()) if err != nil { // error occurs when missing meta info or data is misaligned, should not happen err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err) @@ -103,7 +111,7 @@ func (fddNode *filterDeleteNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // filterInvalidDeleteMessage would filter invalid delete messages -func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) (*msgstream.DeleteMsg, error) { +func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg, loadType loadType) (*msgstream.DeleteMsg, error) { if err := msg.CheckAligned(); err != nil { return nil, fmt.Errorf("CheckAligned failed, err = %s", err) } @@ -123,13 +131,7 @@ func (fddNode *filterDeleteNode) filterInvalidDeleteMessage(msg *msgstream.Delet return nil, nil } - // check if collection exists - col, err := fddNode.metaReplica.getCollectionByID(msg.CollectionID) - if err != nil { - // QueryNode should add collection before start flow graph - return nil, fmt.Errorf("filter invalid delete message, collection does not exist, collectionID = %d", msg.CollectionID) - } - if col.getLoadType() == loadTypePartition { + if loadType == loadTypePartition { if !fddNode.metaReplica.hasPartition(msg.PartitionID) { // filter out msg which not belongs to the loaded partitions return nil, nil diff --git a/internal/querynode/flow_graph_filter_delete_node_test.go b/internal/querynode/flow_graph_filter_delete_node_test.go index ee01b2430b..82e18ceb62 100644 --- a/internal/querynode/flow_graph_filter_delete_node_test.go +++ b/internal/querynode/flow_graph_filter_delete_node_test.go @@ -48,29 +48,17 @@ func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) fg, err := getFilterDeleteNode() assert.NoError(t, err) - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.NotNil(t, res) }) - t.Run("test delete no collection", func(t *testing.T) { - msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - msg.CollectionID = UniqueID(1003) - fg, err := getFilterDeleteNode() - assert.NoError(t, err) - fg.collectionID = UniqueID(1003) - res, err := fg.filterInvalidDeleteMessage(msg) - assert.Error(t, err) - assert.Nil(t, res) - fg.collectionID = defaultCollectionID - }) - t.Run("test delete not target collection", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) fg, err := getFilterDeleteNode() assert.NoError(t, err) fg.collectionID = UniqueID(1000) - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) }) @@ -83,11 +71,11 @@ func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) { msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0) msg.PrimaryKeys = &schemapb.IDs{} msg.NumRows = 0 - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{}) - res, err = fg.filterInvalidDeleteMessage(msg) + res, err = fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) }) @@ -96,13 +84,10 @@ func TestFlowGraphFilterDeleteNode_filterInvalidDeleteMessage(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) fg, err := getFilterDeleteNode() assert.NoError(t, err) - col, err := fg.metaReplica.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - col.setLoadType(loadTypePartition) err = fg.metaReplica.removePartition(defaultPartitionID) assert.NoError(t, err) - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypePartition) assert.NoError(t, err) assert.Nil(t, res) }) diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index c88565e766..6bb21e80bd 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -79,12 +79,20 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { }, } + collection, err := fdmNode.metaReplica.getCollectionByID(fdmNode.collectionID) + if err != nil { + // QueryNode should add collection before start flow graph + panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", fdmNode.Name(), fdmNode.collectionID)) + } + collection.RLock() + defer collection.RUnlock() + for i, msg := range msgStreamMsg.TsMessages() { traceID, _, _ := trace.InfoFromSpan(spans[i]) log.Debug("Filter invalid message in QueryNode", zap.String("traceID", traceID)) switch msg.Type() { case commonpb.MsgType_Insert: - resMsg, err := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg)) + resMsg, err := fdmNode.filterInvalidInsertMessage(msg.(*msgstream.InsertMsg), collection.getLoadType()) if err != nil { // error occurs when missing meta info or data is misaligned, should not happen err = fmt.Errorf("filterInvalidInsertMessage failed, err = %s", err) @@ -95,7 +103,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { iMsg.insertMessages = append(iMsg.insertMessages, resMsg) } case commonpb.MsgType_Delete: - resMsg, err := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg)) + resMsg, err := fdmNode.filterInvalidDeleteMessage(msg.(*msgstream.DeleteMsg), collection.getLoadType()) if err != nil { // error occurs when missing meta info or data is misaligned, should not happen err = fmt.Errorf("filterInvalidDeleteMessage failed, err = %s", err) @@ -118,7 +126,7 @@ func (fdmNode *filterDmNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } // filterInvalidDeleteMessage would filter out invalid delete messages -func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg) (*msgstream.DeleteMsg, error) { +func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg, loadType loadType) (*msgstream.DeleteMsg, error) { if err := msg.CheckAligned(); err != nil { return nil, fmt.Errorf("CheckAligned failed, err = %s", err) } @@ -139,13 +147,7 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg return nil, nil } - // check if collection exist - col, err := fdmNode.metaReplica.getCollectionByID(msg.CollectionID) - if err != nil { - // QueryNode should add collection before start flow graph - return nil, fmt.Errorf("filter invalid delete message, collection does not exist, collectionID = %d", msg.CollectionID) - } - if col.getLoadType() == loadTypePartition { + if loadType == loadTypePartition { if !fdmNode.metaReplica.hasPartition(msg.PartitionID) { // filter out msg which not belongs to the loaded partitions return nil, nil @@ -156,7 +158,7 @@ func (fdmNode *filterDmNode) filterInvalidDeleteMessage(msg *msgstream.DeleteMsg } // filterInvalidInsertMessage would filter out invalid insert messages -func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg) (*msgstream.InsertMsg, error) { +func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg, loadType loadType) (*msgstream.InsertMsg, error) { if err := msg.CheckAligned(); err != nil { return nil, fmt.Errorf("CheckAligned failed, err = %s", err) } @@ -180,13 +182,7 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg return nil, nil } - // check if collection exists - col, err := fdmNode.metaReplica.getCollectionByID(msg.CollectionID) - if err != nil { - // QueryNode should add collection before start flow graph - return nil, fmt.Errorf("filter invalid insert message, collection does not exist, collectionID = %d", msg.CollectionID) - } - if col.getLoadType() == loadTypePartition { + if loadType == loadTypePartition { if !fdmNode.metaReplica.hasPartition(msg.PartitionID) { // filter out msg which not belongs to the loaded partitions return nil, nil diff --git a/internal/querynode/flow_graph_filter_dm_node_test.go b/internal/querynode/flow_graph_filter_dm_node_test.go index ea35271fdf..b12261c5db 100644 --- a/internal/querynode/flow_graph_filter_dm_node_test.go +++ b/internal/querynode/flow_graph_filter_dm_node_test.go @@ -53,7 +53,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { assert.NoError(t, err) fg, err := getFilterDMNode() assert.NoError(t, err) - res, err := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.NotNil(t, res) }) @@ -65,7 +65,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { fg, err := getFilterDMNode() assert.NoError(t, err) fg.collectionID = UniqueID(1000) - res, err := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection) assert.Error(t, err) assert.Nil(t, res) fg.collectionID = defaultCollectionID @@ -78,11 +78,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { fg, err := getFilterDMNode() assert.NoError(t, err) - col, err := fg.metaReplica.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - col.setLoadType(loadTypePartition) - - res, err := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg, loadTypePartition) assert.NoError(t, err) assert.Nil(t, res) }) @@ -93,7 +89,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { fg, err := getFilterDMNode() assert.NoError(t, err) fg.collectionID = UniqueID(1000) - res, err := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) }) @@ -104,7 +100,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { fg, err := getFilterDMNode() assert.NoError(t, err) fg.metaReplica.removeExcludedSegments(defaultCollectionID) - res, err := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection) assert.Error(t, err) assert.Nil(t, res) }) @@ -124,7 +120,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { }, }, }) - res, err := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) }) @@ -135,7 +131,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { fg, err := getFilterDMNode() assert.NoError(t, err) msg.Timestamps = make([]Timestamp, 0) - res, err := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection) assert.Error(t, err) assert.Nil(t, res) }) @@ -150,7 +146,7 @@ func TestFlowGraphFilterDmNode_filterInvalidInsertMessage(t *testing.T) { msg.RowData = make([]*commonpb.Blob, 0) msg.NumRows = 0 msg.FieldsData = nil - res, err := fg.filterInvalidInsertMessage(msg) + res, err := fg.filterInvalidInsertMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) }) @@ -161,34 +157,18 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) fg, err := getFilterDMNode() assert.NoError(t, err) - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.NotNil(t, res) }) - t.Run("test delete no collection", func(t *testing.T) { - msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) - msg.CollectionID = UniqueID(1003) - fg, err := getFilterDMNode() - assert.NoError(t, err) - fg.collectionID = UniqueID(1003) - res, err := fg.filterInvalidDeleteMessage(msg) - assert.Error(t, err) - assert.Nil(t, res) - fg.collectionID = defaultCollectionID - }) - t.Run("test delete no partition", func(t *testing.T) { msg := genDeleteMsg(defaultCollectionID, schemapb.DataType_Int64, defaultDelLength) msg.PartitionID = UniqueID(1000) fg, err := getFilterDMNode() assert.NoError(t, err) - col, err := fg.metaReplica.getCollectionByID(defaultCollectionID) - assert.NoError(t, err) - col.setLoadType(loadTypePartition) - - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypePartition) assert.NoError(t, err) assert.Nil(t, res) }) @@ -198,7 +178,7 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) { fg, err := getFilterDMNode() assert.NoError(t, err) fg.collectionID = UniqueID(1000) - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) }) @@ -208,7 +188,7 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) { fg, err := getFilterDMNode() assert.NoError(t, err) msg.Timestamps = make([]Timestamp, 0) - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.Error(t, err) assert.Nil(t, res) }) @@ -221,11 +201,11 @@ func TestFlowGraphFilterDmNode_filterInvalidDeleteMessage(t *testing.T) { msg.NumRows = 0 msg.Int64PrimaryKeys = make([]IntPrimaryKey, 0) msg.PrimaryKeys = &schemapb.IDs{} - res, err := fg.filterInvalidDeleteMessage(msg) + res, err := fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) msg.PrimaryKeys = storage.ParsePrimaryKeys2IDs([]primaryKey{}) - res, err = fg.filterInvalidDeleteMessage(msg) + res, err = fg.filterInvalidDeleteMessage(msg, loadTypeCollection) assert.NoError(t, err) assert.Nil(t, res) }) diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index b9f3448620..dd1376ddf4 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -43,7 +43,8 @@ import ( // insertNode is one of the nodes in query flow graph type insertNode struct { baseNode - metaReplica ReplicaInterface // streaming + collectionID UniqueID + metaReplica ReplicaInterface // streaming } // insertData stores the valid insert data @@ -103,6 +104,13 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { msg.SetTraceCtx(ctx) } + collection, err := iNode.metaReplica.getCollectionByID(iNode.collectionID) + if err != nil { + // QueryNode should add collection before start flow graph + panic(fmt.Errorf("%s getCollectionByID failed, collectionID = %d", iNode.Name(), iNode.collectionID)) + } + collection.RLock() + defer collection.RUnlock() // 1. hash insertMessages to insertData // sort timestamps ensures that the data in iData.insertRecords is sorted in ascending order of timestamp // avoiding re-sorting in segCore, which will need data copying @@ -111,14 +119,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { }) for _, insertMsg := range iMsg.insertMessages { // if loadType is loadCollection, check if partition exists, if not, create partition - col, err := iNode.metaReplica.getCollectionByID(insertMsg.CollectionID) - if err != nil { - // should not happen, QueryNode should create collection before start flow graph - err = fmt.Errorf("insertNode getCollectionByID failed, err = %s", err) - log.Error(err.Error()) - panic(err) - } - if col.getLoadType() == loadTypeCollection { + if collection.getLoadType() == loadTypeCollection { err = iNode.metaReplica.addPartition(insertMsg.CollectionID, insertMsg.PartitionID) if err != nil { // error occurs only when collection cannot be found, should not happen @@ -144,7 +145,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { } } - insertRecord, err := storage.TransferInsertMsgToInsertRecord(col.schema, insertMsg) + insertRecord, err := storage.TransferInsertMsgToInsertRecord(collection.schema, insertMsg) if err != nil { // occurs only when schema doesn't have dim param, this should not happen err = fmt.Errorf("failed to transfer msgStream.insertMsg to storage.InsertRecord, err = %s", err) @@ -503,7 +504,7 @@ func getPKsFromColumnBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.C } // newInsertNode returns a new insertNode -func newInsertNode(metaReplica ReplicaInterface) *insertNode { +func newInsertNode(metaReplica ReplicaInterface, collectionID UniqueID) *insertNode { maxQueueLength := Params.QueryNodeCfg.FlowGraphMaxQueueLength maxParallelism := Params.QueryNodeCfg.FlowGraphMaxParallelism @@ -512,7 +513,8 @@ func newInsertNode(metaReplica ReplicaInterface) *insertNode { baseNode.SetMaxParallelism(maxParallelism) return &insertNode{ - baseNode: baseNode, - metaReplica: metaReplica, + baseNode: baseNode, + collectionID: collectionID, + metaReplica: metaReplica, } } diff --git a/internal/querynode/flow_graph_insert_node_test.go b/internal/querynode/flow_graph_insert_node_test.go index 686e2c79dd..99f8a64e40 100644 --- a/internal/querynode/flow_graph_insert_node_test.go +++ b/internal/querynode/flow_graph_insert_node_test.go @@ -48,7 +48,7 @@ func getInsertNode() (*insertNode, error) { return nil, err } - return newInsertNode(streaming), nil + return newInsertNode(streaming, defaultCollectionID), nil } func genFlowGraphInsertData(schema *schemapb.CollectionSchema, numRows int) (*insertData, error) { @@ -128,7 +128,7 @@ func TestFlowGraphInsertNode_insert(t *testing.T) { t.Run("test no target segment", func(t *testing.T) { streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(streaming) + insertNode := newInsertNode(streaming, defaultCollectionID) wg := &sync.WaitGroup{} wg.Add(1) err = insertNode.insert(nil, defaultSegmentID, wg) @@ -203,7 +203,7 @@ func TestFlowGraphInsertNode_delete(t *testing.T) { t.Run("test no target segment", func(t *testing.T) { streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(streaming) + insertNode := newInsertNode(streaming, defaultCollectionID) wg := &sync.WaitGroup{} wg.Add(1) err = insertNode.delete(nil, defaultSegmentID, wg) @@ -338,7 +338,7 @@ func TestFlowGraphInsertNode_operate(t *testing.T) { t.Run("test getCollectionByID failed", func(t *testing.T) { streaming, err := genSimpleReplica() assert.NoError(t, err) - insertNode := newInsertNode(streaming) + insertNode := newInsertNode(streaming, defaultCollectionID) msg := []flowgraph.Msg{genInsertMsg()} diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index 4b3caee824..6da87dc64a 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -65,7 +65,7 @@ func newQueryNodeFlowGraph(ctx context.Context, return nil, err } var filterDmNode node = newFilteredDmNode(metaReplica, collectionID) - var insertNode node = newInsertNode(metaReplica) + var insertNode node = newInsertNode(metaReplica, collectionID) var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel) q.flowGraph.AddNode(dmStreamNode) @@ -135,7 +135,7 @@ func newQueryNodeDeltaFlowGraph(ctx context.Context, return nil, err } var filterDeleteNode node = newFilteredDeleteNode(metaReplica, collectionID) - var deleteNode node = newDeleteNode(metaReplica) + var deleteNode node = newDeleteNode(metaReplica, collectionID) var serviceTimeNode node = newServiceTimeNode(tSafeReplica, collectionID, channel) q.flowGraph.AddNode(dmStreamNode) diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 666e224da0..16b18f4f73 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -76,7 +76,6 @@ type IndexedFieldInfo struct { // Segment is a wrapper of the underlying C-structure segment. type Segment struct { - segPtrMu sync.RWMutex // guards segmentPtr segmentPtr C.CSegmentInterface segmentID UniqueID @@ -219,15 +218,11 @@ func deleteSegment(segment *Segment) { return } - segment.segPtrMu.Lock() - defer segment.segPtrMu.Unlock() cPtr := segment.segmentPtr C.DeleteSegment(cPtr) segment.segmentPtr = nil log.Info("delete segment from memory", zap.Int64("collectionID", segment.collectionID), zap.Int64("partitionID", segment.partitionID), zap.Int64("segmentID", segment.ID())) - - segment = nil } func (s *Segment) getRowCount() int64 { @@ -235,8 +230,6 @@ func (s *Segment) getRowCount() int64 { long int getRowCount(CSegmentInterface c_segment); */ - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() if s.segmentPtr == nil { return -1 } @@ -249,8 +242,6 @@ func (s *Segment) getDeletedCount() int64 { long int getDeletedCount(CSegmentInterface c_segment); */ - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() if s.segmentPtr == nil { return -1 } @@ -263,8 +254,6 @@ func (s *Segment) getMemSize() int64 { long int GetMemoryUsageInBytes(CSegmentInterface c_segment); */ - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() if s.segmentPtr == nil { return -1 } @@ -283,8 +272,6 @@ func (s *Segment) search(searchReq *searchRequest) (*SearchResult, error) { long int* result_ids, float* result_distances); */ - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() if s.segmentPtr == nil { return nil, errors.New("null seg core pointer") } @@ -317,8 +304,6 @@ func HandleCProto(cRes *C.CProto, msg proto.Message) error { } func (s *Segment) retrieve(plan *RetrievePlan) (*segcorepb.RetrieveResults, error) { - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() if s.segmentPtr == nil { return nil, errors.New("null seg core pointer") } @@ -566,8 +551,6 @@ func (s *Segment) segmentPreInsert(numOfRecords int) (int64, error) { long int PreInsert(CSegmentInterface c_segment, long int size); */ - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock if s.segmentType != segmentTypeGrowing { return 0, nil } @@ -585,16 +568,12 @@ func (s *Segment) segmentPreDelete(numOfRecords int) int64 { long int PreDelete(CSegmentInterface c_segment, long int size); */ - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock var offset = C.PreDelete(s.segmentPtr, C.int64_t(int64(numOfRecords))) return int64(offset) } func (s *Segment) segmentInsert(offset int64, entityIDs []UniqueID, timestamps []Timestamp, record *segcorepb.InsertRecord) error { - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock if s.segmentType != segmentTypeGrowing { return fmt.Errorf("unexpected segmentType when segmentInsert, segmentType = %s", s.segmentType.String()) } @@ -642,8 +621,6 @@ func (s *Segment) segmentDelete(offset int64, entityIDs []primaryKey, timestamps return fmt.Errorf("empty pks to delete") } - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock if s.segmentPtr == nil { return errors.New("null seg core pointer") } @@ -702,8 +679,6 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche CStatus LoadFieldData(CSegmentInterface c_segment, CLoadFieldDataInfo load_field_data_info); */ - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock if s.segmentPtr == nil { return errors.New("null seg core pointer") } @@ -738,8 +713,6 @@ func (s *Segment) segmentLoadFieldData(fieldID int64, rowCount int64, data *sche } func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps []Timestamp, rowCount int64) error { - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock if s.segmentPtr == nil { return errors.New("null seg core pointer") } @@ -812,8 +785,6 @@ func (s *Segment) segmentLoadIndexData(bytesIndex [][]byte, indexInfo *querypb.F return err } - s.segPtrMu.RLock() - defer s.segPtrMu.RUnlock() // thread safe guaranteed by segCore, use RLock if s.segmentPtr == nil { return errors.New("null seg core pointer") } diff --git a/internal/querynode/segment_test.go b/internal/querynode/segment_test.go index 1e3d1de2f4..55d520f5d6 100644 --- a/internal/querynode/segment_test.go +++ b/internal/querynode/segment_test.go @@ -21,7 +21,6 @@ import ( "fmt" "log" "math" - "sync" "testing" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -530,53 +529,6 @@ func TestSegment_segmentLoadFieldData(t *testing.T) { assert.NoError(t, err) } -func TestSegment_ConcurrentOperation(t *testing.T) { - const N = 16 - var ages = []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} - - ageData := &schemapb.FieldData{ - Type: simpleInt32Field.dataType, - FieldId: simpleInt32Field.id, - Field: &schemapb.FieldData_Scalars{ - Scalars: &schemapb.ScalarField{ - Data: &schemapb.ScalarField_IntData{ - IntData: &schemapb.IntArray{ - Data: ages, - }, - }, - }, - }, - } - - collectionID := UniqueID(0) - partitionID := UniqueID(0) - schema := genTestCollectionSchema() - collection := newCollection(collectionID, schema) - assert.Equal(t, collection.ID(), collectionID) - - wg := sync.WaitGroup{} - for i := 0; i < 100; i++ { - segmentID := UniqueID(i) - segment, err := newSegment(collection, segmentID, partitionID, collectionID, "", segmentTypeSealed) - assert.Equal(t, segmentID, segment.segmentID) - assert.Equal(t, partitionID, segment.partitionID) - assert.Nil(t, err) - - wg.Add(2) - go func() { - deleteSegment(segment) - wg.Done() - }() - go func() { - // segmentLoadFieldData result error may be nil or not, we just expected this test would not crash. - _ = segment.segmentLoadFieldData(simpleInt32Field.id, N, ageData) - wg.Done() - }() - } - wg.Wait() - deleteCollection(collection) -} - func TestSegment_indexInfo(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()