(db/snapshot): fix bug in metadata snapshot that handling CollectionCommit and PartitionCommit mappings (#3864)

Signed-off-by: peng.xu <peng.xu@zilliz.com>
This commit is contained in:
XuPeng-SH 2020-09-25 14:13:19 +08:00 committed by GitHub
parent 53360cda66
commit b2386626de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -22,7 +22,6 @@ CollectionCommitOperation::DoExecute(StorePtr store) {
auto prev_resource = GetPrevResource();
auto row_cnt = 0;
auto size = 0;
bool flush_ids_changed = false;
if (!prev_resource) {
std::stringstream emsg;
emsg << GetRepr() << ". Cannot find prev collection commit resource";
@ -37,19 +36,16 @@ CollectionCommitOperation::DoExecute(StorePtr store) {
auto prev_partition_commit = GetStartedSS()->GetPartitionCommitByPartitionId(pc->GetPartitionId());
if (prev_partition_commit) {
resource_->GetMappings().erase(prev_partition_commit->GetID());
flush_ids_changed = true;
row_cnt -= prev_partition_commit->GetRowCount();
size -= prev_partition_commit->GetSize();
}
resource_->GetMappings().insert(pc->GetID());
flush_ids_changed = true;
row_cnt += pc->GetRowCount();
size += pc->GetSize();
};
if (context_.stale_partition_commit) {
resource_->GetMappings().erase(context_.stale_partition_commit->GetID());
flush_ids_changed = true;
row_cnt -= context_.stale_partition_commit->GetRowCount();
size -= context_.stale_partition_commit->GetSize();
} else if (context_.new_partition_commit) {
@ -63,11 +59,9 @@ CollectionCommitOperation::DoExecute(StorePtr store) {
resource_->SetSchemaId(context_.new_schema_commit->GetID());
}
if (flush_ids_changed) {
resource_->UpdateFlushIds();
auto path = GetResPath<Collection>(store->GetRootPath(), GetStartedSS()->GetCollection());
resource_->FlushIds(path);
}
resource_->UpdateFlushIds();
auto path = GetResPath<Collection>(store->GetRootPath(), GetStartedSS()->GetCollection());
resource_->FlushIds(path);
resource_->SetID(0);
resource_->SetRowCount(row_cnt);
@ -119,7 +113,6 @@ PartitionCommitOperation::DoExecute(StorePtr store) {
auto prev_resource = GetPrevResource();
auto row_cnt = 0;
auto size = 0;
bool flush_ids_changed = false;
PartitionPtr partition = nullptr;
if (prev_resource) {
resource_ = std::make_shared<PartitionCommit>(*prev_resource);
@ -134,7 +127,6 @@ PartitionCommitOperation::DoExecute(StorePtr store) {
auto prev_sc = GetStartedSS()->GetSegmentCommitBySegmentId(sc->GetSegmentId());
if (prev_sc) {
resource_->GetMappings().erase(prev_sc->GetID());
flush_ids_changed = true;
row_cnt -= prev_sc->GetRowCount();
size -= prev_sc->GetSize();
}
@ -155,7 +147,6 @@ PartitionCommitOperation::DoExecute(StorePtr store) {
}
auto stale_segment_commit = GetStartedSS()->GetSegmentCommitBySegmentId(stale_segment->GetID());
resource_->GetMappings().erase(stale_segment_commit->GetID());
flush_ids_changed = true;
row_cnt -= stale_segment_commit->GetRowCount();
size -= stale_segment_commit->GetSize();
}
@ -174,23 +165,19 @@ PartitionCommitOperation::DoExecute(StorePtr store) {
if (context_.new_segment_commit) {
resource_->GetMappings().insert(context_.new_segment_commit->GetID());
flush_ids_changed = true;
row_cnt += context_.new_segment_commit->GetRowCount();
size += context_.new_segment_commit->GetSize();
} else if (context_.new_segment_commits.size() > 0) {
for (auto& sc : context_.new_segment_commits) {
resource_->GetMappings().insert(sc->GetID());
flush_ids_changed = true;
row_cnt += sc->GetRowCount();
size += sc->GetSize();
}
}
if (flush_ids_changed) {
resource_->UpdateFlushIds();
auto path = GetResPath<Partition>(store->GetRootPath(), partition);
resource_->FlushIds(path);
}
resource_->UpdateFlushIds();
auto path = GetResPath<Partition>(store->GetRootPath(), partition);
resource_->FlushIds(path);
resource_->SetRowCount(row_cnt);
resource_->SetSize(size);