diff --git a/configs/milvus.yaml b/configs/milvus.yaml index db2f4fae58..3ab0a9d555 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -316,7 +316,12 @@ dataCoord: maxSize: 512 # Maximum size of a segment in MB diskSegmentMaxSize: 2048 # Maximun size of a segment in MB for collection which has Disk index sealProportion: 0.23 - assignmentExpiration: 2000 # The time of the assignment expiration in ms + # The time of the assignment expiration in ms + # Warning! this parameter is an expert variable and closely related to data integrity. Without specific + # target and solid understanding of the scenarios, it should not be changed. If it's necessary to alter + # this parameter, make sure that the newly changed value is larger than the previous value used before restart + # otherwise there could be a large possibility of data loss + assignmentExpiration: 2000 maxLife: 86400 # The max lifetime of segment in seconds, 24*60*60 # If a segment didn't accept dml records in maxIdleTime and the size of segment is greater than # minSizeFromIdleToSealed, Milvus will automatically seal it. diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index f1da5ee1cc..6906af5dba 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -913,24 +913,13 @@ func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error { curSegInfo := m.segments.GetSegment(segmentID) if curSegInfo == nil { // TODO: Error handling. - log.Warn("meta update: add allocation failed - segment not found", - zap.Int64("segmentID", segmentID)) + log.Warn("meta update: add allocation failed - segment not found", zap.Int64("segmentID", segmentID)) return nil } - // Persist segment updates first. - clonedSegment := curSegInfo.Clone(AddAllocation(allocation)) - if clonedSegment != nil && isSegmentHealthy(clonedSegment) { - if err := m.catalog.AlterSegments(m.ctx, []*datapb.SegmentInfo{clonedSegment.SegmentInfo}); err != nil { - log.Error("meta update: add allocation failed", - zap.Int64("segmentID", segmentID), - zap.Error(err)) - return err - } - } - // Update in-memory meta. + // As we use global segment lastExpire to guarantee data correctness after restart + // there is no need to persist allocation to meta store, only update allocation in-memory meta. m.segments.AddAllocation(segmentID, allocation) - log.Info("meta update: add allocation - complete", - zap.Int64("segmentID", segmentID)) + log.Info("meta update: add allocation - complete", zap.Int64("segmentID", segmentID)) return nil } @@ -950,6 +939,16 @@ func (m *meta) SetCurrentRows(segmentID UniqueID, rows int64) { m.segments.SetCurrentRows(segmentID, rows) } +// SetLastExpire set lastExpire time for segment +// Note that last is not necessary to store in KV meta +func (m *meta) SetLastExpire(segmentID UniqueID, lastExpire uint64) { + m.Lock() + defer m.Unlock() + clonedSegment := m.segments.GetSegment(segmentID).Clone() + clonedSegment.LastExpireTime = lastExpire + m.segments.SetSegment(segmentID, clonedSegment) +} + // SetLastFlushTime set LastFlushTime for segment with provided `segmentID` // Note that lastFlushTime is not persisted in KV store func (m *meta) SetLastFlushTime(segmentID UniqueID, t time.Time) { diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 56c1f95b27..b30b145235 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -29,8 +29,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -113,7 +113,6 @@ type SegmentManager struct { segmentSealPolicies []segmentSealPolicy channelSealPolicies []channelSealPolicy flushPolicy flushPolicy - rcc types.RootCoord } type allocHelper struct { @@ -198,7 +197,7 @@ func defaultFlushPolicy() flushPolicy { } // newSegmentManager should be the only way to retrieve SegmentManager. -func newSegmentManager(meta *meta, allocator allocator, rcc types.RootCoord, opts ...allocOption) *SegmentManager { +func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) (*SegmentManager, error) { manager := &SegmentManager{ meta: meta, allocator: allocator, @@ -209,13 +208,15 @@ func newSegmentManager(meta *meta, allocator allocator, rcc types.RootCoord, opt segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy flushPolicy: defaultFlushPolicy(), - rcc: rcc, } for _, opt := range opts { opt.apply(manager) } manager.loadSegmentsFromMeta() - return manager + if err := manager.maybeResetLastExpireForSegments(); err != nil { + return nil, err + } + return manager, nil } // loadSegmentsFromMeta generate corresponding segment status for each segment from meta @@ -228,6 +229,32 @@ func (s *SegmentManager) loadSegmentsFromMeta() { s.segments = segmentsID } +func (s *SegmentManager) maybeResetLastExpireForSegments() error { + //for all sealed and growing segments, need to reset last expire + if len(s.segments) > 0 { + var latestTs uint64 + allocateErr := retry.Do(context.Background(), func() error { + ts, tryErr := s.genExpireTs(context.Background(), false) + log.Warn("failed to get ts from rootCoord for globalLastExpire", zap.Error(tryErr)) + if tryErr != nil { + return tryErr + } + latestTs = ts + return nil + }, retry.Attempts(Params.DataCoordCfg.AllocLatestExpireAttempt.GetAsUint()), retry.Sleep(200*time.Millisecond)) + if allocateErr != nil { + log.Warn("cannot allocate latest lastExpire from rootCoord", zap.Error(allocateErr)) + return errors.New("global max expire ts is unavailable for segment manager") + } + for _, sID := range s.segments { + if segment := s.meta.GetSegment(sID); segment != nil && segment.GetState() == commonpb.SegmentState_Growing { + s.meta.SetLastExpire(sID, latestTs) + } + } + } + return nil +} + // AllocSegment allocate segment per request collcation, partication, channel and rows func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) { @@ -308,12 +335,6 @@ func (s *SegmentManager) allocSegmentForImport(ctx context.Context, collectionID if err != nil { return nil, err } - // ReportImport with the new segment so RootCoord can add segment ref lock onto it. - // TODO: This is a hack and will be removed once the whole ImportManager is migrated from RootCoord to DataCoord. - if s.rcc == nil { - log.Error("RootCoord client not set") - return nil, errors.New("RootCoord client not set") - } allocation.ExpireTime = expireTs allocation.SegmentID = segment.GetID() diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 25ca396b44..532e79d50d 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -29,9 +29,11 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/metautil" ) @@ -41,8 +43,7 @@ func TestManagerOptions(t *testing.T) { mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) - segmentManager := newSegmentManager(meta, mockAllocator, nil) - + segmentManager, _ := newSegmentManager(meta, mockAllocator) t.Run("test with alloc helper", func(t *testing.T) { opt := withAllocHelper(allocHelper{}) opt.apply(segmentManager) @@ -99,10 +100,11 @@ func TestManagerOptions(t *testing.T) { func TestAllocSegment(t *testing.T) { ctx := context.Background() Params.Init() + Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "1") mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) - segmentManager := newSegmentManager(meta, mockAllocator, nil) + segmentManager, _ := newSegmentManager(meta, mockAllocator) schema := newTestSchema() collID, err := mockAllocator.allocID(ctx) @@ -121,30 +123,117 @@ func TestAllocSegment(t *testing.T) { t.Run("allocation fails 1", func(t *testing.T) { failsAllocator := &FailsAllocator{ allocTsSucceed: true, + allocIDSucceed: false, } - segmentManager := newSegmentManager(meta, failsAllocator, nil) - _, err := segmentManager.AllocSegment(ctx, collID, 100, "c2", 100) + segmentManager, err := newSegmentManager(meta, failsAllocator) + assert.NoError(t, err) + _, err = segmentManager.AllocSegment(ctx, collID, 100, "c2", 100) assert.Error(t, err) }) t.Run("allocation fails 2", func(t *testing.T) { failsAllocator := &FailsAllocator{ + allocTsSucceed: false, allocIDSucceed: true, } - segmentManager := newSegmentManager(meta, failsAllocator, nil) - _, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100) + segmentManager, err := newSegmentManager(meta, failsAllocator) assert.Error(t, err) + assert.Nil(t, segmentManager) }) } +func TestLastExpireReset(t *testing.T) { + //set up meta on dc + ctx := context.Background() + Params.Init() + Params.Save(Params.DataCoordCfg.AllocLatestExpireAttempt.Key, "1") + Params.Save(Params.DataCoordCfg.SegmentMaxSize.Key, "1") + mockAllocator := newRootCoordAllocator(newMockRootCoordService()) + etcdCli, _ := etcd.GetEtcdClient( + Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), + Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), + Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), + Params.EtcdCfg.EtcdTLSCACert.GetValue(), + Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + rootPath := "/test/segment/last/expire" + metaKV := etcdkv.NewEtcdKV(etcdCli, rootPath) + metaKV.RemoveWithPrefix("") + catalog := datacoord.NewCatalog(metaKV, "", "") + meta, err := newMeta(context.TODO(), catalog, nil) + assert.Nil(t, err) + // add collection + channelName := "c1" + schema := newTestSchema() + collID, err := mockAllocator.allocID(ctx) + assert.Nil(t, err) + meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) + + //assign segments, set max segment to only 1MB, equalling to 10485 rows + var bigRows, smallRows int64 = 10000, 1000 + segmentManager, _ := newSegmentManager(meta, mockAllocator) + allocs, _ := segmentManager.AllocSegment(context.Background(), collID, 0, channelName, bigRows) + segmentID1, expire1 := allocs[0].SegmentID, allocs[0].ExpireTime + time.Sleep(100 * time.Millisecond) + allocs, _ = segmentManager.AllocSegment(context.Background(), collID, 0, channelName, bigRows) + segmentID2, expire2 := allocs[0].SegmentID, allocs[0].ExpireTime + time.Sleep(100 * time.Millisecond) + allocs, _ = segmentManager.AllocSegment(context.Background(), collID, 0, channelName, smallRows) + segmentID3, expire3 := allocs[0].SegmentID, allocs[0].ExpireTime + + //simulate handleTimeTick op on dataCoord + meta.SetCurrentRows(segmentID1, bigRows) + meta.SetCurrentRows(segmentID2, bigRows) + meta.SetCurrentRows(segmentID3, smallRows) + segmentManager.tryToSealSegment(expire1, channelName) + assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(segmentID1).GetState()) + assert.Equal(t, commonpb.SegmentState_Sealed, meta.GetSegment(segmentID2).GetState()) + assert.Equal(t, commonpb.SegmentState_Growing, meta.GetSegment(segmentID3).GetState()) + + //pretend that dataCoord break down + metaKV.Close() + etcdCli.Close() + + //dataCoord restart + newEtcdCli, _ := etcd.GetEtcdClient(Params.EtcdCfg.UseEmbedEtcd.GetAsBool(), Params.EtcdCfg.EtcdUseSSL.GetAsBool(), + Params.EtcdCfg.Endpoints.GetAsStrings(), Params.EtcdCfg.EtcdTLSCert.GetValue(), + Params.EtcdCfg.EtcdTLSKey.GetValue(), Params.EtcdCfg.EtcdTLSCACert.GetValue(), Params.EtcdCfg.EtcdTLSMinVersion.GetValue()) + newMetaKV := etcdkv.NewEtcdKV(newEtcdCli, rootPath) + defer newMetaKV.RemoveWithPrefix("") + newCatalog := datacoord.NewCatalog(newMetaKV, "", "") + restartedMeta, err := newMeta(context.TODO(), newCatalog, nil) + restartedMeta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) + assert.Nil(t, err) + newSegmentManager, _ := newSegmentManager(restartedMeta, mockAllocator) + //reset row number to avoid being cleaned by empty segment + restartedMeta.SetCurrentRows(segmentID1, bigRows) + restartedMeta.SetCurrentRows(segmentID2, bigRows) + restartedMeta.SetCurrentRows(segmentID3, smallRows) + + //verify lastExpire of growing and sealed segments + segment1, segment2, segment3 := restartedMeta.GetSegment(segmentID1), restartedMeta.GetSegment(segmentID2), restartedMeta.GetSegment(segmentID3) + //segmentState should not be altered but growing segment's lastExpire has been reset to the latest + assert.Equal(t, commonpb.SegmentState_Sealed, segment1.GetState()) + assert.Equal(t, commonpb.SegmentState_Sealed, segment2.GetState()) + assert.Equal(t, commonpb.SegmentState_Growing, segment3.GetState()) + assert.Equal(t, expire1, segment1.GetLastExpireTime()) + assert.Equal(t, expire2, segment2.GetLastExpireTime()) + assert.True(t, segment3.GetLastExpireTime() > expire3) + flushableSegIds, _ := newSegmentManager.GetFlushableSegments(context.Background(), channelName, expire3) + assert.ElementsMatch(t, []UniqueID{segmentID1, segmentID2}, flushableSegIds) // segment1 and segment2 can be flushed + newAlloc, err := newSegmentManager.AllocSegment(context.Background(), collID, 0, channelName, 2000) + assert.Nil(t, err) + assert.Equal(t, segmentID3, newAlloc[0].SegmentID) // segment3 still can be used to allocate +} + func TestAllocSegmentForImport(t *testing.T) { ctx := context.Background() Params.Init() mockAllocator := newMockAllocator() meta, err := newMemoryMeta() assert.NoError(t, err) - ms := newMockRootCoordService() - segmentManager := newSegmentManager(meta, mockAllocator, ms) + segmentManager, _ := newSegmentManager(meta, mockAllocator) schema := newTestSchema() collID, err := mockAllocator.allocID(ctx) @@ -164,7 +253,7 @@ func TestAllocSegmentForImport(t *testing.T) { failsAllocator := &FailsAllocator{ allocTsSucceed: true, } - segmentManager := newSegmentManager(meta, failsAllocator, ms) + segmentManager, _ := newSegmentManager(meta, failsAllocator) _, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0) assert.Error(t, err) }) @@ -173,13 +262,7 @@ func TestAllocSegmentForImport(t *testing.T) { failsAllocator := &FailsAllocator{ allocIDSucceed: true, } - segmentManager := newSegmentManager(meta, failsAllocator, ms) - _, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0) - assert.Error(t, err) - }) - - t.Run("nil RootCoord", func(t *testing.T) { - segmentManager := newSegmentManager(meta, mockAllocator, nil) + segmentManager, _ := newSegmentManager(meta, failsAllocator) _, err := segmentManager.allocSegmentForImport(ctx, collID, 100, "c1", 100, 0) assert.Error(t, err) }) @@ -231,7 +314,7 @@ func TestLoadSegmentsFromMeta(t *testing.T) { err = meta.AddSegment(NewSegmentInfo(flushedSegment)) assert.NoError(t, err) - segmentManager := newSegmentManager(meta, mockAllocator, nil) + segmentManager, _ := newSegmentManager(meta, mockAllocator) segments := segmentManager.segments assert.EqualValues(t, 2, len(segments)) } @@ -246,7 +329,7 @@ func TestSaveSegmentsToMeta(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil) + segmentManager, _ := newSegmentManager(meta, mockAllocator) allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) @@ -268,7 +351,7 @@ func TestSaveSegmentsToMetaWithSpecificSegments(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil) + segmentManager, _ := newSegmentManager(meta, mockAllocator) allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) @@ -290,7 +373,7 @@ func TestDropSegment(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil) + segmentManager, _ := newSegmentManager(meta, mockAllocator) allocations, err := segmentManager.AllocSegment(context.Background(), collID, 0, "c1", 1000) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) @@ -317,7 +400,7 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) { var mockPolicy = func(schema *schemapb.CollectionSchema) (int, error) { return 1, nil } - segmentManager := newSegmentManager(meta, mockAllocator, nil, withCalUpperLimitPolicy(mockPolicy)) + segmentManager, _ := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy)) allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) assert.NoError(t, err) assert.EqualValues(t, 2, len(allocations)) @@ -339,7 +422,7 @@ func TestExpireAllocation(t *testing.T) { var mockPolicy = func(schema *schemapb.CollectionSchema) (int, error) { return 10000000, nil } - segmentManager := newSegmentManager(meta, mockAllocator, nil, withCalUpperLimitPolicy(mockPolicy)) + segmentManager, _ := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy)) // alloc 100 times and expire var maxts Timestamp var id int64 = -1 @@ -378,8 +461,7 @@ func TestCleanExpiredBulkloadSegment(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - ms := newMockRootCoordService() - segmentManager := newSegmentManager(meta, mockAllocator, ms) + segmentManager, _ := newSegmentManager(meta, mockAllocator) allocation, err := segmentManager.allocSegmentForImport(context.TODO(), collID, 0, "c1", 2, 1) assert.NoError(t, err) @@ -409,7 +491,7 @@ func TestGetFlushableSegments(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil) + segmentManager, _ := newSegmentManager(meta, mockAllocator) allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) @@ -455,7 +537,7 @@ func TestTryToSealSegment(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal + segmentManager, _ := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) @@ -480,7 +562,7 @@ func TestTryToSealSegment(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal + segmentManager, _ := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) @@ -505,7 +587,7 @@ func TestTryToSealSegment(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil, + segmentManager, _ := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64)), withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) @@ -532,7 +614,7 @@ func TestTryToSealSegment(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil) + segmentManager, _ := newSegmentManager(meta, mockAllocator) allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) @@ -619,7 +701,7 @@ func TestTryToSealSegment(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal + segmentManager, _ := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) @@ -648,7 +730,7 @@ func TestTryToSealSegment(t *testing.T) { collID, err := mockAllocator.allocID(context.Background()) assert.NoError(t, err) meta.AddCollection(&collectionInfo{ID: collID, Schema: schema}) - segmentManager := newSegmentManager(meta, mockAllocator, nil, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal + segmentManager, _ := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) assert.NoError(t, err) assert.EqualValues(t, 1, len(allocations)) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 75864075b4..e9c5798ed6 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -354,7 +354,10 @@ func (s *Server) initDataCoord() error { s.createCompactionHandler() s.createCompactionTrigger() } - s.initSegmentManager() + + if err = s.initSegmentManager(); err != nil { + return err + } s.initGarbageCollection(storageCli) s.initIndexBuilder(storageCli) @@ -516,10 +519,15 @@ func (s *Server) initServiceDiscovery() error { return nil } -func (s *Server) initSegmentManager() { +func (s *Server) initSegmentManager() error { if s.segmentManager == nil { - s.segmentManager = newSegmentManager(s.meta, s.allocator, s.rootCoordClient) + manager, err := newSegmentManager(s.meta, s.allocator) + if err != nil { + return err + } + s.segmentManager = manager } + return nil } func (s *Server) initMeta(chunkManager storage.ChunkManager) error { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 94f8f5b3a7..15d835d986 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1919,7 +1919,8 @@ type dataCoordConfig struct { SegmentMaxSize ParamItem `refreshable:"false"` DiskSegmentMaxSize ParamItem `refreshable:"true"` SegmentSealProportion ParamItem `refreshable:"false"` - SegAssignmentExpiration ParamItem `refreshable:"true"` + SegAssignmentExpiration ParamItem `refreshable:"false"` + AllocLatestExpireAttempt ParamItem `refreshable:"true"` SegmentMaxLifetime ParamItem `refreshable:"false"` SegmentMaxIdleTime ParamItem `refreshable:"false"` SegmentMinSizeFromIdleToSealed ParamItem `refreshable:"false"` @@ -2025,6 +2026,15 @@ func (p *dataCoordConfig) init(base *BaseTable) { } p.SegAssignmentExpiration.Init(base.mgr) + p.AllocLatestExpireAttempt = ParamItem{ + Key: "dataCoord.segment.allocLatestExpireAttempt", + Version: "2.2.0", + DefaultValue: "200", + Doc: "The time attempting to alloc latest lastExpire from rootCoord after restart", + Export: true, + } + p.AllocLatestExpireAttempt.Init(base.mgr) + p.SegmentMaxLifetime = ParamItem{ Key: "dataCoord.segment.maxLife", Version: "2.0.0", diff --git a/pkg/util/paramtable/param_item.go b/pkg/util/paramtable/param_item.go index eb3be4e9d1..e2ff38fc2d 100644 --- a/pkg/util/paramtable/param_item.go +++ b/pkg/util/paramtable/param_item.go @@ -109,6 +109,10 @@ func (pi *ParamItem) GetAsInt32() int32 { return int32(getAsInt64(pi.GetValue())) } +func (pi *ParamItem) GetAsUint() uint { + return uint(getAsInt64(pi.GetValue())) +} + func (pi *ParamItem) GetAsUint32() uint32 { return uint32(getAsInt64(pi.GetValue())) }