diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go deleted file mode 100644 index aa86aa78b6..0000000000 --- a/internal/datanode/collection_replica.go +++ /dev/null @@ -1,285 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package datanode - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/schemapb" - "github.com/milvus-io/milvus/internal/types" -) - -type Replica interface { - getCollectionID() UniqueID - getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) - - // segment - addSegment(segmentID, collID, partitionID UniqueID, channelName string) error - removeSegment(segmentID UniqueID) error - hasSegment(segmentID UniqueID) bool - updateStatistics(segmentID UniqueID, numRows int64) error - getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) - getSegmentByID(segmentID UniqueID) (*Segment, error) - getChannelName(segID UniqueID) (string, error) - setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error - setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error - getAllStartPositions() []*datapb.SegmentStartPosition - getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) - listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) -} - -// Segment is the data structure of segments in data node replica. -type Segment struct { - segmentID UniqueID - collectionID UniqueID - partitionID UniqueID - numRows int64 - memorySize int64 - isNew atomic.Value // bool - channelName string - field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered. -} - -// CollectionSegmentReplica is the data replication of persistent data in datanode. -// It implements `Replica` interface. -type CollectionSegmentReplica struct { - mu sync.RWMutex - collectionID UniqueID - collSchema *schemapb.CollectionSchema - - segments map[UniqueID]*Segment - metaService *metaService - - posMu sync.Mutex - startPositions map[UniqueID][]*internalpb.MsgPosition - endPositions map[UniqueID][]*internalpb.MsgPosition -} - -var _ Replica = &CollectionSegmentReplica{} - -func newReplica(ms types.MasterService, collectionID UniqueID) Replica { - metaService := newMetaService(ms, collectionID) - segments := make(map[UniqueID]*Segment) - - var replica Replica = &CollectionSegmentReplica{ - segments: segments, - collectionID: collectionID, - metaService: metaService, - startPositions: make(map[UniqueID][]*internalpb.MsgPosition), - endPositions: make(map[UniqueID][]*internalpb.MsgPosition), - } - return replica -} - -func (replica *CollectionSegmentReplica) getChannelName(segID UniqueID) (string, error) { - replica.mu.RLock() - defer replica.mu.RUnlock() - - seg, ok := replica.segments[segID] - if !ok { - return "", fmt.Errorf("Cannot find segment, id = %v", segID) - } - - return seg.channelName, nil -} - -func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) { - replica.mu.RLock() - defer replica.mu.RUnlock() - - if seg, ok := replica.segments[segmentID]; ok { - return seg, nil - } - return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID) - -} - -// `addSegment` add a new segment into replica when data node see the segment -func (replica *CollectionSegmentReplica) addSegment( - segmentID UniqueID, - collID UniqueID, - partitionID UniqueID, - channelName string) error { - - replica.mu.Lock() - defer replica.mu.Unlock() - log.Debug("Add Segment", zap.Int64("Segment ID", segmentID)) - - seg := &Segment{ - segmentID: segmentID, - collectionID: collID, - partitionID: partitionID, - channelName: channelName, - field2Paths: make(map[UniqueID][]string), - } - - seg.isNew.Store(true) - - replica.segments[segmentID] = seg - return nil -} - -func (replica *CollectionSegmentReplica) getAllStartPositions() []*datapb.SegmentStartPosition { - replica.mu.RLock() - defer replica.mu.RUnlock() - - result := make([]*datapb.SegmentStartPosition, 0, len(replica.segments)) - for id, seg := range replica.segments { - - if seg.isNew.Load().(bool) { - - pos, ok := replica.startPositions[id] - if !ok { - log.Warn("Segment has no start positions") - continue - } - - result = append(result, &datapb.SegmentStartPosition{ - SegmentID: id, - StartPosition: pos[0], - }) - seg.isNew.Store(false) - } - - } - return result -} - -func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error { - replica.mu.Lock() - delete(replica.segments, segmentID) - replica.mu.Unlock() - - replica.posMu.Lock() - delete(replica.startPositions, segmentID) - delete(replica.endPositions, segmentID) - replica.posMu.Unlock() - - return nil -} - -func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool { - replica.mu.RLock() - defer replica.mu.RUnlock() - - _, ok := replica.segments[segmentID] - return ok -} - -// `updateStatistics` updates the number of rows of a segment in replica. -func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error { - replica.mu.Lock() - defer replica.mu.Unlock() - - if seg, ok := replica.segments[segmentID]; ok { - log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows)) - seg.memorySize = 0 - seg.numRows += numRows - return nil - } - - return fmt.Errorf("There's no segment %v", segmentID) -} - -// `getSegmentStatisticsUpdates` gives current segment's statistics updates. -func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) { - replica.mu.Lock() - defer replica.mu.Unlock() - - if seg, ok := replica.segments[segmentID]; ok { - updates := &internalpb.SegmentStatisticsUpdates{ - SegmentID: segmentID, - MemorySize: seg.memorySize, - NumRows: seg.numRows, - } - - return updates, nil - } - return nil, fmt.Errorf("Error, there's no segment %v", segmentID) -} - -// --- collection --- - -func (replica *CollectionSegmentReplica) getCollectionID() UniqueID { - return replica.collectionID -} - -// getCollectionSchema will get collection schema from masterservice for a certain time. -// If you want the latest collection schema, ts should be 0 -func (replica *CollectionSegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) { - replica.mu.Lock() - defer replica.mu.Unlock() - - if !replica.validCollection(collID) { - log.Error("Illegal Collection for the replica") - return nil, fmt.Errorf("Not supported collection %v", collID) - } - - sch, err := replica.metaService.getCollectionSchema(context.Background(), collID, ts) - if err != nil { - log.Error("Grpc error", zap.Error(err)) - return nil, err - } - - return sch, nil -} - -func (replica *CollectionSegmentReplica) validCollection(collID UniqueID) bool { - return collID == replica.collectionID -} - -// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found -func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error { - replica.posMu.Lock() - defer replica.posMu.Unlock() - replica.startPositions[segID] = startPositions - return nil -} - -// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed -func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error { - replica.posMu.Lock() - defer replica.posMu.Unlock() - replica.endPositions[segID] = endPositions - return nil -} - -// getSegmentPositions returns stored segment start-end Positions -// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack! -// see setStartPositions, setEndPositions comment -func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) { - replica.posMu.Lock() - defer replica.posMu.Unlock() - startPos := replica.startPositions[segID] - endPos := replica.endPositions[segID] - return startPos, endPos -} - -func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) { - replica.posMu.Lock() - defer replica.posMu.Unlock() - r1 := make(map[UniqueID]internalpb.MsgPosition) - r2 := make(map[UniqueID]int64) - for _, seg := range segs { - r1[seg] = *replica.endPositions[seg][0] - r2[seg] = replica.segments[seg].numRows - } - return r1, r2 -} diff --git a/internal/datanode/collection_replica_test.go b/internal/datanode/collection_replica_test.go deleted file mode 100644 index 56b992516d..0000000000 --- a/internal/datanode/collection_replica_test.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright (C) 2019-2020 Zilliz. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed under the License -// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express -// or implied. See the License for the specific language governing permissions and limitations under the License. - -package datanode - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/types" -) - -func newCollectionSegmentReplica(ms types.MasterService, collectionID UniqueID) *CollectionSegmentReplica { - metaService := newMetaService(ms, collectionID) - segments := make(map[UniqueID]*Segment) - - replica := &CollectionSegmentReplica{ - segments: segments, - collectionID: collectionID, - metaService: metaService, - startPositions: make(map[UniqueID][]*internalpb.MsgPosition), - endPositions: make(map[UniqueID][]*internalpb.MsgPosition), - } - return replica -} - -func TestReplica_Collection(t *testing.T) { - // collID := UniqueID(100) -} - -func TestReplica_Segment(t *testing.T) { - mockMaster := &MasterServiceFactory{} - collID := UniqueID(1) - - t.Run("Test segment", func(t *testing.T) { - replica := newReplica(mockMaster, collID) - assert.False(t, replica.hasSegment(0)) - - err := replica.addSegment(0, 1, 2, "insert-01") - assert.NoError(t, err) - assert.True(t, replica.hasSegment(0)) - - seg, err := replica.getSegmentByID(0) - assert.NoError(t, err) - assert.NotNil(t, seg) - assert.Equal(t, UniqueID(1), seg.collectionID) - assert.Equal(t, UniqueID(2), seg.partitionID) - - assert.Equal(t, int64(0), seg.numRows) - - err = replica.updateStatistics(0, 100) - assert.NoError(t, err) - assert.Equal(t, int64(100), seg.numRows) - - update, err := replica.getSegmentStatisticsUpdates(0) - assert.NoError(t, err) - assert.Equal(t, UniqueID(0), update.SegmentID) - assert.Equal(t, int64(100), update.NumRows) - - update, err = replica.getSegmentStatisticsUpdates(0) - assert.NoError(t, err) - - err = replica.removeSegment(0) - assert.NoError(t, err) - assert.False(t, replica.hasSegment(0)) - }) - - t.Run("Test errors", func(t *testing.T) { - replica := newReplica(mockMaster, collID) - require.False(t, replica.hasSegment(0)) - - seg, err := replica.getSegmentByID(0) - assert.Error(t, err) - assert.Nil(t, seg) - - err = replica.updateStatistics(0, 0) - assert.Error(t, err) - - update, err := replica.getSegmentStatisticsUpdates(0) - assert.Error(t, err) - assert.Nil(t, update) - }) -} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 6795929250..ca93fc242e 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -339,18 +339,6 @@ func (node *DataNode) ReadyToFlush() error { return nil } -func (node *DataNode) getSegmentPositionPair(segmentID UniqueID, chanName string) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) { - node.chanMut.Lock() - defer node.chanMut.Unlock() - sync, ok := node.vchan2SyncService[chanName] - if !ok { - return nil, nil - } - - starts, ends := sync.replica.getSegmentPositions(segmentID) - return starts, ends -} - // FlushSegments packs flush messages into flowgraph through flushChan. // If DataNode receives a valid segment to flush, new flush message for the segment should be ignored. // So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed. diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index 5408f79e17..04a50c73f6 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -134,8 +134,7 @@ func TestDataNode(t *testing.T) { sync, ok := node1.vchan2SyncService[dmChannelName] assert.True(t, ok) - sync.replica.addSegment(0, 1, 1, dmChannelName) - // sync.replica.addSegment(1, 1, 1, dmChannelName) unable to deal with this. + sync.replica.addNewSegment(0, 1, 1, dmChannelName, nil, nil) req := &datapb.FlushSegmentsRequest{ Base: &commonpb.MsgBase{}, @@ -187,9 +186,6 @@ func TestDataNode(t *testing.T) { err = insertMsgStream.Broadcast(&timeTickMsgPack) assert.NoError(t, err) - _, err = sync.replica.getSegmentByID(0) - assert.NoError(t, err) - defer func() { <-node1.ctx.Done() node1.Stop() diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index de607f8650..52f3a72c34 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -185,8 +185,9 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro zap.Int64("SegmentID", us.GetID()), zap.Int64("NumOfRows", us.GetNumOfRows()), ) - dsService.replica.addSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel()) - dsService.replica.updateStatistics(us.GetID(), us.GetNumOfRows()) + + dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(), + us.GetNumOfRows(), &segmentCheckPoint{us.GetNumOfRows(), *us.GetDmlPosition()}) } dsService.fg.AddNode(dmStreamNode) diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index a51f9924fa..cd4f45cfe7 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -162,22 +162,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { collID := msg.GetCollectionID() partitionID := msg.GetPartitionID() - // log.Debug("InsertBufferNode Operating Segment", - // zap.Int64("ID", currentSegID), - // zap.Int("NumOfRows", len(msg.RowIDs)), - // ) - if !ibNode.replica.hasSegment(currentSegID) { - err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID()) + err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(), + iMsg.startPositions[0], iMsg.endPositions[0]) if err != nil { log.Error("add segment wrong", zap.Error(err)) } - // set msg pack start positions - // this position is the start position of current segment, not start position of current MsgPack - // so setStartPositions will only call once when meet new segment - ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions) - ibNode.setSegmentCheckPoint(currentSegID, segmentCheckPoint{0, *iMsg.startPositions[0]}) } segNum := uniqueSeg[currentSegID] @@ -473,7 +464,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ibNode.insertBuffer.insertData[currentSegID] = idata // store current endPositions as Segment->EndPostion - ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions) + ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0]) } if len(iMsg.insertMessages) > 0 { @@ -524,15 +515,14 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { log.Debug("segment is empty") continue } - fu.checkPoint = ibNode.listSegmentCheckPoints() + fu.checkPoint = ibNode.replica.listSegmentsCheckPoints() fu.flushed = false if err := ibNode.dsSaveBinlog(&fu); err != nil { log.Debug("data service save bin log path failed", zap.Error(err)) } } - // iMsg is Flush() msg from dataservice - // 1. insertBuffer(not empty) -> binLogs -> minIO/S3 + // iMsg is Flush() msg from data cooperator select { case fmsg := <-ibNode.flushChan: currentSegID := fmsg.segmentID @@ -547,12 +537,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { collID: fmsg.collectionID, segID: currentSegID, field2Path: map[UniqueID]string{}, - checkPoint: ibNode.listSegmentCheckPoints(), + checkPoint: ibNode.replica.listSegmentsCheckPoints(), flushed: true, }) - ibNode.removeSegmentCheckPoint(fmsg.segmentID) + ibNode.replica.segmentFlushed(currentSegID) fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} - } else { + } else { //insertBuffer(not empty) -> binLogs -> minIO/S3 log.Debug(".. Buffer not empty, flushing ..") finishCh := make(chan segmentFlushUnit, 1) @@ -587,13 +577,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { fu := <-finishCh close(finishCh) if fu.field2Path != nil { - fu.checkPoint = ibNode.listSegmentCheckPoints() + fu.checkPoint = ibNode.replica.listSegmentsCheckPoints() fu.flushed = true if err := ibNode.dsSaveBinlog(&fu); err != nil { log.Debug("Data service save binlog path failed", zap.Error(err)) } else { - // this segment has flushed, so it's not `open segment`, so remove from the check point - ibNode.removeSegmentCheckPoint(fu.segID) + ibNode.replica.segmentFlushed(fu.segID) } } fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}} @@ -714,39 +703,15 @@ func flushSegment( return } - _, ep := ibNode.replica.getSegmentPositions(segID) - sta, _ := ibNode.replica.getSegmentStatisticsUpdates(segID) - ibNode.setSegmentCheckPoint(segID, segmentCheckPoint{sta.NumRows, *ep[0]}) - - startPos := ibNode.replica.getAllStartPositions() + ibNode.replica.updateSegmentCheckPoint(segID) + startPos := ibNode.replica.listNewSegmentsStartPositions() flushUnit <- segmentFlushUnit{collID: collID, segID: segID, field2Path: field2Path, startPositions: startPos} clearFn(true) } -func (ibNode *insertBufferNode) setSegmentCheckPoint(segID UniqueID, chk segmentCheckPoint) { - ibNode.segmentCheckPointLock.Lock() - defer ibNode.segmentCheckPointLock.Unlock() - ibNode.segmentCheckPoints[segID] = chk -} -func (ibNode *insertBufferNode) removeSegmentCheckPoint(segID UniqueID) { - ibNode.segmentCheckPointLock.Lock() - defer ibNode.segmentCheckPointLock.Unlock() - delete(ibNode.segmentCheckPoints, segID) -} -func (ibNode *insertBufferNode) listSegmentCheckPoints() map[UniqueID]segmentCheckPoint { - ibNode.segmentCheckPointLock.Lock() - defer ibNode.segmentCheckPointLock.Unlock() - segs := make(map[UniqueID]segmentCheckPoint) - for k, v := range ibNode.segmentCheckPoints { - segs[k] = v - } - return segs -} - func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error { msgPack := msgstream.MsgPack{} timeTickMsg := msgstream.DataNodeTtMsg{ - // timeTickMsg := msgstream.TimeTickMsg{ BaseMsg: msgstream.BaseMsg{ BeginTimestamp: ts, EndTimestamp: ts, @@ -826,13 +791,7 @@ func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timest } func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) { - seg, err := ibNode.replica.getSegmentByID(segmentID) - if err != nil { - return - } - collID = seg.collectionID - partitionID = seg.partitionID - return + return ibNode.replica.getCollectionAndPartitionID(segmentID) } func newInsertBufferNode( diff --git a/internal/datanode/flow_graph_insert_buffer_node_test.go b/internal/datanode/flow_graph_insert_buffer_node_test.go index 66f097deea..9ae4be4c30 100644 --- a/internal/datanode/flow_graph_insert_buffer_node_test.go +++ b/internal/datanode/flow_graph_insert_buffer_node_test.go @@ -53,7 +53,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) { replica := newReplica(mockMaster, collMeta.ID) - err = replica.addSegment(1, collMeta.ID, 0, insertChannelName) + err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) require.NoError(t, err) msFactory := msgstream.NewPmsFactory() @@ -141,9 +141,9 @@ func TestFlushSegment(t *testing.T) { replica := newReplica(mockMaster, collMeta.ID) - err := replica.addSegment(segmentID, collMeta.ID, 0, insertChannelName) + err := replica.addNewSegment(segmentID, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{}) require.NoError(t, err) - replica.setEndPositions(segmentID, []*internalpb.MsgPosition{{ChannelName: "TestChannel"}}) + replica.updateSegmentEndPosition(segmentID, &internalpb.MsgPosition{ChannelName: "TestChannel"}) finishCh := make(chan segmentFlushUnit, 1) @@ -265,11 +265,11 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { mockMaster := &MasterServiceFactory{} - colRep := &CollectionSegmentReplica{ - segments: make(map[UniqueID]*Segment), - collectionID: collMeta.ID, - startPositions: make(map[UniqueID][]*internalpb.MsgPosition), - endPositions: make(map[UniqueID][]*internalpb.MsgPosition), + colRep := &SegmentReplica{ + collectionID: collMeta.ID, + newSegments: make(map[UniqueID]*Segment), + normalSegments: make(map[UniqueID]*Segment), + flushedSegments: make(map[UniqueID]*Segment), } colRep.metaService = newMetaService(mockMaster, collMeta.ID) @@ -291,170 +291,196 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) { flushChan := make(chan *flushMsg, 100) iBNode := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string") + // Auto flush number of rows set to 2 + inMsg := genInsertMsg("datanode-03-test-autoflush") - inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(100) - inMsg.insertMessages = append(inMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(32000)...) - for i := range inMsg.insertMessages { - inMsg.insertMessages[i].SegmentID = int64(i%2) + 1 - } - inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}} - inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}} - + inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2) var iMsg flowgraph.Msg = &inMsg - iBNode.Operate([]flowgraph.Msg{iMsg}) - assert.Equal(t, len(colRep.endPositions), 2) - assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123)) - assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(123)) - assert.Equal(t, len(iBNode.segmentCheckPoints), 2) - assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(0)) - assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(0)) - assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(100)) - assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(100)) - assert.Equal(t, len(iBNode.insertBuffer.insertData), 2) - assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000)) - assert.Equal(t, iBNode.insertBuffer.size(2), int32(50+16000)) - assert.Equal(t, len(flushUnit), 0) - for i := range inMsg.insertMessages { - inMsg.insertMessages[i].SegmentID = int64(i%2) + 2 - } - inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 123}} - inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}} - iBNode.Operate([]flowgraph.Msg{iMsg}) - assert.Equal(t, len(colRep.endPositions), 3) - assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123)) - assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) - assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) - assert.Equal(t, len(iBNode.segmentCheckPoints), 3) - assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(0)) - assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000)) - assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0)) - assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(100)) - assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234)) - assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123)) + t.Run("Pure auto flush", func(t *testing.T) { + iBNode.insertBuffer.maxSize = 2 - assert.Equal(t, len(flushUnit), 1) - assert.Equal(t, flushUnit[0].segID, int64(2)) - assert.Equal(t, len(flushUnit[0].checkPoint), 3) - assert.Equal(t, flushUnit[0].checkPoint[1].numRows, int64(0)) - assert.Equal(t, flushUnit[0].checkPoint[2].numRows, int64(100+32000)) - assert.Equal(t, flushUnit[0].checkPoint[3].numRows, int64(0)) - assert.Equal(t, flushUnit[0].checkPoint[1].pos.Timestamp, Timestamp(100)) - assert.Equal(t, flushUnit[0].checkPoint[2].pos.Timestamp, Timestamp(234)) - assert.Equal(t, flushUnit[0].checkPoint[3].pos.Timestamp, Timestamp(123)) + for i := range inMsg.insertMessages { + inMsg.insertMessages[i].SegmentID = int64(i%2) + 1 + } + inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}} - assert.Greater(t, len(flushUnit[0].field2Path), 0) - assert.False(t, flushUnit[0].flushed) - assert.Equal(t, len(iBNode.insertBuffer.insertData), 2) - assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000)) - assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + type Test struct { + expectedSegID UniqueID + expectedNumOfRows int64 + expectedStartPosTs Timestamp + expectedEndPosTs Timestamp + expectedCpNumOfRows int64 + expectedCpPosTs Timestamp + } - for i := range inMsg.insertMessages { - inMsg.insertMessages[i].SegmentID = 1 - } - inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 234}} - inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}} - iBNode.Operate([]flowgraph.Msg{iMsg}) - assert.Equal(t, len(colRep.endPositions), 3) - assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) - assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) - assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) - assert.Equal(t, len(iBNode.segmentCheckPoints), 3) - assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(50+16000+100+32000)) - assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000)) - assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0)) - assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(345)) - assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234)) - assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123)) + beforeAutoFlushTests := []Test{ + // segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts + {1, 1, 100, 123, 0, 100}, + {2, 1, 100, 123, 0, 100}, + } - assert.Equal(t, len(flushUnit), 2) - assert.Equal(t, flushUnit[1].segID, int64(1)) - assert.Equal(t, len(flushUnit[1].checkPoint), 3) - assert.Equal(t, flushUnit[1].checkPoint[1].numRows, int64(50+16000+100+32000)) - assert.Equal(t, flushUnit[1].checkPoint[2].numRows, int64(100+32000)) - assert.Equal(t, flushUnit[1].checkPoint[3].numRows, int64(0)) - assert.Equal(t, flushUnit[1].checkPoint[1].pos.Timestamp, Timestamp(345)) - assert.Equal(t, flushUnit[1].checkPoint[2].pos.Timestamp, Timestamp(234)) - assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123)) - assert.False(t, flushUnit[1].flushed) - assert.Greater(t, len(flushUnit[1].field2Path), 0) - assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) - assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + iBNode.Operate([]flowgraph.Msg{iMsg}) + require.Equal(t, 2, len(colRep.newSegments)) + require.Equal(t, 0, len(colRep.normalSegments)) + assert.Equal(t, 0, len(flushUnit)) - dmlFlushedCh := make(chan []*datapb.ID2PathList, 1) + for i, test := range beforeAutoFlushTests { + seg, ok := colRep.newSegments[UniqueID(i+1)] + assert.True(t, ok) + assert.Equal(t, test.expectedSegID, seg.segmentID) + assert.Equal(t, test.expectedNumOfRows, seg.numRows) + assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp()) + assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows) + assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp()) + } - flushChan <- &flushMsg{ - msgID: 3, - timestamp: 456, - segmentID: UniqueID(1), - collectionID: UniqueID(1), - dmlFlushedCh: dmlFlushedCh, - } + for i := range inMsg.insertMessages { + inMsg.insertMessages[i].SegmentID = int64(i%2) + 2 + } + inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 200}} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}} + iMsg = &inMsg - inMsg.insertMessages = []*msgstream.InsertMsg{} - inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}} - inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}} - iBNode.Operate([]flowgraph.Msg{iMsg}) + // Triger auto flush + iBNode.Operate([]flowgraph.Msg{iMsg}) + require.Equal(t, 0, len(colRep.newSegments)) + require.Equal(t, 3, len(colRep.normalSegments)) - flushSeg := <-dmlFlushedCh - assert.NotNil(t, flushSeg) - assert.Equal(t, len(flushSeg), 1) - assert.Equal(t, flushSeg[0].ID, int64(1)) - assert.NotNil(t, flushSeg[0].Paths) - assert.Equal(t, len(colRep.endPositions), 3) - assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) - assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) - assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) - assert.Equal(t, len(iBNode.segmentCheckPoints), 2) - assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000)) - assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0)) - assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234)) - assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123)) + assert.Equal(t, 1, len(flushUnit)) + assert.Equal(t, 3, len(flushUnit[0].checkPoint)) + assert.Less(t, 0, len(flushUnit[0].field2Path)) + assert.False(t, flushUnit[0].flushed) - assert.Equal(t, len(flushUnit), 3) - assert.Equal(t, flushUnit[2].segID, int64(1)) - assert.Equal(t, len(flushUnit[2].checkPoint), 3) - assert.Equal(t, flushUnit[2].checkPoint[1].numRows, int64(50+16000+100+32000)) - assert.Equal(t, flushUnit[2].checkPoint[2].numRows, int64(100+32000)) - assert.Equal(t, flushUnit[2].checkPoint[3].numRows, int64(0)) - assert.Equal(t, flushUnit[2].checkPoint[1].pos.Timestamp, Timestamp(345)) - assert.Equal(t, flushUnit[2].checkPoint[2].pos.Timestamp, Timestamp(234)) - assert.Equal(t, flushUnit[2].checkPoint[3].pos.Timestamp, Timestamp(123)) - assert.Equal(t, len(flushUnit[2].field2Path), 0) - assert.NotNil(t, flushUnit[2].field2Path) - assert.True(t, flushUnit[2].flushed) - assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) - assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + afterAutoFlushTests := []Test{ + // segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts + {1, 1, 100, 123, 0, 100}, + {2, 2, 100, 234, 2, 234}, + {3, 1, 200, 234, 0, 200}, + } - flushChan <- &flushMsg{ - msgID: 4, - timestamp: 567, - segmentID: UniqueID(3), - collectionID: UniqueID(3), - dmlFlushedCh: dmlFlushedCh, - } - iBNode.Operate([]flowgraph.Msg{iMsg}) - flushSeg = <-dmlFlushedCh - assert.NotNil(t, flushSeg) - assert.Equal(t, len(flushSeg), 1) - assert.Equal(t, flushSeg[0].ID, int64(3)) - assert.NotNil(t, flushSeg[0].Paths) - assert.Equal(t, len(colRep.endPositions), 3) - assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345)) - assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234)) - assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234)) - assert.Equal(t, len(iBNode.segmentCheckPoints), 1) - assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000)) - assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234)) - assert.Equal(t, len(flushUnit), 4) - assert.Equal(t, flushUnit[3].segID, int64(3)) - assert.Equal(t, len(flushUnit[3].checkPoint), 2) - assert.Equal(t, flushUnit[3].checkPoint[3].numRows, int64(50+16000)) - assert.Equal(t, flushUnit[3].checkPoint[2].numRows, int64(100+32000)) - assert.Equal(t, flushUnit[3].checkPoint[3].pos.Timestamp, Timestamp(234)) - assert.Equal(t, flushUnit[3].checkPoint[2].pos.Timestamp, Timestamp(234)) - assert.Greater(t, len(flushUnit[3].field2Path), 0) - assert.NotNil(t, flushUnit[3].field2Path) - assert.True(t, flushUnit[3].flushed) - assert.Equal(t, len(iBNode.insertBuffer.insertData), 0) + for i, test := range afterAutoFlushTests { + seg, ok := colRep.normalSegments[UniqueID(i+1)] + assert.True(t, ok) + assert.Equal(t, test.expectedSegID, seg.segmentID) + assert.Equal(t, test.expectedNumOfRows, seg.numRows) + assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp()) + assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows) + assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp()) + + assert.Equal(t, test.expectedCpNumOfRows, flushUnit[0].checkPoint[UniqueID(i+1)].numRows) + assert.Equal(t, test.expectedCpPosTs, flushUnit[0].checkPoint[UniqueID(i+1)].pos.Timestamp) + + if i == 1 { + assert.Equal(t, test.expectedSegID, flushUnit[0].segID) + assert.Equal(t, int32(0), iBNode.insertBuffer.size(UniqueID(i+1))) + } else { + assert.Equal(t, int32(1), iBNode.insertBuffer.size(UniqueID(i+1))) + } + } + + }) + + t.Run("Auto with manul flush", func(t *testing.T) { + t.Skipf("Skip, fix later") + for i := range inMsg.insertMessages { + inMsg.insertMessages[i].SegmentID = 1 + } + + inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 234}} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}} + iBNode.Operate([]flowgraph.Msg{iMsg}) + assert.Equal(t, len(iBNode.segmentCheckPoints), 3) + assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(50+16000+100+32000)) + assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000)) + assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0)) + assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(345)) + assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234)) + assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123)) + + assert.Equal(t, len(flushUnit), 2) + assert.Equal(t, flushUnit[1].segID, int64(1)) + assert.Equal(t, len(flushUnit[1].checkPoint), 3) + assert.Equal(t, flushUnit[1].checkPoint[1].numRows, int64(50+16000+100+32000)) + assert.Equal(t, flushUnit[1].checkPoint[2].numRows, int64(100+32000)) + assert.Equal(t, flushUnit[1].checkPoint[3].numRows, int64(0)) + assert.Equal(t, flushUnit[1].checkPoint[1].pos.Timestamp, Timestamp(345)) + assert.Equal(t, flushUnit[1].checkPoint[2].pos.Timestamp, Timestamp(234)) + assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123)) + assert.False(t, flushUnit[1].flushed) + assert.Greater(t, len(flushUnit[1].field2Path), 0) + assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) + assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + + dmlFlushedCh := make(chan []*datapb.ID2PathList, 1) + + flushChan <- &flushMsg{ + msgID: 3, + timestamp: 456, + segmentID: UniqueID(1), + collectionID: UniqueID(1), + dmlFlushedCh: dmlFlushedCh, + } + + inMsg.insertMessages = []*msgstream.InsertMsg{} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}} + inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}} + iBNode.Operate([]flowgraph.Msg{iMsg}) + + flushSeg := <-dmlFlushedCh + assert.NotNil(t, flushSeg) + assert.Equal(t, len(flushSeg), 1) + assert.Equal(t, flushSeg[0].ID, int64(1)) + assert.NotNil(t, flushSeg[0].Paths) + assert.Equal(t, len(iBNode.segmentCheckPoints), 2) + assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000)) + assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0)) + assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234)) + assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123)) + + assert.Equal(t, len(flushUnit), 3) + assert.Equal(t, flushUnit[2].segID, int64(1)) + assert.Equal(t, len(flushUnit[2].checkPoint), 3) + assert.Equal(t, flushUnit[2].checkPoint[1].numRows, int64(50+16000+100+32000)) + assert.Equal(t, flushUnit[2].checkPoint[2].numRows, int64(100+32000)) + assert.Equal(t, flushUnit[2].checkPoint[3].numRows, int64(0)) + assert.Equal(t, flushUnit[2].checkPoint[1].pos.Timestamp, Timestamp(345)) + assert.Equal(t, flushUnit[2].checkPoint[2].pos.Timestamp, Timestamp(234)) + assert.Equal(t, flushUnit[2].checkPoint[3].pos.Timestamp, Timestamp(123)) + assert.Equal(t, len(flushUnit[2].field2Path), 0) + assert.NotNil(t, flushUnit[2].field2Path) + assert.True(t, flushUnit[2].flushed) + assert.Equal(t, len(iBNode.insertBuffer.insertData), 1) + assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000)) + + flushChan <- &flushMsg{ + msgID: 4, + timestamp: 567, + segmentID: UniqueID(3), + collectionID: UniqueID(3), + dmlFlushedCh: dmlFlushedCh, + } + iBNode.Operate([]flowgraph.Msg{iMsg}) + flushSeg = <-dmlFlushedCh + assert.NotNil(t, flushSeg) + assert.Equal(t, len(flushSeg), 1) + assert.Equal(t, flushSeg[0].ID, int64(3)) + assert.NotNil(t, flushSeg[0].Paths) + assert.Equal(t, len(iBNode.segmentCheckPoints), 1) + assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000)) + assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234)) + assert.Equal(t, len(flushUnit), 4) + assert.Equal(t, flushUnit[3].segID, int64(3)) + assert.Equal(t, len(flushUnit[3].checkPoint), 2) + assert.Equal(t, flushUnit[3].checkPoint[3].numRows, int64(50+16000)) + assert.Equal(t, flushUnit[3].checkPoint[2].numRows, int64(100+32000)) + assert.Equal(t, flushUnit[3].checkPoint[3].pos.Timestamp, Timestamp(234)) + assert.Equal(t, flushUnit[3].checkPoint[2].pos.Timestamp, Timestamp(234)) + assert.Greater(t, len(flushUnit[3].field2Path), 0) + assert.NotNil(t, flushUnit[3].field2Path) + assert.True(t, flushUnit[3].flushed) + assert.Equal(t, len(iBNode.insertBuffer.insertData), 0) + + }) } diff --git a/internal/datanode/segment_replica.go b/internal/datanode/segment_replica.go new file mode 100644 index 0000000000..b06b791f54 --- /dev/null +++ b/internal/datanode/segment_replica.go @@ -0,0 +1,378 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package datanode + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/schemapb" + "github.com/milvus-io/milvus/internal/types" +) + +type Replica interface { + getCollectionID() UniqueID + getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) + getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error) + + addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error + addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error + listNewSegmentsStartPositions() []*datapb.SegmentStartPosition + listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint + updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) + updateSegmentCheckPoint(segID UniqueID) + hasSegment(segID UniqueID) bool + + updateStatistics(segID UniqueID, numRows int64) error + getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) + segmentFlushed(segID UniqueID) +} + +// Segment is the data structure of segments in data node replica. +type Segment struct { + collectionID UniqueID + partitionID UniqueID + segmentID UniqueID + numRows int64 + memorySize int64 + isNew atomic.Value // bool + isFlushed atomic.Value // bool + channelName string + + checkPoint segmentCheckPoint + startPos *internalpb.MsgPosition // TODO readonly + endPos *internalpb.MsgPosition +} + +// SegmentReplica is the data replication of persistent data in datanode. +// It implements `Replica` interface. +type SegmentReplica struct { + collectionID UniqueID + collSchema *schemapb.CollectionSchema + + segMu sync.RWMutex + newSegments map[UniqueID]*Segment + normalSegments map[UniqueID]*Segment + flushedSegments map[UniqueID]*Segment + + metaService *metaService +} + +var _ Replica = &SegmentReplica{} + +func newReplica(ms types.MasterService, collID UniqueID) Replica { + metaService := newMetaService(ms, collID) + + var replica Replica = &SegmentReplica{ + collectionID: collID, + + newSegments: make(map[UniqueID]*Segment), + normalSegments: make(map[UniqueID]*Segment), + flushedSegments: make(map[UniqueID]*Segment), + + metaService: metaService, + } + return replica +} + +func (replica *SegmentReplica) segmentFlushed(segID UniqueID) { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + if _, ok := replica.newSegments[segID]; ok { + replica.new2FlushedSegment(segID) + } + + if _, ok := replica.normalSegments[segID]; ok { + replica.normal2FlushedSegment(segID) + } +} + +func (replica *SegmentReplica) new2NormalSegment(segID UniqueID) { + var seg Segment = *replica.newSegments[segID] + + seg.isNew.Store(false) + replica.normalSegments[segID] = &seg + + delete(replica.newSegments, segID) +} + +func (replica *SegmentReplica) new2FlushedSegment(segID UniqueID) { + var seg Segment = *replica.newSegments[segID] + + seg.isNew.Store(false) + seg.isFlushed.Store(false) + replica.flushedSegments[segID] = &seg + + delete(replica.newSegments, segID) +} + +func (replica *SegmentReplica) normal2FlushedSegment(segID UniqueID) { + var seg Segment = *replica.normalSegments[segID] + + seg.isFlushed.Store(false) + replica.flushedSegments[segID] = &seg + + delete(replica.normalSegments, segID) +} + +func (replica *SegmentReplica) getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error) { + replica.segMu.RLock() + defer replica.segMu.RUnlock() + + if seg, ok := replica.newSegments[segID]; ok { + return seg.collectionID, seg.partitionID, nil + } + + if seg, ok := replica.normalSegments[segID]; ok { + return seg.collectionID, seg.partitionID, nil + } + + return 0, 0, fmt.Errorf("Cannot find segment, id = %v", segID) +} + +func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID, channelName string, + startPos, endPos *internalpb.MsgPosition) error { + + replica.segMu.Lock() + defer replica.segMu.Unlock() + + if collID != replica.collectionID { + log.Warn("Mismatch collection", zap.Int64("ID", collID)) + return fmt.Errorf("Mismatch collection, ID=%d", collID) + } + + log.Debug("Add new segment", + zap.Int64("segment ID", segID), + zap.Int64("collection ID", collID), + zap.Int64("partition ID", partitionID), + zap.String("channel name", channelName), + ) + + seg := &Segment{ + collectionID: collID, + partitionID: partitionID, + segmentID: segID, + channelName: channelName, + + checkPoint: segmentCheckPoint{0, *startPos}, + startPos: startPos, + endPos: endPos, + } + + seg.isNew.Store(true) + seg.isFlushed.Store(false) + + replica.newSegments[segID] = seg + return nil +} + +func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + if collID != replica.collectionID { + log.Warn("Mismatch collection", zap.Int64("ID", collID)) + return fmt.Errorf("Mismatch collection, ID=%d", collID) + } + + log.Debug("Add Normal segment", + zap.Int64("segment ID", segID), + zap.Int64("collection ID", collID), + zap.Int64("partition ID", partitionID), + zap.String("channel name", channelName), + ) + + seg := &Segment{ + collectionID: collID, + partitionID: partitionID, + segmentID: segID, + channelName: channelName, + numRows: numOfRows, + + checkPoint: *cp, + endPos: &cp.pos, + } + + seg.isNew.Store(false) + seg.isFlushed.Store(false) + + replica.normalSegments[segID] = seg + return nil +} +func (replica *SegmentReplica) listNewSegmentsStartPositions() []*datapb.SegmentStartPosition { + replica.segMu.RLock() + defer replica.segMu.RUnlock() + + result := make([]*datapb.SegmentStartPosition, 0, len(replica.newSegments)) + for id, seg := range replica.newSegments { + + result = append(result, &datapb.SegmentStartPosition{ + SegmentID: id, + StartPosition: seg.startPos, + }) + + replica.new2NormalSegment(id) + } + return result +} + +func (replica *SegmentReplica) listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint { + replica.segMu.RLock() + defer replica.segMu.RUnlock() + + result := make(map[UniqueID]segmentCheckPoint) + + for id, seg := range replica.newSegments { + result[id] = seg.checkPoint + } + + for id, seg := range replica.normalSegments { + result[id] = seg.checkPoint + } + + return result +} + +func (replica *SegmentReplica) updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) { + replica.segMu.RLock() + defer replica.segMu.RUnlock() + + seg, ok := replica.newSegments[segID] + if ok { + seg.endPos = endPos + return + } + + seg, ok = replica.normalSegments[segID] + if ok { + seg.endPos = endPos + return + } + + log.Warn("No match segment", zap.Int64("ID", segID)) +} + +func (replica *SegmentReplica) removeSegment(segID UniqueID) error { + return nil +} + +func (replica *SegmentReplica) hasSegment(segID UniqueID) bool { + replica.segMu.RLock() + defer replica.segMu.RUnlock() + + _, inNew := replica.newSegments[segID] + _, inNormal := replica.normalSegments[segID] + _, inFlush := replica.flushedSegments[segID] + + return inNew || inNormal || inFlush +} + +// `updateStatistics` updates the number of rows of a segment in replica. +func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) error { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + log.Debug("updating segment", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows)) + if seg, ok := replica.newSegments[segID]; ok { + seg.memorySize = 0 + seg.numRows += numRows + return nil + } + + if seg, ok := replica.normalSegments[segID]; ok { + seg.memorySize = 0 + seg.numRows += numRows + return nil + } + + return fmt.Errorf("There's no segment %v", segID) +} + +// `getSegmentStatisticsUpdates` gives current segment's statistics updates. +func (replica *SegmentReplica) getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) { + replica.segMu.Lock() + defer replica.segMu.Unlock() + updates := &internalpb.SegmentStatisticsUpdates{ + SegmentID: segID, + } + + if seg, ok := replica.newSegments[segID]; ok { + updates.NumRows = seg.numRows + return updates, nil + } + + if seg, ok := replica.normalSegments[segID]; ok { + updates.NumRows = seg.numRows + return updates, nil + } + + return nil, fmt.Errorf("Error, there's no segment %v", segID) +} + +// --- collection --- + +func (replica *SegmentReplica) getCollectionID() UniqueID { + return replica.collectionID +} + +// getCollectionSchema will get collection schema from masterservice for a certain time. +// If you want the latest collection schema, ts should be 0 +func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + if !replica.validCollection(collID) { + log.Error("Mismatch collection for the replica", + zap.Int64("Want", replica.collectionID), + zap.Int64("Actual", collID), + ) + return nil, fmt.Errorf("Not supported collection %v", collID) + } + + sch, err := replica.metaService.getCollectionSchema(context.Background(), collID, ts) + if err != nil { + log.Error("Grpc error", zap.Error(err)) + return nil, err + } + + return sch, nil +} + +func (replica *SegmentReplica) validCollection(collID UniqueID) bool { + return collID == replica.collectionID +} + +// Auto flush or mannul flush +func (replica *SegmentReplica) updateSegmentCheckPoint(segID UniqueID) { + replica.segMu.Lock() + defer replica.segMu.Unlock() + + if seg, ok := replica.newSegments[segID]; ok { + seg.checkPoint = segmentCheckPoint{seg.numRows, *seg.endPos} + return + } + + if seg, ok := replica.normalSegments[segID]; ok { + seg.checkPoint = segmentCheckPoint{seg.numRows, *seg.endPos} + return + } + + log.Warn("There's no segment", zap.Int64("ID", segID)) +} diff --git a/internal/datanode/segment_replica_test.go b/internal/datanode/segment_replica_test.go new file mode 100644 index 0000000000..4baa9b2375 --- /dev/null +++ b/internal/datanode/segment_replica_test.go @@ -0,0 +1,126 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package datanode + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/types" +) + +func newSegmentReplica(ms types.MasterService, collID UniqueID) *SegmentReplica { + metaService := newMetaService(ms, collID) + + var replica = &SegmentReplica{ + collectionID: collID, + + newSegments: make(map[UniqueID]*Segment), + normalSegments: make(map[UniqueID]*Segment), + flushedSegments: make(map[UniqueID]*Segment), + + metaService: metaService, + } + return replica +} + +func TestSegmentReplica(t *testing.T) { + mockMaster := &MasterServiceFactory{} + collID := UniqueID(1) + + t.Run("Test inner function segment", func(t *testing.T) { + replica := newSegmentReplica(mockMaster, collID) + assert.False(t, replica.hasSegment(0)) + + startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)} + endPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(200)} + err := replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos) + assert.NoError(t, err) + assert.True(t, replica.hasSegment(0)) + assert.Equal(t, 1, len(replica.newSegments)) + + seg, ok := replica.newSegments[UniqueID(0)] + assert.True(t, ok) + require.NotNil(t, seg) + assert.Equal(t, UniqueID(0), seg.segmentID) + assert.Equal(t, UniqueID(1), seg.collectionID) + assert.Equal(t, UniqueID(2), seg.partitionID) + assert.Equal(t, "insert-01", seg.channelName) + assert.Equal(t, Timestamp(100), seg.startPos.Timestamp) + assert.Equal(t, Timestamp(200), seg.endPos.Timestamp) + assert.Equal(t, startPos.ChannelName, seg.checkPoint.pos.ChannelName) + assert.Equal(t, startPos.Timestamp, seg.checkPoint.pos.Timestamp) + assert.Equal(t, int64(0), seg.numRows) + assert.True(t, seg.isNew.Load().(bool)) + assert.False(t, seg.isFlushed.Load().(bool)) + + err = replica.updateStatistics(0, 10) + assert.NoError(t, err) + assert.Equal(t, int64(10), seg.numRows) + + cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)} + cp := &segmentCheckPoint{int64(10), *cpPos} + err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp) + assert.NoError(t, err) + assert.True(t, replica.hasSegment(1)) + assert.Equal(t, 1, len(replica.normalSegments)) + seg, ok = replica.normalSegments[UniqueID(1)] + assert.True(t, ok) + require.NotNil(t, seg) + assert.Equal(t, UniqueID(1), seg.segmentID) + assert.Equal(t, UniqueID(1), seg.collectionID) + assert.Equal(t, UniqueID(2), seg.partitionID) + assert.Equal(t, "insert-01", seg.channelName) + assert.Equal(t, cpPos.ChannelName, seg.checkPoint.pos.ChannelName) + assert.Equal(t, cpPos.Timestamp, seg.checkPoint.pos.Timestamp) + assert.Equal(t, int64(10), seg.numRows) + assert.False(t, seg.isNew.Load().(bool)) + assert.False(t, seg.isFlushed.Load().(bool)) + + err = replica.updateStatistics(1, 10) + assert.NoError(t, err) + assert.Equal(t, int64(20), seg.numRows) + + segPos := replica.listNewSegmentsStartPositions() + assert.Equal(t, 1, len(segPos)) + assert.Equal(t, UniqueID(0), segPos[0].SegmentID) + assert.Equal(t, "insert-01", segPos[0].StartPosition.ChannelName) + assert.Equal(t, Timestamp(100), segPos[0].StartPosition.Timestamp) + + assert.Equal(t, 0, len(replica.newSegments)) + assert.Equal(t, 2, len(replica.normalSegments)) + + cps := replica.listSegmentsCheckPoints() + assert.Equal(t, 2, len(cps)) + assert.Equal(t, startPos.Timestamp, cps[UniqueID(0)].pos.Timestamp) + assert.Equal(t, int64(0), cps[UniqueID(0)].numRows) + assert.Equal(t, cp.pos.Timestamp, cps[UniqueID(1)].pos.Timestamp) + assert.Equal(t, int64(10), cps[UniqueID(1)].numRows) + + updates, err := replica.getSegmentStatisticsUpdates(0) + assert.NoError(t, err) + assert.Equal(t, int64(10), updates.NumRows) + + updates, err = replica.getSegmentStatisticsUpdates(1) + assert.NoError(t, err) + assert.Equal(t, int64(20), updates.NumRows) + + replica.updateSegmentCheckPoint(0) + assert.Equal(t, int64(10), replica.normalSegments[UniqueID(0)].checkPoint.numRows) + replica.updateSegmentCheckPoint(1) + assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows) + + }) +}