From 08ad77c71b001c0d852f35ce04bf03d998966b0f Mon Sep 17 00:00:00 2001 From: xige-16 Date: Thu, 12 May 2022 14:09:53 +0800 Subject: [PATCH] Delete all repeated primary keys (#16863) Signed-off-by: xige-16 --- internal/core/src/common/Types.h | 2 + internal/core/src/segcore/Record.h | 2 +- internal/core/src/segcore/SegmentGrowing.h | 12 -- .../core/src/segcore/SegmentGrowingImpl.cpp | 55 +----- .../core/src/segcore/SegmentGrowingImpl.h | 8 +- .../core/src/segcore/SegmentSealedImpl.cpp | 58 +------ internal/core/src/segcore/SegmentSealedImpl.h | 9 +- internal/core/src/segcore/Utils.cpp | 64 +++++++ internal/core/src/segcore/Utils.h | 10 ++ internal/core/unittest/test_c_api.cpp | 164 +++++++++++++++++- internal/core/unittest/test_utils/DataGen.h | 26 +-- 11 files changed, 255 insertions(+), 155 deletions(-) diff --git a/internal/core/src/common/Types.h b/internal/core/src/common/Types.h index 84f16e07bd..90f06d8e8a 100644 --- a/internal/core/src/common/Types.h +++ b/internal/core/src/common/Types.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -67,6 +68,7 @@ using IdArray = proto::schema::IDs; using MetricType = faiss::MetricType; using InsertData = proto::segcore::InsertRecord; using PkType = std::variant; +using Pk2OffsetType = tbb::concurrent_unordered_multimap>; MetricType GetMetricType(const std::string& type); diff --git a/internal/core/src/segcore/Record.h b/internal/core/src/segcore/Record.h index d22fa1431e..eda7314c10 100644 --- a/internal/core/src/segcore/Record.h +++ b/internal/core/src/segcore/Record.h @@ -21,7 +21,7 @@ get_barrier(const RecordType& record, Timestamp timestamp) { int64_t end = record.ack_responder_.GetAck(); while (beg < end) { auto mid = (beg + end) / 2; - if (vec[mid] < timestamp) { + if (vec[mid] <= timestamp) { beg = mid + 1; } else { end = mid; diff --git a/internal/core/src/segcore/SegmentGrowing.h b/internal/core/src/segcore/SegmentGrowing.h index d5cbdde85e..95e2bedd3b 100644 --- a/internal/core/src/segcore/SegmentGrowing.h +++ b/internal/core/src/segcore/SegmentGrowing.h @@ -23,18 +23,6 @@ namespace milvus::segcore { -using SearchResult = milvus::SearchResult; -struct RowBasedRawData { - void* raw_data; // schema - int sizeof_per_row; // alignment - int64_t count; -}; - -struct ColumnBasedRawData { - std::vector> columns_; - int64_t count; -}; - class SegmentGrowing : public SegmentInternalInterface { public: virtual void diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index 9f80d3defc..c4d770acae 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -38,65 +38,14 @@ SegmentGrowingImpl::PreDelete(int64_t size) { return reserved_begin; } -std::shared_ptr -SegmentGrowingImpl::get_deleted_bitmap(int64_t del_barrier, - Timestamp query_timestamp, - int64_t insert_barrier, - bool force) const { - auto old = deleted_record_.get_lru_entry(); - if (old->bitmap_ptr->size() == insert_barrier) { - if (old->del_barrier == del_barrier) { - return old; - } - } - - auto current = old->clone(insert_barrier); - current->del_barrier = del_barrier; - - auto bitmap = current->bitmap_ptr; - - int64_t start, end; - if (del_barrier < old->del_barrier) { - start = del_barrier; - end = old->del_barrier; - } else { - start = old->del_barrier; - end = del_barrier; - } - for (auto del_index = start; del_index < end; ++del_index) { - // get uid in delete logs - auto uid = deleted_record_.pks_[del_index]; - - // map uid to corresponding offsets, select the max one, which should be the target - // the max one should be closest to query_timestamp, so the delete log should refer to it - int64_t the_offset = -1; - auto [iter_b, iter_e] = pk2offset_.equal_range(uid); - - for (auto iter = iter_b; iter != iter_e; ++iter) { - auto offset = iter->second; - AssertInfo(offset < insert_barrier, "Timestamp offset is larger than insert barrier"); - the_offset = std::max(the_offset, offset); - if (the_offset == -1) { - continue; - } - if (insert_record_.timestamps_[the_offset] >= query_timestamp) { - bitmap->reset(the_offset); - } else { - bitmap->set(the_offset); - } - } - } - this->deleted_record_.insert_lru_entry(current); - return current; -} - void SegmentGrowingImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Timestamp timestamp) const { auto del_barrier = get_barrier(get_deleted_record(), timestamp); if (del_barrier == 0) { return; } - auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); + auto bitmap_holder = + get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, pk2offset_, timestamp); if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { return; } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 3cc1dfa6fc..67fe99371e 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -195,12 +195,6 @@ class SegmentGrowingImpl : public SegmentGrowing { } protected: - std::shared_ptr - get_deleted_bitmap(int64_t del_barrier, - Timestamp query_timestamp, - int64_t insert_barrier, - bool force = false) const; - int64_t num_chunk() const override; @@ -227,7 +221,7 @@ class SegmentGrowingImpl : public SegmentGrowing { mutable DeletedRecord deleted_record_; // pks to row offset - tbb::concurrent_unordered_multimap> pk2offset_; + Pk2OffsetType pk2offset_; int64_t id_; private: diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index cf2df789c8..f02c8d1991 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -279,67 +279,14 @@ SegmentSealedImpl::get_schema() const { return *schema_; } -std::shared_ptr -SegmentSealedImpl::get_deleted_bitmap(int64_t del_barrier, - Timestamp query_timestamp, - int64_t insert_barrier, - bool force) const { - auto old = deleted_record_.get_lru_entry(); - auto current = old->clone(insert_barrier); - current->del_barrier = del_barrier; - auto bitmap = current->bitmap_ptr; - // Sealed segment only has one chunk with chunk_id 0 - auto delete_pks_data = deleted_record_.pks_.get_chunk_data(0); - auto delete_pks = reinterpret_cast(delete_pks_data); - auto del_size = deleted_record_.reserved.load(); - - std::vector seg_offsets; - std::vector pks; - for (int i = 0; i < del_size; ++i) { - auto [iter_b, iter_e] = pk2offset_.equal_range(delete_pks[i]); - for (auto iter = iter_b; iter != iter_e; ++iter) { - auto [entry_pk, entry_offset] = *iter; - pks.emplace_back(entry_pk); - seg_offsets.emplace_back(SegOffset(entry_offset)); - } - } - - for (int i = 0; i < pks.size(); ++i) { - bitmap->set(seg_offsets[i].get()); - } - if (pks.size() == 0 || seg_offsets.size() == 0) { - return current; - } - - int64_t start, end; - if (del_barrier < old->del_barrier) { - start = del_barrier; - end = old->del_barrier; - } else { - start = old->del_barrier; - end = del_barrier; - } - - for (auto del_index = start; del_index < end; ++del_index) { - int64_t the_offset = seg_offsets[del_index].get(); - AssertInfo(the_offset >= 0, "Seg offset is invalid"); - if (deleted_record_.timestamps_[del_index] >= query_timestamp) { - bitmap->reset(the_offset); - } else { - bitmap->set(the_offset); - } - } - this->deleted_record_.insert_lru_entry(current); - return current; -} - void SegmentSealedImpl::mask_with_delete(BitsetType& bitset, int64_t ins_barrier, Timestamp timestamp) const { auto del_barrier = get_barrier(get_deleted_record(), timestamp); if (del_barrier == 0) { return; } - auto bitmap_holder = get_deleted_bitmap(del_barrier, timestamp, ins_barrier); + auto bitmap_holder = + get_deleted_bitmap(del_barrier, ins_barrier, deleted_record_, insert_record_, pk2offset_, timestamp); if (!bitmap_holder || !bitmap_holder->bitmap_ptr) { return; } @@ -463,7 +410,6 @@ SegmentSealedImpl::check_search(const query::Plan* plan) const { SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, int64_t segment_id) : schema_(schema), - // fields_data_(schema->size()), insert_record_(*schema, MAX_ROW_COUNT), field_data_ready_bitset_(schema->size()), vecindex_ready_bitset_(schema->size()), diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index d9710bc730..9575d8b14b 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include "ConcurrentVector.h" @@ -105,12 +104,6 @@ class SegmentSealedImpl : public SegmentSealed { int64_t get_active_count(Timestamp ts) const override; - std::shared_ptr - get_deleted_bitmap(int64_t del_barrier, - Timestamp query_timestamp, - int64_t insert_barrier, - bool force = false) const; - private: template static void @@ -194,7 +187,7 @@ class SegmentSealedImpl : public SegmentSealed { mutable DeletedRecord deleted_record_; // pks to row offset - tbb::concurrent_unordered_multimap> pk2offset_; + Pk2OffsetType pk2offset_; // std::unique_ptr primary_key_index_; SchemaPtr schema_; diff --git a/internal/core/src/segcore/Utils.cpp b/internal/core/src/segcore/Utils.cpp index 1befe92674..01645bf1bd 100644 --- a/internal/core/src/segcore/Utils.cpp +++ b/internal/core/src/segcore/Utils.cpp @@ -255,4 +255,68 @@ MergeDataArray(std::vector>& result_of return data_array; } + +// insert_barrier means num row of insert data in a segment +// del_barrier means that if the pk of the insert data is in delete record[0 : del_barrier] +// then the data corresponding to this pk may be ignored when searching/querying +// and refer to func get_barrier, all ts in delete record[0 : del_barrier] < query_timestamp +// assert old insert record pks = [5, 2, 4, 1, 3, 8, 7, 6] +// assert old delete record pks = [2, 4, 3, 8, 5], old delete record ts = [100, 100, 150, 200, 400, 500, 500, 500] +// if delete_barrier = 3, query time = 180, then insert records with pks in [2, 4, 3] will be deleted +// then the old bitmap = [0, 1, 1, 0, 1, 0, 0, 0] +std::shared_ptr +get_deleted_bitmap(int64_t del_barrier, + int64_t insert_barrier, + DeletedRecord& delete_record, + const InsertRecord& insert_record, + const Pk2OffsetType& pk2offset, + Timestamp query_timestamp) { + auto old = delete_record.get_lru_entry(); + // if insert_barrier and del_barrier have not changed, use cache data directly + if (old->bitmap_ptr->size() == insert_barrier) { + if (old->del_barrier == del_barrier) { + return old; + } + } + + auto current = old->clone(insert_barrier); + current->del_barrier = del_barrier; + + auto bitmap = current->bitmap_ptr; + + int64_t start, end; + if (del_barrier < old->del_barrier) { + // in this case, ts of delete record[current_del_barrier : old_del_barrier] > query_timestamp + // so these deletion records do not take effect in query/search + // so bitmap corresponding to those pks in delete record[current_del_barrier:old_del_barrier] wil be reset to 0 + // for example, current_del_barrier = 2, query_time = 120, the bitmap will be reset to [0, 1, 1, 0, 0, 0, 0, 0] + start = del_barrier; + end = old->del_barrier; + } else { + // the cache is not enough, so update bitmap using new pks in delete record[old_del_barrier:current_del_barrier] + // for example, current_del_barrier = 4, query_time = 300, bitmap will be updated to [0, 1, 1, 0, 1, 1, 0, 0] + start = old->del_barrier; + end = del_barrier; + } + for (auto del_index = start; del_index < end; ++del_index) { + // get pk in delete logs + auto pk = delete_record.pks_[del_index]; + // find insert data which has same pk + auto [iter_b, iter_e] = pk2offset.equal_range(pk); + for (auto iter = iter_b; iter != iter_e; ++iter) { + auto insert_row_offset = iter->second; + AssertInfo(insert_row_offset < insert_barrier, "Timestamp offset is larger than insert barrier"); + if (delete_record.timestamps_[del_index] > query_timestamp) { + // the deletion record do not take effect in search/query, and reset bitmap to 0 + bitmap->reset(insert_row_offset); + } else { + // insert data corresponding to the insert_row_offset will be ignored in search/query + bitmap->set(insert_row_offset); + } + } + } + delete_record.insert_lru_entry(current); + return current; +} + } // namespace milvus::segcore diff --git a/internal/core/src/segcore/Utils.h b/internal/core/src/segcore/Utils.h index 76c5855229..6db370769d 100644 --- a/internal/core/src/segcore/Utils.h +++ b/internal/core/src/segcore/Utils.h @@ -18,6 +18,8 @@ #include #include #include "common/QueryResult.h" +#include "DeletedRecord.h" +#include "InsertRecord.h" namespace milvus::segcore { @@ -79,4 +81,12 @@ CreateDataArrayFrom(const void* data_raw, int64_t count, const FieldMeta& field_ std::unique_ptr MergeDataArray(std::vector>& result_offsets, const FieldMeta& field_meta); +std::shared_ptr +get_deleted_bitmap(int64_t del_barrier, + int64_t insert_barrier, + DeletedRecord& delete_record, + const InsertRecord& insert_record, + const Pk2OffsetType& pk2offset, + Timestamp query_timestamp); + } // namespace milvus::segcore diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 636feedb32..1ec70e8ecc 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -240,6 +240,168 @@ TEST(CApiTest, DeleteTest) { DeleteSegment(segment); } +TEST(CApiTest, DeleteRepeatedPksFromGrowingSegment) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Growing, -1); + auto col = (milvus::segcore::Collection*)collection; + + int N = 10; + auto dataset = DataGen(col->get_schema(), N); + + std::string insert_data; + auto marshal = google::protobuf::TextFormat::PrintToString(*dataset.raw_, &insert_data); + assert(marshal == true); + + // first insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + int64_t offset; + PreInsert(segment, N, &offset); + auto res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.c_str()); + assert(res.error_code == Success); + + // second insert, pks= {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + PreInsert(segment, N, &offset); + res = Insert(segment, offset, N, dataset.row_ids_.data(), dataset.timestamps_.data(), insert_data.c_str()); + assert(res.error_code == Success); + + // create retrieve plan pks in {1, 2, 3} + std::vector retrive_row_ids = {1, 2, 3}; + auto schema = ((milvus::segcore::Collection*)collection)->get_schema(); + auto plan = std::make_unique(*schema); + auto term_expr = std::make_unique>(FieldId(101), DataType::INT64, retrive_row_ids); + plan->plan_node_ = std::make_unique(); + plan->plan_node_->predicate_ = std::move(term_expr); + std::vector target_field_ids{FieldId(100), FieldId(101)}; + plan->field_ids_ = target_field_ids; + + CRetrieveResult retrieve_result; + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + auto query_result = std::make_unique(); + auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 6); + + // delete data pks = {1, 2, 3} + std::vector delete_row_ids = {1, 2, 3}; + auto ids = std::make_unique(); + ids->mutable_int_id()->mutable_data()->Add(delete_row_ids.begin(), delete_row_ids.end()); + std::string delete_data; + marshal = google::protobuf::TextFormat::PrintToString(*ids.get(), &delete_data); + assert(marshal == true); + std::vector delete_timestamps(3, dataset.timestamps_[N - 1]); + + offset = PreDelete(segment, 3); + auto del_res = Delete(segment, offset, 3, delete_data.c_str(), delete_timestamps.data()); + assert(del_res.error_code == Success); + + // retrieve pks in {1, 2, 3} + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + + query_result = std::make_unique(); + suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 0); + + DeleteRetrievePlan(plan.release()); + DeleteRetrieveResult(&retrieve_result); + + DeleteCollection(collection); + DeleteSegment(segment); +} + +TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) { + auto collection = NewCollection(get_default_schema_config()); + auto segment = NewSegment(collection, Sealed, -1); + auto col = (milvus::segcore::Collection*)collection; + + int N = 20; + auto dataset = DataGen(col->get_schema(), N, 42, 0, 2); + + for (auto& [field_id, field_meta] : col->get_schema()->get_fields()) { + auto array = dataset.get_col(field_id); + std::string data; + auto marshal = google::protobuf::TextFormat::PrintToString(*array.get(), &data); + assert(marshal == true); + + auto load_info = CLoadFieldDataInfo{field_id.get(), data.c_str(), N}; + + auto res = LoadFieldData(segment, load_info); + assert(res.error_code == Success); + auto count = GetRowCount(segment); + assert(count == N); + } + + FieldMeta ts_field_meta(FieldName("Timestamp"), TimestampFieldID, DataType::INT64); + auto ts_array = CreateScalarDataArrayFrom(dataset.timestamps_.data(), N, ts_field_meta); + std::string ts_data; + auto marshal = google::protobuf::TextFormat::PrintToString(*ts_array.get(), &ts_data); + assert(marshal == true); + auto load_info = CLoadFieldDataInfo{TimestampFieldID.get(), ts_data.c_str(), N}; + auto res = LoadFieldData(segment, load_info); + assert(res.error_code == Success); + auto count = GetRowCount(segment); + assert(count == N); + + FieldMeta row_id_field_meta(FieldName("RowID"), RowFieldID, DataType::INT64); + auto row_id_array = CreateScalarDataArrayFrom(dataset.row_ids_.data(), N, row_id_field_meta); + std::string row_is_data; + marshal = google::protobuf::TextFormat::PrintToString(*row_id_array.get(), &row_is_data); + assert(marshal == true); + load_info = CLoadFieldDataInfo{RowFieldID.get(), ts_data.c_str(), N}; + res = LoadFieldData(segment, load_info); + assert(res.error_code == Success); + count = GetRowCount(segment); + assert(count == N); + + // create retrieve plan pks in {1, 2, 3} + std::vector retrive_row_ids = {1, 2, 3}; + auto schema = ((milvus::segcore::Collection*)collection)->get_schema(); + auto plan = std::make_unique(*schema); + auto term_expr = std::make_unique>(FieldId(101), DataType::INT64, retrive_row_ids); + plan->plan_node_ = std::make_unique(); + plan->plan_node_->predicate_ = std::move(term_expr); + std::vector target_field_ids{FieldId(100), FieldId(101)}; + plan->field_ids_ = target_field_ids; + + CRetrieveResult retrieve_result; + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + auto query_result = std::make_unique(); + auto suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 6); + + // delete data pks = {1, 2, 3} + std::vector delete_row_ids = {1, 2, 3}; + auto ids = std::make_unique(); + ids->mutable_int_id()->mutable_data()->Add(delete_row_ids.begin(), delete_row_ids.end()); + std::string delete_data; + marshal = google::protobuf::TextFormat::PrintToString(*ids.get(), &delete_data); + assert(marshal == true); + std::vector delete_timestamps(3, dataset.timestamps_[N - 1]); + + auto offset = PreDelete(segment, 3); + + auto del_res = Delete(segment, offset, 3, delete_data.c_str(), delete_timestamps.data()); + assert(del_res.error_code == Success); + + // retrieve pks in {1, 2, 3} + res = Retrieve(segment, plan.get(), dataset.timestamps_[N - 1], &retrieve_result); + ASSERT_EQ(res.error_code, Success); + + query_result = std::make_unique(); + suc = query_result->ParseFromArray(retrieve_result.proto_blob, retrieve_result.proto_size); + ASSERT_TRUE(suc); + ASSERT_EQ(query_result->ids().int_id().data().size(), 0); + + DeleteRetrievePlan(plan.release()); + DeleteRetrieveResult(&retrieve_result); + + DeleteCollection(collection); + DeleteSegment(segment); +} + TEST(CApiTest, SearchTest) { auto c_collection = NewCollection(get_default_schema_config()); auto segment = NewSegment(c_collection, Growing, -1); @@ -388,7 +550,7 @@ TEST(CApiTest, RetrieveTestWithExpr) { plan->field_ids_ = target_field_ids; CRetrieveResult retrieve_result; - auto res = Retrieve(segment, plan.release(), dataset.timestamps_[0], &retrieve_result); + auto res = Retrieve(segment, plan.get(), dataset.timestamps_[0], &retrieve_result); ASSERT_EQ(res.error_code, Success); DeleteRetrievePlan(plan.release()); diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index bcc434dc18..fab1823df4 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -123,11 +123,11 @@ struct GeneratedData { private: GeneratedData() = default; friend GeneratedData - DataGen(SchemaPtr schema, int64_t N, uint64_t seed, uint64_t ts_offset); + DataGen(SchemaPtr schema, int64_t N, uint64_t seed, uint64_t ts_offset, int repeat_count); }; inline GeneratedData -DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0) { +DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0, int repeat_count = 1) { using std::vector; std::default_random_engine er(seed); std::normal_distribution<> distr(0, 1); @@ -181,19 +181,8 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0) } case DataType::INT64: { vector data(N); - // begin with counter - if (starts_with(field_meta.get_name().get(), "counter")) { - int64_t index = 0; - for (auto& x : data) { - x = index++; - } - } else { - int i = 0; - for (auto& x : data) { - x = er() % (2 * N); - x = i; - i++; - } + for (int i = 0; i < N; i++) { + data[i] = i / repeat_count; } insert_cols(data, N, field_meta); break; @@ -240,8 +229,11 @@ DataGen(SchemaPtr schema, int64_t N, uint64_t seed = 42, uint64_t ts_offset = 0) } case DataType::VARCHAR: { vector data(N); - for (auto& x : data) { - x = std::to_string(er()); + for (int i = 0; i < N / repeat_count; i++) { + auto str = std::to_string(er()); + for (int j = 0; j < repeat_count; j++) { + data[i * repeat_count + j] = str; + } } insert_cols(data, N, field_meta); break;