diff --git a/internal/master/collection_task.go b/internal/master/collection_task.go index 590b36c3f8..7beb94607d 100644 --- a/internal/master/collection_task.go +++ b/internal/master/collection_task.go @@ -19,7 +19,8 @@ type createCollectionTask struct { type dropCollectionTask struct { baseTask - req *internalpb.DropCollectionRequest + req *internalpb.DropCollectionRequest + segManager SegmentManager } type hasCollectionTask struct { @@ -163,6 +164,11 @@ func (t *dropCollectionTask) Execute() error { return err } + // before drop collection in segment manager, if segment manager receive a time tick from write node, + // maybe this collection can not be found in meta table. + if err = t.segManager.DropCollection(collectionID); err != nil { + return err + } ts, err := t.Ts() if err != nil { return err diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index f9583b1109..d17b5acf98 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -50,6 +50,7 @@ func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollecti mt: s.metaTable, cv: make(chan error), }, + segManager: s.segmentManager, } response := &commonpb.Status{ diff --git a/internal/master/index_task.go b/internal/master/index_task.go index 580d8f73a6..7fd630355d 100644 --- a/internal/master/index_task.go +++ b/internal/master/index_task.go @@ -12,7 +12,7 @@ type createIndexTask struct { req *internalpb.CreateIndexRequest indexBuildScheduler *IndexBuildScheduler indexLoadScheduler *IndexLoadScheduler - segManager *SegmentManager + segManager SegmentManager } func (task *createIndexTask) Type() internalpb.MsgType { diff --git a/internal/master/master.go b/internal/master/master.go index 6d9734e425..adb002adf8 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -63,7 +63,7 @@ type Master struct { startCallbacks []func() closeCallbacks []func() - segmentManager *SegmentManager + segmentManager SegmentManager segmentAssigner *SegmentAssigner statProcessor *StatsProcessor segmentStatusMsg ms.MsgStream diff --git a/internal/master/scheduler_test.go b/internal/master/scheduler_test.go index f81c4e78d2..a40f7584fa 100644 --- a/internal/master/scheduler_test.go +++ b/internal/master/scheduler_test.go @@ -130,6 +130,7 @@ func TestMaster_Scheduler_Collection(t *testing.T) { mt: meta, cv: make(chan error), }, + segManager: NewMockSegmentManager(), } err = scheduler.Enqueue(dropCollectionTask) diff --git a/internal/master/segment_manager.go b/internal/master/segment_manager.go index 2e7ae0f24f..e7a4b64dce 100644 --- a/internal/master/segment_manager.go +++ b/internal/master/segment_manager.go @@ -31,7 +31,16 @@ type channelRange struct { channelStart int32 channelEnd int32 } -type SegmentManager struct { + +type SegmentManager interface { + Start() + Close() + AssignSegment(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) + ForceClose(collID UniqueID) error + DropCollection(collID UniqueID) error +} + +type SegmentManagerImpl struct { metaTable *metaTable channelRanges []*channelRange collStatus map[UniqueID]*collectionStatus // collection id to collection status @@ -54,7 +63,7 @@ type SegmentManager struct { waitGroup sync.WaitGroup } -func (manager *SegmentManager) AssignSegment(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) { +func (manager *SegmentManagerImpl) AssignSegment(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) { manager.mu.Lock() defer manager.mu.Unlock() @@ -97,7 +106,7 @@ func (manager *SegmentManager) AssignSegment(segIDReq []*internalpb.SegIDRequest return res, nil } -func (manager *SegmentManager) assignSegment( +func (manager *SegmentManagerImpl) assignSegment( collName string, collID UniqueID, partitionTag string, @@ -180,7 +189,7 @@ func (manager *SegmentManager) assignSegment( }, nil } -func (manager *SegmentManager) isMatch(segmentID UniqueID, partitionTag string, channelID int32) (bool, error) { +func (manager *SegmentManagerImpl) isMatch(segmentID UniqueID, partitionTag string, channelID int32) (bool, error) { segMeta, err := manager.metaTable.GetSegmentByID(segmentID) if err != nil { return false, err @@ -193,7 +202,7 @@ func (manager *SegmentManager) isMatch(segmentID UniqueID, partitionTag string, return true, nil } -func (manager *SegmentManager) estimateTotalRows(collName string) (int, error) { +func (manager *SegmentManagerImpl) estimateTotalRows(collName string) (int, error) { collMeta, err := manager.metaTable.GetCollectionByName(collName) if err != nil { return -1, err @@ -205,7 +214,7 @@ func (manager *SegmentManager) estimateTotalRows(collName string) (int, error) { return int(manager.segmentThreshold / float64(sizePerRecord)), nil } -func (manager *SegmentManager) openNewSegment(channelID int32, collID UniqueID, partitionTag string, numRows int) (UniqueID, error) { +func (manager *SegmentManagerImpl) openNewSegment(channelID int32, collID UniqueID, partitionTag string, numRows int) (UniqueID, error) { // find the channel range channelStart, channelEnd := int32(-1), int32(-1) for _, r := range manager.channelRanges { @@ -259,17 +268,17 @@ func (manager *SegmentManager) openNewSegment(channelID int32, collID UniqueID, return newID, nil } -func (manager *SegmentManager) Start() { +func (manager *SegmentManagerImpl) Start() { manager.waitGroup.Add(1) go manager.startWriteNodeTimeSync() } -func (manager *SegmentManager) Close() { +func (manager *SegmentManagerImpl) Close() { manager.cancel() manager.waitGroup.Wait() } -func (manager *SegmentManager) startWriteNodeTimeSync() { +func (manager *SegmentManagerImpl) startWriteNodeTimeSync() { defer manager.waitGroup.Done() for { select { @@ -284,7 +293,7 @@ func (manager *SegmentManager) startWriteNodeTimeSync() { } } -func (manager *SegmentManager) syncWriteNodeTimestamp(timeTick Timestamp) error { +func (manager *SegmentManagerImpl) syncWriteNodeTimestamp(timeTick Timestamp) error { manager.mu.Lock() defer manager.mu.Unlock() for _, status := range manager.collStatus { @@ -292,7 +301,8 @@ func (manager *SegmentManager) syncWriteNodeTimestamp(timeTick Timestamp) error if !segStatus.closable { closable, err := manager.judgeSegmentClosable(segStatus) if err != nil { - return err + log.Printf("check segment closable error: %s", err.Error()) + continue } segStatus.closable = closable if !segStatus.closable { @@ -310,16 +320,20 @@ func (manager *SegmentManager) syncWriteNodeTimestamp(timeTick Timestamp) error status.segments = append(status.segments[:i], status.segments[i+1:]...) ts, err := manager.globalTSOAllocator() if err != nil { - return err + log.Printf("allocate tso error: %s", err.Error()) + continue } if err = manager.metaTable.CloseSegment(segStatus.segmentID, ts); err != nil { - return err + log.Printf("meta table close segment error: %s", err.Error()) + continue } if err = manager.assigner.CloseSegment(segStatus.segmentID); err != nil { - return err + log.Printf("assigner close segment error: %s", err.Error()) + continue } if err = manager.flushScheduler.Enqueue(segStatus.segmentID); err != nil { - return err + log.Printf("flush scheduler enqueue error: %s", err.Error()) + continue } } } @@ -327,7 +341,7 @@ func (manager *SegmentManager) syncWriteNodeTimestamp(timeTick Timestamp) error return nil } -func (manager *SegmentManager) judgeSegmentClosable(status *segmentStatus) (bool, error) { +func (manager *SegmentManagerImpl) judgeSegmentClosable(status *segmentStatus) (bool, error) { segMeta, err := manager.metaTable.GetSegmentByID(status.segmentID) if err != nil { return false, err @@ -339,7 +353,7 @@ func (manager *SegmentManager) judgeSegmentClosable(status *segmentStatus) (bool return false, nil } -func (manager *SegmentManager) initChannelRanges() error { +func (manager *SegmentManagerImpl) initChannelRanges() error { div, rem := manager.numOfChannels/manager.numOfQueryNodes, manager.numOfChannels%manager.numOfQueryNodes for i, j := 0, 0; i < manager.numOfChannels; j++ { if j < rem { @@ -360,7 +374,9 @@ func (manager *SegmentManager) initChannelRanges() error { } // ForceClose set segments of collection with collID closable, segment will be closed after the assignments of it has expired -func (manager *SegmentManager) ForceClose(collID UniqueID) error { +func (manager *SegmentManagerImpl) ForceClose(collID UniqueID) error { + manager.mu.Lock() + defer manager.mu.Unlock() status, ok := manager.collStatus[collID] if !ok { return nil @@ -372,16 +388,35 @@ func (manager *SegmentManager) ForceClose(collID UniqueID) error { return nil } +func (manager *SegmentManagerImpl) DropCollection(collID UniqueID) error { + manager.mu.Lock() + defer manager.mu.Unlock() + + status, ok := manager.collStatus[collID] + if !ok { + return nil + } + + for _, segStatus := range status.segments { + if err := manager.assigner.CloseSegment(segStatus.segmentID); err != nil { + return err + } + } + + delete(manager.collStatus, collID) + return nil +} + func NewSegmentManager(ctx context.Context, meta *metaTable, globalIDAllocator func() (UniqueID, error), globalTSOAllocator func() (Timestamp, error), syncWriteNodeChan chan *ms.TimeTickMsg, scheduler persistenceScheduler, - assigner *SegmentAssigner) (*SegmentManager, error) { + assigner *SegmentAssigner) (*SegmentManagerImpl, error) { assignerCtx, cancel := context.WithCancel(ctx) - segAssigner := &SegmentManager{ + segManager := &SegmentManagerImpl{ metaTable: meta, channelRanges: make([]*channelRange, 0), collStatus: make(map[UniqueID]*collectionStatus), @@ -401,9 +436,35 @@ func NewSegmentManager(ctx context.Context, cancel: cancel, } - if err := segAssigner.initChannelRanges(); err != nil { + if err := segManager.initChannelRanges(); err != nil { return nil, err } - return segAssigner, nil + return segManager, nil +} + +type mockSegmentManager struct { +} + +func (manager *mockSegmentManager) Start() { +} + +func (manager *mockSegmentManager) Close() { +} + +func (manager *mockSegmentManager) AssignSegment(segIDReq []*internalpb.SegIDRequest) ([]*internalpb.SegIDAssignment, error) { + return nil, nil +} + +func (manager *mockSegmentManager) ForceClose(collID UniqueID) error { + return nil +} + +func (manager *mockSegmentManager) DropCollection(collID UniqueID) error { + return nil +} + +// only used in unit tests +func NewMockSegmentManager() SegmentManager { + return &mockSegmentManager{} } diff --git a/internal/master/task_test.go b/internal/master/task_test.go index 3753fd6181..d901bf11a9 100644 --- a/internal/master/task_test.go +++ b/internal/master/task_test.go @@ -46,8 +46,9 @@ func TestMaster_DropCollectionTask(t *testing.T) { CollectionName: nil, } var collectionTask task = &dropCollectionTask{ - req: &req, - baseTask: baseTask{}, + req: &req, + baseTask: baseTask{}, + segManager: NewMockSegmentManager(), } assert.Equal(t, internalpb.MsgType_kDropPartition, collectionTask.Type()) ts, err := collectionTask.Ts() @@ -55,8 +56,9 @@ func TestMaster_DropCollectionTask(t *testing.T) { assert.Nil(t, err) collectionTask = &dropCollectionTask{ - req: nil, - baseTask: baseTask{}, + req: nil, + baseTask: baseTask{}, + segManager: NewMockSegmentManager(), } assert.Equal(t, internalpb.MsgType_kNone, collectionTask.Type())