mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
fix: erase pk empty check when pk index replace raw data (#30432)
#30350 Signed-off-by: luzhang <luzhang@zilliz.com> Co-authored-by: luzhang <luzhang@zilliz.com>
This commit is contained in:
parent
5bbace1094
commit
e8a6f1ea2b
@ -176,12 +176,11 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
|
||||
// reverse pk from scalar index and set pks to offset
|
||||
if (schema_->get_primary_field_id() == field_id) {
|
||||
AssertInfo(field_id.get() != -1, "Primary key is -1");
|
||||
AssertInfo(insert_record_.empty_pks(), "already exists");
|
||||
switch (field_meta.get_data_type()) {
|
||||
case DataType::INT64: {
|
||||
auto int64_index = dynamic_cast<index::ScalarIndex<int64_t>*>(
|
||||
scalar_indexings_[field_id].get());
|
||||
if (int64_index->HasRawData()) {
|
||||
if (insert_record_.empty_pks() && int64_index->HasRawData()) {
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
insert_record_.insert_pk(int64_index->Reverse_Lookup(i),
|
||||
i);
|
||||
@ -194,7 +193,7 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
|
||||
auto string_index =
|
||||
dynamic_cast<index::ScalarIndex<std::string>*>(
|
||||
scalar_indexings_[field_id].get());
|
||||
if (string_index->HasRawData()) {
|
||||
if (insert_record_.empty_pks() && string_index->HasRawData()) {
|
||||
for (int i = 0; i < row_count; ++i) {
|
||||
insert_record_.insert_pk(
|
||||
string_index->Reverse_Lookup(i), i);
|
||||
@ -931,8 +930,8 @@ SegmentSealedImpl::DropFieldData(const FieldId field_id) {
|
||||
auto& field_meta = schema_->operator[](field_id);
|
||||
std::unique_lock lck(mutex_);
|
||||
if (get_bit(field_data_ready_bitset_, field_id)) {
|
||||
fields_.erase(field_id);
|
||||
set_bit(field_data_ready_bitset_, field_id, false);
|
||||
insert_record_.drop_field_data(field_id);
|
||||
}
|
||||
if (get_bit(binlog_index_bitset_, field_id)) {
|
||||
set_bit(binlog_index_bitset_, field_id, false);
|
||||
|
||||
@ -609,6 +609,37 @@ TEST(Sealed, LoadFieldDataMmap) {
|
||||
ASSERT_ANY_THROW(segment->Search(plan.get(), ph_group.get(), timestamp));
|
||||
}
|
||||
|
||||
TEST(Sealed, LoadPkScalarIndex) {
|
||||
size_t N = ROW_COUNT;
|
||||
auto schema = std::make_shared<Schema>();
|
||||
auto pk_id = schema->AddDebugField("counter", DataType::INT64);
|
||||
auto nothing_id = schema->AddDebugField("nothing", DataType::INT32);
|
||||
schema->set_primary_field_id(pk_id);
|
||||
|
||||
auto dataset = DataGen(schema, N);
|
||||
auto segment = CreateSealedSegment(schema);
|
||||
auto fields = schema->get_fields();
|
||||
for (auto field_data : dataset.raw_->fields_data()) {
|
||||
int64_t field_id = field_data.field_id();
|
||||
|
||||
auto info = FieldDataInfo(field_data.field_id(), N);
|
||||
auto field_meta = fields.at(FieldId(field_id));
|
||||
info.channel->push(
|
||||
CreateFieldDataFromDataArray(N, &field_data, field_meta));
|
||||
info.channel->close();
|
||||
|
||||
segment->LoadFieldData(FieldId(field_id), info);
|
||||
}
|
||||
|
||||
LoadIndexInfo pk_index;
|
||||
pk_index.field_id = pk_id.get();
|
||||
pk_index.field_type = DataType::INT64;
|
||||
pk_index.index_params["index_type"] = "sort";
|
||||
auto pk_data = dataset.get_col<int64_t>(pk_id);
|
||||
pk_index.index = GenScalarIndexing<int64_t>(N, pk_data.data());
|
||||
segment->LoadIndex(pk_index);
|
||||
}
|
||||
|
||||
TEST(Sealed, LoadScalarIndex) {
|
||||
auto dim = 16;
|
||||
size_t N = ROW_COUNT;
|
||||
|
||||
@ -418,7 +418,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
|
||||
log.Info("received load segments request",
|
||||
zap.Int64("version", req.GetVersion()),
|
||||
zap.Bool("needTransfer", req.GetNeedTransfer()),
|
||||
)
|
||||
zap.String("loadScope", req.GetLoadScope().String()))
|
||||
// check node healthy
|
||||
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
|
||||
return merr.Status(err), nil
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user