From 9f35dee2dfcf642d958a12d4b8912904b1cbf79e Mon Sep 17 00:00:00 2001 From: FluorineDog Date: Sat, 21 Nov 2020 15:56:13 +0800 Subject: [PATCH] Support range executor Signed-off-by: FluorineDog --- .../core/src/segcore/SegmentSmallIndex.cpp | 1 - internal/core/src/segcore/SegmentSmallIndex.h | 4 +- internal/core/src/utils/tools.h | 2 +- internal/core/unittest/test_expr.cpp | 5 +- internal/master/collection_task_test.go | 4 +- internal/master/grpc_service.go | 20 +++---- internal/master/grpc_service_test.go | 2 +- internal/master/master.go | 60 ++++++++++--------- internal/master/meta_table.go | 10 ++-- internal/master/partition_task_test.go | 2 +- 10 files changed, 57 insertions(+), 53 deletions(-) diff --git a/internal/core/src/segcore/SegmentSmallIndex.cpp b/internal/core/src/segcore/SegmentSmallIndex.cpp index 5508e3c4d0..47248e75c9 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.cpp +++ b/internal/core/src/segcore/SegmentSmallIndex.cpp @@ -433,7 +433,6 @@ SegmentSmallIndex::BuildIndex(IndexMetaPtr remote_index_meta) { #endif } - int64_t SegmentSmallIndex::GetMemoryUsageInBytes() { int64_t total_bytes = 0; diff --git a/internal/core/src/segcore/SegmentSmallIndex.h b/internal/core/src/segcore/SegmentSmallIndex.h index 27df2315b1..9bb3030e89 100644 --- a/internal/core/src/segcore/SegmentSmallIndex.h +++ b/internal/core/src/segcore/SegmentSmallIndex.h @@ -52,7 +52,7 @@ class SegmentSmallIndex : public SegmentBase { // TODO: originally, id should be put into data_chunk // TODO: Is it ok to put them the other side? Status - Insert(int64_t reserverd_offset, + Insert(int64_t reserved_offset, int64_t size, const int64_t* row_ids, const Timestamp* timestamps, @@ -85,7 +85,7 @@ class SegmentSmallIndex : public SegmentBase { // using IndexMode = knowhere::IndexMode; // using IndexConfig = knowhere::Config; // BuildIndex With Paramaters, must with Frozen State - // NOTE: index_params contains serveral policies for several index + // NOTE: index_params contains several policies for several index // TODO: currently, index has to be set at startup, and can't be modified // AddIndex and DropIndex will be added later Status diff --git a/internal/core/src/utils/tools.h b/internal/core/src/utils/tools.h index f309387310..2778a6dabf 100644 --- a/internal/core/src/utils/tools.h +++ b/internal/core/src/utils/tools.h @@ -16,4 +16,4 @@ upper_div(int64_t value, int64_t align) { return groups; } -} +} // namespace milvus diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 0c677ec87e..8763cf4c17 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -177,7 +177,7 @@ TEST(Expr, TestRange) { auto seg_promote = dynamic_cast(seg.get()); ExecExprVisitor visitor(*seg_promote); - for (auto [clause, ref_func]: testcases) { + for (auto [clause, ref_func] : testcases) { auto loc = dsl_string_tmp.find("@@@@"); auto dsl_string = dsl_string_tmp; dsl_string.replace(loc, 4, clause); @@ -192,8 +192,7 @@ TEST(Expr, TestRange) { auto val = age_col[i]; auto ref = !ref_func(val); - ASSERT_EQ(ans, ref) << clause << "@" << i - << "!!" << val; + ASSERT_EQ(ans, ref) << clause << "@" << i << "!!" << val; } } } \ No newline at end of file diff --git a/internal/master/collection_task_test.go b/internal/master/collection_task_test.go index a48d4e6fa6..5bbe4e0961 100644 --- a/internal/master/collection_task_test.go +++ b/internal/master/collection_task_test.go @@ -187,7 +187,7 @@ func TestMaster_CollectionTask(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode) // CreateCollection Test - collMeta, err := svr.metaTable.GetCollectionByName(sch.Name) + collMeta, err := svr.mt.GetCollectionByName(sch.Name) assert.Nil(t, err) t.Logf("collection id = %d", collMeta.ID) assert.Equal(t, collMeta.CreateTime, uint64(11)) @@ -298,7 +298,7 @@ func TestMaster_CollectionTask(t *testing.T) { assert.Nil(t, err) assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - collMeta, err = svr.metaTable.GetCollectionByName(sch.Name) + collMeta, err = svr.mt.GetCollectionByName(sch.Name) assert.NotNil(t, err) // HasCollection "col1" is false diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go index f4449001c2..54b261d301 100644 --- a/internal/master/grpc_service.go +++ b/internal/master/grpc_service.go @@ -16,7 +16,7 @@ func (s *Master) CreateCollection(ctx context.Context, in *internalpb.CreateColl req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, } @@ -46,7 +46,7 @@ func (s *Master) DropCollection(ctx context.Context, in *internalpb.DropCollecti req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, } @@ -76,7 +76,7 @@ func (s *Master) HasCollection(ctx context.Context, in *internalpb.HasCollection req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, hasCollection: false, @@ -113,7 +113,7 @@ func (s *Master) DescribeCollection(ctx context.Context, in *internalpb.Describe req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, description: nil, @@ -149,7 +149,7 @@ func (s *Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollect req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, stringListResponse: nil, @@ -187,7 +187,7 @@ func (s *Master) CreatePartition(ctx context.Context, in *internalpb.CreateParti req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, } @@ -218,7 +218,7 @@ func (s *Master) DropPartition(ctx context.Context, in *internalpb.DropPartition req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, } @@ -249,7 +249,7 @@ func (s *Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRe req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, hasPartition: false, @@ -290,7 +290,7 @@ func (s *Master) DescribePartition(ctx context.Context, in *internalpb.DescribeP req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, description: nil, @@ -328,7 +328,7 @@ func (s *Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitio req: in, baseTask: baseTask{ sch: s.scheduler, - mt: s.metaTable, + mt: s.mt, cv: make(chan error), }, stringListResponse: nil, diff --git a/internal/master/grpc_service_test.go b/internal/master/grpc_service_test.go index 72541293ca..aed7600440 100644 --- a/internal/master/grpc_service_test.go +++ b/internal/master/grpc_service_test.go @@ -131,7 +131,7 @@ func TestMaster_CreateCollection(t *testing.T) { assert.Nil(t, err) assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS) - collMeta, err := svr.metaTable.GetCollectionByName(sch.Name) + collMeta, err := svr.mt.GetCollectionByName(sch.Name) assert.Nil(t, err) t.Logf("collection id = %d", collMeta.ID) assert.Equal(t, collMeta.CreateTime, uint64(11)) diff --git a/internal/master/master.go b/internal/master/master.go index ebf5a69f73..9fdb164c4c 100644 --- a/internal/master/master.go +++ b/internal/master/master.go @@ -13,6 +13,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" @@ -71,22 +72,26 @@ type Master struct { //grpc server grpcServer *grpc.Server - grpcErr chan error - kvBase *kv.EtcdKV - scheduler *ddRequestScheduler - metaTable *metaTable - timesSyncMsgProducer *timeSyncMsgProducer + // chans + ssChan chan internalpb.SegmentStats + + grpcErr chan error + + kvBase *kv.EtcdKV + scheduler *ddRequestScheduler + mt *metaTable + tsmp *timeSyncMsgProducer // tso ticker - tsoTicker *time.Ticker + tsTicker *time.Ticker // Add callback functions at different stages startCallbacks []func() closeCallbacks []func() - segmentMgr *SegmentManager - segmentStatusMsg ms.MsgStream + segmentMgr *SegmentManager + statsMs ms.MsgStream //id allocator idAllocator *GlobalIDAllocator @@ -123,7 +128,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { } //timeSyncMsgProducer - tsMsgProducer, err := NewTimeSyncMsgProducer(ctx) + tsmp, err := NewTimeSyncMsgProducer(ctx) if err != nil { return nil, err } @@ -133,7 +138,7 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { pulsarProxyStream.Start() var proxyStream ms.MsgStream = pulsarProxyStream proxyTimeTickBarrier := newSoftTimeTickBarrier(ctx, &proxyStream, opt.ProxyIDs, opt.SoftTTBInterval) - tsMsgProducer.SetProxyTtBarrier(proxyTimeTickBarrier) + tsmp.SetProxyTtBarrier(proxyTimeTickBarrier) pulsarWriteStream := ms.NewPulsarMsgStream(ctx, 1024) //output stream pulsarWriteStream.SetPulsarClient(opt.PulsarAddr) @@ -141,17 +146,17 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { pulsarWriteStream.Start() var writeStream ms.MsgStream = pulsarWriteStream writeTimeTickBarrier := newHardTimeTickBarrier(ctx, &writeStream, opt.WriteIDs) - tsMsgProducer.SetWriteNodeTtBarrier(writeTimeTickBarrier) + tsmp.SetWriteNodeTtBarrier(writeTimeTickBarrier) pulsarDMStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream pulsarDMStream.SetPulsarClient(opt.PulsarAddr) pulsarDMStream.CreatePulsarProducers(opt.PulsarDMChannels) - tsMsgProducer.SetDMSyncStream(pulsarDMStream) + tsmp.SetDMSyncStream(pulsarDMStream) pulsarK2SStream := ms.NewPulsarMsgStream(ctx, 1024) //input stream pulsarK2SStream.SetPulsarClient(opt.PulsarAddr) pulsarK2SStream.CreatePulsarProducers(opt.PulsarK2SChannels) - tsMsgProducer.SetK2sSyncStream(pulsarK2SStream) + tsmp.SetK2sSyncStream(pulsarK2SStream) // stats msg stream statsMs := ms.NewPulsarMsgStream(ctx, 1024) @@ -160,13 +165,14 @@ func CreateServer(ctx context.Context, opt *Option) (*Master, error) { statsMs.Start() m := &Master{ - ctx: ctx, - startTimestamp: time.Now().Unix(), - kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr), - metaTable: metakv, - timesSyncMsgProducer: tsMsgProducer, - grpcErr: make(chan error), - segmentStatusMsg: statsMs, + ctx: ctx, + startTimestamp: time.Now().Unix(), + kvBase: newKVBase(opt.KVRootPath, opt.EtcdAddr), + mt: metakv, + tsmp: tsmp, + ssChan: make(chan internalpb.SegmentStats, 10), + grpcErr: make(chan error), + statsMs: statsMs, } //init idAllocator @@ -264,7 +270,7 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error { //go s.Se s.serverLoopWg.Add(1) - if err := s.timesSyncMsgProducer.Start(); err != nil { + if err := s.tsmp.Start(); err != nil { return err } @@ -288,7 +294,7 @@ func (s *Master) startServerLoop(ctx context.Context, grpcPort int64) error { } func (s *Master) stopServerLoop() { - s.timesSyncMsgProducer.Close() + s.tsmp.Close() s.serverLoopWg.Done() if s.grpcServer != nil { @@ -334,13 +340,13 @@ func (s *Master) grpcLoop(grpcPort int64) { func (s *Master) tsLoop() { defer s.serverLoopWg.Done() - s.tsoTicker = time.NewTicker(UpdateTimestampStep) - defer s.tsoTicker.Stop() + s.tsTicker = time.NewTicker(UpdateTimestampStep) + defer s.tsTicker.Stop() ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() for { select { - case <-s.tsoTicker.C: + case <-s.tsTicker.C: if err := s.tsoAllocator.UpdateTSO(); err != nil { log.Println("failed to update timestamp", err) return @@ -386,13 +392,13 @@ func (s *Master) tasksExecutionLoop() { func (s *Master) segmentStatisticsLoop() { defer s.serverLoopWg.Done() - defer s.segmentStatusMsg.Close() + defer s.statsMs.Close() ctx, cancel := context.WithCancel(s.serverLoopCtx) defer cancel() for { select { - case msg := <-s.segmentStatusMsg.Chan(): + case msg := <-s.statsMs.Chan(): err := s.segmentMgr.HandleQueryNodeMsgPack(msg) if err != nil { log.Println(err) diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go index 0f76b45d40..16aa026fca 100644 --- a/internal/master/meta_table.go +++ b/internal/master/meta_table.go @@ -105,7 +105,7 @@ func (mt *metaTable) reloadFromKV() error { return nil } -// metaTable.ddLock.Lock() before call this function +// mt.ddLock.Lock() before call this function func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error { collBytes, err := proto.Marshal(coll) if err != nil { @@ -116,7 +116,7 @@ func (mt *metaTable) saveCollectionMeta(coll *pb.CollectionMeta) error { return mt.client.Save("/collection/"+strconv.FormatInt(coll.ID, 10), string(collBytes)) } -// metaTable.ddLock.Lock() before call this function +// mt.ddLock.Lock() before call this function func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error { segBytes, err := proto.Marshal(seg) if err != nil { @@ -128,7 +128,7 @@ func (mt *metaTable) saveSegmentMeta(seg *pb.SegmentMeta) error { return mt.client.Save("/segment/"+strconv.FormatInt(seg.SegmentID, 10), string(segBytes)) } -// metaTable.ddLock.Lock() before call this function +// mt.ddLock.Lock() before call this function func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta, segIDs []UniqueID) error { segIDStrs := make([]string, 0, len(segIDs)) for _, segID := range segIDs { @@ -156,7 +156,7 @@ func (mt *metaTable) saveCollectionAndDeleteSegmentsMeta(coll *pb.CollectionMeta return mt.client.MultiSaveAndRemove(kvs, segIDStrs) } -// metaTable.ddLock.Lock() before call this function +// mt.ddLock.Lock() before call this function func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg *pb.SegmentMeta) error { kvs := make(map[string]string) collBytes, err := proto.Marshal(coll) @@ -179,7 +179,7 @@ func (mt *metaTable) saveCollectionsAndSegmentsMeta(coll *pb.CollectionMeta, seg return mt.client.MultiSave(kvs) } -// metaTable.ddLock.Lock() before call this function +// mt.ddLock.Lock() before call this function func (mt *metaTable) deleteCollectionsAndSegmentsMeta(collID UniqueID, segIDs []UniqueID) error { collIDStr := "/collection/" + strconv.FormatInt(collID, 10) diff --git a/internal/master/partition_task_test.go b/internal/master/partition_task_test.go index b861bc15d6..a635f192c1 100644 --- a/internal/master/partition_task_test.go +++ b/internal/master/partition_task_test.go @@ -168,7 +168,7 @@ func TestMaster_Partition(t *testing.T) { assert.NotNil(t, st) assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode) - collMeta, err := svr.metaTable.GetCollectionByName(sch.Name) + collMeta, err := svr.mt.GetCollectionByName(sch.Name) assert.Nil(t, err) t.Logf("collection id = %d", collMeta.ID) assert.Equal(t, collMeta.CreateTime, uint64(1))