diff --git a/internal/querynode/data_sync_service.go b/internal/querynode/data_sync_service.go index 568b1e7c26..fe30f7fbdb 100644 --- a/internal/querynode/data_sync_service.go +++ b/internal/querynode/data_sync_service.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" + "github.com/milvus-io/milvus/internal/util/funcutil" ) // dataSyncService manages a lot of flow graphs @@ -246,6 +247,50 @@ func (dsService *dataSyncService) removeFlowGraphsByDeltaChannels(channels []Cha } } +// removeEmptyFlowGraphByChannel would remove delta flow graph if no seal segment exists in meta. +// *segment shall be added into meta before add flow graph +// *release sealed segment shall always come after load completed +func (dsService *dataSyncService) removeEmptyFlowGraphByChannel(collectionID int64, channel string) { + log := log.With( + zap.Int64("collectionID", collectionID), + zap.String("channel", channel), + ) + dsService.mu.Lock() + defer dsService.mu.Unlock() + + // convert dml channel name to delta channel name + dc, err := funcutil.ConvertChannelName(channel, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) + if err != nil { + log.Warn("removeEmptyFGByDelta failed to convert channel to delta", zap.Error(err)) + return + } + + // check flow graph exists first + fg, ok := dsService.deltaChannel2FlowGraph[dc] + if !ok { + log.Warn("remove delta flowgraph does not exist") + return + } + + // get all sealed segments associated with this channel + segments, err := dsService.metaReplica.getSegmentIDsByVChannel(nil, channel, segmentTypeSealed) + if err != nil { + log.Warn("removeEmptyFGByDelta failed to check segments with VChannel", zap.Error(err)) + } + + // check whether there are still not released segments + if len(segments) > 0 { + return + } + + // start to release flow graph + log.Info("all segments released, start to remove deltaChannel flowgraph") + fg.close() + delete(dsService.deltaChannel2FlowGraph, dc) + dsService.tSafeReplica.removeTSafe(dc) + rateCol.removeTSafeChannel(dc) +} + // newDataSyncService returns a new dataSyncService func newDataSyncService(ctx context.Context, metaReplica ReplicaInterface, diff --git a/internal/querynode/data_sync_service_test.go b/internal/querynode/data_sync_service_test.go index 89a400f06b..5962f781c6 100644 --- a/internal/querynode/data_sync_service_test.go +++ b/internal/querynode/data_sync_service_test.go @@ -18,9 +18,13 @@ package querynode import ( "context" + "fmt" "testing" + "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" ) func TestDataSyncService_DMLFlowGraphs(t *testing.T) { @@ -199,3 +203,83 @@ func TestDataSyncService_checkReplica(t *testing.T) { dataSyncService.tSafeReplica.addTSafe(defaultDMLChannel) }) } + +type DataSyncServiceSuite struct { + suite.Suite + factory dependency.Factory + dsService *dataSyncService +} + +func (s *DataSyncServiceSuite) SetupSuite() { + s.factory = genFactory() +} + +func (s *DataSyncServiceSuite) SetupTest() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + replica, err := genSimpleReplica() + s.Require().NoError(err) + + tSafe := newTSafeReplica() + s.dsService = newDataSyncService(ctx, replica, tSafe, s.factory) + s.Require().NoError(err) +} + +func (s *DataSyncServiceSuite) TearDownTest() { + s.dsService.close() + s.dsService = nil +} + +func (s *DataSyncServiceSuite) TestRemoveEmptyFlowgraphByChannel() { + s.Run("non existing channel", func() { + s.Assert().NotPanics(func() { + channelName := fmt.Sprintf("%s_%d_1", Params.CommonCfg.RootCoordDml, defaultCollectionID) + s.dsService.removeEmptyFlowGraphByChannel(defaultCollectionID, channelName) + }) + }) + + s.Run("bad format channel", func() { + s.Assert().NotPanics(func() { + s.dsService.removeEmptyFlowGraphByChannel(defaultCollectionID, "") + }) + }) + + s.Run("non-empty flowgraph", func() { + channelName := fmt.Sprintf("%s_%d_1", Params.CommonCfg.RootCoordDml, defaultCollectionID) + deltaChannelName, err := funcutil.ConvertChannelName(channelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) + s.Require().NoError(err) + err = s.dsService.metaReplica.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, channelName, defaultSegmentVersion, segmentTypeSealed) + s.Require().NoError(err) + + _, err = s.dsService.addFlowGraphsForDeltaChannels(defaultCollectionID, []string{deltaChannelName}) + s.Require().NoError(err) + + s.Assert().NotPanics(func() { + s.dsService.removeEmptyFlowGraphByChannel(defaultCollectionID, channelName) + }) + + _, err = s.dsService.getFlowGraphByDeltaChannel(defaultCollectionID, deltaChannelName) + s.Assert().NoError(err) + }) + + s.Run("empty flowgraph", func() { + channelName := fmt.Sprintf("%s_%d_2", Params.CommonCfg.RootCoordDml, defaultCollectionID) + deltaChannelName, err := funcutil.ConvertChannelName(channelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta) + s.Require().NoError(err) + + _, err = s.dsService.addFlowGraphsForDeltaChannels(defaultCollectionID, []string{deltaChannelName}) + s.Require().NoError(err) + + s.Assert().NotPanics(func() { + s.dsService.removeEmptyFlowGraphByChannel(defaultCollectionID, channelName) + }) + + _, err = s.dsService.getFlowGraphByDeltaChannel(defaultCollectionID, deltaChannelName) + s.Assert().Error(err) + }) +} + +func TestDataSyncServiceSuite(t *testing.T) { + suite.Run(t, new(DataSyncServiceSuite)) +} diff --git a/internal/querynode/impl.go b/internal/querynode/impl.go index 114d3f32d6..f7d837a93d 100644 --- a/internal/querynode/impl.go +++ b/internal/querynode/impl.go @@ -599,6 +599,9 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseS } } + // note that argument is dmlchannel name + node.dataSyncService.removeEmptyFlowGraphByChannel(in.GetCollectionID(), in.GetShard()) + log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs), zap.String("Scope", in.GetScope().String())) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success,