From fd30034c778420ca808faaab906ebdab4bd59cbd Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 21 Nov 2024 21:35:42 +0800 Subject: [PATCH] fix: [10kcp] Fix data view and add more ut (#37915) Signed-off-by: bigsheeper --- internal/datacoord/dataview/data_view.go | 2 +- internal/datacoord/dataview/view_manager.go | 37 ++- .../datacoord/dataview/view_manager_test.go | 217 ++++++++++++++++++ internal/datacoord/handler.go | 3 + internal/datacoord/services_test.go | 34 +++ .../datacoord/client/client_test.go | 51 ++++ .../distributed/datacoord/service_test.go | 7 + 7 files changed, 339 insertions(+), 12 deletions(-) create mode 100644 internal/datacoord/dataview/view_manager_test.go diff --git a/internal/datacoord/dataview/data_view.go b/internal/datacoord/dataview/data_view.go index c34b59b820..bfea1c1ff8 100644 --- a/internal/datacoord/dataview/data_view.go +++ b/internal/datacoord/dataview/data_view.go @@ -18,7 +18,7 @@ package dataview import "github.com/milvus-io/milvus/internal/proto/datapb" -const InitialDataViewVersion = 0 +const InitialDataViewVersion int64 = 0 type DataView struct { CollectionID int64 diff --git a/internal/datacoord/dataview/view_manager.go b/internal/datacoord/dataview/view_manager.go index 9eed50065b..f5638f4e27 100644 --- a/internal/datacoord/dataview/view_manager.go +++ b/internal/datacoord/dataview/view_manager.go @@ -33,6 +33,7 @@ type PullNewDataViewFunction func(collectionID int64) (*DataView, error) type ViewManager interface { Get(collectionID int64) (*DataView, error) GetVersion(collectionID int64) int64 + Remove(collectionID int64) Start() Close() @@ -63,8 +64,12 @@ func (m *dataViewManager) Get(collectionID int64) (*DataView, error) { if err != nil { return nil, err } - m.currentViews.GetOrInsert(collectionID, view) - return view, nil + + v, ok := m.currentViews.GetOrInsert(collectionID, view) + if !ok { + log.Info("update new data view", zap.Int64("collectionID", collectionID), zap.Int64("version", view.Version)) + } + return v, nil } func (m *dataViewManager) GetVersion(collectionID int64) int64 { @@ -74,6 +79,12 @@ func (m *dataViewManager) GetVersion(collectionID int64) int64 { return InitialDataViewVersion } +func (m *dataViewManager) Remove(collectionID int64) { + if view, ok := m.currentViews.GetAndRemove(collectionID); ok { + log.Info("data view removed", zap.Int64("collectionID", collectionID), zap.Int64("version", view.Version)) + } +} + func (m *dataViewManager) Start() { ticker := time.NewTicker(paramtable.Get().DataCoordCfg.DataViewUpdateInterval.GetAsDuration(time.Second)) defer ticker.Stop() @@ -100,35 +111,36 @@ func (m *dataViewManager) Close() { } func (m *dataViewManager) update(view *DataView) { - _, ok := m.currentViews.GetOrInsert(view.CollectionID, view) - if ok { - log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version)) - } + m.currentViews.Insert(view.CollectionID, view) + log.Info("update new data view", zap.Int64("collectionID", view.CollectionID), zap.Int64("version", view.Version)) } func (m *dataViewManager) TryUpdateDataView(collectionID int64) { newView, err := m.pullFn(collectionID) if err != nil { log.Warn("pull new data view failed", zap.Int64("collectionID", collectionID), zap.Error(err)) - // notify to trigger pull again + // notify to trigger retry NotifyUpdate(collectionID) return } currentView, ok := m.currentViews.Get(collectionID) if !ok { - m.currentViews.GetOrInsert(collectionID, newView) + // update due to data view is empty + m.update(newView) return } // no-op if the incoming version is less than the current version. if newView.Version <= currentView.Version { + log.Warn("stale version, skip update", zap.Int64("collectionID", collectionID), + zap.Int64("new", newView.Version), zap.Int64("current", currentView.Version)) return } - // check if channel info has been updated. for channel, new := range newView.Channels { current, ok := currentView.Channels[channel] if !ok { + // update due to channel info is empty m.update(newView) return } @@ -137,22 +149,25 @@ func (m *dataViewManager) TryUpdateDataView(collectionID int64) { !funcutil.SliceSetEqual(new.GetFlushedSegmentIds(), current.GetFlushedSegmentIds()) || !funcutil.SliceSetEqual(new.GetIndexedSegmentIds(), current.GetIndexedSegmentIds()) || !funcutil.SliceSetEqual(new.GetDroppedSegmentIds(), current.GetDroppedSegmentIds()) { + // update due to segments list changed m.update(newView) return } if !typeutil.MapEqual(new.GetPartitionStatsVersions(), current.GetPartitionStatsVersions()) { + // update due to partition stats changed m.update(newView) return } // TODO: It might be too frequent. if new.GetSeekPosition().GetTimestamp() > current.GetSeekPosition().GetTimestamp() { + // update due to channel cp advanced m.update(newView) return } } - // check if segment info has been updated. if !typeutil.MapEqual(newView.Segments, currentView.Segments) { - m.currentViews.GetOrInsert(collectionID, newView) + // update due to segments list changed + m.update(newView) } } diff --git a/internal/datacoord/dataview/view_manager_test.go b/internal/datacoord/dataview/view_manager_test.go new file mode 100644 index 0000000000..afd41a6b7b --- /dev/null +++ b/internal/datacoord/dataview/view_manager_test.go @@ -0,0 +1,217 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dataview + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +func init() { + paramtable.Init() +} + +func TestNewDataViewManager_Get(t *testing.T) { + pullFn := func(collectionID int64) (*DataView, error) { + return &DataView{ + CollectionID: collectionID, + Channels: nil, + Segments: nil, + Version: time.Now().UnixNano(), + }, nil + } + manager := NewDataViewManager(pullFn) + + collectionID := int64(1) + // No data view + version := manager.GetVersion(collectionID) + assert.Equal(t, InitialDataViewVersion, version) + + // Lazy get data view + v1, err := manager.Get(collectionID) + assert.NoError(t, err) + assert.NotEqual(t, InitialDataViewVersion, v1) + version = manager.GetVersion(v1.CollectionID) + assert.Equal(t, v1.Version, version) + + // Get again, data view should not update + v2, err := manager.Get(collectionID) + assert.NoError(t, err) + assert.Equal(t, v1, v2) +} + +func TestNewDataViewManager_TryUpdateDataView(t *testing.T) { + manager := NewDataViewManager(nil) + go manager.Start() + defer manager.Close() + + collectionID := int64(1) + + // Update due to data view is empty + v1 := &DataView{ + CollectionID: collectionID, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v1, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v1.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to channel info is empty + v2 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + }}, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v2, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v2.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to segments list changed + v3 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + UnflushedSegmentIds: []int64{100, 200}, + }}, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v3, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v3.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to partition stats changed + v4 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + SeekPosition: &msgpb.MsgPosition{ + Timestamp: uint64(time.Now().UnixNano()), + }, + UnflushedSegmentIds: []int64{100, 200}, + PartitionStatsVersions: map[int64]int64{1000: 2000}, + }}, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v4, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v4.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to channel cp advanced + v5 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + SeekPosition: &msgpb.MsgPosition{ + Timestamp: uint64(time.Now().UnixNano()), + }, + UnflushedSegmentIds: []int64{100, 200}, + PartitionStatsVersions: map[int64]int64{1000: 2000}, + }}, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v5, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v5.Version + }, 1*time.Second, 10*time.Millisecond) + + // Update due to segments list changed + v6 := &DataView{ + CollectionID: collectionID, + Channels: map[string]*datapb.VchannelInfo{"ch0": { + CollectionID: collectionID, + ChannelName: "ch0", + SeekPosition: &msgpb.MsgPosition{ + Timestamp: v5.Channels["ch0"].GetSeekPosition().GetTimestamp(), + }, + UnflushedSegmentIds: []int64{100, 200}, + PartitionStatsVersions: map[int64]int64{1000: 2000}, + }}, + Segments: map[int64]struct{}{ + 300: {}, + }, + Version: time.Now().UnixNano(), + } + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return v6, nil + } + NotifyUpdate(collectionID) + assert.Eventually(t, func() bool { + version := manager.GetVersion(collectionID) + return version == v6.Version + }, 1*time.Second, 10*time.Millisecond) + + // Won't update anymore + NotifyUpdate(collectionID) + assert.Never(t, func() bool { + version := manager.GetVersion(collectionID) + return version != v6.Version + }, 100*time.Millisecond, 10*time.Millisecond) +} + +func TestNewDataViewManager_TryUpdateDataView_Failed(t *testing.T) { + manager := NewDataViewManager(nil) + go manager.Start() + defer manager.Close() + + collectionID := int64(1) + + manager.(*dataViewManager).pullFn = func(collectionID int64) (*DataView, error) { + return nil, fmt.Errorf("mock err") + } + NotifyUpdate(collectionID) + assert.Never(t, func() bool { + version := manager.GetVersion(collectionID) + return version > InitialDataViewVersion + }, 100*time.Millisecond, 10*time.Millisecond) +} diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 60bec831c3..089c370993 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -474,5 +474,8 @@ func (h *ServerHandler) FinishDropChannel(channel string, collectionID int64) er // clean collection info cache when meet drop collection info h.s.meta.DropCollection(collectionID) + // clean data view + h.s.viewManager.Remove(collectionID) + return nil } diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 356d8da0a0..cc14114d77 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/datacoord/dataview" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/metastore/model" mocks2 "github.com/milvus-io/milvus/internal/mocks" @@ -1060,6 +1061,39 @@ func TestServer_GcConfirm(t *testing.T) { }) } +func TestGetDataViewVersions(t *testing.T) { + t.Run("server not healthy", func(t *testing.T) { + svr := newTestServer(t) + closeTestServer(t, svr) + resp, err := svr.GetDataViewVersions(context.TODO(), &datapb.GetDataViewVersionsRequest{}) + assert.NoError(t, err) + err = merr.Error(resp.GetStatus()) + assert.ErrorIs(t, err, merr.ErrServiceNotReady) + }) + + t.Run("normal", func(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + + pullFn := func(collectionID int64) (*dataview.DataView, error) { + return &dataview.DataView{ + CollectionID: collectionID, + Version: time.Now().UnixNano(), + }, nil + } + manager := dataview.NewDataViewManager(pullFn) + svr.viewManager = manager + + req := &datapb.GetDataViewVersionsRequest{ + CollectionIDs: []int64{100, 200, 300}, + } + resp, err := svr.GetDataViewVersions(context.TODO(), req) + assert.NoError(t, err) + assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetCode()) + assert.EqualValues(t, 3, len(resp.GetDataViewVersions())) + }) +} + func TestGetRecoveryInfoV2(t *testing.T) { t.Run("test get recovery info with no segments", func(t *testing.T) { svr := newTestServer(t) diff --git a/internal/distributed/datacoord/client/client_test.go b/internal/distributed/datacoord/client/client_test.go index 2cb56dbd44..7504a66b4e 100644 --- a/internal/distributed/datacoord/client/client_test.go +++ b/internal/distributed/datacoord/client/client_test.go @@ -692,6 +692,57 @@ func Test_SaveBinlogPaths(t *testing.T) { assert.ErrorIs(t, err, context.DeadlineExceeded) } +func Test_GetDataViewVersions(t *testing.T) { + paramtable.Init() + + ctx := context.Background() + client, err := NewClient(ctx) + assert.NoError(t, err) + assert.NotNil(t, client) + defer client.Close() + + mockDC := mocks.NewMockDataCoordClient(t) + mockGrpcClient := mocks.NewMockGrpcClient[datapb.DataCoordClient](t) + mockGrpcClient.EXPECT().Close().Return(nil) + mockGrpcClient.EXPECT().ReCall(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, f func(datapb.DataCoordClient) (interface{}, error)) (interface{}, error) { + return f(mockDC) + }) + client.(*Client).grpcClient = mockGrpcClient + + // test success + mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{ + Status: merr.Success(), + }, nil) + _, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{}) + assert.Nil(t, err) + + // test return error status + mockDC.ExpectedCalls = nil + mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{ + Status: merr.Status(merr.ErrServiceNotReady), + }, nil) + + rsp, err := client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{}) + assert.NotEqual(t, int32(0), rsp.GetStatus().GetCode()) + assert.Nil(t, err) + + // test return error + mockDC.ExpectedCalls = nil + mockDC.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{ + Status: merr.Success(), + }, mockErr) + + _, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{}) + assert.NotNil(t, err) + + // test ctx done + ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) + defer cancel() + time.Sleep(20 * time.Millisecond) + _, err = client.GetDataViewVersions(ctx, &datapb.GetDataViewVersionsRequest{}) + assert.ErrorIs(t, err, context.DeadlineExceeded) +} + func Test_GetRecoveryInfo(t *testing.T) { paramtable.Init() diff --git a/internal/distributed/datacoord/service_test.go b/internal/distributed/datacoord/service_test.go index 319c915995..adab9b434f 100644 --- a/internal/distributed/datacoord/service_test.go +++ b/internal/distributed/datacoord/service_test.go @@ -130,6 +130,13 @@ func Test_NewServer(t *testing.T) { assert.NotNil(t, resp) }) + t.Run("GetDataViewVersions", func(t *testing.T) { + mockDataCoord.EXPECT().GetDataViewVersions(mock.Anything, mock.Anything).Return(&datapb.GetDataViewVersionsResponse{}, nil) + resp, err := server.GetDataViewVersions(ctx, nil) + assert.NoError(t, err) + assert.NotNil(t, resp) + }) + t.Run("GetRecoveryInfo", func(t *testing.T) { mockDataCoord.EXPECT().GetRecoveryInfo(mock.Anything, mock.Anything).Return(&datapb.GetRecoveryInfoResponse{}, nil) resp, err := server.GetRecoveryInfo(ctx, nil)