milvus/internal/core/src/segcore/TimestampIndex.cpp
FluorineDog 255e3959af
support time travel (#5894)
* support time travel

Signed-off-by: fluorinedog <fluorinedog@gmail.com>

* lint

Signed-off-by: fluorinedog <fluorinedog@gmail.com>
2021-06-19 17:38:11 +08:00

105 lines
3.9 KiB
C++

// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <segcore/TimestampIndex.h>
namespace milvus::segcore {
void
TimestampIndex::set_length_meta(std::vector<int64_t> lengths) {
lengths_ = std::move(lengths);
}
void
TimestampIndex::build_with(const Timestamp* timestamps, int64_t size) {
auto num_slice = lengths_.size();
Assert(num_slice > 0);
std::vector<int64_t> prefix_sums;
int offset = 0;
prefix_sums.push_back(offset);
std::vector<Timestamp> timestamp_barriers;
Timestamp last_max_v = 0;
for (int slice_id = 0; slice_id < num_slice; ++slice_id) {
auto length = lengths_[slice_id];
auto [min_v, max_v] = std::minmax_element(timestamps + offset, timestamps + offset + length);
Assert(last_max_v <= *min_v);
offset += length;
prefix_sums.push_back(offset);
timestamp_barriers.push_back(*min_v);
last_max_v = *max_v;
}
timestamp_barriers.push_back(last_max_v);
Assert(std::is_sorted(timestamp_barriers.begin(), timestamp_barriers.end()));
Assert(offset == size);
auto min_ts = timestamp_barriers[0];
this->size_ = size;
this->start_locs_ = std::move(prefix_sums);
this->min_timestamp_ = min_ts;
this->max_timestamp_ = last_max_v;
this->timestamp_barriers_ = std::move(timestamp_barriers);
}
std::pair<int64_t, int64_t>
TimestampIndex::get_active_range(Timestamp query_timestamp) const {
if (query_timestamp >= max_timestamp_) {
// most common case
return {size_, size_};
}
if (query_timestamp < min_timestamp_) {
return {0, 0};
}
auto iter = std::upper_bound(timestamp_barriers_.begin(), timestamp_barriers_.end(), query_timestamp);
int block_id = (iter - timestamp_barriers_.begin()) - 1;
Assert(0 <= block_id && block_id < lengths_.size());
return {start_locs_[block_id], start_locs_[block_id + 1]};
}
boost::dynamic_bitset<>
TimestampIndex::GenerateBitset(Timestamp query_timestamp,
std::pair<int64_t, int64_t> active_range,
const Timestamp* timestamps,
int64_t size) {
auto [beg, end] = active_range;
Assert(beg < end);
boost::dynamic_bitset<> bitset;
bitset.reserve(size);
bitset.resize(beg, true);
bitset.resize(size, false);
for (int64_t i = beg; i < end; ++i) {
bitset[i] = timestamps[i] <= query_timestamp;
}
return bitset;
}
std::vector<int64_t>
GenerateFakeSlices(const Timestamp* timestamps, int64_t size, int min_slice_length) {
assert(min_slice_length >= 1);
std::vector<int64_t> results;
std::vector<int64_t> min_values(size);
Timestamp value = std::numeric_limits<Timestamp>::max();
for (int64_t i = 0; i < size; ++i) {
auto offset = size - 1 - i;
value = std::min(value, timestamps[offset]);
min_values[offset] = value;
}
value = std::numeric_limits<Timestamp>::min();
auto slice_length = 0;
for (int64_t i = 0; i < size; ++i) {
if (value <= min_values[i] && slice_length >= min_slice_length) {
// emit new slice
results.push_back(slice_length);
slice_length = 0;
}
value = std::max(value, timestamps[i]);
slice_length += 1;
}
results.push_back(slice_length);
return results;
}
} // namespace milvus::segcore