From aa8a038305ceceff75c936724a61b1d2a178fc41 Mon Sep 17 00:00:00 2001 From: sunby Date: Thu, 3 Jun 2021 19:06:33 +0800 Subject: [PATCH] Rename SegmentAllocator to SegmentManager (#5559) Add numRows to segmentStatus and Rename SegmentAllocator to SegmentManager. Remove SegmentAllocStats. Signed-off-by: sunby --- internal/dataservice/grpc_services.go | 6 +- internal/dataservice/meta_test.go | 22 +- .../dataservice/segment_allocation_policy.go | 4 +- .../dataservice/segment_allocation_stats.go | 153 -------------- ...egment_allocator.go => segment_manager.go} | 199 ++++++++++++------ ...ocator_test.go => segment_manager_test.go} | 18 +- internal/dataservice/server.go | 43 +--- internal/dataservice/server_test.go | 116 +--------- 8 files changed, 167 insertions(+), 394 deletions(-) delete mode 100644 internal/dataservice/segment_allocation_stats.go rename internal/dataservice/{segment_allocator.go => segment_manager.go} (58%) rename internal/dataservice/{segment_allocator_test.go => segment_manager_test.go} (85%) diff --git a/internal/dataservice/grpc_services.go b/internal/dataservice/grpc_services.go index e61ac2b041..e5d0a6bcec 100644 --- a/internal/dataservice/grpc_services.go +++ b/internal/dataservice/grpc_services.go @@ -46,7 +46,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb resp.Reason = "server is closed" return resp, nil } - if err := s.segAllocator.SealAllSegments(ctx, req.CollectionID); err != nil { + if err := s.segmentManager.SealAllSegments(ctx, req.CollectionID); err != nil { resp.Reason = fmt.Sprintf("Seal all segments error %s", err) return resp, nil } @@ -94,7 +94,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI s.cluster.watchIfNeeded(r.ChannelName, r.CollectionID) - segmentID, retCount, expireTs, err := s.segAllocator.AllocSegment(ctx, + segmentID, retCount, expireTs, err := s.segmentManager.AllocSegment(ctx, r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count)) if err != nil { errMsg := fmt.Sprintf("allocation of collection %d, partition %d, channel %s, count %d error: %s", @@ -325,7 +325,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath log.Debug("flush segment with meta", zap.Int64("id", req.SegmentID), zap.Any("meta", meta)) - s.segAllocator.DropSegment(ctx, req.SegmentID) + s.segmentManager.DropSegment(ctx, req.SegmentID) s.flushCh <- req.SegmentID resp.ErrorCode = commonpb.ErrorCode_Success diff --git a/internal/dataservice/meta_test.go b/internal/dataservice/meta_test.go index fc5658c276..4deb779400 100644 --- a/internal/dataservice/meta_test.go +++ b/internal/dataservice/meta_test.go @@ -16,7 +16,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/stretchr/testify/assert" ) @@ -209,23 +208,6 @@ func TestMeta_Basic(t *testing.T) { err = meta.AddSegment(segInfo0) assert.Nil(t, err) - // update seg1 to 300 rows - stat := &internalpb.SegmentStatisticsUpdates{} - stat.SegmentID = segInfo0.ID - stat.NumRows = rowCount1 - err = meta.UpdateSegmentStatistic(stat) - assert.Nil(t, err) - - nums, err = meta.GetNumRowsOfCollection(collID) - assert.Nil(t, err) - assert.EqualValues(t, rowCount1, nums) - - // check update non-exist segment - stat.SegmentID, err = mockAllocator.allocID() - assert.Nil(t, err) - err = meta.UpdateSegmentStatistic(stat) - assert.NotNil(t, err) - // add seg2 with 300 rows segID1, err := mockAllocator.allocID() assert.Nil(t, err) @@ -238,10 +220,10 @@ func TestMeta_Basic(t *testing.T) { // check partition/collection statistics nums, err = meta.GetNumRowsOfPartition(collID, partID0) assert.Nil(t, err) - assert.EqualValues(t, (rowCount1 + rowCount1), nums) + assert.EqualValues(t, (rowCount0 + rowCount1), nums) nums, err = meta.GetNumRowsOfCollection(collID) assert.Nil(t, err) - assert.EqualValues(t, (rowCount1 + rowCount1), nums) + assert.EqualValues(t, (rowCount0 + rowCount1), nums) }) t.Run("Test Invalid", func(t *testing.T) { diff --git a/internal/dataservice/segment_allocation_policy.go b/internal/dataservice/segment_allocation_policy.go index e9065b528e..e35f4356a3 100644 --- a/internal/dataservice/segment_allocation_policy.go +++ b/internal/dataservice/segment_allocation_policy.go @@ -58,13 +58,13 @@ func newSealPolicyV1() sealPolicy { } type flushPolicy interface { - apply(status *segAllocStatus, t Timestamp) bool + apply(status *segmentStatus, t Timestamp) bool } type flushPolicyV1 struct { } -func (p *flushPolicyV1) apply(status *segAllocStatus, t Timestamp) bool { +func (p *flushPolicyV1) apply(status *segmentStatus, t Timestamp) bool { return status.sealed && status.lastExpireTime <= t } diff --git a/internal/dataservice/segment_allocation_stats.go b/internal/dataservice/segment_allocation_stats.go deleted file mode 100644 index 7089a1f2c2..0000000000 --- a/internal/dataservice/segment_allocation_stats.go +++ /dev/null @@ -1,153 +0,0 @@ -package dataservice - -import ( - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" - "github.com/milvus-io/milvus/internal/proto/datapb" - "go.uber.org/zap" -) - -type segAllocStatus struct { - id UniqueID - collectionID UniqueID - partitionID UniqueID - sealed bool - total int64 - insertChannel string - allocations []*allocation - lastExpireTime Timestamp -} -type allocation struct { - rowNums int64 - expireTime Timestamp -} - -func (s *segAllocStatus) getAllocationSize() int64 { - var totalOfAllocations int64 - for _, allocation := range s.allocations { - totalOfAllocations += allocation.rowNums - } - return totalOfAllocations -} - -func (s *segAllocStatus) appendAllocation(rowNums int64, expireTime Timestamp) { - alloc := &allocation{ - rowNums: rowNums, - expireTime: expireTime, - } - s.lastExpireTime = expireTime - s.allocations = append(s.allocations, alloc) -} - -type segAllocStats struct { - meta *meta - stats map[UniqueID]*segAllocStatus //segment id -> status -} - -func newAllocStats(meta *meta) *segAllocStats { - s := &segAllocStats{ - meta: meta, - stats: make(map[UniqueID]*segAllocStatus), - } - s.loadSegmentsFromMeta() - return s -} - -func (s *segAllocStats) loadSegmentsFromMeta() { - // load unflushed segments from meta - segments := s.meta.GetUnFlushedSegments() - for _, seg := range segments { - stat := &segAllocStatus{ - id: seg.ID, - collectionID: seg.CollectionID, - partitionID: seg.PartitionID, - total: seg.MaxRowNum, - allocations: []*allocation{}, - insertChannel: seg.InsertChannel, - lastExpireTime: seg.LastExpireTime, - sealed: seg.State == commonpb.SegmentState_Sealed, - } - s.stats[seg.ID] = stat - } -} - -func (s *segAllocStats) getSegments(collectionID UniqueID, partitionID UniqueID, channelName string) []*segAllocStatus { - ret := make([]*segAllocStatus, 0) - for _, segment := range s.stats { - if segment.sealed || segment.collectionID != collectionID || segment.partitionID != partitionID || segment.insertChannel != channelName { - continue - } - ret = append(ret, segment) - } - return ret -} - -func (s *segAllocStats) appendAllocation(segmentID UniqueID, numRows int64, expireTime Timestamp) error { - segStatus := s.stats[segmentID] - segStatus.appendAllocation(numRows, expireTime) - return s.meta.SetLastExpireTime(segStatus.id, expireTime) -} - -func (s *segAllocStats) sealSegment(id UniqueID) error { - s.stats[id].sealed = true - return s.meta.SealSegment(id) -} - -func (s *segAllocStats) sealSegmentsBy(collectionID UniqueID) error { - for _, status := range s.stats { - if status.collectionID == collectionID { - if status.sealed { - continue - } - if err := s.meta.SealSegment(status.id); err != nil { - return err - } - status.sealed = true - } - } - return nil -} -func (s *segAllocStats) dropSegment(id UniqueID) { - delete(s.stats, id) -} - -func (s *segAllocStats) expire(t Timestamp) { - for _, segStatus := range s.stats { - for i := 0; i < len(segStatus.allocations); i++ { - if t < segStatus.allocations[i].expireTime { - continue - } - log.Debug("dataservice::ExpireAllocations: ", - zap.Any("segStatus.id", segStatus.id), - zap.Any("segStatus.allocations.rowNums", segStatus.allocations[i].rowNums)) - segStatus.allocations = append(segStatus.allocations[:i], segStatus.allocations[i+1:]...) - i-- - } - } -} - -func (s *segAllocStats) getAllSegments() []*segAllocStatus { - ret := make([]*segAllocStatus, 0) - for _, status := range s.stats { - ret = append(ret, status) - } - return ret -} - -func (s *segAllocStats) getSegmentBy(id UniqueID) *segAllocStatus { - return s.stats[id] -} - -func (s *segAllocStats) addSegment(segment *datapb.SegmentInfo) error { - s.stats[segment.ID] = &segAllocStatus{ - id: segment.ID, - collectionID: segment.CollectionID, - partitionID: segment.PartitionID, - sealed: false, - total: segment.MaxRowNum, - insertChannel: segment.InsertChannel, - allocations: []*allocation{}, - lastExpireTime: segment.LastExpireTime, - } - return s.meta.AddSegment(segment) -} diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_manager.go similarity index 58% rename from internal/dataservice/segment_allocator.go rename to internal/dataservice/segment_manager.go index 0ed627d418..fc5dbe1906 100644 --- a/internal/dataservice/segment_allocator.go +++ b/internal/dataservice/segment_manager.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -39,8 +40,8 @@ func (err errRemainInSufficient) Error() string { return fmt.Sprintf("segment remaining is insufficient for %d", err.requestRows) } -// segmentAllocator is used to allocate rows for segments and record the allocations. -type segmentAllocator interface { +// Manager manage segment related operations. +type Manager interface { // AllocSegment allocate rows and record the allocation. AllocSegment(ctx context.Context, collectionID, partitionID UniqueID, channelName string, requestRows int64) (UniqueID, int64, Timestamp, error) // DropSegment drop the segment from allocator. @@ -49,14 +50,33 @@ type segmentAllocator interface { SealAllSegments(ctx context.Context, collectionID UniqueID) error // GetFlushableSegments return flushable segment ids GetFlushableSegments(ctx context.Context, channel string, ts Timestamp) ([]UniqueID, error) + // UpdateSegmentStats update segment status + UpdateSegmentStats(stat *internalpb.SegmentStatisticsUpdates) } -type channelSegmentAllocator struct { - mt *meta - mu sync.RWMutex - allocator allocator - helper allocHelper - allocStats *segAllocStats +type segmentStatus struct { + id UniqueID + collectionID UniqueID + partitionID UniqueID + sealed bool + total int64 + insertChannel string + allocations []*allocation + lastExpireTime Timestamp + currentRows int64 +} + +type allocation struct { + rowNums int64 + expireTime Timestamp +} + +type SegmentManager struct { + meta *meta + mu sync.RWMutex + allocator allocator + helper allocHelper + stats map[UniqueID]*segmentStatus //segment id -> status estimatePolicy calUpperLimitPolicy allocPolicy allocatePolicy @@ -69,12 +89,12 @@ type allocHelper struct { } type allocOption struct { - apply func(alloc *channelSegmentAllocator) + apply func(manager *SegmentManager) } func withAllocHelper(helper allocHelper) allocOption { return allocOption{ - apply: func(alloc *channelSegmentAllocator) { alloc.helper = helper }, + apply: func(manager *SegmentManager) { manager.helper = helper }, } } @@ -86,25 +106,25 @@ func defaultAllocHelper() allocHelper { func withCalUpperLimitPolicy(policy calUpperLimitPolicy) allocOption { return allocOption{ - apply: func(alloc *channelSegmentAllocator) { alloc.estimatePolicy = policy }, + apply: func(manager *SegmentManager) { manager.estimatePolicy = policy }, } } func withAllocPolicy(policy allocatePolicy) allocOption { return allocOption{ - apply: func(alloc *channelSegmentAllocator) { alloc.allocPolicy = policy }, + apply: func(manager *SegmentManager) { manager.allocPolicy = policy }, } } func withSealPolicy(policy sealPolicy) allocOption { return allocOption{ - apply: func(alloc *channelSegmentAllocator) { alloc.sealPolicy = policy }, + apply: func(manager *SegmentManager) { manager.sealPolicy = policy }, } } func withFlushPolicy(policy flushPolicy) allocOption { return allocOption{ - apply: func(alloc *channelSegmentAllocator) { alloc.flushPolicy = policy }, + apply: func(manager *SegmentManager) { manager.flushPolicy = policy }, } } @@ -124,12 +144,12 @@ func defaultFlushPolicy() flushPolicy { return newFlushPolicyV1() } -func newSegmentAllocator(meta *meta, allocator allocator, opts ...allocOption) *channelSegmentAllocator { - alloc := &channelSegmentAllocator{ - mt: meta, - allocator: allocator, - helper: defaultAllocHelper(), - allocStats: newAllocStats(meta), +func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *SegmentManager { + manager := &SegmentManager{ + meta: meta, + allocator: allocator, + helper: defaultAllocHelper(), + stats: make(map[UniqueID]*segmentStatus), estimatePolicy: defaultCalUpperLimitPolicy(), allocPolicy: defaultAlocatePolicy(), @@ -137,12 +157,30 @@ func newSegmentAllocator(meta *meta, allocator allocator, opts ...allocOption) * flushPolicy: defaultFlushPolicy(), } for _, opt := range opts { - opt.apply(alloc) + opt.apply(manager) } - return alloc + manager.loadSegmentsFromMeta() + return manager } -func (s *channelSegmentAllocator) AllocSegment(ctx context.Context, collectionID UniqueID, +func (s *SegmentManager) loadSegmentsFromMeta() { + // load unflushed segments from meta + segments := s.meta.GetUnFlushedSegments() + for _, seg := range segments { + stat := &segmentStatus{ + id: seg.ID, + collectionID: seg.CollectionID, + partitionID: seg.PartitionID, + total: seg.MaxRowNum, + allocations: []*allocation{}, + insertChannel: seg.InsertChannel, + lastExpireTime: seg.LastExpireTime, + sealed: seg.State == commonpb.SegmentState_Sealed, + } + s.stats[seg.ID] = stat + } +} +func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) (segID UniqueID, retCount int64, expireTime Timestamp, err error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() @@ -150,9 +188,12 @@ func (s *channelSegmentAllocator) AllocSegment(ctx context.Context, collectionID defer s.mu.Unlock() var success bool - var status *segAllocStatus - segments := s.allocStats.getSegments(collectionID, partitionID, channelName) - for _, segStatus := range segments { + var status *segmentStatus + for _, segStatus := range s.stats { + if segStatus.sealed || segStatus.collectionID != collectionID || + segStatus.partitionID != partitionID || segStatus.insertChannel != channelName { + continue + } success, err = s.alloc(segStatus, requestRows) if err != nil { return @@ -185,13 +226,12 @@ func (s *channelSegmentAllocator) AllocSegment(ctx context.Context, collectionID return } -func (s *channelSegmentAllocator) alloc(segStatus *segAllocStatus, numRows int64) (bool, error) { - info, err := s.mt.GetSegment(segStatus.id) - if err != nil { - return false, err +func (s *SegmentManager) alloc(segStatus *segmentStatus, numRows int64) (bool, error) { + var allocSize int64 + for _, allocation := range segStatus.allocations { + allocSize += allocation.rowNums } - allocSize := segStatus.getAllocationSize() - if !s.allocPolicy.apply(segStatus.total, info.NumRows, allocSize, numRows) { + if !s.allocPolicy.apply(segStatus.total, segStatus.currentRows, allocSize, numRows) { return false, nil } @@ -199,14 +239,21 @@ func (s *channelSegmentAllocator) alloc(segStatus *segAllocStatus, numRows int64 if err != nil { return false, err } - if err := s.allocStats.appendAllocation(segStatus.id, numRows, expireTs); err != nil { + + alloc := &allocation{ + rowNums: numRows, + expireTime: expireTs, + } + segStatus.lastExpireTime = expireTs + segStatus.allocations = append(segStatus.allocations, alloc) + + if err := s.meta.SetLastExpireTime(segStatus.id, expireTs); err != nil { return false, err } - return true, nil } -func (s *channelSegmentAllocator) genExpireTs() (Timestamp, error) { +func (s *SegmentManager) genExpireTs() (Timestamp, error) { ts, err := s.allocator.allocTimestamp() if err != nil { return 0, err @@ -217,7 +264,7 @@ func (s *channelSegmentAllocator) genExpireTs() (Timestamp, error) { return expireTs, nil } -func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segAllocStatus, error) { +func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string) (*segmentStatus, error) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() id, err := s.allocator.allocID() @@ -228,6 +275,19 @@ func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collection if err != nil { return nil, err } + segStatus := &segmentStatus{ + id: id, + collectionID: collectionID, + partitionID: partitionID, + sealed: false, + total: int64(totalRows), + insertChannel: channelName, + allocations: []*allocation{}, + lastExpireTime: 0, + currentRows: 0, + } + + s.stats[id] = segStatus segmentInfo := &datapb.SegmentInfo{ ID: id, CollectionID: collectionID, @@ -239,7 +299,7 @@ func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collection LastExpireTime: 0, } - if err := s.allocStats.addSegment(segmentInfo); err != nil { + if err := s.meta.AddSegment(segmentInfo); err != nil { return nil, err } @@ -250,35 +310,45 @@ func (s *channelSegmentAllocator) openNewSegment(ctx context.Context, collection s.helper.afterCreateSegment(segmentInfo) - return s.allocStats.getSegmentBy(segmentInfo.ID), nil + return segStatus, nil } -func (s *channelSegmentAllocator) estimateTotalRows(collectionID UniqueID) (int, error) { - collMeta, err := s.mt.GetCollection(collectionID) +func (s *SegmentManager) estimateTotalRows(collectionID UniqueID) (int, error) { + collMeta, err := s.meta.GetCollection(collectionID) if err != nil { return -1, err } return s.estimatePolicy.apply(collMeta.Schema) } -func (s *channelSegmentAllocator) DropSegment(ctx context.Context, segmentID UniqueID) { +func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() s.mu.Lock() defer s.mu.Unlock() - s.allocStats.dropSegment(segmentID) + delete(s.stats, segmentID) } -func (s *channelSegmentAllocator) SealAllSegments(ctx context.Context, collectionID UniqueID) error { +func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID UniqueID) error { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() s.mu.Lock() defer s.mu.Unlock() - s.allocStats.sealSegmentsBy(collectionID) + for _, status := range s.stats { + if status.collectionID == collectionID { + if status.sealed { + continue + } + if err := s.meta.SealSegment(status.id); err != nil { + return err + } + status.sealed = true + } + } return nil } -func (s *channelSegmentAllocator) GetFlushableSegments(ctx context.Context, channel string, +func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel string, t Timestamp) ([]UniqueID, error) { s.mu.Lock() defer s.mu.Unlock() @@ -289,8 +359,7 @@ func (s *channelSegmentAllocator) GetFlushableSegments(ctx context.Context, chan } ret := make([]UniqueID, 0) - segments := s.allocStats.getAllSegments() - for _, segStatus := range segments { + for _, segStatus := range s.stats { if segStatus.insertChannel != channel { continue } @@ -302,9 +371,18 @@ func (s *channelSegmentAllocator) GetFlushableSegments(ctx context.Context, chan return ret, nil } -func (s *channelSegmentAllocator) tryToSealSegment() error { - segments := s.allocStats.getAllSegments() - for _, segStatus := range segments { +func (s *SegmentManager) UpdateSegmentStats(stat *internalpb.SegmentStatisticsUpdates) { + s.mu.Lock() + defer s.mu.Unlock() + segment, ok := s.stats[stat.SegmentID] + if !ok { + return + } + segment.currentRows = stat.NumRows +} + +func (s *SegmentManager) tryToSealSegment() error { + for _, segStatus := range s.stats { if segStatus.sealed { continue } @@ -315,31 +393,34 @@ func (s *channelSegmentAllocator) tryToSealSegment() error { if !sealed { continue } - if err := s.allocStats.sealSegment(segStatus.id); err != nil { + if err := s.meta.SealSegment(segStatus.id); err != nil { return err } + segStatus.sealed = true } return nil } -func (s *channelSegmentAllocator) checkSegmentSealed(segStatus *segAllocStatus) (bool, error) { - segMeta, err := s.mt.GetSegment(segStatus.id) - if err != nil { - return false, err +func (s *SegmentManager) checkSegmentSealed(segStatus *segmentStatus) (bool, error) { + var allocSize int64 + for _, allocation := range segStatus.allocations { + allocSize += allocation.rowNums } - - ret := s.sealPolicy.apply(segStatus.total, segMeta.NumRows, segStatus.getAllocationSize()) + ret := s.sealPolicy.apply(segStatus.total, segStatus.currentRows, allocSize) return ret, nil } // only for test -func (s *channelSegmentAllocator) SealSegment(ctx context.Context, segmentID UniqueID) error { +func (s *SegmentManager) SealSegment(ctx context.Context, segmentID UniqueID) error { sp, _ := trace.StartSpanFromContext(ctx) defer sp.Finish() s.mu.Lock() defer s.mu.Unlock() - s.allocStats.sealSegment(segmentID) + if err := s.meta.SealSegment(segmentID); err != nil { + return err + } + s.stats[segmentID].sealed = true return nil } diff --git a/internal/dataservice/segment_allocator_test.go b/internal/dataservice/segment_manager_test.go similarity index 85% rename from internal/dataservice/segment_allocator_test.go rename to internal/dataservice/segment_manager_test.go index 9ea3a1c3cf..d379268f76 100644 --- a/internal/dataservice/segment_allocator_test.go +++ b/internal/dataservice/segment_manager_test.go @@ -26,7 +26,7 @@ func TestAllocSegment(t *testing.T) { mockAllocator := newMockAllocator() meta, err := newMemoryMeta(mockAllocator) assert.Nil(t, err) - segAllocator := newSegmentAllocator(meta, mockAllocator) + segmentManager := newSegmentManager(meta, mockAllocator) schema := newTestSchema() collID, err := mockAllocator.allocID() @@ -47,7 +47,7 @@ func TestAllocSegment(t *testing.T) { {collID, 100, "c1", math.MaxInt64, false}, } for _, c := range cases { - id, count, expireTime, err := segAllocator.AllocSegment(ctx, c.collectionID, c.partitionID, c.channelName, c.requestRows) + id, count, expireTime, err := segmentManager.AllocSegment(ctx, c.collectionID, c.partitionID, c.channelName, c.requestRows) if c.expectResult { assert.Nil(t, err) assert.EqualValues(t, c.requestRows, count) @@ -108,11 +108,11 @@ func TestLoadSegmentsFromMeta(t *testing.T) { err = meta.AddSegment(flushedSegment) assert.Nil(t, err) - segAllocator := newSegmentAllocator(meta, mockAllocator) - segments := segAllocator.allocStats.getAllSegments() + segmentManager := newSegmentManager(meta, mockAllocator) + segments := segmentManager.stats assert.EqualValues(t, 2, len(segments)) - assert.NotNil(t, segments[0]) assert.NotNil(t, segments[1]) + assert.NotNil(t, segments[2]) } func TestSaveSegmentsToMeta(t *testing.T) { @@ -130,12 +130,12 @@ func TestSaveSegmentsToMeta(t *testing.T) { }) assert.Nil(t, err) - allocator := newSegmentAllocator(meta, mockAllocator) - segID, _, expireTs, err := allocator.AllocSegment(context.Background(), collID, 0, "c1", 1000) + segmentManager := newSegmentManager(meta, mockAllocator) + segID, _, expireTs, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000) assert.Nil(t, err) - segStatus := allocator.allocStats.getSegmentBy(segID) + segStatus := segmentManager.stats[segID] assert.NotNil(t, segStatus) - err = allocator.SealAllSegments(context.Background(), collID) + err = segmentManager.SealAllSegments(context.Background(), collID) assert.Nil(t, err) segment, err := meta.GetSegment(segID) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 9b8cd7c939..9526d2b4c7 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -55,7 +55,7 @@ type Server struct { kvClient *etcdkv.EtcdKV meta *meta segmentInfoStream msgstream.MsgStream - segAllocator segmentAllocator + segmentManager Manager allocator allocator cluster *cluster masterClient types.MasterService @@ -135,7 +135,7 @@ func (s *Server) Start() error { s.allocator = newAllocator(s.masterClient) - s.startSegmentAllocator() + s.startSegmentManager() if err = s.initFlushMsgStream(); err != nil { return err } @@ -187,9 +187,9 @@ func (s *Server) initServiceDiscovery() error { return nil } -func (s *Server) startSegmentAllocator() { +func (s *Server) startSegmentManager() { helper := createNewSegmentHelper(s.segmentInfoStream) - s.segAllocator = newSegmentAllocator(s.meta, s.allocator, withAllocHelper(helper)) + s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper)) } func (s *Server) initSegmentInfoChannel() error { @@ -260,19 +260,9 @@ func (s *Server) startStatsChannel(ctx context.Context) { defer s.serverLoopWg.Done() statsStream, _ := s.msFactory.NewMsgStream(ctx) statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName) - log.Debug("dataservice AsConsumer: " + Params.StatisticsChannelName + " : " + Params.DataServiceSubscriptionName) - // try to restore last processed pos - pos, err := s.loadStreamLastPos(streamTypeStats) - log.Debug("load last pos of stats channel", zap.Any("pos", pos), zap.Error(err)) - if err == nil { - err = statsStream.Seek([]*internalpb.MsgPosition{pos}) - if err != nil { - log.Error("Failed to seek to last pos for statsStream", - zap.String("StatisticsChanName", Params.StatisticsChannelName), - zap.String("DataServiceSubscriptionName", Params.DataServiceSubscriptionName), - zap.Error(err)) - } - } + log.Debug("dataservce stats stream", + zap.String("channelName", Params.StatisticsChannelName), + zap.String("descriptionName", Params.DataServiceSubscriptionName)) statsStream.Start() defer statsStream.Close() for { @@ -293,22 +283,7 @@ func (s *Server) startStatsChannel(ctx context.Context) { } ssMsg := msg.(*msgstream.SegmentStatisticsMsg) for _, stat := range ssMsg.SegStats { - if err := s.meta.UpdateSegmentStatistic(stat); err != nil { - log.Error("handle segment stat error", - zap.Int64("segmentID", stat.SegmentID), - zap.Error(err)) - continue - } - } - if ssMsg.MsgPosition != nil { - err := s.storeStreamPos(streamTypeStats, ssMsg.MsgPosition) - if err != nil { - log.Error("Fail to store current success pos for Stats stream", - zap.Stringer("pos", ssMsg.MsgPosition), - zap.Error(err)) - } - } else { - log.Warn("Empty Msg Pos found ", zap.Int64("msgid", msg.ID())) + s.segmentManager.UpdateSegmentStats(stat) } } } @@ -349,7 +324,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) { ch := ttMsg.ChannelName ts := ttMsg.Timestamp - segments, err := s.segAllocator.GetFlushableSegments(ctx, ch, ts) + segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts) if err != nil { log.Warn("get flushable segments failed", zap.Error(err)) continue diff --git a/internal/dataservice/server_test.go b/internal/dataservice/server_test.go index 103895039d..02700cd31c 100644 --- a/internal/dataservice/server_test.go +++ b/internal/dataservice/server_test.go @@ -13,12 +13,10 @@ package dataservice import ( "context" "math" - "math/rand" "path" "testing" "time" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -29,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" ) func TestGetSegmentInfoChannel(t *testing.T) { @@ -167,7 +164,7 @@ func TestFlush(t *testing.T) { Partitions: []int64{}, }) assert.Nil(t, err) - segID, _, expireTs, err := svr.segAllocator.AllocSegment(context.TODO(), 0, 1, "channel-1", 1) + segID, _, expireTs, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1) assert.Nil(t, err) resp, err := svr.Flush(context.TODO(), &datapb.FlushRequest{ Base: &commonpb.MsgBase{ @@ -181,7 +178,7 @@ func TestFlush(t *testing.T) { }) assert.Nil(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.ErrorCode) - ids, err := svr.segAllocator.GetFlushableSegments(context.TODO(), "channel-1", expireTs) + ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs) assert.Nil(t, err) assert.EqualValues(t, 1, len(ids)) assert.EqualValues(t, segID, ids[0]) @@ -649,115 +646,6 @@ func TestDataNodeTtChannel(t *testing.T) { } -func TestResumeChannel(t *testing.T) { - Params.Init() - - segmentIDs := make([]int64, 0, 1000) - - t.Run("Prepare Resume test set", func(t *testing.T) { - svr := newTestServer(t, nil) - defer svr.Stop() - - i := int64(-1) - cnt := 0 - for ; cnt < 1000; i-- { - svr.meta.RLock() - _, has := svr.meta.segments[i] - svr.meta.RUnlock() - if has { - continue - } - err := svr.meta.AddSegment(&datapb.SegmentInfo{ - ID: i, - CollectionID: -1, - }) - assert.Nil(t, err) - segmentIDs = append(segmentIDs, i) - cnt++ - } - }) - - t.Run("Test ResumeSegmentStatsChannel", func(t *testing.T) { - svr := newTestServer(t, nil) - - segRows := rand.Int63n(1000) - - statsStream, _ := svr.msFactory.NewMsgStream(svr.ctx) - statsStream.AsProducer([]string{Params.StatisticsChannelName}) - statsStream.Start() - defer statsStream.Close() - - genMsg := func(msgType commonpb.MsgType, t Timestamp, stats *internalpb.SegmentStatisticsUpdates) *msgstream.SegmentStatisticsMsg { - return &msgstream.SegmentStatisticsMsg{ - BaseMsg: msgstream.BaseMsg{ - HashValues: []uint32{0}, - }, - SegmentStatistics: internalpb.SegmentStatistics{ - Base: &commonpb.MsgBase{ - MsgType: msgType, - MsgID: 0, - Timestamp: t, - SourceID: 0, - }, - SegStats: []*internalpb.SegmentStatisticsUpdates{stats}, - }, - } - } - ch := make(chan struct{}) - - go func() { - for _, segID := range segmentIDs { - stats := &internalpb.SegmentStatisticsUpdates{ - SegmentID: segID, - NumRows: segRows, - } - - msgPack := msgstream.MsgPack{} - msgPack.Msgs = append(msgPack.Msgs, genMsg(commonpb.MsgType_SegmentStatistics, uint64(time.Now().Unix()), stats)) - - err := statsStream.Produce(&msgPack) - assert.Nil(t, err) - time.Sleep(time.Millisecond * 5) - } - ch <- struct{}{} - }() - - time.Sleep(time.Second) - - svr.Stop() - time.Sleep(time.Millisecond * 50) - - svr = newTestServer(t, nil) - defer svr.Stop() - <-ch - - //wait for Server processing last messages - time.Sleep(time.Second) - - svr.meta.RLock() - defer svr.meta.RUnlock() - for _, segID := range segmentIDs { - seg, has := svr.meta.segments[segID] - log.Debug("check segment in meta", zap.Any("id", seg.ID), zap.Any("has", has)) - assert.True(t, has) - if has { - log.Debug("compare num rows", zap.Any("id", seg.ID), zap.Any("expected", segRows), zap.Any("actual", seg.NumRows)) - assert.Equal(t, segRows, seg.NumRows) - } - } - }) - - t.Run("Clean up test segments", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - var err error - for _, segID := range segmentIDs { - err = svr.meta.DropSegment(segID) - assert.Nil(t, err) - } - }) -} - func TestGetVChannelPos(t *testing.T) { svr := newTestServer(t, nil) defer closeTestServer(t, svr)