mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Support range executor
Signed-off-by: FluorineDog <guilin.gou@zilliz.com>
This commit is contained in:
parent
73e3ff09f5
commit
9f35dee2df
@ -433,7 +433,6 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) {
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
int64_t
|
||||
SegmentSmallIndex::GetMemoryUsageInBytes() {
|
||||
int64_t total_bytes = 0;
|
||||
|
||||
@ -52,7 +52,7 @@ class SegmentSmallIndex : public SegmentBase {
|
||||
// TODO: originally, id should be put into data_chunk
|
||||
// TODO: Is it ok to put them the other side?
|
||||
Status
|
||||
Insert(int64_t reserverd_offset,
|
||||
Insert(int64_t reserved_offset,
|
||||
int64_t size,
|
||||
const int64_t* row_ids,
|
||||
const Timestamp* timestamps,
|
||||
@ -85,7 +85,7 @@ class SegmentSmallIndex : public SegmentBase {
|
||||
// using IndexMode = knowhere::IndexMode;
|
||||
// using IndexConfig = knowhere::Config;
|
||||
// BuildIndex With Paramaters, must with Frozen State
|
||||
// NOTE: index_params contains serveral policies for several index
|
||||
// NOTE: index_params contains several policies for several index
|
||||
// TODO: currently, index has to be set at startup, and can't be modified
|
||||
// AddIndex and DropIndex will be added later
|
||||
Status
|
||||
|
||||
@ -16,4 +16,4 @@ upper_div(int64_t value, int64_t align) {
|
||||
return groups;
|
||||
}
|
||||
|
||||
}
|
||||
} // namespace milvus
|
||||
|
||||
@ -177,7 +177,7 @@ TEST(Expr, TestRange) {
|
||||
|
||||
auto seg_promote = dynamic_cast<SegmentSmallIndex*>(seg.get());
|
||||
ExecExprVisitor visitor(*seg_promote);
|
||||
for (auto [clause, ref_func]: testcases) {
|
||||
for (auto [clause, ref_func] : testcases) {
|
||||
auto loc = dsl_string_tmp.find("@@@@");
|
||||
auto dsl_string = dsl_string_tmp;
|
||||
dsl_string.replace(loc, 4, clause);
|
||||
@ -192,8 +192,7 @@ TEST(Expr, TestRange) {
|
||||
|
||||
auto val = age_col[i];
|
||||
auto ref = !ref_func(val);
|
||||
ASSERT_EQ(ans, ref) << clause << "@" << i
|
||||
<< "!!" << val;
|
||||
ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -187,7 +187,7 @@ func TestMaster_CollectionTask(t *testing.T) {
|
||||
assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode)
|
||||
|
||||
// CreateCollection Test
|
||||
collMeta, err := svr.metaTable.GetCollectionByName(sch.Name)
|
||||
collMeta, err := svr.mt.GetCollectionByName(sch.Name)
|
||||
assert.Nil(t, err)
|
||||
t.Logf("collection id = %d", collMeta.ID)
|
||||
assert.Equal(t, collMeta.CreateTime, uint64(11))
|
||||
@ -298,7 +298,7 @@ func TestMaster_CollectionTask(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
|
||||
|
||||
collMeta, err = svr.metaTable.GetCollectionByName(sch.Name)
|
||||
collMeta, err = svr.mt.GetCollectionByName(sch.Name)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
// HasCollection "col1" is false
|
||||
|
||||
@ -16,7 +16,7 @@ func (s *Master) CreateCollection(ctx context.Context, in *internalpb.CreateColl
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
}
|
||||
@ -46,7 +46,7 @@ func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollecti
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
}
|
||||
@ -76,7 +76,7 @@ func (s *Master) HasCollection(ctx context.Context, in *internalpb.HasCollection
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
hasCollection: false,
|
||||
@ -113,7 +113,7 @@ func (s *Master) DescribeCollection(ctx context.Context, in *internalpb.Describe
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
description: nil,
|
||||
@ -149,7 +149,7 @@ func (s *Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollect
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
stringListResponse: nil,
|
||||
@ -187,7 +187,7 @@ func (s *Master) CreatePartition(ctx context.Context, in *internalpb.CreateParti
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
}
|
||||
@ -218,7 +218,7 @@ func (s *Master) DropPartition(ctx context.Context, in *internalpb.DropPartition
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
}
|
||||
@ -249,7 +249,7 @@ func (s *Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRe
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
hasPartition: false,
|
||||
@ -290,7 +290,7 @@ func (s *Master) DescribePartition(ctx context.Context, in *internalpb.DescribeP
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
description: nil,
|
||||
@ -328,7 +328,7 @@ func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio
|
||||
req: in,
|
||||
baseTask: baseTask{
|
||||
sch: s.scheduler,
|
||||
mt: s.metaTable,
|
||||
mt: s.mt,
|
||||
cv: make(chan error),
|
||||
},
|
||||
stringListResponse: nil,
|
||||
|
||||
@ -131,7 +131,7 @@ func TestMaster_CreateCollection(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
|
||||
|
||||
collMeta, err := svr.metaTable.GetCollectionByName(sch.Name)
|
||||
collMeta, err := svr.mt.GetCollectionByName(sch.Name)
|
||||
assert.Nil(t, err)
|
||||
t.Logf("collection id = %d", collMeta.ID)
|
||||
assert.Equal(t, collMeta.CreateTime, uint64(11))
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/errors"
|
||||
"github.com/zilliztech/milvus-distributed/internal/kv"
|
||||
ms "github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"
|
||||
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
|
||||
@ -71,22 +72,26 @@ type Master struct {
|
||||
|
||||
//grpc server
|
||||
grpcServer *grpc.Server
|
||||
grpcErr chan error
|
||||
|
||||
kvBase *kv.EtcdKV
|
||||
scheduler *ddRequestScheduler
|
||||
metaTable *metaTable
|
||||
timesSyncMsgProducer *timeSyncMsgProducer
|
||||
// chans
|
||||
ssChan chan internalpb.SegmentStats
|
||||
|
||||
grpcErr chan error
|
||||
|
||||
kvBase *kv.EtcdKV
|
||||
scheduler *ddRequestScheduler
|
||||
mt *metaTable
|
||||
tsmp *timeSyncMsgProducer
|
||||
|
||||
// tso ticker
|
||||
tsoTicker *time.Ticker
|
||||
tsTicker *time.Ticker
|
||||
|
||||
// Add callback functions at different stages
|
||||
startCallbacks []func()
|
||||
closeCallbacks []func()
|
||||
|
||||
segmentMgr *SegmentManager
|
||||
segmentStatusMsg ms.MsgStream
|
||||
segmentMgr *SegmentManager
|
||||
statsMs ms.MsgStream
|
||||
|
||||
//id allocator
|
||||
idAllocator *GlobalIDAllocator
|
||||
@ -123,7 +128,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
|
||||
}
|
||||
|
||||
//timeSyncMsgProducer
|
||||
tsMsgProducer, err := NewTimeSyncMsgProducer(ctx)
|
||||
tsmp, err := NewTimeSyncMsgProducer(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -133,7 +138,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
|
||||
pulsarProxyStream.Start()
|
||||
var proxyStream ms.MsgStream = pulsarProxyStream
|
||||
proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval)
|
||||
tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier)
|
||||
tsmp.SetProxyTtBarrier(proxyTimeTickBarrier)
|
||||
|
||||
pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream
|
||||
pulsarWriteStream.SetPulsarClient(opt.PulsarAddr)
|
||||
@ -141,17 +146,17 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
|
||||
pulsarWriteStream.Start()
|
||||
var writeStream ms.MsgStream = pulsarWriteStream
|
||||
writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs)
|
||||
tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier)
|
||||
tsmp.SetWriteNodeTtBarrier(writeTimeTickBarrier)
|
||||
|
||||
pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
|
||||
pulsarDMStream.SetPulsarClient(opt.PulsarAddr)
|
||||
pulsarDMStream.CreatePulsarProducers(opt.PulsarDMChannels)
|
||||
tsMsgProducer.SetDMSyncStream(pulsarDMStream)
|
||||
tsmp.SetDMSyncStream(pulsarDMStream)
|
||||
|
||||
pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream
|
||||
pulsarK2SStream.SetPulsarClient(opt.PulsarAddr)
|
||||
pulsarK2SStream.CreatePulsarProducers(opt.PulsarK2SChannels)
|
||||
tsMsgProducer.SetK2sSyncStream(pulsarK2SStream)
|
||||
tsmp.SetK2sSyncStream(pulsarK2SStream)
|
||||
|
||||
// stats msg stream
|
||||
statsMs := ms.NewPulsarMsgStream(ctx, 1024)
|
||||
@ -160,13 +165,14 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) {
|
||||
statsMs.Start()
|
||||
|
||||
m := &Master{
|
||||
ctx: ctx,
|
||||
startTimestamp: time.Now().Unix(),
|
||||
kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr),
|
||||
metaTable: metakv,
|
||||
timesSyncMsgProducer: tsMsgProducer,
|
||||
grpcErr: make(chan error),
|
||||
segmentStatusMsg: statsMs,
|
||||
ctx: ctx,
|
||||
startTimestamp: time.Now().Unix(),
|
||||
kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr),
|
||||
mt: metakv,
|
||||
tsmp: tsmp,
|
||||
ssChan: make(chan internalpb.SegmentStats, 10),
|
||||
grpcErr: make(chan error),
|
||||
statsMs: statsMs,
|
||||
}
|
||||
|
||||
//init idAllocator
|
||||
@ -264,7 +270,7 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
|
||||
//go s.Se
|
||||
|
||||
s.serverLoopWg.Add(1)
|
||||
if err := s.timesSyncMsgProducer.Start(); err != nil {
|
||||
if err := s.tsmp.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -288,7 +294,7 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error {
|
||||
}
|
||||
|
||||
func (s *Master) stopServerLoop() {
|
||||
s.timesSyncMsgProducer.Close()
|
||||
s.tsmp.Close()
|
||||
s.serverLoopWg.Done()
|
||||
|
||||
if s.grpcServer != nil {
|
||||
@ -334,13 +340,13 @@ func (s *Master) grpcLoop(grpcPort int64) {
|
||||
|
||||
func (s *Master) tsLoop() {
|
||||
defer s.serverLoopWg.Done()
|
||||
s.tsoTicker = time.NewTicker(UpdateTimestampStep)
|
||||
defer s.tsoTicker.Stop()
|
||||
s.tsTicker = time.NewTicker(UpdateTimestampStep)
|
||||
defer s.tsTicker.Stop()
|
||||
ctx, cancel := context.WithCancel(s.serverLoopCtx)
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
case <-s.tsoTicker.C:
|
||||
case <-s.tsTicker.C:
|
||||
if err := s.tsoAllocator.UpdateTSO(); err != nil {
|
||||
log.Println("failed to update timestamp", err)
|
||||
return
|
||||
@ -386,13 +392,13 @@ func (s *Master) tasksExecutionLoop() {
|
||||
|
||||
func (s *Master) segmentStatisticsLoop() {
|
||||
defer s.serverLoopWg.Done()
|
||||
defer s.segmentStatusMsg.Close()
|
||||
defer s.statsMs.Close()
|
||||
ctx, cancel := context.WithCancel(s.serverLoopCtx)
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case msg := <-s.segmentStatusMsg.Chan():
|
||||
case msg := <-s.statsMs.Chan():
|
||||
err := s.segmentMgr.HandleQueryNodeMsgPack(msg)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
||||
@ -105,7 +105,7 @@ func (mt *metaTable) reloadFromKV() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// metaTable.ddLock.Lock() before call this function
|
||||
// mt.ddLock.Lock() before call this function
|
||||
func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error {
|
||||
collBytes, err := proto.Marshal(coll)
|
||||
if err != nil {
|
||||
@ -116,7 +116,7 @@ func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error {
|
||||
return mt.client.Save("/collection/"+strconv.FormatInt(coll.ID, 10), string(collBytes))
|
||||
}
|
||||
|
||||
// metaTable.ddLock.Lock() before call this function
|
||||
// mt.ddLock.Lock() before call this function
|
||||
func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
|
||||
segBytes, err := proto.Marshal(seg)
|
||||
if err != nil {
|
||||
@ -128,7 +128,7 @@ func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error {
|
||||
return mt.client.Save("/segment/"+strconv.FormatInt(seg.SegmentID, 10), string(segBytes))
|
||||
}
|
||||
|
||||
// metaTable.ddLock.Lock() before call this function
|
||||
// mt.ddLock.Lock() before call this function
|
||||
func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIDs []UniqueID) error {
|
||||
segIDStrs := make([]string, 0, len(segIDs))
|
||||
for _, segID := range segIDs {
|
||||
@ -156,7 +156,7 @@ func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta
|
||||
return mt.client.MultiSaveAndRemove(kvs, segIDStrs)
|
||||
}
|
||||
|
||||
// metaTable.ddLock.Lock() before call this function
|
||||
// mt.ddLock.Lock() before call this function
|
||||
func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg *pb.SegmentMeta) error {
|
||||
kvs := make(map[string]string)
|
||||
collBytes, err := proto.Marshal(coll)
|
||||
@ -179,7 +179,7 @@ func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg
|
||||
return mt.client.MultiSave(kvs)
|
||||
}
|
||||
|
||||
// metaTable.ddLock.Lock() before call this function
|
||||
// mt.ddLock.Lock() before call this function
|
||||
func (mt *metaTable) deleteCollectionsAndSegmentsMeta(collID UniqueID, segIDs []UniqueID) error {
|
||||
collIDStr := "/collection/" + strconv.FormatInt(collID, 10)
|
||||
|
||||
|
||||
@ -168,7 +168,7 @@ func TestMaster_Partition(t *testing.T) {
|
||||
assert.NotNil(t, st)
|
||||
assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode)
|
||||
|
||||
collMeta, err := svr.metaTable.GetCollectionByName(sch.Name)
|
||||
collMeta, err := svr.mt.GetCollectionByName(sch.Name)
|
||||
assert.Nil(t, err)
|
||||
t.Logf("collection id = %d", collMeta.ID)
|
||||
assert.Equal(t, collMeta.CreateTime, uint64(1))
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user