diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index c2e932a3bb..802ac52f53 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -36,17 +36,17 @@ import ( type dataSyncService struct { ctx context.Context cancelFn context.CancelFunc - fg *flowgraph.TimeTickedFlowGraph - flushCh chan flushMsg - replica Replica - idAllocator allocatorInterface + fg *flowgraph.TimeTickedFlowGraph // internal flowgraph processes insert/delta messages + flushCh chan flushMsg // chan to notify flush + replica Replica // segment replica stores meta + idAllocator allocatorInterface // id/timestamp allocator msFactory msgstream.Factory - collectionID UniqueID - dataCoord types.DataCoord - clearSignal chan<- UniqueID + collectionID UniqueID // collection id of vchan for which this data sync service serves + dataCoord types.DataCoord // DataCoord instance to interact with + clearSignal chan<- UniqueID // signal channel to notify flowgraph close for collection/partition drop msg consumed - flushingSegCache *Cache - flushManager flushManager + flushingSegCache *Cache // a guarding cache stores currently flushing segment ids + flushManager flushManager // flush manager handles flush process } func newDataSyncService(ctx context.Context, @@ -156,6 +156,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro return err } + // initialize flush manager for DataSync Service dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, minIOKV, dsService.replica, func(pack *segmentFlushPack) error { fieldInsert := []*datapb.FieldBinlog{} fieldStats := []*datapb.FieldBinlog{}