diff --git a/internal/core/src/index/Index.h b/internal/core/src/index/Index.h index c91fe2282f..2de8d8abb1 100644 --- a/internal/core/src/index/Index.h +++ b/internal/core/src/index/Index.h @@ -68,6 +68,9 @@ class IndexBase { virtual BinarySet UploadV2(const Config& config = {}) = 0; + virtual const bool + HasRawData() const = 0; + bool IsMmapSupported() const { return index_type_ == knowhere::IndexEnum::INDEX_HNSW || diff --git a/internal/core/src/index/ScalarIndex.h b/internal/core/src/index/ScalarIndex.h index e8dcfe834b..cc0799a2dc 100644 --- a/internal/core/src/index/ScalarIndex.h +++ b/internal/core/src/index/ScalarIndex.h @@ -68,6 +68,9 @@ class ScalarIndex : public IndexBase { virtual const TargetBitmap Query(const DatasetPtr& dataset); + virtual const bool + HasRawData() const override = 0; + virtual int64_t Size() = 0; }; diff --git a/internal/core/src/index/ScalarIndexSort.h b/internal/core/src/index/ScalarIndexSort.h index c45e308677..d6a5523986 100644 --- a/internal/core/src/index/ScalarIndexSort.h +++ b/internal/core/src/index/ScalarIndexSort.h @@ -95,6 +95,11 @@ class ScalarIndexSort : public ScalarIndex { BinarySet UploadV2(const Config& config = {}) override; + const bool + HasRawData() const override { + return true; + } + private: bool ShouldSkip(const T lower_value, const T upper_value, const OpType op); diff --git a/internal/core/src/index/StringIndexMarisa.h b/internal/core/src/index/StringIndexMarisa.h index acf2a3c549..b5aa9f9238 100644 --- a/internal/core/src/index/StringIndexMarisa.h +++ b/internal/core/src/index/StringIndexMarisa.h @@ -93,6 +93,11 @@ class StringIndexMarisa : public StringIndex { BinarySet UploadV2(const Config& config = {}); + const bool + HasRawData() const override { + return true; + } + private: void fill_str_ids(size_t n, const std::string* values); diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index 68af56dad4..8f8803aaee 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -2280,8 +2280,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) // for case, sealed segment has loaded index for scalar field instead of raw data auto& indexing = segment_.chunk_scalar_index( field_id, chunk_id); - return [&indexing](int i) -> const number { - return indexing.Reverse_Lookup(i); + if (indexing.HasRawData()) { + return [&indexing](int i) -> const number { + return indexing.Reverse_Lookup(i); + }; + } + auto chunk_data = + segment_.chunk_data(field_id, chunk_id) + .data(); + return [chunk_data](int i) -> const number { + return chunk_data[i]; }; } } @@ -2297,8 +2305,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) // for case, sealed segment has loaded index for scalar field instead of raw data auto& indexing = segment_.chunk_scalar_index( field_id, chunk_id); - return [&indexing](int i) -> const number { - return indexing.Reverse_Lookup(i); + if (indexing.HasRawData()) { + return [&indexing](int i) -> const number { + return indexing.Reverse_Lookup(i); + }; + } + auto chunk_data = + segment_.chunk_data(field_id, chunk_id) + .data(); + return [chunk_data](int i) -> const number { + return chunk_data[i]; }; } } @@ -2314,8 +2330,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) // for case, sealed segment has loaded index for scalar field instead of raw data auto& indexing = segment_.chunk_scalar_index( field_id, chunk_id); - return [&indexing](int i) -> const number { - return indexing.Reverse_Lookup(i); + if (indexing.HasRawData()) { + return [&indexing](int i) -> const number { + return indexing.Reverse_Lookup(i); + }; + } + auto chunk_data = + segment_.chunk_data(field_id, chunk_id) + .data(); + return [chunk_data](int i) -> const number { + return chunk_data[i]; }; } } @@ -2331,8 +2355,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) // for case, sealed segment has loaded index for scalar field instead of raw data auto& indexing = segment_.chunk_scalar_index( field_id, chunk_id); - return [&indexing](int i) -> const number { - return indexing.Reverse_Lookup(i); + if (indexing.HasRawData()) { + return [&indexing](int i) -> const number { + return indexing.Reverse_Lookup(i); + }; + } + auto chunk_data = + segment_.chunk_data(field_id, chunk_id) + .data(); + return [chunk_data](int i) -> const number { + return chunk_data[i]; }; } } @@ -2348,8 +2380,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) // for case, sealed segment has loaded index for scalar field instead of raw data auto& indexing = segment_.chunk_scalar_index( field_id, chunk_id); - return [&indexing](int i) -> const number { - return indexing.Reverse_Lookup(i); + if (indexing.HasRawData()) { + return [&indexing](int i) -> const number { + return indexing.Reverse_Lookup(i); + }; + } + auto chunk_data = + segment_.chunk_data(field_id, chunk_id) + .data(); + return [chunk_data](int i) -> const number { + return chunk_data[i]; }; } } @@ -2365,8 +2405,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) // for case, sealed segment has loaded index for scalar field instead of raw data auto& indexing = segment_.chunk_scalar_index( field_id, chunk_id); - return [&indexing](int i) -> const number { - return indexing.Reverse_Lookup(i); + if (indexing.HasRawData()) { + return [&indexing](int i) -> const number { + return indexing.Reverse_Lookup(i); + }; + } + auto chunk_data = + segment_.chunk_data(field_id, chunk_id) + .data(); + return [chunk_data](int i) -> const number { + return chunk_data[i]; }; } } @@ -2382,8 +2430,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) // for case, sealed segment has loaded index for scalar field instead of raw data auto& indexing = segment_.chunk_scalar_index( field_id, chunk_id); - return [&indexing](int i) -> const number { - return indexing.Reverse_Lookup(i); + if (indexing.HasRawData()) { + return [&indexing](int i) -> const number { + return indexing.Reverse_Lookup(i); + }; + } + auto chunk_data = + segment_.chunk_data(field_id, chunk_id) + .data(); + return [chunk_data](int i) -> const number { + return chunk_data[i]; }; } } @@ -2411,8 +2467,16 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) auto& indexing = segment_.chunk_scalar_index(field_id, chunk_id); - return [&indexing](int i) -> const number { - return indexing.Reverse_Lookup(i); + if (indexing.HasRawData()) { + return [&indexing](int i) -> const number { + return indexing.Reverse_Lookup(i); + }; + } + auto chunk_data = + segment_.chunk_data(field_id, chunk_id) + .data(); + return [chunk_data](int i) -> const number { + return chunk_data[i]; }; } } diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index c29799fbe9..bf66fe5c7d 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -323,8 +323,8 @@ SegmentSealedImpl::LoadFieldData(FieldId field_id, FieldDataInfo& data) { auto data_type = field_meta.get_data_type(); // Don't allow raw data and index exist at the same time - AssertInfo(!get_bit(index_ready_bitset_, field_id), - "field data can't be loaded when indexing exists"); + // AssertInfo(!get_bit(index_ready_bitset_, field_id), + // "field data can't be loaded when indexing exists"); std::shared_ptr column{}; if (datatype_is_variable(data_type)) { @@ -1071,30 +1071,11 @@ SegmentSealedImpl::fill_with_empty(FieldId field_id, int64_t count) const { } std::unique_ptr -SegmentSealedImpl::bulk_subscript(FieldId field_id, - const int64_t* seg_offsets, - int64_t count) const { - auto& field_meta = schema_->operator[](field_id); - // if count == 0, return empty data array - if (count == 0) { - return fill_with_empty(field_id, count); - } - - if (HasIndex(field_id)) { - // if field has load scalar index, reverse raw data from index - if (!datatype_is_vector(field_meta.get_data_type())) { - AssertInfo(num_chunk() == 1, - "num chunk not equal to 1 for sealed segment"); - auto index = chunk_index_impl(field_id, 0); - return ReverseDataFromIndex(index, seg_offsets, count, field_meta); - } - - return get_vector(field_id, seg_offsets, count); - } - - Assert(get_bit(field_data_ready_bitset_, field_id)); - - // DO NOT directly access the column byh map like: `fields_.at(field_id)->Data()`, +SegmentSealedImpl::get_raw_data(FieldId field_id, + const FieldMeta& field_meta, + const int64_t* seg_offsets, + int64_t count) const { + // DO NOT directly access the column by map like: `fields_.at(field_id)->Data()`, // we have to clone the shared pointer, // to make sure it won't get released if segment released auto column = fields_.at(field_id); @@ -1235,10 +1216,39 @@ SegmentSealedImpl::bulk_subscript(FieldId field_id, field_meta.get_data_type())); } } - return ret; } +std::unique_ptr +SegmentSealedImpl::bulk_subscript(FieldId field_id, + const int64_t* seg_offsets, + int64_t count) const { + auto& field_meta = schema_->operator[](field_id); + // if count == 0, return empty data array + if (count == 0) { + return fill_with_empty(field_id, count); + } + + if (HasIndex(field_id)) { + // if field has load scalar index, reverse raw data from index + if (!datatype_is_vector(field_meta.get_data_type())) { + AssertInfo(num_chunk() == 1, + "num chunk not equal to 1 for sealed segment"); + auto index = chunk_index_impl(field_id, 0); + if (index->HasRawData()) { + return ReverseDataFromIndex( + index, seg_offsets, count, field_meta); + } + return get_raw_data(field_id, field_meta, seg_offsets, count); + } + return get_vector(field_id, seg_offsets, count); + } + + Assert(get_bit(field_data_ready_bitset_, field_id)); + + return get_raw_data(field_id, field_meta, seg_offsets, count); +} + bool SegmentSealedImpl::HasIndex(FieldId field_id) const { std::shared_lock lck(mutex_); @@ -1271,6 +1281,11 @@ SegmentSealedImpl::HasRawData(int64_t field_id) const { field_indexing->indexing_.get()); return vec_index->HasRawData(); } + } else { + auto scalar_index = scalar_indexings_.find(fieldID); + if (scalar_index != scalar_indexings_.end()) { + return scalar_index->second->HasRawData(); + } } return true; } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index a26a186ab1..8c4ffbfe88 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -207,6 +207,12 @@ class SegmentSealedImpl : public SegmentSealed { std::unique_ptr fill_with_empty(FieldId field_id, int64_t count) const; + std::unique_ptr + get_raw_data(FieldId field_id, + const FieldMeta& field_meta, + const int64_t* seg_offsets, + int64_t count) const; + void update_row_count(int64_t row_count) { // if (row_count_opt_.has_value()) { diff --git a/internal/core/unittest/test_scalar_index.cpp b/internal/core/unittest/test_scalar_index.cpp index 4663e38b7a..f9264b794f 100644 --- a/internal/core/unittest/test_scalar_index.cpp +++ b/internal/core/unittest/test_scalar_index.cpp @@ -81,6 +81,26 @@ TYPED_TEST_P(TypedScalarIndexTest, Count) { } } +TYPED_TEST_P(TypedScalarIndexTest, HasRawData) { + using T = TypeParam; + auto dtype = milvus::GetDType(); + auto index_types = GetIndexTypes(); + for (const auto& index_type : index_types) { + milvus::index::CreateIndexInfo create_index_info; + create_index_info.field_type = milvus::DataType(dtype); + create_index_info.index_type = index_type; + auto index = + milvus::index::IndexFactory::GetInstance().CreateScalarIndex( + create_index_info); + auto scalar_index = + dynamic_cast*>(index.get()); + auto arr = GenArr(nb); + scalar_index->Build(nb, arr.data()); + ASSERT_EQ(nb, scalar_index->Count()); + ASSERT_TRUE(scalar_index->HasRawData()); + } +} + TYPED_TEST_P(TypedScalarIndexTest, In) { using T = TypeParam; auto dtype = milvus::GetDType(); @@ -200,7 +220,8 @@ REGISTER_TYPED_TEST_CASE_P(TypedScalarIndexTest, NotIn, Range, Codec, - Reverse); + Reverse, + HasRawData); INSTANTIATE_TYPED_TEST_CASE_P(ArithmeticCheck, TypedScalarIndexTest, ScalarT); diff --git a/internal/core/unittest/test_string_index.cpp b/internal/core/unittest/test_string_index.cpp index 9855d71b92..430f7ed24a 100644 --- a/internal/core/unittest/test_string_index.cpp +++ b/internal/core/unittest/test_string_index.cpp @@ -52,6 +52,12 @@ TEST_F(StringIndexMarisaTest, Build) { index->Build(strs.size(), strs.data()); } +TEST_F(StringIndexMarisaTest, HasRawData) { + auto index = milvus::index::CreateStringIndexMarisa(); + index->Build(nb, strs.data()); + ASSERT_TRUE(index->HasRawData()); +} + TEST_F(StringIndexMarisaTest, Count) { auto index = milvus::index::CreateStringIndexMarisa(); index->Build(nb, strs.data()); diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index c4fb609fb3..38173dfbef 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -703,6 +703,7 @@ func (s *LocalSegment) LoadFieldData(fieldID int64, rowCount int64, field *datap } } loadFieldDataInfo.appendMMapDirPath(paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue()) + loadFieldDataInfo.enableMmap(fieldID, mmapEnabled) var status C.CStatus GetLoadPool().Submit(func() (any, error) { diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 4739cdbd47..345677989d 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -564,12 +564,27 @@ func (loader *segmentLoader) loadSegment(ctx context.Context, } } + schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema()) + log.Info("load fields...", zap.Int64s("indexedFields", lo.Keys(indexedFieldInfos)), ) - if err := loader.loadFieldsIndex(ctx, collection.Schema(), segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil { + if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil { return err } + for fieldID, info := range indexedFieldInfos { + field, err := schemaHelper.GetFieldFromID(fieldID) + if err != nil { + return err + } + if !typeutil.IsVectorType(field.GetDataType()) && !segment.HasRawData(fieldID) { + log.Info("field index doesn't include raw data, load binlog...", zap.Int64("fieldID", fieldID), zap.String("index", info.IndexInfo.GetIndexName())) + if err = segment.LoadFieldData(fieldID, loadInfo.GetNumOfRows(), info.FieldBinlog, true); err != nil { + log.Warn("load raw data failed", zap.Int64("fieldID", fieldID), zap.Error(err)) + return err + } + } + } if err := loader.loadSealedSegmentFields(ctx, segment, fieldBinlogs, loadInfo.GetNumOfRows()); err != nil { return err } @@ -654,13 +669,11 @@ func (loader *segmentLoader) loadSealedSegmentFields(ctx context.Context, segmen } func (loader *segmentLoader) loadFieldsIndex(ctx context.Context, - schema *schemapb.CollectionSchema, + schemaHelper *typeutil.SchemaHelper, segment *LocalSegment, numRows int64, indexedFieldInfos map[int64]*IndexedFieldInfo, ) error { - schemaHelper, _ := typeutil.CreateSchemaHelper(schema) - for fieldID, fieldInfo := range indexedFieldInfos { indexInfo := fieldInfo.IndexInfo err := loader.loadFieldIndex(ctx, segment, indexInfo)