remove write handoff event (#23565)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2023-04-23 15:18:33 +08:00 committed by GitHub
parent 5f96f4bbfd
commit cb82783d34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 14 additions and 853 deletions

View File

@ -53,7 +53,6 @@ type flushedSegmentWatcher struct {
meta *metaTable
builder *indexBuilder
ic *IndexCoord
handoff *handoff
internalTasks map[UniqueID]*internalTask
}
@ -63,8 +62,7 @@ type internalTask struct {
segmentInfo *datapb.SegmentInfo
}
func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, builder *indexBuilder,
handoff *handoff, ic *IndexCoord) (*flushedSegmentWatcher, error) {
func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, builder *indexBuilder, ic *IndexCoord) (*flushedSegmentWatcher, error) {
ctx, cancel := context.WithCancel(ctx)
fsw := &flushedSegmentWatcher{
ctx: ctx,
@ -76,7 +74,6 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable,
internalNotify: make(chan struct{}, 1),
meta: meta,
builder: builder,
handoff: handoff,
ic: ic,
}
err := fsw.reloadFromKV()
@ -239,7 +236,6 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
switch t.state {
case indexTaskPrepare:
if t.segmentInfo.GetIsFake() {
fsw.handoff.enqueue(t.segmentInfo)
fsw.updateInternalTaskState(segID, indexTaskDone)
fsw.internalNotifyFunc()
return
@ -258,7 +254,7 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
fsw.updateInternalTaskState(segID, indexTaskInProgress)
fsw.internalNotifyFunc()
case indexTaskInProgress:
if fsw.handoff.taskDone(segID) {
if fsw.checkIndexState(segID) {
fsw.updateInternalTaskState(segID, indexTaskDone)
fsw.internalNotifyFunc()
}
@ -274,6 +270,11 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
}
}
func (fsw *flushedSegmentWatcher) checkIndexState(segID int64) bool {
state := fsw.meta.GetSegmentIndexState(segID)
return state.state == commonpb.IndexState_Finished
}
func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
// Make sure index is not being written.
fsw.ic.indexGCLock.Lock()
@ -318,7 +319,6 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
log.Ctx(fsw.ctx).Info("flushedSegmentWatcher construct task success", zap.Int64("segID", t.segmentInfo.ID),
zap.Int64("buildID", buildID), zap.Bool("already have index task", have))
}
fsw.handoff.enqueue(t.segmentInfo)
return nil
}

View File

@ -70,7 +70,7 @@ func Test_flushSegmentWatcher(t *testing.T) {
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{},
},
&indexBuilder{}, &handoff{}, &IndexCoord{
&indexBuilder{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(),
})
assert.NoError(t, err)
@ -97,7 +97,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
loadWithRevision: func(key string) ([]string, []string, int64, error) {
return []string{"segID1"}, []string{segBytes}, 1, nil
},
}, &metaTable{}, &indexBuilder{}, &handoff{}, &IndexCoord{
}, &metaTable{}, &indexBuilder{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(),
})
assert.NoError(t, err)
@ -108,7 +108,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
loadWithRevision: func(key string) ([]string, []string, int64, error) {
return []string{"segID1"}, []string{"10"}, 1, nil
},
}, &metaTable{}, &indexBuilder{}, &handoff{}, &IndexCoord{
}, &metaTable{}, &indexBuilder{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(),
})
assert.NoError(t, err)
@ -121,7 +121,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
loadWithRevision: func(key string) ([]string, []string, int64, error) {
return []string{"segID1"}, []string{segBytes}, 1, errors.New("error")
},
}, &metaTable{}, &indexBuilder{}, &handoff{}, &IndexCoord{
}, &metaTable{}, &indexBuilder{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(),
})
assert.Error(t, err)
@ -134,7 +134,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
loadWithRevision: func(key string) ([]string, []string, int64, error) {
return []string{"segID1"}, []string{"segID"}, 1, nil
},
}, &metaTable{}, &indexBuilder{}, &handoff{}, &IndexCoord{
}, &metaTable{}, &indexBuilder{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(),
})
assert.Error(t, err)
@ -158,7 +158,6 @@ func Test_flushedSegmentWatcher_internalRun(t *testing.T) {
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
handoff: nil,
internalTasks: map[UniqueID]*internalTask{
segID: {
state: indexTaskPrepare,
@ -266,15 +265,6 @@ func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) {
fsw := &flushedSegmentWatcher{
ctx: context.Background(),
handoff: &handoff{
segments: map[UniqueID]*datapb.SegmentInfo{},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: meta,
notifyChan: make(chan struct{}, 1),
scheduleDuration: time.Second,
kvClient: nil,
},
ic: &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
@ -351,8 +341,6 @@ func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) {
fsw.internalTaskMutex.RUnlock()
})
fsw.handoff.deleteTask(segID)
t.Run("inProgress", func(t *testing.T) {
fsw.internalProcess(segID)
fsw.internalTaskMutex.RLock()
@ -498,7 +486,6 @@ func Test_flushSegmentWatcher_prepare_error(t *testing.T) {
loopCtx: context.Background(),
dataCoordClient: NewDataCoordMock(),
},
handoff: nil,
internalTasks: map[UniqueID]*internalTask{
segID: {
state: indexTaskPrepare,
@ -540,7 +527,6 @@ func Test_flushSegmentWatcher_prepare_error(t *testing.T) {
},
},
},
handoff: nil,
internalTasks: map[UniqueID]*internalTask{
segID: {
state: indexTaskPrepare,
@ -642,7 +628,6 @@ func Test_flushSegmentWatcher_constructTask_error(t *testing.T) {
ic: &IndexCoord{
rootCoordClient: NewRootCoordMock(),
},
handoff: nil,
internalTasks: map[UniqueID]*internalTask{
segID: task,
},

View File

@ -1,334 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexcoord
import (
"context"
"errors"
"sort"
"sync"
"time"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
type handoff struct {
ctx context.Context
cancel context.CancelFunc
segments map[UniqueID]*datapb.SegmentInfo
taskMutex sync.RWMutex
wg sync.WaitGroup
meta *metaTable
notifyChan chan struct{}
scheduleDuration time.Duration
kvClient kv.MetaKv
ic *IndexCoord
}
func newHandoff(ctx context.Context, metaTable *metaTable, kvClient kv.MetaKv, ic *IndexCoord) *handoff {
ctx, cancel := context.WithCancel(ctx)
hd := &handoff{
ctx: ctx,
cancel: cancel,
segments: make(map[UniqueID]*datapb.SegmentInfo),
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: metaTable,
notifyChan: make(chan struct{}, 1),
scheduleDuration: time.Second,
kvClient: kvClient,
ic: ic,
}
hd.recoveryFromMeta()
log.Ctx(ctx).Info("new handoff success")
return hd
}
func (hd *handoff) recoveryFromMeta() {
allSegIndexes := hd.meta.GetAllSegIndexes()
hd.taskMutex.Lock()
defer hd.taskMutex.Unlock()
hd.segments = make(map[UniqueID]*datapb.SegmentInfo, 0)
for segID, segIdx := range allSegIndexes {
if segIdx.IsDeleted {
continue
}
if segIdx.WriteHandoff {
continue
}
hd.segments[segID] = &datapb.SegmentInfo{ID: segID}
}
log.Ctx(hd.ctx).Info("recovery from meta success", zap.Int("task num", len(hd.segments)))
}
func (hd *handoff) enqueue(segment *datapb.SegmentInfo) {
defer hd.Notify()
hd.taskMutex.Lock()
defer hd.taskMutex.Unlock()
// note: don't reset state if the task contains state
hd.segments[segment.GetID()] = segment
log.Ctx(hd.ctx).Info("handoff task enqueue successfully",
zap.Int64("segID", segment.GetID()),
zap.Bool("isFake", segment.GetIsFake()),
)
}
func (hd *handoff) Start() {
hd.wg.Add(1)
go hd.scheduler()
}
func (hd *handoff) Stop() {
hd.cancel()
hd.wg.Wait()
}
func (hd *handoff) Notify() {
select {
case hd.notifyChan <- struct{}{}:
default:
}
}
func (hd *handoff) scheduler() {
log.Ctx(hd.ctx).Info("IndexCoord handoff start...")
defer hd.wg.Done()
ticker := time.NewTicker(hd.scheduleDuration)
defer ticker.Stop()
for {
select {
case <-hd.ctx.Done():
log.Info("IndexCoord handoff context done, exit...")
return
case <-ticker.C:
hd.run()
case <-hd.notifyChan:
hd.run()
}
}
}
func (hd *handoff) run() {
hd.taskMutex.RLock()
segIDs := make([]UniqueID, 0, len(hd.segments))
for segID := range hd.segments {
segIDs = append(segIDs, segID)
}
hd.taskMutex.RUnlock()
sort.Slice(segIDs, func(i, j int) bool {
return segIDs[i] < segIDs[j]
})
if len(segIDs) > 0 {
log.Ctx(hd.ctx).Debug("handoff process...", zap.Int("task num", len(segIDs)))
}
for _, segID := range segIDs {
hd.process(segID)
}
}
func (hd *handoff) handoffFakedSegment(segment *datapb.SegmentInfo) {
if hd.allParentsDone(segment.GetCompactionFrom()) {
handoffSegment := &querypb.SegmentInfo{
SegmentID: segment.GetID(),
CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(),
CompactionFrom: segment.GetCompactionFrom(),
CreatedByCompaction: segment.GetCreatedByCompaction(),
IsFake: segment.GetIsFake(),
}
if err := hd.writeHandoffSegment(handoffSegment); err != nil {
log.Ctx(hd.ctx).Warn("write handoff task fail, need to retry", zap.Int64("segID", segment.GetID()), zap.Error(err))
return
}
log.Ctx(hd.ctx).Info("write handoff task success",
zap.Int64("segID", segment.GetID()),
zap.Bool("isFake", segment.GetIsFake()),
zap.Any("segment", segment))
hd.deleteTask(segment.GetID())
}
}
func (hd *handoff) process(segID UniqueID) {
hd.taskMutex.RLock()
segment, ok := hd.segments[segID]
hd.taskMutex.RUnlock()
if !ok {
log.Ctx(hd.ctx).Warn("handoff get task fail", zap.Int64("segID", segID))
return
}
if segment.GetIsFake() {
hd.handoffFakedSegment(segment)
return
}
state := hd.meta.GetSegmentIndexState(segID)
log.Ctx(hd.ctx).RatedDebug(30, "handoff task is process", zap.Int64("segID", segID),
zap.String("state", state.state.String()))
if state.state == commonpb.IndexState_Failed {
log.Ctx(hd.ctx).Error("build index failed, may be need manual intervention", zap.Int64("segID", segID),
zap.String("fail reason", state.failReason))
hd.deleteTask(segID)
// TODO @xiaocai2333: need write handoff event?
return
}
if state.state == commonpb.IndexState_Finished {
log.Ctx(hd.ctx).Info("build index for segment success, write handoff event...", zap.Int64("segID", segID))
info, err := hd.ic.pullSegmentInfo(hd.ctx, segID)
if err != nil {
if errors.Is(err, ErrSegmentNotFound) {
log.Ctx(hd.ctx).Warn("handoff get segment fail, remove task", zap.Error(err))
hd.deleteTask(segID)
return
}
log.Ctx(hd.ctx).Warn("handoff get segment fail, need to retry", zap.Error(err))
return
}
if info.IsImporting {
log.Info("segment is importing, can't write handoff event", zap.Int64("segID", segID))
return
}
if hd.allParentsDone(info.CompactionFrom) {
log.Ctx(hd.ctx).Debug("segment can write handoff event", zap.Int64("segID", segID),
zap.Int64s("compactionFrom", info.CompactionFrom))
indexInfos := hd.meta.GetSegmentIndexes(segID)
if len(indexInfos) == 0 {
log.Ctx(hd.ctx).Warn("ready to write handoff, but there is no index, may be dropped, remove task",
zap.Int64("segID", segID))
hd.deleteTask(segID)
return
}
handoffTask := &querypb.SegmentInfo{
SegmentID: segID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
NumRows: info.NumOfRows,
DmChannel: info.GetInsertChannel(),
CompactionFrom: info.CompactionFrom,
CreatedByCompaction: info.CreatedByCompaction,
SegmentState: info.State,
IndexInfos: make([]*querypb.FieldIndexInfo, 0),
EnableIndex: true,
}
for _, indexInfo := range indexInfos {
indexParams := hd.meta.GetIndexParams(info.CollectionID, indexInfo.IndexID)
indexParams = append(indexParams, hd.meta.GetTypeParams(info.CollectionID, indexInfo.IndexID)...)
handoffTask.IndexInfos = append(handoffTask.IndexInfos, &querypb.FieldIndexInfo{
FieldID: hd.meta.GetFieldIDByIndexID(info.CollectionID, indexInfo.IndexID),
EnableIndex: true,
IndexName: hd.meta.GetIndexNameByID(info.CollectionID, indexInfo.IndexID),
IndexID: indexInfo.IndexID,
BuildID: indexInfo.BuildID,
IndexParams: indexParams,
//IndexFileKeys: nil,
//IndexSize: 0,
})
}
if !hd.meta.AlreadyWrittenHandoff(segID) {
if err := hd.writeHandoffSegment(handoffTask); err != nil {
log.Ctx(hd.ctx).Warn("write handoff task fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
return
}
log.Ctx(hd.ctx).Info("write handoff success", zap.Int64("segID", segID))
if err := hd.meta.MarkSegmentWriteHandoff(segID); err != nil {
log.Ctx(hd.ctx).Warn("mark segment as write handoff fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
return
}
}
log.Ctx(hd.ctx).Info("mark segment as write handoff success, remove task", zap.Int64("segID", segID))
hd.deleteTask(segID)
return
}
log.Ctx(hd.ctx).RatedDebug(5, "the handoff of the parent segment has not been written yet",
zap.Int64("segID", segID), zap.Int64s("compactionFrom", info.CompactionFrom))
}
}
func (hd *handoff) Len() int {
hd.taskMutex.RLock()
defer hd.taskMutex.RUnlock()
return len(hd.segments)
}
func (hd *handoff) deleteTask(segID UniqueID) {
hd.taskMutex.Lock()
defer hd.taskMutex.Unlock()
delete(hd.segments, segID)
}
func (hd *handoff) taskDone(segID UniqueID) bool {
hd.taskMutex.RLock()
defer hd.taskMutex.RUnlock()
_, ok := hd.segments[segID]
return !ok
}
func (hd *handoff) allParentsDone(segIDs []UniqueID) bool {
hd.taskMutex.RLock()
defer hd.taskMutex.RUnlock()
for _, segID := range segIDs {
if _, ok := hd.segments[segID]; ok {
return false
}
}
return true
}
func (hd *handoff) writeHandoffSegment(info *querypb.SegmentInfo) error {
key := buildHandoffKey(info.CollectionID, info.PartitionID, info.SegmentID)
value, err := proto.Marshal(info)
if err != nil {
log.Error("IndexCoord marshal handoff task fail", zap.Int64("collID", info.CollectionID),
zap.Int64("partID", info.PartitionID), zap.Int64("segID", info.SegmentID), zap.Error(err))
return err
}
err = hd.kvClient.Save(key, string(value))
if err != nil {
log.Error("IndexCoord save handoff task fail", zap.Int64("collID", info.CollectionID),
zap.Int64("partID", info.PartitionID), zap.Int64("segID", info.SegmentID), zap.Error(err))
return err
}
log.Info("IndexCoord write handoff task success", zap.Int64("collID", info.CollectionID),
zap.Int64("partID", info.PartitionID), zap.Int64("segID", info.SegmentID))
return nil
}

View File

@ -1,483 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexcoord
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/indexcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
)
func createMetaForHandoff(catalog metastore.IndexCoordCatalog) *metaTable {
return &metaTable{
catalog: catalog,
segmentIndexLock: sync.RWMutex{},
indexLock: sync.RWMutex{},
collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
},
},
},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID,
IndexState: 1,
IsDeleted: false,
WriteHandoff: false,
},
},
segID + 1: {
indexID: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID + 1,
IndexState: 1,
IsDeleted: true,
WriteHandoff: false,
},
},
segID + 2: {
indexID: {
SegmentID: segID + 2,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID + 2,
IndexState: 1,
IsDeleted: false,
WriteHandoff: true,
},
},
},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID,
IndexID: indexID,
IndexState: 1,
IsDeleted: false,
WriteHandoff: false,
},
buildID + 1: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID + 1,
IndexID: indexID,
IndexState: 1,
IsDeleted: true,
WriteHandoff: false,
},
buildID + 2: {
SegmentID: segID + 2,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID + 2,
IndexID: indexID,
IndexState: 1,
IsDeleted: false,
WriteHandoff: true,
},
},
}
}
func Test_newHandoff(t *testing.T) {
ctx := context.Background()
hd := newHandoff(ctx, createMetaForHandoff(&indexcoord.Catalog{Txn: NewMockEtcdKV()}), NewMockEtcdKV(), &IndexCoord{dataCoordClient: NewDataCoordMock()})
assert.NotNil(t, hd)
assert.Equal(t, 1, len(hd.segments))
hd.enqueue(&datapb.SegmentInfo{ID: segID})
assert.Equal(t, 1, len(hd.segments))
err := hd.meta.AddIndex(&model.SegmentIndex{
SegmentID: segID + 3,
CollectionID: collID,
PartitionID: partID,
NumRows: 0,
IndexID: indexID,
BuildID: buildID + 3,
})
assert.NoError(t, err)
hd.enqueue(&datapb.SegmentInfo{ID: segID + 3})
assert.Equal(t, 2, len(hd.segments))
hd.Start()
err = hd.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFileKeys: []string{"file1", "file2"},
SerializedSize: 100,
FailReason: "",
})
assert.NoError(t, err)
err = hd.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID + 3,
State: commonpb.IndexState_Failed,
IndexFileKeys: nil,
SerializedSize: 0,
FailReason: "failed",
})
assert.NoError(t, err)
// handle ticker
time.Sleep(time.Second * 2)
for hd.Len() != 0 {
time.Sleep(500 * time.Millisecond)
}
assert.True(t, hd.taskDone(segID))
assert.True(t, hd.taskDone(segID+3))
hd.Stop()
}
func Test_process(t *testing.T) {
t.Run("not found segment", func(t *testing.T) {
hd := &handoff{segments: map[UniqueID]*datapb.SegmentInfo{}}
hd.process(segID)
assert.Equal(t, 0, hd.Len())
})
t.Run("write handoff ok for faked segment", func(t *testing.T) {
hd := &handoff{
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {
ID: segID,
IsFake: true,
},
},
taskMutex: sync.RWMutex{},
kvClient: &mockETCDKV{
save: func(s string, s2 string) error {
return nil
},
},
}
hd.process(segID)
assert.Equal(t, 0, hd.Len())
})
}
func Test_handoff_error(t *testing.T) {
t.Run("pullSegmentInfo fail", func(t *testing.T) {
hd := &handoff{
ctx: context.Background(),
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: nil,
ic: &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return nil, errors.New("error")
},
},
},
}
hd.process(segID)
assert.Equal(t, 1, hd.Len())
hd.ic.dataCoordClient = &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return nil, errSegmentNotFound(segID)
},
}
hd.process(segID)
assert.Equal(t, 0, hd.Len())
})
t.Run("is importing", func(t *testing.T) {
hd := &handoff{
ctx: context.Background(),
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: nil,
ic: &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Infos: []*datapb.SegmentInfo{
{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1024,
State: commonpb.SegmentState_Flushed,
IsImporting: true,
},
},
}, nil
},
},
},
}
hd.process(segID)
assert.Equal(t, 1, hd.Len())
})
t.Run("get index info fail", func(t *testing.T) {
hd := &handoff{
ctx: context.Background(),
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: true,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: nil,
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
}
hd.process(segID)
assert.Equal(t, 0, hd.Len())
})
t.Run("write handoff fail", func(t *testing.T) {
hd := &handoff{
ctx: context.Background(),
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
catalog: &indexcoord.Catalog{Txn: NewMockEtcdKV()},
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: &mockETCDKV{
save: func(s string, s2 string) error {
return errors.New("error")
},
},
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
}
hd.process(segID)
assert.Equal(t, 1, hd.Len())
})
t.Run("write handoff fail for faked segment", func(t *testing.T) {
hd := &handoff{
ctx: context.Background(),
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {
ID: segID,
IsFake: true,
},
},
taskMutex: sync.RWMutex{},
kvClient: &mockETCDKV{
save: func(s string, s2 string) error {
return errors.New("error")
},
},
}
hd.process(segID)
assert.Equal(t, 1, hd.Len())
})
t.Run("mark meta as write handoff fail", func(t *testing.T) {
hd := &handoff{
ctx: context.Background(),
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
catalog: &indexcoord.Catalog{Txn: &mockETCDKV{
multiSave: func(m map[string]string) error {
return errors.New("error")
},
}},
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: NewMockEtcdKV(),
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
}
hd.process(segID)
assert.Equal(t, 1, hd.Len())
})
}
func Test_handoff_allParentsDone(t *testing.T) {
t.Run("done", func(t *testing.T) {
hd := &handoff{
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {},
},
taskMutex: sync.RWMutex{},
}
done := hd.allParentsDone([]UniqueID{segID + 1, segID + 2, segID + 3})
assert.True(t, done)
})
t.Run("not done", func(t *testing.T) {
hd := &handoff{
segments: map[UniqueID]*datapb.SegmentInfo{
segID: {},
segID + 1: {},
},
taskMutex: sync.RWMutex{},
}
done := hd.allParentsDone([]UniqueID{segID + 1, segID + 2, segID + 3})
assert.False(t, done)
})
}

View File

@ -91,7 +91,6 @@ type IndexCoord struct {
indexBuilder *indexBuilder
garbageCollector *garbageCollector
flushedSegmentWatcher *flushedSegmentWatcher
handoff *handoff
metricsCacheManager *metricsinfo.MetricsCacheManager
@ -265,8 +264,7 @@ func (i *IndexCoord) initIndexCoord() error {
i.chunkManager = chunkManager
i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i)
i.handoff = newHandoff(i.loopCtx, i.metaTable, i.etcdKV, i)
i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i.handoff, i)
i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i)
if err != nil {
return err
}
@ -308,7 +306,6 @@ func (i *IndexCoord) startIndexCoord() {
i.indexBuilder.Start()
i.garbageCollector.Start()
i.handoff.Start()
i.flushedSegmentWatcher.Start()
})

View File

@ -232,7 +232,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
return err
}
buildIDs, segments, err := cit.createIndexAtomic(index, segmentsInfo.GetInfos())
buildIDs, _, err := cit.createIndexAtomic(index, segmentsInfo.GetInfos())
if err != nil {
log.Error("IndexCoord create index fail", zap.Int64("collectionID", cit.req.CollectionID),
zap.Int64("fieldID", cit.req.FieldID), zap.String("indexName", cit.req.IndexName), zap.Error(err))
@ -241,10 +241,6 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
for _, buildID := range buildIDs {
cit.indexCoordClient.indexBuilder.enqueue(buildID)
}
// If the handoff is not notified here, the segment that has been loaded will not be able to replace the index
for _, segment := range segments {
cit.indexCoordClient.handoff.enqueue(segment)
}
return nil
}