snapshot code opt (#2609)

* opt test_snapshot

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename BaseHolders to ResourceHolder

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* rename DBBaseResource to BaseResource

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* remove ResourceHolder.inl

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>

* fix clang-format

Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
Cai Yudong 2020-06-19 11:35:25 +08:00 committed by GitHub
parent ecc52d365c
commit 2bc9c0cf25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 249 additions and 313 deletions

View File

@ -1,84 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/Store.h"
namespace milvus {
namespace engine {
namespace snapshot {
template <typename ResourceT, typename Derived>
class ResourceHolder {
public:
using ResourcePtr = std::shared_ptr<ResourceT>;
/* using ResourcePtr = typename ResourceT::Ptr; */
using ScopedT = ScopedResource<ResourceT>;
using ScopedPtr = std::shared_ptr<ScopedT>;
using IdMapT = std::map<ID_TYPE, ResourcePtr>;
using Ptr = std::shared_ptr<Derived>;
// TODO: Resource should be loaded into holder in OperationExecutor thread
ScopedT
Load(Store& store, ID_TYPE id, bool scoped = true);
ScopedT
GetResource(ID_TYPE id, bool scoped = true);
bool
AddNoLock(ResourcePtr resource);
bool
ReleaseNoLock(ID_TYPE id);
virtual bool
Add(ResourcePtr resource);
virtual bool
Release(ID_TYPE id);
virtual bool
HardDelete(ID_TYPE id);
static Derived&
GetInstance() {
static Derived holder;
return holder;
}
virtual void
Reset();
virtual void
Dump(const std::string& tag = "");
protected:
virtual void
OnNoRefCallBack(ResourcePtr resource);
virtual ResourcePtr
DoLoad(Store& store, ID_TYPE id);
ResourceHolder() = default;
virtual ~ResourceHolder() = default;
std::mutex mutex_;
IdMapT id_map_;
};
} // namespace snapshot
} // namespace engine
} // namespace milvus
#include "BaseHolders.inl"

View File

@ -1,138 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#include "BaseHolders.h"
#include "Operations.h"
#include <iostream>
#include <memory>
namespace milvus {
namespace engine {
namespace snapshot {
template <typename ResourceT, typename Derived>
void ResourceHolder<ResourceT, Derived>::Dump(const std::string& tag) {
std::unique_lock<std::mutex> lock(mutex_);
std::cout << typeid(*this).name() << " Dump Start [" << tag << "]:" << id_map_.size() << std::endl;
for (auto& kv : id_map_) {
/* std::cout << "\t" << kv.second->ToString() << std::endl; */
std::cout << "\t" << kv.first << " RefCnt " << kv.second->RefCnt() << std::endl;
}
std::cout << typeid(*this).name() << " Dump End [" << tag << "]" << std::endl;
}
template <typename ResourceT, typename Derived>
void ResourceHolder<ResourceT, Derived>::Reset() {
id_map_.clear();
}
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ResourcePtr
ResourceHolder<ResourceT, Derived>::DoLoad(Store& store, ID_TYPE id) {
LoadOperationContext context;
context.id = id;
auto op = std::make_shared<LoadOperation<ResourceT>>(context);
(*op)(store);
typename ResourceT::Ptr c;
auto status = op->GetResource(c);
if (status.ok()) {
Add(c);
return c;
}
return nullptr;
}
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ScopedT
ResourceHolder<ResourceT, Derived>::Load(Store& store, ID_TYPE id, bool scoped) {
{
std::unique_lock<std::mutex> lock(mutex_);
auto cit = id_map_.find(id);
if (cit != id_map_.end()) {
return ScopedT(cit->second, scoped);
}
}
auto ret = DoLoad(store, id);
if (!ret) return ScopedT();
return ScopedT(ret, scoped);
}
template <typename ResourceT, typename Derived>
typename ResourceHolder<ResourceT, Derived>::ScopedT
ResourceHolder<ResourceT, Derived>::GetResource(ID_TYPE id, bool scoped) {
// TODO: Temp to use Load here. Will be removed when resource is loaded just post Compound
// Operations.
return Load(Store::GetInstance(), id, scoped);
{
std::unique_lock<std::mutex> lock(mutex_);
auto cit = id_map_.find(id);
if (cit != id_map_.end()) {
return ScopedT(cit->second, scoped);
}
}
return ScopedT();
}
template <typename ResourceT, typename Derived>
void
ResourceHolder<ResourceT, Derived>::OnNoRefCallBack(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
HardDelete(resource->GetID());
Release(resource->GetID());
}
template <typename ResourceT, typename Derived>
bool ResourceHolder<ResourceT, Derived>::ReleaseNoLock(ID_TYPE id) {
auto it = id_map_.find(id);
if (it == id_map_.end()) {
return false;
}
id_map_.erase(it);
return true;
}
template <typename ResourceT, typename Derived>
bool ResourceHolder<ResourceT, Derived>::Release(ID_TYPE id) {
std::unique_lock<std::mutex> lock(mutex_);
return ReleaseNoLock(id);
}
template <typename ResourceT, typename Derived>
bool
ResourceHolder<ResourceT, Derived>::HardDelete(ID_TYPE id) {
auto op = std::make_shared<HardDeleteOperation<ResourceT>>(id);
// TODO:
(*op)(Store::GetInstance());
return true;
}
template <typename ResourceT, typename Derived>
bool ResourceHolder<ResourceT, Derived>::AddNoLock(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
if (!resource) return false;
if (id_map_.find(resource->GetID()) != id_map_.end()) {
return false;
}
id_map_[resource->GetID()] = resource;
resource->RegisterOnNoRefCB(std::bind(&Derived::OnNoRefCallBack, this, resource));
return true;
}
template <typename ResourceT, typename Derived>
bool ResourceHolder<ResourceT, Derived>::Add(typename ResourceHolder<ResourceT, Derived>::ResourcePtr resource) {
std::unique_lock<std::mutex> lock(mutex_);
return AddNoLock(resource);
}
} // namespace snapshot
} // namespace engine
} // namespace milvus

View File

@ -17,14 +17,14 @@ namespace milvus {
namespace engine {
namespace snapshot {
class DBBaseResource : public ReferenceProxy {
class BaseResource : public ReferenceProxy {
public:
virtual std::string
ToString() const {
return "";
}
virtual ~DBBaseResource() {
virtual ~BaseResource() {
}
};

View File

@ -0,0 +1,169 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
#pragma once
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include "db/snapshot/Operations.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/ScopedResource.h"
#include "db/snapshot/Store.h"
namespace milvus {
namespace engine {
namespace snapshot {
template <typename ResourceT, typename Derived>
class ResourceHolder {
using ResourcePtr = std::shared_ptr<ResourceT>;
using ScopedT = ScopedResource<ResourceT>;
using ScopedPtr = std::shared_ptr<ScopedT>;
using IdMapT = std::map<ID_TYPE, ResourcePtr>;
using Ptr = std::shared_ptr<Derived>;
protected:
ResourceHolder() = default;
virtual ~ResourceHolder() = default;
public:
static Derived&
GetInstance() {
static Derived holder;
return holder;
}
ScopedT
GetResource(ID_TYPE id, bool scoped = true) {
// TODO: Temp to use Load here. Will be removed when resource is loaded just post Compound
// Operations.
return Load(Store::GetInstance(), id, scoped);
{
std::unique_lock<std::mutex> lock(mutex_);
auto cit = id_map_.find(id);
if (cit != id_map_.end()) {
return ScopedT(cit->second, scoped);
}
}
return ScopedT();
}
virtual bool
Add(ResourcePtr resource) {
std::unique_lock<std::mutex> lock(mutex_);
return AddNoLock(resource);
}
virtual bool
Release(ID_TYPE id) {
std::unique_lock<std::mutex> lock(mutex_);
return ReleaseNoLock(id);
}
// TODO: Resource should be loaded into holder in OperationExecutor thread
ScopedT
Load(Store& store, ID_TYPE id, bool scoped = true) {
{
std::unique_lock<std::mutex> lock(mutex_);
auto cit = id_map_.find(id);
if (cit != id_map_.end()) {
return ScopedT(cit->second, scoped);
}
}
auto ret = DoLoad(store, id);
if (!ret) {
return ScopedT();
}
return ScopedT(ret, scoped);
}
virtual bool
HardDelete(ID_TYPE id) {
auto op = std::make_shared<HardDeleteOperation<ResourceT>>(id);
// TODO:
(*op)(Store::GetInstance());
return true;
}
virtual void
Reset() {
id_map_.clear();
}
virtual void
Dump(const std::string& tag = "") {
std::unique_lock<std::mutex> lock(mutex_);
std::cout << typeid(*this).name() << " Dump Start [" << tag << "]:" << id_map_.size() << std::endl;
for (auto& kv : id_map_) {
/* std::cout << "\t" << kv.second->ToString() << std::endl; */
std::cout << "\t" << kv.first << " RefCnt " << kv.second->RefCnt() << std::endl;
}
std::cout << typeid(*this).name() << " Dump End [" << tag << "]" << std::endl;
}
private:
bool
AddNoLock(ResourcePtr resource) {
if (!resource) {
return false;
}
if (id_map_.find(resource->GetID()) != id_map_.end()) {
return false;
}
id_map_[resource->GetID()] = resource;
resource->RegisterOnNoRefCB(std::bind(&Derived::OnNoRefCallBack, this, resource));
return true;
}
bool
ReleaseNoLock(ID_TYPE id) {
auto it = id_map_.find(id);
if (it == id_map_.end()) {
return false;
}
id_map_.erase(it);
return true;
}
virtual void
OnNoRefCallBack(ResourcePtr resource) {
HardDelete(resource->GetID());
Release(resource->GetID());
}
virtual ResourcePtr
DoLoad(Store& store, ID_TYPE id) {
LoadOperationContext context;
context.id = id;
auto op = std::make_shared<LoadOperation<ResourceT>>(context);
(*op)(store);
typename ResourceT::Ptr c;
auto status = op->GetResource(c);
if (status.ok()) {
Add(c);
return c;
}
return nullptr;
}
private:
std::mutex mutex_;
IdMapT id_map_;
};
} // namespace snapshot
} // namespace engine
} // namespace milvus

View File

@ -18,7 +18,7 @@
#include <string>
#include <thread>
#include <vector>
#include "db/snapshot/BaseHolders.h"
#include "db/snapshot/ResourceHolder.h"
#include "db/snapshot/ResourceTypes.h"
#include "db/snapshot/Resources.h"
#include "db/snapshot/ScopedResource.h"

View File

@ -276,7 +276,7 @@ class NameField {
std::string name_;
};
class Collection : public DBBaseResource,
class Collection : public BaseResource,
public NameField,
public IdField,
public LsnField,
@ -296,7 +296,7 @@ class Collection : public DBBaseResource,
using CollectionPtr = Collection::Ptr;
class SchemaCommit : public DBBaseResource,
class SchemaCommit : public BaseResource,
public CollectionIdField,
public MappingsField,
public IdField,
@ -318,7 +318,7 @@ class SchemaCommit : public DBBaseResource,
using SchemaCommitPtr = SchemaCommit::Ptr;
class Field : public DBBaseResource,
class Field : public BaseResource,
public NameField,
public NumField,
public IdField,
@ -339,7 +339,7 @@ class Field : public DBBaseResource,
using FieldPtr = Field::Ptr;
class FieldCommit : public DBBaseResource,
class FieldCommit : public BaseResource,
public CollectionIdField,
public FieldIdField,
public MappingsField,
@ -362,7 +362,7 @@ class FieldCommit : public DBBaseResource,
using FieldCommitPtr = FieldCommit::Ptr;
class FieldElement : public DBBaseResource,
class FieldElement : public BaseResource,
public CollectionIdField,
public FieldIdField,
public NameField,
@ -385,7 +385,7 @@ class FieldElement : public DBBaseResource,
using FieldElementPtr = FieldElement::Ptr;
class CollectionCommit : public DBBaseResource,
class CollectionCommit : public BaseResource,
public CollectionIdField,
public SchemaIdField,
public MappingsField,
@ -407,7 +407,7 @@ class CollectionCommit : public DBBaseResource,
using CollectionCommitPtr = CollectionCommit::Ptr;
class Partition : public DBBaseResource,
class Partition : public BaseResource,
public NameField,
public CollectionIdField,
public IdField,
@ -428,7 +428,7 @@ class Partition : public DBBaseResource,
using PartitionPtr = Partition::Ptr;
class PartitionCommit : public DBBaseResource,
class PartitionCommit : public BaseResource,
public CollectionIdField,
public PartitionIdField,
public MappingsField,
@ -454,7 +454,7 @@ class PartitionCommit : public DBBaseResource,
using PartitionCommitPtr = PartitionCommit::Ptr;
class Segment : public DBBaseResource,
class Segment : public BaseResource,
public PartitionIdField,
public NumField,
public IdField,
@ -478,7 +478,7 @@ class Segment : public DBBaseResource,
using SegmentPtr = Segment::Ptr;
class SegmentCommit : public DBBaseResource,
class SegmentCommit : public BaseResource,
public SchemaIdField,
public PartitionIdField,
public SegmentIdField,
@ -505,7 +505,7 @@ class SegmentCommit : public DBBaseResource,
using SegmentCommitPtr = SegmentCommit::Ptr;
class SegmentFile : public DBBaseResource,
class SegmentFile : public BaseResource,
public PartitionIdField,
public SegmentIdField,
public FieldElementIdField,

View File

@ -44,9 +44,7 @@ class ScopedResource {
}
operator bool() const {
if (res_)
return true;
return false;
return (res_ != nullptr);
}
~ScopedResource();

View File

@ -207,7 +207,7 @@ class Store {
auto& resources = std::get<Collection::MapT>(resources_);
if (!collection.HasAssigned() && (name_ids_.find(collection.GetName()) != name_ids_.end()) &&
(resources[name_ids_[collection.GetName()]]->IsActive()) && !collection.IsDeactive()) {
return Status(SS_DUPLICATED_ERROR, "Duplcated");
return Status(SS_DUPLICATED_ERROR, "Duplicated");
}
auto c = std::make_shared<Collection>(collection);
auto& id = std::get<Index<Collection::MapT, MockResourcesT>::value>(ids_);

View File

@ -35,10 +35,10 @@ using LSN_TYPE = milvus::engine::snapshot::LSN_TYPE;
using MappingT = milvus::engine::snapshot::MappingT;
using CreateCollectionContext = milvus::engine::snapshot::CreateCollectionContext;
using SegmentFileContext = milvus::engine::snapshot::SegmentFileContext;
using OperationContext = milvus::engine::snapshot::OperationContext;
using PartitionContext = milvus::engine::snapshot::PartitionContext;
using BuildOperation = milvus::engine::snapshot::BuildOperation;
using MergeOperation = milvus::engine::snapshot::MergeOperation;
using OperationContext = milvus::engine::snapshot::OperationContext;
using PartitionContext = milvus::engine::snapshot::PartitionContext;
using BuildOperation = milvus::engine::snapshot::BuildOperation;
using MergeOperation = milvus::engine::snapshot::MergeOperation;
using CreateCollectionOperation = milvus::engine::snapshot::CreateCollectionOperation;
using NewSegmentOperation = milvus::engine::snapshot::NewSegmentOperation;
using DropPartitionOperation = milvus::engine::snapshot::DropPartitionOperation;
@ -124,7 +124,7 @@ TEST_F(SnapshotTest, ScopedResourceTest) {
{
// Test bool operator
decltype(scoped) other_scoped;
CollectionScopedT other_scoped;
ASSERT_EQ(other_scoped, false);
// Test operator=
other_scoped = scoped;
@ -153,8 +153,7 @@ TEST_F(SnapshotTest, ResourceHoldersTest) {
auto collection = CollectionsHolder::GetInstance().GetResource(collection_id, false);
auto prev_cnt = collection->RefCnt();
{
auto collection_2 = CollectionsHolder::GetInstance().GetResource(
collection_id, false);
auto collection_2 = CollectionsHolder::GetInstance().GetResource(collection_id, false);
ASSERT_EQ(collection->GetID(), collection_id);
ASSERT_EQ(collection->RefCnt(), prev_cnt);
}
@ -194,7 +193,7 @@ CreateCollection(const std::string& collection_name, const LSN_TYPE& lsn) {
TEST_F(SnapshotTest, CreateCollectionOperationTest) {
ScopedSnapshotT expect_null;
auto status = Snapshots::GetInstance().GetSnapshot(expect_null, 100000);
ASSERT_TRUE(!expect_null);
ASSERT_FALSE(expect_null);
std::string collection_name = "test_c1";
LSN_TYPE lsn = 1;
@ -203,7 +202,7 @@ TEST_F(SnapshotTest, CreateCollectionOperationTest) {
ScopedSnapshotT latest_ss;
status = Snapshots::GetInstance().GetSnapshot(latest_ss, "xxxx");
ASSERT_TRUE(!status.ok());
ASSERT_FALSE(status.ok());
status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(latest_ss);
@ -222,8 +221,8 @@ TEST_F(SnapshotTest, CreateCollectionOperationTest) {
status = sd_op->Push();
ASSERT_TRUE(status.ok());
ASSERT_TRUE(sd_op->GetStatus().ok());
ASSERT_TRUE(!sd_op_ctx.collection->IsActive());
ASSERT_TRUE(!latest_ss->GetCollection()->IsActive());
ASSERT_FALSE(sd_op_ctx.collection->IsActive());
ASSERT_FALSE(latest_ss->GetCollection()->IsActive());
Snapshots::GetInstance().Reset();
}
@ -244,18 +243,18 @@ TEST_F(SnapshotTest, DropCollectionTest) {
status = Snapshots::GetInstance().DropCollection(collection_name, lsn);
ASSERT_TRUE(status.ok());
status = Snapshots::GetInstance().GetSnapshot(lss, collection_name);
ASSERT_TRUE(!status.ok());
ASSERT_FALSE(status.ok());
auto ss_2 = CreateCollection(collection_name, ++lsn);
status = Snapshots::GetInstance().GetSnapshot(lss, collection_name);
ASSERT_TRUE(status.ok());
ASSERT_EQ(ss_2->GetID(), lss->GetID());
ASSERT_TRUE(prev_ss_id != ss_2->GetID());
ASSERT_TRUE(prev_c_id != ss_2->GetCollection()->GetID());
ASSERT_NE(prev_ss_id, ss_2->GetID());
ASSERT_NE(prev_c_id, ss_2->GetCollection()->GetID());
status = Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
ASSERT_TRUE(status.ok());
status = Snapshots::GetInstance().DropCollection(collection_name, ++lsn);
ASSERT_TRUE(!status.ok());
ASSERT_FALSE(status.ok());
}
TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
@ -269,13 +268,13 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
ASSERT_TRUE(ss);
ASSERT_EQ(ss->GetName(), collection_name);
stale_ss_id = ss->GetID();
decltype(ss) a_ss;
ScopedSnapshotT a_ss;
status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
ASSERT_TRUE(status.ok());
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ASSERT_TRUE(!ss->GetCollection()->IsActive());
ASSERT_FALSE(ss->GetCollection()->IsActive());
status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
ASSERT_TRUE(!status.ok());
ASSERT_FALSE(status.ok());
auto c_c = CollectionCommitsHolder::GetInstance().GetResource(stale_ss_id, false);
ASSERT_TRUE(c_c);
@ -287,12 +286,12 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
ASSERT_TRUE(status.ok());
ScopedSnapshotT a_ss;
status = Snapshots::GetInstance().GetSnapshot(a_ss, collection_name);
ASSERT_TRUE(!status.ok());
ASSERT_FALSE(status.ok());
};
auto worker3 = [&] {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(!ss);
ASSERT_FALSE(ss);
std::this_thread::sleep_for(std::chrono::milliseconds(80));
ss = CreateCollection(collection_name, ++lsn);
ASSERT_TRUE(ss);
@ -306,12 +305,11 @@ TEST_F(SnapshotTest, ConCurrentCollectionOperation) {
t3.join();
auto c_c = CollectionCommitsHolder::GetInstance().GetResource(stale_ss_id, false);
ASSERT_TRUE(!c_c);
ASSERT_FALSE(c_c);
}
ScopedSnapshotT
CreatePartition(const std::string& collection_name, const PartitionContext& p_context,
const LSN_TYPE& lsn) {
CreatePartition(const std::string& collection_name, const PartitionContext& p_context, const LSN_TYPE& lsn) {
ScopedSnapshotT curr_ss;
ScopedSnapshotT ss;
auto status = Snapshots::GetInstance().GetSnapshot(ss, collection_name);
@ -365,17 +363,17 @@ TEST_F(SnapshotTest, PartitionTest) {
ASSERT_TRUE(status.ok());
ASSERT_TRUE(partition);
ASSERT_EQ(partition->GetName(), partition_name);
ASSERT_TRUE(!partition->IsActive());
ASSERT_FALSE(partition->IsActive());
ASSERT_TRUE(partition->HasAssigned());
status = op->Push();
ASSERT_TRUE(status.ok());
decltype(ss) curr_ss;
ScopedSnapshotT curr_ss;
status = op->GetSnapshot(curr_ss);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(curr_ss);
ASSERT_EQ(curr_ss->GetName(), ss->GetName());
ASSERT_TRUE(curr_ss->GetID() > ss->GetID());
ASSERT_GT(curr_ss->GetID(), ss->GetID());
ASSERT_EQ(curr_ss->NumberOfPartitions(), 2);
p_ctx.lsn = ++lsn;
@ -383,7 +381,7 @@ TEST_F(SnapshotTest, PartitionTest) {
status = drop_op->Push();
ASSERT_TRUE(status.ok());
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
status = drop_op->GetSnapshot(latest_ss);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(latest_ss);
@ -395,7 +393,7 @@ TEST_F(SnapshotTest, PartitionTest) {
drop_op = std::make_shared<DropPartitionOperation>(p_ctx, latest_ss);
status = drop_op->Push();
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(!status.ok());
ASSERT_FALSE(status.ok());
// TODO: Keep LSN in order
PartitionContext pp_ctx;
@ -424,13 +422,11 @@ TEST_F(SnapshotTest, PartitionTest) {
status = curr_ss->GetPartitionId(p_name_stream.str(), partition_id);
ASSERT_TRUE(status.ok());
status = Snapshots::GetInstance().DropPartition(
curr_ss->GetCollectionId(), partition_id, ++lsn);
status = Snapshots::GetInstance().DropPartition(curr_ss->GetCollectionId(), partition_id, ++lsn);
ASSERT_TRUE(status.ok());
status = Snapshots::GetInstance().GetSnapshot(
curr_ss, curr_ss->GetCollectionId());
status = Snapshots::GetInstance().GetSnapshot(curr_ss, curr_ss->GetCollectionId());
ASSERT_TRUE(status.ok());
ASSERT_EQ(curr_ss->NumberOfPartitions(), total_partition_num - i -1);
ASSERT_EQ(curr_ss->NumberOfPartitions(), total_partition_num - i - 1);
}
}
@ -455,11 +451,11 @@ TEST_F(SnapshotTest, PartitionTest) {
/* ASSERT_TRUE(status.ok()); */
/* ASSERT_TRUE(partition); */
/* ASSERT_EQ(partition->GetName(), partition_name); */
/* ASSERT_TRUE(!partition->IsActive()); */
/* ASSERT_FALSE(partition->IsActive()); */
/* ASSERT_TRUE(partition->HasAssigned()); */
/* status = cp_op->Push(); */
/* ASSERT_TRUE(!status.ok()); */
/* ASSERT_FALSE(status.ok()); */
/* } */
TEST_F(SnapshotTest, OperationTest) {
@ -478,23 +474,20 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(status.ok());
auto ss_id = ss->GetID();
lsn = ss->GetMaxLsn() + 1;
ASSERT_TRUE(status.ok());
// Check snapshot
{
auto collection_commit = CollectionCommitsHolder::GetInstance()
.GetResource(ss_id, false);
auto collection_commit = CollectionCommitsHolder::GetInstance().GetResource(ss_id, false);
/* snapshot::SegmentCommitsHolder::GetInstance().GetResource(prev_segment_commit->GetID()); */
ASSERT_TRUE(collection_commit);
to_string = collection_commit->ToString();
ASSERT_EQ(to_string, "");
ASSERT_TRUE(collection_commit->ToString().empty());
}
OperationContext merge_ctx;
std::set<ID_TYPE> stale_segment_commit_ids;
decltype(sf_context.segment_id) new_seg_id;
decltype(ss) new_ss;
ID_TYPE new_seg_id;
ScopedSnapshotT new_ss;
// Check build operation correctness
{
OperationContext context;
@ -506,11 +499,11 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(seg_file);
auto prev_segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
auto prev_segment_commit_mappings = prev_segment_commit->GetMappings();
ASSERT_NE(prev_segment_commit->ToString(), "");
ASSERT_FALSE(prev_segment_commit->ToString().empty());
build_op->Push();
status = build_op->GetSnapshot(ss);
ASSERT_TRUE(ss->GetID() > ss_id);
ASSERT_GT(ss->GetID(), ss_id);
auto segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
auto segment_commit_mappings = segment_commit->GetMappings();
@ -527,9 +520,8 @@ TEST_F(SnapshotTest, OperationTest) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
// Check stale snapshot has been deleted from store
{
auto collection_commit = CollectionCommitsHolder::GetInstance()
.GetResource(ss_id, false);
ASSERT_TRUE(!collection_commit);
auto collection_commit = CollectionCommitsHolder::GetInstance().GetResource(ss_id, false);
ASSERT_FALSE(collection_commit);
}
ss_id = ss->GetID();
@ -542,7 +534,7 @@ TEST_F(SnapshotTest, OperationTest) {
SegmentPtr new_seg;
status = op->CommitNewSegment(new_seg);
ASSERT_TRUE(status.ok());
ASSERT_NE(new_seg->ToString(), "");
ASSERT_FALSE(new_seg->ToString().empty());
SegmentFilePtr seg_file;
status = op->CommitNewSegmentFile(sf_context, seg_file);
ASSERT_TRUE(status.ok());
@ -550,7 +542,7 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(status.ok());
status = op->GetSnapshot(ss);
ASSERT_TRUE(ss->GetID() > ss_id);
ASSERT_GT(ss->GetID(), ss_id);
ASSERT_TRUE(status.ok());
auto segment_commit = ss->GetSegmentCommitBySegmentId(seg_file->GetSegmentId());
@ -572,8 +564,8 @@ TEST_F(SnapshotTest, OperationTest) {
{
auto prev_partition_commit = ss->GetPartitionCommitByPartitionId(partition_id);
auto expect_null = ss->GetPartitionCommitByPartitionId(11111111);
ASSERT_TRUE(!expect_null);
ASSERT_NE(prev_partition_commit->ToString(), "");
ASSERT_FALSE(expect_null);
ASSERT_FALSE(prev_partition_commit->ToString().empty());
merge_ctx.lsn = ++lsn;
auto op = std::make_shared<MergeOperation>(merge_ctx, ss);
SegmentPtr new_seg;
@ -586,7 +578,7 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(status.ok());
std::cout << op->ToString() << std::endl;
status = op->GetSnapshot(ss);
ASSERT_TRUE(ss->GetID() > ss_id);
ASSERT_GT(ss->GetID(), ss_id);
ASSERT_TRUE(status.ok());
auto segment_commit = ss->GetSegmentCommitBySegmentId(new_seg->GetID());
@ -616,7 +608,7 @@ TEST_F(SnapshotTest, OperationTest) {
auto new_sf_context = sf_context;
new_sf_context.segment_id = new_seg_id;
status = build_op->CommitNewSegmentFile(new_sf_context, seg_file);
ASSERT_TRUE(!status.ok());
ASSERT_FALSE(status.ok());
}
// 1. Build start
@ -634,13 +626,12 @@ TEST_F(SnapshotTest, OperationTest) {
ASSERT_TRUE(status.ok());
std::cout << build_op->ToString() << std::endl;
auto status = Snapshots::GetInstance().DropCollection(ss->GetName(),
++lsn);
auto status = Snapshots::GetInstance().DropCollection(ss->GetName(), ++lsn);
ASSERT_TRUE(status.ok());
status = build_op->Push();
std::cout << status.ToString() << std::endl;
ASSERT_TRUE(!status.ok());
ASSERT_TRUE(!(build_op->GetStatus()).ok());
ASSERT_FALSE(status.ok());
ASSERT_FALSE(build_op->GetStatus().ok());
std::cout << build_op->ToString() << std::endl;
}
Snapshots::GetInstance().Reset();
@ -712,7 +703,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
IDS_TYPE partitions = {ss->GetResources<Partition>().begin()->second->GetID()};
auto do_build = [&] (const ID_TYPE& seg_id) {
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
@ -751,7 +742,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
if (seg_ids.size() == 0) {
return;
}
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
@ -824,7 +815,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
// TODO: If any Compound Operation find larger Snapshot. This Operation should be rollback to latest
auto handler_worker = [&] {
auto loop_cnt = RandomInt(10, 20);
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
auto create_new_segment = [&]() {
ID_TYPE partition_id;
@ -895,7 +886,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
std::cout << "Exiting Merge Worker" << std::endl;
break;
}
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
auto seg = latest_ss->GetResource<Segment>(seg_id);
@ -980,7 +971,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
merge_waiter.Wait();
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
auto expect_segments = all_segments;
@ -990,7 +981,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
expect_segments.erase(id);
}
}
decltype(expect_segments) final_segments;
std::set<ID_TYPE> final_segments;
auto segments = latest_ss->GetResources<Segment>();
for (auto& kv : segments) {
final_segments.insert(kv.first);
@ -999,7 +990,7 @@ TEST_F(SnapshotTest, CompoundTest1) {
auto final_segment_file_cnt = latest_ss->GetResources<SegmentFile>().size();
decltype(final_segment_file_cnt) expect_segment_file_cnt;
size_t expect_segment_file_cnt;
expect_segment_file_cnt = expect_segments.size();
expect_segment_file_cnt += built_segs.size();
std::cout << latest_ss->ToString() << std::endl;
@ -1014,11 +1005,11 @@ TEST_F(SnapshotTest, CompoundTest1) {
TEST_F(SnapshotTest, CompoundTest2) {
milvus::Status status;
LSN_TYPE lsn = 0;
auto next_lsn = [&]() -> decltype(lsn)& {
auto next_lsn = [&]() -> LSN_TYPE& {
return ++lsn;
};
LSN_TYPE pid = 0;
auto next_pid = [&]() -> decltype(pid) {
auto next_pid = [&]() -> LSN_TYPE {
return ++pid;
};
std::string collection_name("c1");
@ -1056,7 +1047,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
std::map<ID_TYPE, ID_TYPE> seg_p_map;
auto do_build = [&] (const ID_TYPE& seg_id, const ID_TYPE& p_id) {
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
@ -1112,7 +1103,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
if (seg_ids.size() == 0) {
return;
}
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
@ -1201,7 +1192,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
// TODO: If any Compound Operation find larger Snapshot. This Operation should be rollback to latest
auto handler_worker = [&] {
auto loop_cnt = RandomInt(30, 35);
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
auto create_new_segment = [&]() {
ID_TYPE partition_id;
@ -1285,7 +1276,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
if (partitions.size() <= 2) {
return;
}
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
auto index = RandomInt(0, partitions.size() - 1);
auto pid = partitions[index];
@ -1318,7 +1309,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
std::cout << "Exiting Merge Worker" << std::endl;
break;
}
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
auto status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
auto seg = latest_ss->GetResource<Segment>(seg_id);
@ -1412,7 +1403,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
merge_waiter.Wait();
decltype(ss) latest_ss;
ScopedSnapshotT latest_ss;
status = Snapshots::GetInstance().GetSnapshot(latest_ss, collection_name);
ASSERT_TRUE(status.ok());
@ -1433,7 +1424,7 @@ TEST_F(SnapshotTest, CompoundTest2) {
expect_segments.erase(seg_p.first);
}
decltype(expect_segments) final_segments;
std::set<ID_TYPE> final_segments;
auto segments = latest_ss->GetResources<Segment>();
for (auto& kv : segments) {
final_segments.insert(kv.first);