enhance: Support streaming read and write of vector index files (#43824)

issue: #42032

Signed-off-by: xianliang.li <xianliang.li@zilliz.com>
This commit is contained in:
foxspy 2025-08-15 23:41:43 +08:00 committed by GitHub
parent ebb10dfae0
commit 647c2bca2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 480 additions and 30 deletions

View File

@ -27,6 +27,7 @@
#include "common/Consts.h"
#include "common/RangeSearchHelper.h"
#include "indexbuilder/types.h"
#include "filemanager/FileManager.h"
namespace milvus::index {
@ -59,7 +60,7 @@ VectorDiskAnnIndex<T>::VectorDiskAnnIndex(
CheckCompatible(version);
local_chunk_manager->CreateDir(local_index_path_prefix);
auto diskann_index_pack =
knowhere::Pack(std::shared_ptr<knowhere::FileManager>(file_manager_));
knowhere::Pack(std::shared_ptr<milvus::FileManager>(file_manager_));
auto get_index_obj = knowhere::IndexFactory::Instance().Create<T>(
GetIndexType(), version, diskann_index_pack);
if (get_index_obj.has_value()) {
@ -96,8 +97,11 @@ VectorDiskAnnIndex<T>::Load(milvus::tracer::TraceContext ctx,
GetValueFromConfig<std::vector<std::string>>(config, "index_files");
AssertInfo(index_files.has_value(),
"index file paths is empty when load disk ann index data");
file_manager_->CacheIndexToDisk(index_files.value(),
config[milvus::LOAD_PRIORITY]);
// If index is loaded with stream, we don't need to cache index to disk
if (!index_.LoadIndexWithStream()) {
file_manager_->CacheIndexToDisk(index_files.value(),
config[milvus::LOAD_PRIORITY]);
}
read_file_span->End();
}

View File

@ -191,7 +191,6 @@ class DeletedRecord {
}
it++;
}
}
size_t

View File

@ -121,6 +121,13 @@ class ChunkManager {
*/
virtual std::string
GetRootPath() const = 0;
/**
* @brief Get the Bucket Name
* @return std::string
*/
virtual std::string
GetBucketName() const = 0;
};
using ChunkManagerPtr = std::shared_ptr<ChunkManager>;

View File

@ -47,6 +47,9 @@
#include "storage/Util.h"
#include "storage/FileWriter.h"
#include "storage/RemoteOutputStream.h"
#include "storage/RemoteInputStream.h"
namespace milvus::storage {
DiskFileManagerImpl::DiskFileManagerImpl(
const FileManagerContext& fileManagerContext)
@ -76,6 +79,12 @@ DiskFileManagerImpl::GetRemoteIndexPath(const std::string& file_name,
return remote_prefix + "/" + file_name + "_" + std::to_string(slice_num);
}
std::string
DiskFileManagerImpl::GetRemoteIndexPathV2(const std::string& file_name) const {
std::string remote_prefix = GetRemoteIndexFilePrefixV2();
return remote_prefix + "/" + file_name;
}
std::string
DiskFileManagerImpl::GetRemoteTextLogPath(const std::string& file_name,
int64_t slice_num) const {
@ -145,6 +154,38 @@ DiskFileManagerImpl::AddFileInternal(
return true;
} // namespace knowhere
std::shared_ptr<InputStream>
DiskFileManagerImpl::OpenInputStream(const std::string& filename) {
auto local_file_name = GetFileName(filename);
auto remote_file_path = GetRemoteIndexPathV2(local_file_name);
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto remote_file = fs->OpenInputFile(remote_file_path);
AssertInfo(remote_file.ok(), "failed to open remote file");
return std::static_pointer_cast<milvus::InputStream>(
std::make_shared<milvus::storage::RemoteInputStream>(
std::move(remote_file.ValueOrDie())));
}
std::shared_ptr<OutputStream>
DiskFileManagerImpl::OpenOutputStream(const std::string& filename) {
auto local_file_name = GetFileName(filename);
auto remote_file_path = GetRemoteIndexPathV2(local_file_name);
auto fs = milvus_storage::ArrowFileSystemSingleton::GetInstance()
.GetArrowFileSystem();
auto remote_stream = fs->OpenOutputStream(remote_file_path);
AssertInfo(remote_stream.ok(),
"failed to open remote stream, reason: {}",
remote_stream.status().ToString());
return std::make_shared<milvus::storage::RemoteOutputStream>(
std::move(remote_stream.ValueOrDie()));
}
bool
DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
return AddFileInternal(file,
@ -153,6 +194,15 @@ DiskFileManagerImpl::AddFile(const std::string& file) noexcept {
});
}
bool
DiskFileManagerImpl::AddFileMeta(const FileMeta& file_meta) {
auto local_file_name = GetFileName(file_meta.file_path);
auto remote_file_path = GetRemoteIndexPathV2(local_file_name);
remote_paths_to_size_[remote_file_path] = file_meta.file_size;
return true;
}
bool
DiskFileManagerImpl::AddJsonKeyIndexLog(const std::string& file) noexcept {
return AddFileInternal(
@ -883,4 +933,9 @@ DiskFileManagerImpl::CacheRawDataToDisk<bfloat16>(const Config& config);
template std::string
DiskFileManagerImpl::CacheRawDataToDisk<bin1>(const Config& config);
std::string
DiskFileManagerImpl::GetRemoteIndexFilePrefixV2() const {
return FileManagerImpl::GetRemoteIndexFilePrefixV2();
}
} // namespace milvus::storage

View File

@ -43,12 +43,21 @@ class DiskFileManagerImpl : public FileManagerImpl {
bool
AddFile(const std::string& filename) noexcept override;
bool
AddFileMeta(const FileMeta& file_meta) override;
std::optional<bool>
IsExisted(const std::string& filename) noexcept override;
bool
RemoveFile(const std::string& filename) noexcept override;
std::shared_ptr<InputStream>
OpenInputStream(const std::string& filename) override;
std::shared_ptr<OutputStream>
OpenOutputStream(const std::string& filename) override;
public:
bool
AddTextLog(const std::string& filename) noexcept;
@ -164,6 +173,9 @@ class DiskFileManagerImpl : public FileManagerImpl {
std::string
GetFileName(const std::string& localfile);
std::string
GetRemoteIndexFilePrefixV2() const override;
private:
int64_t
GetIndexBuildId() {
@ -173,6 +185,14 @@ class DiskFileManagerImpl : public FileManagerImpl {
std::string
GetRemoteIndexPath(const std::string& file_name, int64_t slice_num) const;
/**
* @brief Get the Remote Index Path V2
* @param file_name; v2 will not split the file with slice_num
* @return std::string
*/
std::string
GetRemoteIndexPathV2(const std::string& file_name) const;
std::string
GetRemoteTextLogPath(const std::string& file_name, int64_t slice_num) const;

View File

@ -22,11 +22,11 @@
#include "common/Consts.h"
#include "boost/filesystem/path.hpp"
#include "knowhere/file_manager.h"
#include "log/Log.h"
#include "storage/ChunkManager.h"
#include "storage/Types.h"
#include "milvus-storage/filesystem/fs.h"
#include "filemanager/FileManager.h"
namespace milvus::storage {
@ -75,7 +75,7 @@ struct FileManagerContext {
return false;
#define FILEMANAGER_END }
class FileManagerImpl : public knowhere::FileManager {
class FileManagerImpl : public milvus::FileManager {
public:
explicit FileManagerImpl(const FieldDataMeta& field_mata,
IndexMeta index_meta)
@ -90,7 +90,7 @@ class FileManagerImpl : public knowhere::FileManager {
* @return false if any error, or return true.
*/
virtual bool
LoadFile(const std::string& filename) noexcept = 0;
LoadFile(const std::string& filename) override = 0;
/**
* @brief Add file to FileManager to manipulate it.
@ -99,7 +99,7 @@ class FileManagerImpl : public knowhere::FileManager {
* @return false if any error, or return true.
*/
virtual bool
AddFile(const std::string& filename) noexcept = 0;
AddFile(const std::string& filename) override = 0;
/**
* @brief Check if a file exists.
@ -108,7 +108,7 @@ class FileManagerImpl : public knowhere::FileManager {
* @return std::nullopt if any error, or return if the file exists.
*/
virtual std::optional<bool>
IsExisted(const std::string& filename) noexcept = 0;
IsExisted(const std::string& filename) override = 0;
/**
* @brief Delete a file from FileManager.
@ -117,7 +117,16 @@ class FileManagerImpl : public knowhere::FileManager {
* @return false if any error, or return true.
*/
virtual bool
RemoveFile(const std::string& filename) noexcept = 0;
RemoveFile(const std::string& filename) override = 0;
virtual bool
AddFileMeta(const FileMeta& file_meta) override = 0;
virtual std::shared_ptr<InputStream>
OpenInputStream(const std::string& filename) override = 0;
virtual std::shared_ptr<OutputStream>
OpenOutputStream(const std::string& filename) override = 0;
public:
virtual std::string
@ -159,6 +168,17 @@ class FileManagerImpl : public knowhere::FileManager {
std::to_string(field_meta_.segment_id);
}
virtual std::string
GetRemoteIndexFilePrefixV2() const {
boost::filesystem::path bucket = rcm_->GetBucketName();
std::string v1_prefix = GetRemoteIndexObjectPrefix();
if (bucket.empty()) {
return v1_prefix;
} else {
return (bucket / v1_prefix).string();
}
}
virtual std::string
GetRemoteTextLogPrefix() const {
boost::filesystem::path prefix = rcm_->GetRootPath();

View File

@ -133,6 +133,11 @@ class LocalChunkManager : public ChunkManager {
int64_t
GetSizeOfDir(const std::string& dir);
virtual std::string
GetBucketName() const {
return "";
}
private:
std::string path_prefix_;
};

View File

@ -84,6 +84,21 @@ MemFileManagerImpl::AddBinarySet(const BinarySet& binary_set,
return true;
}
std::shared_ptr<InputStream>
MemFileManagerImpl::OpenInputStream(const std::string& filename) {
return nullptr;
}
std::shared_ptr<OutputStream>
MemFileManagerImpl::OpenOutputStream(const std::string& filename) {
return nullptr;
}
bool
MemFileManagerImpl::AddFileMeta(const FileMeta& file_meta) {
return true;
}
bool
MemFileManagerImpl::AddFile(const BinarySet& binary_set) {
return AddBinarySet(binary_set, GetRemoteIndexObjectPrefix());

View File

@ -47,6 +47,15 @@ class MemFileManagerImpl : public FileManagerImpl {
virtual bool
RemoveFile(const std::string& filename) noexcept;
virtual bool
AddFileMeta(const FileMeta& file_meta) override;
virtual std::shared_ptr<InputStream>
OpenInputStream(const std::string& filename) override;
virtual std::shared_ptr<OutputStream>
OpenOutputStream(const std::string& filename) override;
public:
virtual std::string
GetName() const {

View File

@ -163,8 +163,8 @@ class MinioChunkManager : public ChunkManager {
return remote_root_path_;
}
inline std::string
GetBucketName() {
virtual std::string
GetBucketName() const {
return default_bucket_name_;
}

View File

@ -0,0 +1,66 @@
#include <unistd.h>
#include "RemoteInputStream.h"
#include "common/Consts.h"
#include "common/EasyAssert.h"
namespace milvus::storage {
RemoteInputStream::RemoteInputStream(
std::shared_ptr<arrow::io::RandomAccessFile>&& remote_file)
: remote_file_(std::move(remote_file)) {
auto status = remote_file_->GetSize();
AssertInfo(status.ok(), "Failed to get size of remote file");
file_size_ = static_cast<size_t>(status.ValueOrDie());
}
size_t
RemoteInputStream::Read(void* data, size_t size) {
auto status = remote_file_->Read(size, data);
AssertInfo(status.ok(), "Failed to read from input stream");
return static_cast<size_t>(status.ValueOrDie());
}
size_t
RemoteInputStream::Read(int fd, size_t size) {
size_t read_batch_size =
std::min(size, static_cast<size_t>(DEFAULT_INDEX_FILE_SLICE_SIZE));
size_t rest_size = size;
std::vector<uint8_t> data(read_batch_size);
while (rest_size > 0) {
size_t read_size = std::min(rest_size, read_batch_size);
auto status = remote_file_->Read(read_size, data.data());
AssertInfo(status.ok(), "Failed to read from input stream");
ssize_t ret = ::write(fd, data.data(), read_size);
AssertInfo(ret == static_cast<ssize_t>(read_size),
"Failed to write to file");
::fsync(fd);
rest_size -= read_size;
}
return size;
}
size_t
RemoteInputStream::Tell() const {
auto status = remote_file_->Tell();
AssertInfo(status.ok(), "Failed to tell input stream");
return static_cast<size_t>(status.ValueOrDie());
}
bool
RemoteInputStream::Eof() const {
return Tell() >= file_size_;
}
bool
RemoteInputStream::Seek(int64_t offset) {
auto status = remote_file_->Seek(offset);
return status.ok();
}
size_t
RemoteInputStream::Size() const {
return file_size_;
}
} // namespace milvus::storage

View File

@ -0,0 +1,49 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include "filemanager/InputStream.h"
#include "milvus-storage/filesystem/fs.h"
namespace milvus::storage {
class RemoteInputStream : public milvus::InputStream {
public:
explicit RemoteInputStream(
std::shared_ptr<arrow::io::RandomAccessFile>&& remote_file);
~RemoteInputStream() override = default;
size_t
Size() const override;
size_t
Read(void* data, size_t size) override;
size_t
Read(int fd, size_t size) override;
size_t
Tell() const override;
bool
Eof() const override;
bool
Seek(int64_t offset) override;
private:
size_t file_size_;
std::shared_ptr<arrow::io::RandomAccessFile> remote_file_;
};
} // namespace milvus::storage

View File

@ -0,0 +1,47 @@
#include "RemoteOutputStream.h"
#include <cstddef>
#include <unistd.h>
#include "common/Consts.h"
#include "common/EasyAssert.h"
namespace milvus::storage {
RemoteOutputStream::RemoteOutputStream(
std::shared_ptr<arrow::io::OutputStream>&& output_stream)
: output_stream_(std::move(output_stream)) {
}
size_t
RemoteOutputStream::Tell() const {
auto status = output_stream_->Tell();
AssertInfo(status.ok(), "Failed to tell output stream");
return status.ValueOrDie();
}
size_t
RemoteOutputStream::Write(const void* data, size_t size) {
auto status = output_stream_->Write(data, size);
AssertInfo(status.ok(), "Failed to write to output stream");
return size;
}
size_t
RemoteOutputStream::Write(int fd, size_t size) {
size_t read_batch_size =
std::min(size, static_cast<size_t>(DEFAULT_INDEX_FILE_SLICE_SIZE));
size_t rest_size = size;
std::vector<uint8_t> data(read_batch_size);
while (rest_size > 0) {
size_t read_size = std::min(rest_size, read_batch_size);
auto read_status = ::read(fd, data.data(), read_size);
AssertInfo(read_status == static_cast<ssize_t>(read_size),
"Failed to read from file");
auto write_status = output_stream_->Write(data.data(), read_size);
AssertInfo(write_status.ok(), "Failed to write to output stream");
rest_size -= read_size;
}
return size;
}
} // namespace milvus::storage

View File

@ -0,0 +1,39 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#pragma once
#include "filemanager/OutputStream.h"
#include "milvus-storage/filesystem/fs.h"
namespace milvus::storage {
class RemoteOutputStream : public milvus::OutputStream {
public:
explicit RemoteOutputStream(
std::shared_ptr<arrow::io::OutputStream>&& output_stream);
~RemoteOutputStream() override = default;
size_t
Tell() const override;
size_t
Write(const void* data, size_t size) override;
size_t
Write(int fd, size_t size) override;
private:
std::shared_ptr<arrow::io::OutputStream> output_stream_;
};
} // namespace milvus::storage

View File

@ -24,6 +24,7 @@
#include <thread>
#include <vector>
#include <utility>
#include <cassert>
#include "SafeQueue.h"
#include "log/Log.h"

View File

@ -116,8 +116,8 @@ class AzureChunkManager : public ChunkManager {
return path_prefix_;
}
inline std::string
GetBucketName() {
virtual std::string
GetBucketName() const {
return default_bucket_name_;
}

View File

@ -108,8 +108,8 @@ class GcpNativeChunkManager : public ChunkManager {
}
public:
inline std::string
GetBucketName() {
virtual std::string
GetBucketName() const {
return default_bucket_name_;
}

View File

@ -79,6 +79,11 @@ class OpenDALChunkManager : public ChunkManager {
return remote_root_path_;
}
virtual std::string
GetBucketName() const {
return default_bucket_name_;
}
private:
std::string default_bucket_name_;
std::string remote_root_path_;

View File

@ -29,6 +29,7 @@ set(FETCHCONTENT_QUIET OFF)
# Find pthreads
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
add_subdirectory(milvus-common)
add_subdirectory(knowhere)
@ -46,4 +47,4 @@ if (LINUX)
endif()
add_subdirectory(milvus-storage)
add_subdirectory(milvus-common)

View File

@ -14,7 +14,7 @@
# Update KNOWHERE_VERSION for the first occurrence
milvus_add_pkg_config("knowhere")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( KNOWHERE_VERSION v2.6.0 )
set( KNOWHERE_VERSION v2.6.1-rc )
set( GIT_REPOSITORY "https://github.com/zilliztech/knowhere.git")
message(STATUS "Knowhere repo: ${GIT_REPOSITORY}")

View File

@ -13,8 +13,8 @@
milvus_add_pkg_config("milvus-common")
set_property(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} PROPERTY INCLUDE_DIRECTORIES "")
set( MILVUS-COMMON-VERSION be8f319 )
set( GIT_REPOSITORY "https://github.com/zilliztech/milvus-common.git")
set( MILVUS-COMMON-VERSION 41fa9b1 )
set( GIT_REPOSITORY "https://github.com/zilliztech/milvus-common.git")
message(STATUS "milvus-common repo: ${GIT_REPOSITORY}")
message(STATUS "milvus-common version: ${MILVUS-COMMON-VERSION}")

View File

@ -118,6 +118,109 @@ TEST_F(DiskAnnFileManagerTest, AddFilePositiveParallel) {
}
}
TEST_F(DiskAnnFileManagerTest, ReadAndWriteWithStream) {
auto conf = milvus_storage::ArrowFileSystemConfig();
conf.storage_type = "local";
conf.root_path = "/tmp";
milvus_storage::ArrowFileSystemSingleton::GetInstance().Init(conf);
auto lcm = LocalChunkManagerSingleton::GetInstance().GetChunkManager();
std::string small_index_file_path =
"/tmp/diskann/index_files/1000/small_index_file";
std::string large_index_file_path =
"/tmp/diskann/index_files/1000/large_index_file";
auto exist = lcm->Exist(large_index_file_path);
std::string index_file_path = "/tmp/diskann/index_files/1000/index_file";
boost::filesystem::path localPath(index_file_path);
auto local_file_name = localPath.filename().string();
EXPECT_EQ(exist, false);
uint64_t large_index_size = 50 << 20;
lcm->CreateFile(large_index_file_path);
std::vector<uint8_t> large_data(large_index_size);
for (size_t i = 0; i < large_index_size; i++) {
large_data[i] = i % 255;
}
lcm->Write(large_index_file_path, large_data.data(), large_index_size);
uint64_t small_index_size = 10 << 20;
lcm->CreateFile(small_index_file_path);
std::vector<uint8_t> small_data(small_index_size);
for (size_t i = 0; i < small_index_size; i++) {
small_data[i] = i % 255;
}
lcm->Write(small_index_file_path, small_data.data(), small_index_size);
// collection_id: 1, partition_id: 2, segment_id: 3
// field_id: 100, index_build_id: 1000, index_version: 1
FieldDataMeta filed_data_meta = {1, 2, 3, 100};
IndexMeta index_meta = {3, 100, 1000, 1, "index"};
auto diskAnnFileManager = std::make_shared<DiskFileManagerImpl>(
storage::FileManagerContext(filed_data_meta, index_meta, cm_));
auto os = diskAnnFileManager->OpenOutputStream(index_file_path);
size_t write_offset = 0;
os->Write(large_index_size);
write_offset += sizeof(large_index_size);
EXPECT_EQ(os->Tell(), write_offset);
os->Write(large_data.data(), large_index_size);
write_offset += large_index_size;
EXPECT_EQ(os->Tell(), write_offset);
os->Write(small_index_size);
write_offset += sizeof(small_index_size);
EXPECT_EQ(os->Tell(), write_offset);
int fd = open(small_index_file_path.c_str(), O_RDONLY);
ASSERT_NE(fd, -1);
os->Write(fd, small_index_size);
write_offset += small_index_size;
close(fd);
EXPECT_EQ(os->Tell(), write_offset);
auto is = diskAnnFileManager->OpenInputStream(index_file_path);
size_t read_offset = 0;
size_t read_large_index_size;
is->Read(read_large_index_size);
read_offset += sizeof(read_large_index_size);
EXPECT_EQ(read_large_index_size, large_index_size);
EXPECT_EQ(is->Tell(), read_offset);
std::vector<uint8_t> read_large_data(read_large_index_size);
is->Read(read_large_data.data(), read_large_index_size);
EXPECT_EQ(
memcmp(
read_large_data.data(), large_data.data(), read_large_index_size),
0);
read_offset += read_large_index_size;
EXPECT_EQ(is->Tell(), read_offset);
size_t read_small_index_size;
is->Read(read_small_index_size);
read_offset += sizeof(read_small_index_size);
EXPECT_EQ(read_small_index_size, small_index_size);
EXPECT_EQ(is->Tell(), read_offset);
std::string small_index_file_path_read =
"/tmp/diskann/index_files/1000/small_index_file_read";
lcm->CreateFile(small_index_file_path_read);
int fd_read = open(small_index_file_path_read.c_str(), O_WRONLY);
ASSERT_NE(fd_read, -1);
is->Read(fd_read, small_index_size);
close(fd_read);
std::vector<uint8_t> read_small_data(read_small_index_size);
lcm->Read(small_index_file_path_read,
read_small_data.data(),
read_small_index_size);
EXPECT_EQ(
memcmp(
read_small_data.data(), small_data.data(), read_small_index_size),
0);
read_offset += read_small_index_size;
EXPECT_EQ(is->Tell(), read_offset);
lcm->Remove(small_index_file_path_read);
lcm->Remove(large_index_file_path);
lcm->Remove(small_index_file_path);
}
int
test_worker(string s) {
std::cout << s << std::endl;

View File

@ -742,7 +742,8 @@ TEST_P(TaskTest, Test_MultiNotEqualConvert) {
EXPECT_EQ(inputs.size(), 1);
EXPECT_STREQ(inputs[0]->name().c_str(), "PhyLogicalUnaryExpr");
EXPECT_EQ(inputs[0]->GetInputsRef().size(), 1);
EXPECT_STREQ(inputs[0]->GetInputsRef()[0]->name().c_str(), "PhyTermFilterExpr");
EXPECT_STREQ(inputs[0]->GetInputsRef()[0]->name().c_str(),
"PhyTermFilterExpr");
}
{
@ -852,12 +853,15 @@ TEST_P(TaskTest, Test_MultiNotEqualConvert) {
EXPECT_EQ(inputs.size(), 2);
EXPECT_STREQ(inputs[0]->name().c_str(), "PhyConjunctFilterExpr");
EXPECT_STREQ(inputs[1]->name().c_str(), "PhyLogicalUnaryExpr");
auto phy_expr1 = std::static_pointer_cast<milvus::exec::PhyLogicalUnaryExpr>(
inputs[1]);
auto phy_expr1 =
std::static_pointer_cast<milvus::exec::PhyLogicalUnaryExpr>(
inputs[1]);
EXPECT_EQ(phy_expr1->GetInputsRef().size(), 1);
EXPECT_STREQ(phy_expr1->GetInputsRef()[0]->name().c_str(), "PhyTermFilterExpr");
phy_expr = std::static_pointer_cast<milvus::exec::PhyConjunctFilterExpr>(
inputs[0]);
EXPECT_STREQ(phy_expr1->GetInputsRef()[0]->name().c_str(),
"PhyTermFilterExpr");
phy_expr =
std::static_pointer_cast<milvus::exec::PhyConjunctFilterExpr>(
inputs[0]);
inputs = phy_expr->GetInputsRef();
EXPECT_EQ(inputs.size(), 2);
}
@ -908,12 +912,13 @@ TEST_P(TaskTest, Test_MultiNotEqualConvert) {
auto inputs = phy_expr->GetInputsRef();
EXPECT_EQ(inputs.size(), 1);
EXPECT_STREQ(inputs[0]->name().c_str(), "PhyLogicalUnaryExpr");
auto phy_expr1 = std::static_pointer_cast<milvus::exec::PhyLogicalUnaryExpr>(
inputs[0]);
auto phy_expr1 =
std::static_pointer_cast<milvus::exec::PhyLogicalUnaryExpr>(
inputs[0]);
EXPECT_EQ(phy_expr1->GetInputsRef().size(), 1);
EXPECT_STREQ(phy_expr1->GetInputsRef()[0]->name().c_str(), "PhyTermFilterExpr");
EXPECT_STREQ(phy_expr1->GetInputsRef()[0]->name().c_str(),
"PhyTermFilterExpr");
}
}
TEST_P(TaskTest, Test_MultiInConvert) {