diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index a2d2f16e50..9f80d3defc 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -194,6 +194,28 @@ SegmentGrowingImpl::GetMemoryUsageInBytes() const { return total_bytes; } +void +SegmentGrowingImpl::LoadDeletedRecord(const LoadDeletedRecordInfo& info) { + AssertInfo(info.row_count > 0, "The row count of deleted record is 0"); + AssertInfo(info.primary_keys, "Deleted primary keys is null"); + AssertInfo(info.timestamps, "Deleted timestamps is null"); + // step 1: get pks and timestamps + auto field_id = schema_->get_primary_field_id().value_or(FieldId(INVALID_FIELD_ID)); + AssertInfo(field_id.get() != INVALID_FIELD_ID, "Primary key has invalid field id"); + auto& field_meta = schema_->operator[](field_id); + int64_t size = info.row_count; + std::vector pks(size); + ParsePksFromIDs(pks, field_meta.get_data_type(), *info.primary_keys); + auto timestamps = reinterpret_cast(info.timestamps); + + // step 2: fill pks and timestamps + deleted_record_.pks_.set_data_raw(0, pks.data(), size); + deleted_record_.timestamps_.set_data_raw(0, timestamps, size); + deleted_record_.ack_responder_.AddSegment(0, size); + deleted_record_.reserved.fetch_add(size); + deleted_record_.record_size_ = size; +} + SpanBase SegmentGrowingImpl::chunk_data_impl(FieldId field_id, int64_t chunk_id) const { auto vec = get_insert_record().get_field_data_base(field_id); diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 49978721c2..3cc1dfa6fc 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -59,6 +59,9 @@ class SegmentGrowingImpl : public SegmentGrowing { int64_t GetMemoryUsageInBytes() const override; + void + LoadDeletedRecord(const LoadDeletedRecordInfo& info) override; + std::string debug() const override; diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 27b11e52f1..a7e5db7815 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -24,6 +24,7 @@ #include "common/Span.h" #include "common/SystemProperty.h" #include "common/Types.h" +#include "common/LoadInfo.h" #include "common/BitsetView.h" #include "common/QueryResult.h" #include "knowhere/index/vector_index/VecIndex.h" @@ -66,6 +67,9 @@ class SegmentInterface { virtual Status Delete(int64_t reserved_offset, int64_t size, const IdArray* pks, const Timestamp* timestamps) = 0; + + virtual void + LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0; }; // internal API for DSL calculation diff --git a/internal/core/src/segcore/SegmentSealed.h b/internal/core/src/segcore/SegmentSealed.h index 061921306e..eb17689dca 100644 --- a/internal/core/src/segcore/SegmentSealed.h +++ b/internal/core/src/segcore/SegmentSealed.h @@ -29,8 +29,6 @@ class SegmentSealed : public SegmentInternalInterface { virtual void LoadFieldData(const LoadFieldDataInfo& info) = 0; virtual void - LoadDeletedRecord(const LoadDeletedRecordInfo& info) = 0; - virtual void DropIndex(const FieldId field_id) = 0; virtual void DropFieldData(const FieldId field_id) = 0; diff --git a/internal/core/src/segcore/segment_c.cpp b/internal/core/src/segcore/segment_c.cpp index 2efad6609e..28f13b8672 100644 --- a/internal/core/src/segcore/segment_c.cpp +++ b/internal/core/src/segcore/segment_c.cpp @@ -210,8 +210,7 @@ CStatus LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_record_info) { try { auto segment_interface = reinterpret_cast(c_segment); - auto segment = dynamic_cast(segment_interface); - AssertInfo(segment != nullptr, "segment conversion failed"); + AssertInfo(segment_interface != nullptr, "segment conversion failed"); auto proto = std::string(deleted_record_info.primary_keys); Assert(!proto.empty()); auto pks = std::make_unique(); @@ -219,7 +218,7 @@ LoadDeletedRecord(CSegmentInterface c_segment, CLoadDeletedRecordInfo deleted_re AssertInfo(suc, "unmarshal field data string failed"); auto load_info = LoadDeletedRecordInfo{deleted_record_info.timestamps, pks.get(), deleted_record_info.row_count}; - segment->LoadDeletedRecord(load_info); + segment_interface->LoadDeletedRecord(load_info); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(UnexpectedError, e.what()); diff --git a/internal/querynode/mock_test.go b/internal/querynode/mock_test.go index 6f5afc6881..9ddb23e30d 100644 --- a/internal/querynode/mock_test.go +++ b/internal/querynode/mock_test.go @@ -1062,13 +1062,15 @@ func saveDeltaLog(collectionID UniqueID, kvs := make(map[string][]byte, 1) - // write insert binlog + // write delta log + pkFieldID := UniqueID(106) fieldBinlog := make([]*datapb.FieldBinlog, 0) - log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", 999)) - key := JoinIDPath(collectionID, partitionID, segmentID, 999) + log.Debug("[query node unittest] save delta log", zap.Int64("fieldID", pkFieldID)) + key := JoinIDPath(collectionID, partitionID, segmentID, pkFieldID) + key += "delta" // append suffix 'delta' to avoid conflicts against binlog kvs[key] = blob.Value[:] fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{ - FieldID: 999, + FieldID: pkFieldID, Binlogs: []*datapb.Binlog{{LogPath: key}}, }) log.Debug("[query node unittest] save delta log file to MinIO/S3") diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 3497268337..b7f276d47f 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -774,10 +774,6 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps if s.segmentPtr == nil { return errors.New("null seg core pointer") } - if s.segmentType != segmentTypeSealed { - errMsg := fmt.Sprintln("segmentLoadFieldData failed, illegal segment type ", s.segmentType, "segmentID = ", s.ID()) - return errors.New(errMsg) - } if len(primaryKeys) <= 0 { return fmt.Errorf("empty pks to delete") diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index a0e0e06c29..5a2c18a90b 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -25,6 +25,9 @@ import ( "strconv" "sync" + "github.com/panjf2000/ants/v2" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/common" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" @@ -41,8 +44,6 @@ import ( "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/timerecord" - "github.com/panjf2000/ants/v2" - "go.uber.org/zap" ) // segmentLoader is only responsible for loading the field data from binlog @@ -97,10 +98,8 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, segme switch segmentType { case segmentTypeGrowing: metaReplica = loader.streamingReplica - case segmentTypeSealed: metaReplica = loader.historicalReplica - default: err := fmt.Errorf("illegal segment type when load segment, collectionID = %d", req.CollectionID) log.Error("load segment failed, illegal segment type", diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index d2df3b95d9..881d34aecb 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -430,7 +430,7 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) { deltaLogs, err := saveDeltaLog(defaultCollectionID, defaultPartitionID, defaultSegmentID) assert.NoError(t, err) - t.Run("test load growing and sealed segments", func(t *testing.T) { + t.Run("test load sealed segments", func(t *testing.T) { node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) @@ -451,7 +451,6 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) { PartitionID: defaultPartitionID, CollectionID: defaultCollectionID, BinlogPaths: fieldBinlog, - Deltalogs: deltaLogs, }, }, } @@ -459,6 +458,10 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) { err = loader.loadSegment(req1, segmentTypeSealed) assert.NoError(t, err) + segment1, err := loader.historicalReplica.getSegmentByID(segmentID1) + assert.NoError(t, err) + assert.Equal(t, segment1.getRowCount(), int64(100)) + segmentID2 := UniqueID(101) req2 := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ @@ -473,25 +476,29 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) { PartitionID: defaultPartitionID, CollectionID: defaultCollectionID, BinlogPaths: fieldBinlog, + Deltalogs: deltaLogs, }, }, } - err = loader.loadSegment(req2, segmentTypeGrowing) + err = loader.loadSegment(req2, segmentTypeSealed) assert.NoError(t, err) - segment1, err := loader.historicalReplica.getSegmentByID(segmentID1) + segment2, err := loader.historicalReplica.getSegmentByID(segmentID2) + assert.NoError(t, err) + // Note: getRowCount currently does not return accurate counts. The deleted rows are also counted. + assert.Equal(t, segment2.getRowCount(), int64(100)) // accurate counts should be 98 + }) + + t.Run("test load growing segments", func(t *testing.T) { + node, err := genSimpleQueryNode(ctx) assert.NoError(t, err) - segment2, err := loader.streamingReplica.getSegmentByID(segmentID2) - assert.NoError(t, err) + loader := node.loader + assert.NotNil(t, loader) - assert.Equal(t, segment1.getRowCount(), segment2.getRowCount()) - - // Loading growing segments with delta log, expect to fail (this is a bug). - // See: https://github.com/milvus-io/milvus/issues/16821 - segmentID3 := UniqueID(102) - req3 := &querypb.LoadSegmentsRequest{ + segmentID1 := UniqueID(100) + req1 := &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchQueryChannels, MsgID: rand.Int63(), @@ -500,7 +507,32 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) { Schema: schema, Infos: []*querypb.SegmentLoadInfo{ { - SegmentID: segmentID3, + SegmentID: segmentID1, + PartitionID: defaultPartitionID, + CollectionID: defaultCollectionID, + BinlogPaths: fieldBinlog, + }, + }, + } + + err = loader.loadSegment(req1, segmentTypeGrowing) + assert.NoError(t, err) + + segment1, err := loader.streamingReplica.getSegmentByID(segmentID1) + assert.NoError(t, err) + assert.Equal(t, segment1.getRowCount(), int64(100)) + + segmentID2 := UniqueID(101) + req2 := &querypb.LoadSegmentsRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_WatchQueryChannels, + MsgID: rand.Int63(), + }, + DstNodeID: 0, + Schema: schema, + Infos: []*querypb.SegmentLoadInfo{ + { + SegmentID: segmentID2, PartitionID: defaultPartitionID, CollectionID: defaultCollectionID, BinlogPaths: fieldBinlog, @@ -509,8 +541,13 @@ func TestSegmentLoader_testLoadGrowingAndSealed(t *testing.T) { }, } - err = loader.loadSegment(req3, segmentTypeGrowing) - assert.Error(t, err) + err = loader.loadSegment(req2, segmentTypeGrowing) + assert.NoError(t, err) + + segment2, err := loader.streamingReplica.getSegmentByID(segmentID2) + assert.NoError(t, err) + // Note: getRowCount currently does not return accurate counts. The deleted rows are also counted. + assert.Equal(t, segment2.getRowCount(), int64(100)) // accurate counts should be 98 }) }