fix build index crash (#3167)

* fix build index crash

Signed-off-by: yhmo <yihua.mo@zilliz.com>

Co-authored-by: shengjun.li <shengjun.li@zilliz.com>
This commit is contained in:
groot 2020-08-07 14:08:13 +08:00 committed by GitHub
parent c05f67baf2
commit 15a07951ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 24 deletions

View File

@ -55,8 +55,9 @@ BuildIndexTask::OnLoad(milvus::scheduler::LoadType type, uint8_t device_id) {
stat = execution_engine_->Load(context);
type_str = "DISK2CPU";
} else if (type == LoadType::CPU2GPU) {
stat = execution_engine_->CopyToGpu(device_id);
type_str = "CPU2GPU:" + std::to_string(device_id);
// no need to copy flat to gpu,
// stat = execution_engine_->CopyToGpu(device_id);
// type_str = "CPU2GPU:" + std::to_string(device_id);
} else {
error_msg = "Wrong load type";
stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);

View File

@ -149,30 +149,20 @@ Segment::DeleteEntity(std::vector<offset_t>& offsets) {
// sort offset in descendant
std::sort(offsets.begin(), offsets.end(), std::greater<offset_t>());
// delete entity data
// delete entity data from max offset to min offset
for (auto& pair : fixed_fields_) {
int64_t width = fixed_fields_width_[pair.first];
if (width == 0 || pair.second == nullptr) {
continue;
}
auto& data = pair.second->data_;
std::vector<bool> marked(data.size(), false);
std::vector<uint8_t> temp;
temp.reserve(data.size() - offsets.size() * width);
for (auto pos = offsets.begin(); pos != offsets.end(); pos++) {
for (auto i = 0; i < width; ++i) {
marked[(*pos) * width + i] = true;
auto& data = pair.second;
for (auto offset : offsets) {
if (offset >= 0 && offset < row_count_) {
auto step = offset * width;
data->data_.erase(data->data_.begin() + step, data->data_.begin() + step + width);
}
}
for (size_t i = 0; i < data.size(); i++) {
if (!marked[i]) {
temp.push_back(data[i]);
}
}
data = std::move(temp);
}
// reset row count

View File

@ -544,9 +544,16 @@ TEST_F(DBTest, InsertTest) {
ASSERT_TRUE(status.ok());
};
// create collection with auto-generate id, insert entities with user id, insert action failed
do_insert(true, true);
// create collection with auto-generate id, insert entities without user id, insert action succeed
do_insert(true, false);
// create collection without auto-generate id, insert entities with user id, insert action succeed
do_insert(false, true);
// create collection without auto-generate id, insert entities without user id, insert action failed
do_insert(false, false);
}
@ -559,6 +566,7 @@ TEST_F(DBTest, MergeTest) {
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(entity_count, 0, data_chunk);
// insert entities into collection multiple times
int64_t repeat = 2;
for (int32_t i = 0; i < repeat; i++) {
status = db_->Insert(collection_name, "", data_chunk);
@ -570,13 +578,16 @@ TEST_F(DBTest, MergeTest) {
ASSERT_TRUE(status.ok());
}
sleep(2); // wait to merge
// wait to merge finished
sleep(2);
// validate entities count
int64_t row_count = 0;
status = db_->CountEntities(collection_name, row_count);
ASSERT_TRUE(status.ok());
ASSERT_EQ(row_count, entity_count * repeat);
// validate segment files count is correct
ScopedSnapshotT ss;
status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
ASSERT_TRUE(status.ok());
@ -631,6 +642,7 @@ TEST_F(DBTest, CompactTest) {
auto status = CreateCollection2(db_, collection_name, 0);
ASSERT_TRUE(status.ok());
// insert 1000 entities into default partition
const uint64_t entity_count = 1000;
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(entity_count, 0, data_chunk);
@ -654,6 +666,8 @@ TEST_F(DBTest, CompactTest) {
status = db_->DeleteEntityByID(collection_name, delete_ids);
ASSERT_TRUE(status.ok());
};
// delete entities from 100 to 300
int64_t delete_count_1 = 200;
delete_entity(100, 100 + delete_count_1);
@ -677,8 +691,11 @@ TEST_F(DBTest, CompactTest) {
ASSERT_EQ(p[index++], i);
}
};
// validate the left data is correct after deletion
validate_entity_data();
// delete entities from 700 to 800
int64_t delete_count_2 = 100;
delete_entity(700, 700 + delete_count_2);
@ -703,6 +720,7 @@ TEST_F(DBTest, CompactTest) {
validate_entity_data();
};
// compact the collection, when threshold = 0.001, the compact do nothing
validate_compact(0.001); // compact skip
validate_compact(0.5); // do compact
}
@ -712,6 +730,7 @@ TEST_F(DBTest, IndexTest) {
auto status = CreateCollection2(db_, collection_name, 0);
ASSERT_TRUE(status.ok());
// insert 10000 entities into default partition
const uint64_t entity_count = 10000;
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(entity_count, 0, data_chunk);
@ -722,6 +741,7 @@ TEST_F(DBTest, IndexTest) {
status = db_->Flush();
ASSERT_TRUE(status.ok());
// create index for vector field, validate the index
{
milvus::engine::CollectionIndex index;
index.index_name_ = "my_index1";
@ -740,6 +760,7 @@ TEST_F(DBTest, IndexTest) {
ASSERT_EQ(index.extra_params_, index_get.extra_params_);
}
// create index for structured fields, validate the index
{
milvus::engine::CollectionIndex index;
index.index_name_ = "my_index2";
@ -758,6 +779,7 @@ TEST_F(DBTest, IndexTest) {
ASSERT_EQ(index.index_type_, index_get.index_type_);
}
// drop index of vector field
{
status = db_->DropIndex(collection_name, VECTOR_FIELD_NAME);
ASSERT_TRUE(status.ok());
@ -768,6 +790,7 @@ TEST_F(DBTest, IndexTest) {
ASSERT_TRUE(index_get.index_name_.empty());
}
// drop index of structured field 'field_0'
{
status = db_->DropIndex(collection_name, "field_0");
ASSERT_TRUE(status.ok());
@ -788,6 +811,8 @@ TEST_F(DBTest, StatsTest) {
status = db_->CreatePartition(collection_name, partition_name);
ASSERT_TRUE(status.ok());
// insert 10000 entities into default partition
// insert 10000 entities into partition 'p1'
const uint64_t entity_count = 10000;
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(entity_count, 0, data_chunk);
@ -803,6 +828,7 @@ TEST_F(DBTest, StatsTest) {
status = db_->Flush();
ASSERT_TRUE(status.ok());
// create index for vector field
{
milvus::engine::CollectionIndex index;
index.index_type_ = milvus::knowhere::IndexEnum::INDEX_FAISS_IVFFLAT;
@ -812,6 +838,7 @@ TEST_F(DBTest, StatsTest) {
ASSERT_TRUE(status.ok());
}
// create index for structured fields
{
milvus::engine::CollectionIndex index;
index.index_type_ = milvus::engine::DEFAULT_STRUCTURED_INDEX_NAME;
@ -823,6 +850,7 @@ TEST_F(DBTest, StatsTest) {
ASSERT_TRUE(status.ok());
}
// validate collection stats
milvus::json json_stats;
status = db_->GetCollectionStats(collection_name, json_stats);
@ -876,6 +904,8 @@ TEST_F(DBTest, FetchTest) {
status = db_->CreatePartition(collection_name, partition_name);
ASSERT_TRUE(status.ok());
// insert 1000 entities into default partition
// insert 1000 entities into partition 'p1'
const uint64_t entity_count = 1000;
milvus::engine::DataChunkPtr data_chunk;
BuildEntities(entity_count, 0, data_chunk);
@ -891,10 +921,12 @@ TEST_F(DBTest, FetchTest) {
status = db_->Flush();
ASSERT_TRUE(status.ok());
// get all the entities id of partition 'p1'
milvus::engine::IDNumbers batch_entity_ids;
milvus::engine::utils::GetIDFromChunk(data_chunk, batch_entity_ids);
ASSERT_EQ(batch_entity_ids.size(), entity_count);
// delete first 10 entities from partition 'p1'
int64_t delete_count = 10;
milvus::engine::IDNumbers delete_entity_ids = batch_entity_ids;
delete_entity_ids.resize(delete_count);
@ -904,6 +936,7 @@ TEST_F(DBTest, FetchTest) {
status = db_->Flush();
ASSERT_TRUE(status.ok());
// try fetch first 10 entities of parititon 'p1', since they were deleted, nothing returned
std::vector<std::string> field_names = {milvus::engine::DEFAULT_UID_NAME, VECTOR_FIELD_NAME, "field_0"};
std::vector<bool> valid_row;
milvus::engine::DataChunkPtr fetch_chunk;
@ -914,6 +947,7 @@ TEST_F(DBTest, FetchTest) {
ASSERT_FALSE(valid);
}
// fetch all entities from the partition 'p1', 990 entities returned
fetch_chunk = nullptr;
status = db_->GetEntityByID(collection_name, batch_entity_ids, field_names, valid_row, fetch_chunk);
ASSERT_TRUE(status.ok());
@ -923,6 +957,9 @@ TEST_F(DBTest, FetchTest) {
auto& vectors = fetch_chunk->fixed_fields_[VECTOR_FIELD_NAME];
ASSERT_EQ(vectors->Size() / (COLLECTION_DIM * sizeof(float)), fetch_chunk->count_);
// get collection stats
// the segment of default partition has 1000 entities
// the segment of partition 'p1' only has 990 entities
milvus::json json_stats;
status = db_->GetCollectionStats(collection_name, json_stats);
ASSERT_TRUE(status.ok());
@ -968,12 +1005,13 @@ TEST_F(DBTest, DeleteEntitiesTest) {
return Status::OK();
};
// insert 1000 entities into default partiiton
milvus::engine::IDNumbers entity_ids;
auto status = insert_entities(collection_name, "", 10000, 0, entity_ids);
auto status = insert_entities(collection_name, "", 1000, 0, entity_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
// delete the first entity
milvus::engine::IDNumbers delete_ids = {entity_ids[0]};
status = db_->DeleteEntityByID(collection_name, delete_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
@ -990,25 +1028,35 @@ TEST_F(DBTest, DeleteEntitiesTest) {
status = db_->CreatePartition(collection_name, partition1);
ASSERT_TRUE(status.ok()) << status.ToString();
// insert 1000 entities into partiiton_0
milvus::engine::IDNumbers partition0_ids;
status = insert_entities(collection_name, partition0, 10000, 2 * i + 1, partition0_ids);
status = insert_entities(collection_name, partition0, 1000, 2 * i + 1, partition0_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
// insert 1000 entities into partiiton_1
milvus::engine::IDNumbers partition1_ids;
status = insert_entities(collection_name, partition1, 10000, 2 * i + 2, partition1_ids);
status = insert_entities(collection_name, partition1, 1000, 2 * i + 2, partition1_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
// delete the first entity of each partition
milvus::engine::IDNumbers partition_delete_ids = {partition0_ids[0], partition1_ids[0]};
whole_delete_ids.insert(whole_delete_ids.begin(), partition_delete_ids.begin(), partition_delete_ids.end());
db_->DeleteEntityByID(collection_name, partition_delete_ids);
ASSERT_TRUE(status.ok()) << status.ToString();
// drop partition_1
status = db_->DropPartition(collection_name, partition1);
ASSERT_TRUE(status.ok()) << status.ToString();
}
sleep(3);
// flush will apply the deletion
status = db_->Flush();
ASSERT_TRUE(status.ok());
fiu_disable("MemCollection.ApplyDeletes.RandomSleep");
// get first entity from partition_0, the entity has been deleted, no entity returned
// get first entity from partition_1, the partition has been deleted, no entity returned
std::vector<bool> valid_row;
milvus::engine::DataChunkPtr entity_data_chunk;
for (auto& id : whole_delete_ids) {