diff --git a/internal/core/src/common/LoadInfo.h b/internal/core/src/common/LoadInfo.h index d2401563d5..ec6655af46 100644 --- a/internal/core/src/common/LoadInfo.h +++ b/internal/core/src/common/LoadInfo.h @@ -16,6 +16,7 @@ #include "knowhere/index/vector_index/VecIndex.h" struct LoadIndexInfo { + std::string field_name; int64_t field_id; std::map index_params; milvus::knowhere::VecIndexPtr index; diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index b58295b049..21d5637f1b 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -59,9 +59,11 @@ AppendIndexParam(CLoadIndexInfo c_load_index_info, const char* c_index_key, cons } CStatus -AppendFieldInfo(CLoadIndexInfo c_load_index_info, int64_t field_id) { +AppendFieldInfo(CLoadIndexInfo c_load_index_info, const char* c_field_name, int64_t field_id) { try { auto load_index_info = (LoadIndexInfo*)c_load_index_info; + std::string field_name(c_field_name); + load_index_info->field_name = field_name; load_index_info->field_id = field_id; auto status = CStatus(); @@ -95,6 +97,7 @@ AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set) { load_index_info->index = milvus::knowhere::VecIndexFactory::GetInstance().CreateVecIndex(index_params["index_type"], mode); load_index_info->index->Load(*binary_set); + auto status = CStatus(); status.error_code = Success; status.error_msg = ""; diff --git a/internal/core/src/segcore/load_index_c.h b/internal/core/src/segcore/load_index_c.h index 85df9c4079..0718d59406 100644 --- a/internal/core/src/segcore/load_index_c.h +++ b/internal/core/src/segcore/load_index_c.h @@ -33,7 +33,7 @@ CStatus AppendIndexParam(CLoadIndexInfo c_load_index_info, const char* index_key, const char* index_value); CStatus -AppendFieldInfo(CLoadIndexInfo c_load_index_info, int64_t field_id); +AppendFieldInfo(CLoadIndexInfo c_load_index_info, const char* field_name, int64_t field_id); CStatus AppendIndex(CLoadIndexInfo c_load_index_info, CBinarySet c_binary_set); diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 2e972fc244..73a32d9391 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -781,7 +781,7 @@ TEST(CApiTest, LoadIndexInfo) { status = AppendIndexParam(c_load_index_info, index_param_key2.data(), index_param_value2.data()); assert(status.error_code == Success); std::string field_name = "field0"; - status = AppendFieldInfo(c_load_index_info, 0); + status = AppendFieldInfo(c_load_index_info, field_name.data(), 0); assert(status.error_code == Success); status = AppendIndex(c_load_index_info, c_binary_set); assert(status.error_code == Success); @@ -937,7 +937,7 @@ TEST(CApiTest, UpdateSegmentIndex_Without_Predicate) { AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str()); AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 100); + AppendFieldInfo(c_load_index_info, "fakevec", 100); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); status = UpdateSegmentIndex(segment, c_load_index_info); @@ -1074,7 +1074,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Range) { AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str()); AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 100); + AppendFieldInfo(c_load_index_info, "fakevec", 100); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); status = UpdateSegmentIndex(segment, c_load_index_info); @@ -1211,7 +1211,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_float_Predicate_Term) { AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str()); AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 100); + AppendFieldInfo(c_load_index_info, "fakevec", 100); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); status = UpdateSegmentIndex(segment, c_load_index_info); @@ -1350,7 +1350,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Range) { AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str()); AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 100); + AppendFieldInfo(c_load_index_info, "fakevec", 100); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); status = UpdateSegmentIndex(segment, c_load_index_info); @@ -1488,7 +1488,7 @@ TEST(CApiTest, UpdateSegmentIndex_With_binary_Predicate_Term) { AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str()); AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 100); + AppendFieldInfo(c_load_index_info, "fakevec", 100); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); status = UpdateSegmentIndex(segment, c_load_index_info); @@ -1665,7 +1665,7 @@ TEST(CApiTest, SealedSegment_search_float_Predicate_Range) { AppendIndexParam(c_load_index_info, index_type_key.c_str(), index_type_value.c_str()); AppendIndexParam(c_load_index_info, index_mode_key.c_str(), index_mode_value.c_str()); AppendIndexParam(c_load_index_info, metric_type_key.c_str(), metric_type_value.c_str()); - AppendFieldInfo(c_load_index_info, 100); + AppendFieldInfo(c_load_index_info, "fakevec", 100); AppendIndex(c_load_index_info, (CBinarySet)&binary_set); auto load_index_info = (LoadIndexInfo*)c_load_index_info; diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 08ad60375e..0c4ffe5182 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -105,6 +105,7 @@ TEST(Sealed, without_predicate) { auto ref_result = QueryResultToJson(qr); LoadIndexInfo load_info; + load_info.field_name = "fakevec"; load_info.field_id = fake_id.get(); load_info.index = indexing; load_info.index_params["metric_type"] = "L2"; @@ -197,6 +198,7 @@ TEST(Sealed, with_predicate) { auto result = indexing->Query(query_dataset, conf, nullptr); LoadIndexInfo load_info; + load_info.field_name = "fakevec"; load_info.field_id = fake_id.get(); load_info.index = indexing; load_info.index_params["metric_type"] = "L2"; @@ -310,6 +312,7 @@ TEST(Sealed, LoadFieldData) { LoadIndexInfo vec_info; vec_info.field_id = fakevec_id.get(); + vec_info.field_name = "fakevec"; vec_info.index = indexing; vec_info.index_params["metric_type"] = milvus::knowhere::Metric::L2; segment->LoadIndex(vec_info); diff --git a/internal/dataservice/channel.go b/internal/dataservice/channel.go index d67b16c95a..f2a18c3c91 100644 --- a/internal/dataservice/channel.go +++ b/internal/dataservice/channel.go @@ -1,76 +1,38 @@ package dataservice import ( - "fmt" "strconv" "sync" ) type ( - channelGroup []string insertChannelManager struct { mu sync.RWMutex count int - channelGroups map[UniqueID][]channelGroup // collection id to channel ranges + channelGroups map[UniqueID][]string // collection id to channel ranges } ) -func (cr channelGroup) Contains(channelName string) bool { - for _, name := range cr { - if name == channelName { - return true - } - } - return false -} - func newInsertChannelManager() *insertChannelManager { return &insertChannelManager{ count: 0, - channelGroups: make(map[UniqueID][]channelGroup), + channelGroups: make(map[UniqueID][]string), } } -func (cm *insertChannelManager) GetChannels(collectionID UniqueID, groupNum int) ([]channelGroup, error) { +func (cm *insertChannelManager) GetChannels(collectionID UniqueID) ([]string, error) { cm.mu.Lock() defer cm.mu.Unlock() if _, ok := cm.channelGroups[collectionID]; ok { return cm.channelGroups[collectionID], nil } channels := Params.InsertChannelNumPerCollection - m, n := channels/int64(groupNum), channels%int64(groupNum) - cg := make([]channelGroup, 0) - var i, j int64 = 0, 0 - for i < channels { - var group []string - if j < n { - group = make([]string, m+1) - } else { - group = make([]string, m) - } - for k := 0; k < len(group); k++ { - group[k] = Params.InsertChannelPrefixName + strconv.Itoa(cm.count) - cm.count++ - } - i += int64(len(group)) - j++ - cg = append(cg, group) + cg := make([]string, channels) + var i int64 = 0 + for ; i < channels; i++ { + cg[i] = Params.InsertChannelPrefixName + strconv.Itoa(cm.count) + cm.count++ } cm.channelGroups[collectionID] = cg return cg, nil } - -func (cm *insertChannelManager) GetChannelGroup(collectionID UniqueID, channelName string) (channelGroup, error) { - cm.mu.RLock() - defer cm.mu.RUnlock() - _, ok := cm.channelGroups[collectionID] - if !ok { - return nil, fmt.Errorf("can not find collection %d", collectionID) - } - for _, cr := range cm.channelGroups[collectionID] { - if cr.Contains(channelName) { - return cr, nil - } - } - return nil, fmt.Errorf("channel name %s not found", channelName) -} diff --git a/internal/dataservice/channel_test.go b/internal/dataservice/channel_test.go index 70a1dab871..c38f442629 100644 --- a/internal/dataservice/channel_test.go +++ b/internal/dataservice/channel_test.go @@ -1,33 +1,21 @@ package dataservice import ( + "strconv" "testing" "github.com/stretchr/testify/assert" ) -func TestChannelAllocation(t *testing.T) { +func TestGetChannel(t *testing.T) { Params.Init() Params.InsertChannelNumPerCollection = 4 + Params.InsertChannelPrefixName = "channel" manager := newInsertChannelManager() - cases := []struct { - collectionID UniqueID - groupNum int - expectGroupNum int - }{ - {1, 4, 4}, - {1, 4, 4}, - {2, 1, 1}, - {3, 5, 4}, - } - for _, c := range cases { - channels, err := manager.GetChannels(c.collectionID, c.expectGroupNum) - assert.Nil(t, err) - assert.EqualValues(t, c.expectGroupNum, len(channels)) - total := 0 - for _, channel := range channels { - total += len(channel) - } - assert.EqualValues(t, Params.InsertChannelNumPerCollection, total) + channels, err := manager.GetChannels(1) + assert.Nil(t, err) + assert.EqualValues(t, Params.InsertChannelNumPerCollection, len(channels)) + for i := 0; i < len(channels); i++ { + assert.EqualValues(t, Params.InsertChannelPrefixName+strconv.Itoa(i), channels[i]) } } diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index 9964b34504..d0c92da48d 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -23,16 +23,18 @@ type ( channelNum int } dataNodeCluster struct { - mu sync.RWMutex - finishCh chan struct{} - nodes []*dataNode + mu sync.RWMutex + finishCh chan struct{} + nodes []*dataNode + watchedCollection map[UniqueID]bool } ) func newDataNodeCluster(finishCh chan struct{}) *dataNodeCluster { return &dataNodeCluster{ - finishCh: finishCh, - nodes: make([]*dataNode, 0), + finishCh: finishCh, + nodes: make([]*dataNode, 0), + watchedCollection: make(map[UniqueID]bool), } } @@ -49,7 +51,7 @@ func (c *dataNodeCluster) Register(dataNode *dataNode) { func (c *dataNodeCluster) checkDataNodeNotExist(ip string, port int64) bool { for _, node := range c.nodes { - if node.address.ip == ip || node.address.port == port { + if node.address.ip == ip && node.address.port == port { return false } } @@ -70,12 +72,25 @@ func (c *dataNodeCluster) GetNodeIDs() []int64 { return ret } -func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) { +func (c *dataNodeCluster) WatchInsertChannels(collectionID UniqueID, channels []string) { c.mu.Lock() defer c.mu.Unlock() + if c.watchedCollection[collectionID] { + return + } sort.Slice(c.nodes, func(i, j int) bool { return c.nodes[i].channelNum < c.nodes[j].channelNum }) + var groups [][]string + if len(channels) < len(c.nodes) { + groups = make([][]string, len(channels)) + } else { + groups = make([][]string, len(c.nodes)) + } + length := len(groups) + for i, channel := range channels { + groups[i%length] = append(groups[i%length], channel) + } for i, group := range groups { - _, err := c.nodes[i%len(c.nodes)].client.WatchDmChannels(&datapb.WatchDmChannelRequest{ + resp, err := c.nodes[i].client.WatchDmChannels(&datapb.WatchDmChannelRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeCollection, MsgID: -1, // todo @@ -88,7 +103,13 @@ func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) { log.Println(err.Error()) continue } + if resp.ErrorCode != commonpb.ErrorCode_SUCCESS { + log.Println(resp.Reason) + continue + } + c.nodes[i].channelNum += len(group) } + c.watchedCollection[collectionID] = true } func (c *dataNodeCluster) GetDataNodeStates() ([]*internalpb2.ComponentInfo, error) { @@ -125,3 +146,12 @@ func (c *dataNodeCluster) ShutDownClients() { } } } + +// Clear only for test +func (c *dataNodeCluster) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + c.finishCh = make(chan struct{}) + c.nodes = make([]*dataNode, 0) + c.watchedCollection = make(map[UniqueID]bool) +} diff --git a/internal/dataservice/cluster_test.go b/internal/dataservice/cluster_test.go new file mode 100644 index 0000000000..a8fbc60e3b --- /dev/null +++ b/internal/dataservice/cluster_test.go @@ -0,0 +1,42 @@ +package dataservice + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWatchChannels(t *testing.T) { + Params.Init() + Params.DataNodeNum = 3 + cases := []struct { + collectionID UniqueID + channels []string + channelNums []int + }{ + {1, []string{"c1"}, []int{1, 0, 0}}, + {1, []string{"c1", "c2", "c3"}, []int{1, 1, 1}}, + {1, []string{"c1", "c2", "c3", "c4"}, []int{2, 1, 1}}, + {1, []string{"c1", "c2", "c3", "c4", "c5", "c6", "c7"}, []int{3, 2, 2}}, + } + + cluster := newDataNodeCluster(make(chan struct{})) + for _, c := range cases { + for i := 0; i < Params.DataNodeNum; i++ { + cluster.Register(&dataNode{ + id: int64(i), + address: struct { + ip string + port int64 + }{"localhost", int64(9999 + i)}, + client: newMockDataNodeClient(), + channelNum: 0, + }) + } + cluster.WatchInsertChannels(c.collectionID, c.channels) + for i := 0; i < len(cluster.nodes); i++ { + assert.EqualValues(t, c.channelNums[i], cluster.nodes[i].channelNum) + } + cluster.Clear() + } +} diff --git a/internal/dataservice/meta.go b/internal/dataservice/meta.go index f6ff1bb116..6a2c5ca06c 100644 --- a/internal/dataservice/meta.go +++ b/internal/dataservice/meta.go @@ -386,16 +386,16 @@ func (meta *meta) removeSegments(segIDs []UniqueID) error { return meta.client.MultiRemove(segmentPaths) } -func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelRange []string) (*datapb.SegmentInfo, error) { +func BuildSegment(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, channelName string) (*datapb.SegmentInfo, error) { return &datapb.SegmentInfo{ - SegmentID: segmentID, - CollectionID: collectionID, - PartitionID: partitionID, - InsertChannels: channelRange, - OpenTime: 0, - SealedTime: 0, - NumRows: 0, - MemSize: 0, - State: datapb.SegmentState_SegmentGrowing, + SegmentID: segmentID, + CollectionID: collectionID, + PartitionID: partitionID, + InsertChannel: channelName, + OpenTime: 0, + SealedTime: 0, + NumRows: 0, + MemSize: 0, + State: datapb.SegmentState_SegmentGrowing, }, nil } diff --git a/internal/dataservice/meta_test.go b/internal/dataservice/meta_test.go index c03eb1a6c2..437ca3c41f 100644 --- a/internal/dataservice/meta_test.go +++ b/internal/dataservice/meta_test.go @@ -48,7 +48,7 @@ func TestSegment(t *testing.T) { assert.Nil(t, err) segID, err := mockAllocator.allocID() assert.Nil(t, err) - segmentInfo, err := BuildSegment(id, 100, segID, []string{"c1", "c2"}) + segmentInfo, err := BuildSegment(id, 100, segID, "c1") assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) @@ -114,14 +114,14 @@ func TestGetCount(t *testing.T) { nums, err := meta.GetNumRowsOfCollection(id) assert.Nil(t, err) assert.EqualValues(t, 0, nums) - segment, err := BuildSegment(id, 100, segID, []string{"c1"}) + segment, err := BuildSegment(id, 100, segID, "c1") assert.Nil(t, err) segment.NumRows = 100 err = meta.AddSegment(segment) assert.Nil(t, err) segID, err = mockAllocator.allocID() assert.Nil(t, err) - segment, err = BuildSegment(id, 100, segID, []string{"c1"}) + segment, err = BuildSegment(id, 100, segID, "c1") assert.Nil(t, err) segment.NumRows = 300 err = meta.AddSegment(segment) diff --git a/internal/dataservice/mock.go b/internal/dataservice/mock.go index 2922d33abf..105d958c7d 100644 --- a/internal/dataservice/mock.go +++ b/internal/dataservice/mock.go @@ -4,6 +4,10 @@ import ( "sync/atomic" "time" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" memkv "github.com/zilliztech/milvus-distributed/internal/kv/mem" @@ -46,3 +50,27 @@ func newTestSchema() *schemapb.CollectionSchema { }, } } + +type mockDataNodeClient struct { +} + +func newMockDataNodeClient() *mockDataNodeClient { + return &mockDataNodeClient{} +} + +func (c *mockDataNodeClient) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil +} + +func (c *mockDataNodeClient) GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { + // todo + return nil, nil +} + +func (c *mockDataNodeClient) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) { + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS}, nil +} + +func (c *mockDataNodeClient) Stop() error { + return nil +} diff --git a/internal/dataservice/segment_allocator.go b/internal/dataservice/segment_allocator.go index f9d13892a3..d15d3b66a0 100644 --- a/internal/dataservice/segment_allocator.go +++ b/internal/dataservice/segment_allocator.go @@ -54,7 +54,7 @@ type ( sealed bool lastExpireTime Timestamp allocations []*allocation - channelGroup channelGroup + insertChannel string } allocation struct { rowNums int @@ -100,7 +100,7 @@ func (allocator *segmentAllocatorImpl) OpenSegment(segmentInfo *datapb.SegmentIn total: totalRows, sealed: false, lastExpireTime: 0, - channelGroup: segmentInfo.InsertChannels, + insertChannel: segmentInfo.InsertChannel, } return nil } @@ -112,7 +112,7 @@ func (allocator *segmentAllocatorImpl) AllocSegment(collectionID UniqueID, for _, segStatus := range allocator.segments { if segStatus.sealed || segStatus.collectionID != collectionID || segStatus.partitionID != partitionID || - !segStatus.channelGroup.Contains(channelName) { + segStatus.insertChannel != channelName { continue } var success bool diff --git a/internal/dataservice/segment_allocator_test.go b/internal/dataservice/segment_allocator_test.go index 03697593ae..d0e248edc0 100644 --- a/internal/dataservice/segment_allocator_test.go +++ b/internal/dataservice/segment_allocator_test.go @@ -1,11 +1,14 @@ package dataservice import ( + "log" "math" "strconv" "testing" "time" + "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" + "github.com/stretchr/testify/assert" ) @@ -27,7 +30,7 @@ func TestAllocSegment(t *testing.T) { assert.Nil(t, err) id, err := mockAllocator.allocID() assert.Nil(t, err) - segmentInfo, err := BuildSegment(collID, 100, id, []string{"c1", "c2"}) + segmentInfo, err := BuildSegment(collID, 100, id, "c1") assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) @@ -80,7 +83,7 @@ func TestSealSegment(t *testing.T) { for i := 0; i < 10; i++ { id, err := mockAllocator.allocID() assert.Nil(t, err) - segmentInfo, err := BuildSegment(collID, 100, id, []string{"c" + strconv.Itoa(i)}) + segmentInfo, err := BuildSegment(collID, 100, id, "c"+strconv.Itoa(i)) assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) @@ -115,21 +118,32 @@ func TestExpireSegment(t *testing.T) { assert.Nil(t, err) id, err := mockAllocator.allocID() assert.Nil(t, err) - segmentInfo, err := BuildSegment(collID, 100, id, []string{"c1", "c2"}) + segmentInfo, err := BuildSegment(collID, 100, id, "c1") assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) err = segAllocator.OpenSegment(segmentInfo) assert.Nil(t, err) - id1, _, _, err := segAllocator.AllocSegment(collID, 100, "c1", 10) + id1, _, et, err := segAllocator.AllocSegment(collID, 100, "c1", 10) + ts2, _ := tsoutil.ParseTS(et) + log.Printf("physical ts: %s", ts2.String()) assert.Nil(t, err) - time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond) + ts, err := mockAllocator.allocTimestamp() assert.Nil(t, err) + t1, _ := tsoutil.ParseTS(ts) + log.Printf("before ts: %s", t1.String()) + time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond) + ts, err = mockAllocator.allocTimestamp() + assert.Nil(t, err) err = segAllocator.ExpireAllocations(ts) assert.Nil(t, err) expired, err := segAllocator.IsAllocationsExpired(id1, ts) + if et > ts { + tsPhy, _ := tsoutil.ParseTS(ts) + log.Printf("ts %s", tsPhy.String()) + } assert.Nil(t, err) assert.True(t, expired) assert.EqualValues(t, 0, len(segAllocator.segments[id1].allocations)) diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index a9a028c2da..a0f9349230 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -555,16 +555,11 @@ func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.Assign } func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, channelName string) error { - group, err := s.insertChannelMgr.GetChannelGroup(collectionID, channelName) - if err != nil { - return err - } - id, err := s.allocator.allocID() if err != nil { return err } - segmentInfo, err := BuildSegment(collectionID, partitionID, id, group) + segmentInfo, err := BuildSegment(collectionID, partitionID, id, channelName) if err != nil { return err } @@ -683,16 +678,12 @@ func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, if !s.checkStateIsHealthy() { return nil, errors.New("server is initializing") } - channelGroups, err := s.insertChannelMgr.GetChannels(req.CollectionID, s.cluster.GetNumOfNodes()) + channels, err := s.insertChannelMgr.GetChannels(req.CollectionID) if err != nil { return nil, err } - channels := make([]string, 0) - for _, group := range channelGroups { - channels = append(channels, group...) - } - s.cluster.WatchInsertChannels(channelGroups) + s.cluster.WatchInsertChannels(req.CollectionID, channels) return channels, nil } diff --git a/internal/dataservice/watcher_test.go b/internal/dataservice/watcher_test.go index 85af18e530..e476f9f02a 100644 --- a/internal/dataservice/watcher_test.go +++ b/internal/dataservice/watcher_test.go @@ -52,7 +52,7 @@ func TestDataNodeTTWatcher(t *testing.T) { segID, err := allocator.allocID() segmentIDs[i] = segID assert.Nil(t, err) - segmentInfo, err := BuildSegment(id, 100, segID, []string{"channel" + strconv.Itoa(i)}) + segmentInfo, err := BuildSegment(id, 100, segID, "channel"+strconv.Itoa(i)) assert.Nil(t, err) err = meta.AddSegment(segmentInfo) assert.Nil(t, err) @@ -64,7 +64,7 @@ func TestDataNodeTTWatcher(t *testing.T) { } } - time.Sleep(time.Duration(Params.SegIDAssignExpiration) * time.Millisecond) + time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond) for i, c := range cases { if c.allocation && !c.expired { _, _, _, err := segAllocator.AllocSegment(id, 100, "channel"+strconv.Itoa(i), 100) diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index fca7ce4d02..f473ea5a18 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -98,7 +98,7 @@ func TestGrpcService(t *testing.T) { var binlogLock sync.Mutex binlogPathArray := make([]string, 0, 16) - core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) { + core.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) { binlogLock.Lock() defer binlogLock.Unlock() binlogPathArray = append(binlogPathArray, binlog...) diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index e164ce7ba9..0b62fd1482 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -152,7 +152,7 @@ type Core struct { GetBinlogFilePathsFromDataServiceReq func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) //TODO, call index builder's client to build index, return build id - BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) + BuildIndexReq func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) //TODO, proxy service interface, notify proxy service to drop collection InvalidateCollectionMetaCache func(ts typeutil.Timestamp, dbName string, collectionName string) error @@ -671,13 +671,11 @@ func (c *Core) SetDataService(s DataServiceInterface) error { } func (c *Core) SetIndexService(s IndexServiceInterface) error { - c.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair, indexID typeutil.UniqueID, indexName string) (typeutil.UniqueID, error) { + c.BuildIndexReq = func(binlog []string, typeParams []*commonpb.KeyValuePair, indexParams []*commonpb.KeyValuePair) (typeutil.UniqueID, error) { rsp, err := s.BuildIndex(&indexpb.BuildIndexRequest{ DataPaths: binlog, TypeParams: typeParams, IndexParams: indexParams, - IndexID: indexID, - IndexName: indexName, }) if err != nil { return 0, err diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 2d6648d4f1..3586151c28 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -628,7 +628,7 @@ func (t *CreateIndexTask) BuildIndex() error { }) } } - bldID, err = t.core.BuildIndexReq(binlogs, t.fieldSchema.TypeParams, t.indexParams, idxID, t.indexName) + bldID, err = t.core.BuildIndexReq(binlogs, t.fieldSchema.TypeParams, t.indexParams) if err != nil { return err } diff --git a/internal/proto/data_service.proto b/internal/proto/data_service.proto index a30b822e4d..60dcdd8f87 100644 --- a/internal/proto/data_service.proto +++ b/internal/proto/data_service.proto @@ -137,7 +137,7 @@ message SegmentInfo { int64 segmentID = 1; int64 collectionID = 2; int64 partitionID = 3; - repeated string insert_channels = 4; + string insert_channel = 4; uint64 open_time = 5; uint64 sealed_time = 6; uint64 flushed_time = 7; diff --git a/internal/proto/datapb/data_service.pb.go b/internal/proto/datapb/data_service.pb.go index 712fdfc410..432444baa3 100644 --- a/internal/proto/datapb/data_service.pb.go +++ b/internal/proto/datapb/data_service.pb.go @@ -1064,7 +1064,7 @@ type SegmentInfo struct { SegmentID int64 `protobuf:"varint,1,opt,name=segmentID,proto3" json:"segmentID,omitempty"` CollectionID int64 `protobuf:"varint,2,opt,name=collectionID,proto3" json:"collectionID,omitempty"` PartitionID int64 `protobuf:"varint,3,opt,name=partitionID,proto3" json:"partitionID,omitempty"` - InsertChannels []string `protobuf:"bytes,4,rep,name=insert_channels,json=insertChannels,proto3" json:"insert_channels,omitempty"` + InsertChannel string `protobuf:"bytes,4,opt,name=insert_channel,json=insertChannel,proto3" json:"insert_channel,omitempty"` OpenTime uint64 `protobuf:"varint,5,opt,name=open_time,json=openTime,proto3" json:"open_time,omitempty"` SealedTime uint64 `protobuf:"varint,6,opt,name=sealed_time,json=sealedTime,proto3" json:"sealed_time,omitempty"` FlushedTime uint64 `protobuf:"varint,7,opt,name=flushed_time,json=flushedTime,proto3" json:"flushed_time,omitempty"` @@ -1124,11 +1124,11 @@ func (m *SegmentInfo) GetPartitionID() int64 { return 0 } -func (m *SegmentInfo) GetInsertChannels() []string { +func (m *SegmentInfo) GetInsertChannel() string { if m != nil { - return m.InsertChannels + return m.InsertChannel } - return nil + return "" } func (m *SegmentInfo) GetOpenTime() uint64 { @@ -1732,109 +1732,109 @@ func init() { func init() { proto.RegisterFile("data_service.proto", fileDescriptor_3385cd32ad6cfe64) } var fileDescriptor_3385cd32ad6cfe64 = []byte{ - // 1631 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x5d, 0x6f, 0x1b, 0x45, - 0x17, 0xce, 0x66, 0xed, 0xd8, 0x3e, 0x76, 0x1c, 0x77, 0xf2, 0x51, 0xd7, 0xed, 0xdb, 0xa6, 0xfb, - 0xaa, 0x4d, 0x5a, 0xbd, 0x6f, 0x82, 0x52, 0x41, 0x41, 0x48, 0x88, 0xa6, 0x4e, 0x23, 0xab, 0x4d, - 0x14, 0x8d, 0x0b, 0x15, 0xb9, 0xb1, 0xd6, 0xf6, 0xd4, 0x19, 0xf0, 0xee, 0x9a, 0x9d, 0x71, 0x93, - 0xe6, 0x06, 0xc4, 0x05, 0x08, 0x6e, 0xe0, 0x0a, 0x24, 0xb8, 0x41, 0x42, 0xfc, 0x0e, 0xfe, 0x02, - 0x3f, 0x86, 0x2b, 0xae, 0xd0, 0xce, 0xcc, 0x7e, 0xd9, 0x9b, 0xac, 0x71, 0xbf, 0xee, 0x3c, 0xc7, - 0xcf, 0x9c, 0x73, 0xe6, 0x7c, 0x3c, 0x67, 0x66, 0x01, 0x75, 0x4d, 0x6e, 0xb6, 0x18, 0x71, 0x9f, - 0xd1, 0x0e, 0xd9, 0x18, 0xb8, 0x0e, 0x77, 0xd0, 0x05, 0x8b, 0xf6, 0x9f, 0x0d, 0x99, 0x5c, 0x6d, - 0x78, 0x80, 0x5a, 0xa9, 0xe3, 0x58, 0x96, 0x63, 0x4b, 0x51, 0xad, 0x4c, 0x6d, 0x4e, 0x5c, 0xdb, - 0xec, 0xab, 0x75, 0x29, 0xba, 0xc1, 0xf8, 0x02, 0x16, 0x31, 0xe9, 0x51, 0xc6, 0x89, 0xbb, 0xef, - 0x74, 0x09, 0x26, 0x9f, 0x0f, 0x09, 0xe3, 0xe8, 0x2d, 0xc8, 0xb4, 0x4d, 0x46, 0xaa, 0xda, 0xaa, - 0xb6, 0x5e, 0xdc, 0xba, 0xb2, 0x11, 0x33, 0xa2, 0xd4, 0xef, 0xb1, 0xde, 0xb6, 0xc9, 0x08, 0x16, - 0x48, 0xf4, 0x0e, 0xe4, 0xcc, 0x6e, 0xd7, 0x25, 0x8c, 0x55, 0x67, 0xcf, 0xd9, 0x74, 0x4f, 0x62, - 0xb0, 0x0f, 0x36, 0xbe, 0xd7, 0x60, 0x29, 0xee, 0x01, 0x1b, 0x38, 0x36, 0x23, 0x68, 0x1b, 0x8a, - 0xd4, 0xa6, 0xbc, 0x35, 0x30, 0x5d, 0xd3, 0x62, 0xca, 0x93, 0xeb, 0x71, 0xa5, 0xc1, 0xd1, 0x1a, - 0x36, 0xe5, 0x07, 0x02, 0x88, 0x81, 0x06, 0xbf, 0xd1, 0x1d, 0x98, 0x63, 0xdc, 0xe4, 0x43, 0xdf, - 0xa7, 0xcb, 0x89, 0x3e, 0x35, 0x05, 0x04, 0x2b, 0xa8, 0xf1, 0xa7, 0x06, 0xa5, 0x26, 0xe9, 0x35, - 0xea, 0x7e, 0x30, 0x96, 0x20, 0xdb, 0x71, 0x86, 0x36, 0x17, 0x3e, 0xcc, 0x63, 0xb9, 0x40, 0xab, - 0x50, 0xec, 0x1c, 0x99, 0xb6, 0x4d, 0xfa, 0xfb, 0xa6, 0x45, 0x84, 0x81, 0x02, 0x8e, 0x8a, 0x90, - 0x01, 0xa5, 0x8e, 0xd3, 0xef, 0x93, 0x0e, 0xa7, 0x8e, 0xdd, 0xa8, 0x57, 0xf5, 0x55, 0x6d, 0x5d, - 0xc7, 0x31, 0x99, 0xa7, 0x65, 0x60, 0xba, 0x9c, 0x2a, 0x48, 0x46, 0x40, 0xa2, 0x22, 0x74, 0x19, - 0x0a, 0xde, 0x8e, 0x96, 0xed, 0x59, 0xc9, 0x0a, 0x2b, 0x79, 0x4f, 0x20, 0x4c, 0xdc, 0x80, 0x72, - 0x80, 0x95, 0x88, 0x39, 0x81, 0x98, 0x0f, 0xa4, 0x1e, 0xcc, 0xf8, 0x41, 0x03, 0x74, 0x8f, 0x31, - 0xda, 0xb3, 0x63, 0x07, 0x5b, 0x81, 0x39, 0xdb, 0xe9, 0x92, 0x46, 0x5d, 0x9c, 0x4c, 0xc7, 0x6a, - 0xe5, 0x99, 0x1c, 0x10, 0xe2, 0xb6, 0x5c, 0xa7, 0xef, 0x1f, 0x2c, 0xef, 0x09, 0xb0, 0xd3, 0x27, - 0x68, 0x07, 0xe6, 0x59, 0x44, 0x09, 0xab, 0xea, 0xab, 0xfa, 0x7a, 0x71, 0xeb, 0xda, 0xc6, 0x58, - 0x21, 0x6e, 0x44, 0x8d, 0xe1, 0xf8, 0x2e, 0xe3, 0x8f, 0x59, 0x58, 0x10, 0xff, 0x4b, 0xbf, 0x2c, - 0x62, 0x8b, 0x40, 0x0b, 0x90, 0x72, 0x47, 0x2e, 0x26, 0x08, 0x74, 0x90, 0x20, 0x3d, 0x9a, 0xa0, - 0xd1, 0xf0, 0x67, 0xd2, 0xc3, 0x9f, 0x1d, 0x0f, 0xff, 0x35, 0x28, 0x92, 0x93, 0x01, 0x75, 0x49, - 0x8b, 0x53, 0x15, 0xde, 0x0c, 0x06, 0x29, 0x7a, 0x4c, 0x2d, 0x12, 0xa9, 0xb1, 0xdc, 0xc4, 0x35, - 0x16, 0x4f, 0x6a, 0x3e, 0x35, 0xa9, 0x85, 0xa4, 0xa4, 0xfe, 0xac, 0xc1, 0x62, 0x2c, 0xa9, 0xaa, - 0x71, 0xf6, 0xa1, 0xc2, 0xe2, 0x81, 0xf5, 0xba, 0xc7, 0xcb, 0x91, 0x71, 0x56, 0x8e, 0x42, 0x28, - 0x1e, 0xdb, 0x3b, 0x5d, 0x13, 0x9d, 0x40, 0xe9, 0x41, 0x7f, 0xc8, 0x8e, 0xa6, 0x27, 0x14, 0x04, - 0x99, 0x6e, 0xbb, 0x51, 0x17, 0x46, 0x75, 0x2c, 0x7e, 0x4f, 0x92, 0x52, 0xe3, 0x57, 0x0d, 0x50, - 0xf3, 0xc8, 0x39, 0x6e, 0x92, 0x9e, 0x38, 0xd0, 0xd4, 0x0e, 0x8c, 0x1a, 0x9b, 0x4d, 0xaf, 0x1f, - 0x7d, 0xbc, 0x7e, 0xfc, 0x63, 0x64, 0xc2, 0x63, 0x18, 0x9f, 0xc2, 0x62, 0xcc, 0x43, 0x95, 0xb8, - 0xab, 0x00, 0x4c, 0x8a, 0x1a, 0x75, 0x99, 0x32, 0x1d, 0x47, 0x24, 0xd3, 0x25, 0xe2, 0x08, 0x96, - 0x94, 0x1d, 0xef, 0x0f, 0xc2, 0xa6, 0x8f, 0x47, 0xdc, 0xbd, 0xd9, 0x51, 0xf7, 0x8c, 0x9f, 0x74, - 0xa8, 0x44, 0x4d, 0x35, 0xec, 0xa7, 0x0e, 0xba, 0x02, 0x85, 0x00, 0xa2, 0xda, 0x3a, 0x14, 0xa0, - 0xb7, 0x21, 0xeb, 0xb9, 0x29, 0x9b, 0xba, 0x7c, 0x16, 0x87, 0x04, 0x1a, 0xb1, 0x44, 0x7b, 0x3d, - 0xd9, 0x71, 0x89, 0xc9, 0x55, 0x4f, 0xea, 0xb2, 0x27, 0xa5, 0x48, 0xf4, 0xe4, 0x35, 0x28, 0x32, - 0x62, 0xf6, 0x49, 0x57, 0x02, 0x32, 0x12, 0x20, 0x45, 0x02, 0x70, 0x1d, 0x4a, 0x4f, 0xbd, 0xf2, - 0xf4, 0x11, 0x59, 0x81, 0x28, 0x2a, 0x99, 0x80, 0x3c, 0x84, 0x05, 0xc6, 0x4d, 0x97, 0xb7, 0x06, - 0x0e, 0x13, 0xc9, 0x64, 0xd5, 0xb9, 0xa4, 0x2e, 0x0a, 0x66, 0xd0, 0x1e, 0xeb, 0x1d, 0x28, 0x28, - 0x2e, 0x8b, 0xad, 0xfe, 0x92, 0xa1, 0x5d, 0x98, 0x27, 0x76, 0x37, 0xa2, 0x2a, 0x37, 0xb1, 0xaa, - 0x12, 0xb1, 0xbb, 0xa1, 0xa2, 0xb0, 0x06, 0xf2, 0x93, 0xd7, 0xc0, 0xb7, 0x1a, 0x2c, 0x8f, 0x14, - 0x81, 0x2a, 0xb9, 0x50, 0x9d, 0x36, 0x39, 0x79, 0xbd, 0x2f, 0x37, 0x11, 0x59, 0x04, 0xc5, 0xad, - 0xff, 0xa6, 0xa4, 0xcd, 0x2b, 0x04, 0xac, 0xb6, 0x18, 0x14, 0x2e, 0x36, 0x6c, 0x46, 0x5c, 0xbe, - 0x4d, 0xed, 0xbe, 0xd3, 0x3b, 0x30, 0xf9, 0x0b, 0x70, 0x44, 0xac, 0xba, 0x66, 0x47, 0xaa, 0xcb, - 0xf8, 0x5d, 0x83, 0x4b, 0xa3, 0xb6, 0xc2, 0xa3, 0xd7, 0x20, 0xff, 0x94, 0x92, 0x7e, 0x37, 0xec, - 0xb5, 0x60, 0x8d, 0xee, 0x42, 0x76, 0xe0, 0x81, 0xd5, 0x01, 0xcf, 0xba, 0x75, 0x34, 0xb9, 0x4b, - 0xed, 0xde, 0x23, 0xca, 0x38, 0x96, 0xf8, 0x48, 0x3c, 0xf5, 0xc9, 0xd3, 0xf3, 0xa5, 0x06, 0x4b, - 0xd2, 0xcf, 0xfb, 0x72, 0xa8, 0xbd, 0x5a, 0xd2, 0x4c, 0xb8, 0x86, 0x18, 0x16, 0x2c, 0x3f, 0x31, - 0x79, 0xe7, 0xa8, 0x6e, 0xbd, 0xb0, 0x0b, 0x9e, 0xb9, 0x70, 0x36, 0xcb, 0x10, 0x16, 0x70, 0x4c, - 0x66, 0xfc, 0xa2, 0xc1, 0x82, 0x18, 0x0f, 0x4d, 0xd2, 0x7b, 0xed, 0x87, 0x1d, 0x21, 0xb2, 0xcc, - 0x18, 0x91, 0xfd, 0xad, 0x43, 0x51, 0xd5, 0xef, 0x04, 0x1c, 0xf6, 0x72, 0xc6, 0xc4, 0x1a, 0x2c, - 0x50, 0x51, 0x02, 0x2d, 0x15, 0x28, 0xe9, 0x58, 0x01, 0x97, 0x69, 0xb4, 0x32, 0xc4, 0xcd, 0xc1, - 0x19, 0x10, 0x3b, 0x4a, 0x5b, 0x79, 0x4f, 0x90, 0xc4, 0x7b, 0x73, 0xa9, 0xbc, 0x97, 0x1b, 0xe7, - 0xbd, 0x4b, 0x90, 0xb7, 0x87, 0x56, 0xcb, 0x75, 0x8e, 0x25, 0xc7, 0xe8, 0x38, 0x67, 0x0f, 0x2d, - 0xec, 0x1c, 0x33, 0xef, 0x2f, 0x8b, 0x58, 0x2d, 0x46, 0x4f, 0xe5, 0x95, 0x44, 0xc7, 0x39, 0x8b, - 0x58, 0x4d, 0x7a, 0x4a, 0x42, 0x26, 0x87, 0x7f, 0xc5, 0xe4, 0x0d, 0x28, 0xc7, 0x49, 0xb6, 0x5a, - 0x9c, 0x98, 0x18, 0xe7, 0x63, 0x1c, 0x8b, 0x76, 0xa0, 0x14, 0xa5, 0xd8, 0x6a, 0x69, 0x62, 0x45, - 0xc5, 0x08, 0xc3, 0x1a, 0x27, 0x00, 0xca, 0xd1, 0x3d, 0xd6, 0x9b, 0xa2, 0x28, 0xdf, 0x85, 0x9c, - 0xaa, 0x0d, 0x35, 0xa5, 0xaf, 0x9e, 0x1d, 0x0a, 0x41, 0x8c, 0x3e, 0xdc, 0xf8, 0x4a, 0x83, 0x95, - 0xfb, 0x41, 0xd5, 0x78, 0x61, 0x62, 0xaf, 0x9f, 0x08, 0xbe, 0xd1, 0xe0, 0xe2, 0x98, 0x13, 0x8a, - 0x31, 0xef, 0xca, 0x1c, 0xfb, 0xb7, 0xc9, 0xeb, 0x89, 0x6e, 0x3c, 0x24, 0xcf, 0x3f, 0x36, 0xfb, - 0x43, 0x72, 0x60, 0x52, 0x57, 0x66, 0x79, 0xca, 0x8b, 0xcb, 0x6f, 0x1a, 0x2c, 0x1f, 0xf8, 0x1d, - 0xf2, 0x66, 0xa2, 0x91, 0xfe, 0x3a, 0x33, 0xbe, 0xd6, 0x60, 0x65, 0xd4, 0xcb, 0x37, 0x12, 0xae, - 0x3d, 0x28, 0x3f, 0xf0, 0xc6, 0x97, 0xa0, 0xd5, 0x3d, 0xc2, 0x4d, 0x54, 0x85, 0x9c, 0x1a, 0x68, - 0x8a, 0xb4, 0xfc, 0xa5, 0xc7, 0x02, 0x6d, 0x31, 0x11, 0x5b, 0xe1, 0x94, 0x2b, 0xe0, 0x62, 0x3b, - 0x9c, 0x92, 0xc6, 0x77, 0x5a, 0x70, 0x99, 0x0b, 0x35, 0x9e, 0x4f, 0x84, 0xff, 0x01, 0xa0, 0xac, - 0xa5, 0xa8, 0x44, 0xb8, 0x9e, 0xc7, 0x05, 0xca, 0x1e, 0x48, 0x01, 0x7a, 0x0f, 0xe6, 0x84, 0x7d, - 0x56, 0xcd, 0x26, 0xc5, 0x43, 0xf4, 0x45, 0xfc, 0x04, 0x58, 0x6d, 0x30, 0x3e, 0x82, 0x52, 0xbd, - 0xfe, 0x28, 0xf4, 0x63, 0x34, 0x75, 0x5a, 0x42, 0xea, 0x26, 0x38, 0x63, 0xbc, 0xe1, 0xee, 0x7b, - 0x8f, 0xc6, 0xd7, 0xdf, 0x70, 0xdd, 0x68, 0xbf, 0x29, 0x1f, 0x5e, 0xe4, 0x72, 0x16, 0xbc, 0x85, - 0xa5, 0x23, 0x72, 0x71, 0xdb, 0x11, 0x9f, 0x34, 0x02, 0xfa, 0x45, 0x0b, 0xc1, 0x84, 0xdb, 0x77, - 0x6c, 0x52, 0x99, 0x41, 0x8b, 0xe2, 0x35, 0x2e, 0x05, 0x7c, 0xe7, 0x84, 0x32, 0x5e, 0xd1, 0x10, - 0x82, 0xb2, 0x12, 0xee, 0xba, 0xce, 0x31, 0xb5, 0x7b, 0x95, 0x59, 0x74, 0x01, 0xe6, 0x7d, 0x4d, - 0x62, 0xac, 0x54, 0xf4, 0x08, 0x4c, 0xe5, 0xba, 0x92, 0xd9, 0xfa, 0x0b, 0xa0, 0x58, 0x37, 0xb9, - 0xd9, 0x94, 0x1f, 0xab, 0x90, 0x09, 0xa5, 0xe8, 0x57, 0x1e, 0x74, 0x33, 0x21, 0xfb, 0x09, 0x1f, - 0xa2, 0x6a, 0x6b, 0xa9, 0x38, 0x19, 0x2c, 0x63, 0x06, 0xed, 0x42, 0x56, 0xd8, 0x47, 0x49, 0xc3, - 0x27, 0xfa, 0x18, 0xad, 0x9d, 0x17, 0x48, 0x63, 0x06, 0xb5, 0x61, 0x21, 0x78, 0x57, 0xab, 0xda, - 0xbe, 0x91, 0xa0, 0x72, 0xfc, 0x83, 0x4a, 0xed, 0x66, 0x1a, 0x2c, 0x70, 0xb6, 0x05, 0xa5, 0xc8, - 0x13, 0x90, 0x25, 0x1a, 0x18, 0x7f, 0xc5, 0x26, 0x1a, 0x48, 0x78, 0x4a, 0x1a, 0x33, 0xa8, 0x07, - 0x95, 0x5d, 0xc2, 0x63, 0xb7, 0x7e, 0xb4, 0x96, 0x32, 0x95, 0x7d, 0x86, 0xad, 0xad, 0xa7, 0x03, - 0x03, 0x43, 0x2e, 0x2c, 0xed, 0x12, 0x3e, 0x76, 0xcf, 0x46, 0xb7, 0x13, 0x74, 0x9c, 0x71, 0xf3, - 0xaf, 0xfd, 0x6f, 0x02, 0x6c, 0xd4, 0xa6, 0x09, 0x17, 0x02, 0x9b, 0xc1, 0xcd, 0x68, 0xed, 0x4c, - 0x25, 0xf1, 0x3b, 0x6d, 0x2d, 0xfd, 0x3a, 0x2f, 0x8e, 0x75, 0x71, 0x97, 0xf0, 0xf8, 0x28, 0xa4, - 0x8c, 0xd3, 0x0e, 0x43, 0xb7, 0x12, 0x0c, 0x25, 0x0f, 0xee, 0xda, 0xed, 0x49, 0xa0, 0xc1, 0xb1, - 0x1c, 0x58, 0xd9, 0x25, 0x3c, 0x36, 0x4e, 0x94, 0xc9, 0xa4, 0x84, 0x24, 0x0e, 0xc7, 0xda, 0xad, - 0x09, 0x90, 0x81, 0xc1, 0x43, 0x40, 0xe2, 0x90, 0xd6, 0xc0, 0xb1, 0xc3, 0x32, 0xa9, 0x25, 0xb6, - 0xc7, 0x8e, 0x35, 0xe0, 0xcf, 0x47, 0x0b, 0x30, 0x88, 0xdd, 0x88, 0x0e, 0x63, 0x06, 0x3d, 0x11, - 0xba, 0xbd, 0x2b, 0xe5, 0x63, 0xda, 0xf9, 0x4c, 0xa5, 0xe0, 0x5c, 0xdd, 0x23, 0xef, 0x48, 0xb5, - 0x90, 0x59, 0x89, 0x38, 0xfd, 0x89, 0x28, 0xb8, 0x30, 0x38, 0x2f, 0x51, 0xf5, 0x21, 0x2c, 0x87, - 0x4d, 0xe3, 0xdd, 0xce, 0x5e, 0xa2, 0xee, 0x0e, 0xe4, 0x45, 0xac, 0x87, 0x36, 0x4f, 0xa9, 0xa0, - 0xe8, 0x24, 0x4a, 0xa9, 0xa0, 0xd8, 0xc0, 0x30, 0x66, 0xb6, 0x7e, 0x9c, 0x85, 0xbc, 0x47, 0xbb, - 0x82, 0x63, 0x5f, 0x65, 0x76, 0x0f, 0x61, 0x21, 0xfe, 0x60, 0x4c, 0xae, 0xd1, 0xc4, 0x47, 0x65, - 0x1a, 0xff, 0x62, 0x98, 0xf7, 0x1f, 0x87, 0x92, 0x1c, 0x8d, 0xb3, 0x08, 0x3d, 0x7c, 0x3e, 0xa6, - 0xe8, 0xdc, 0xfe, 0xf0, 0xf0, 0x83, 0x1e, 0xe5, 0x47, 0xc3, 0xb6, 0xf7, 0xcf, 0xe6, 0x29, 0xed, - 0xf7, 0xe9, 0x29, 0x27, 0x9d, 0xa3, 0x4d, 0xb9, 0xeb, 0xff, 0x5d, 0xca, 0xb8, 0x4b, 0xdb, 0x43, - 0x4e, 0xba, 0x9b, 0xfe, 0xb1, 0x37, 0x85, 0xaa, 0x4d, 0xcf, 0xdc, 0xa0, 0xdd, 0x9e, 0x13, 0xab, - 0x3b, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0x41, 0xfa, 0x50, 0x93, 0x85, 0x19, 0x00, 0x00, + // 1627 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x5b, 0x6f, 0x1b, 0x45, + 0x14, 0xce, 0x66, 0xed, 0xd8, 0x3e, 0xbe, 0xc4, 0x9d, 0x5c, 0xea, 0xba, 0xa5, 0x4d, 0x17, 0xb5, + 0x4d, 0x2b, 0x48, 0x50, 0x2a, 0x28, 0x08, 0x09, 0xd1, 0xd4, 0x69, 0x64, 0xb5, 0x89, 0xa2, 0x75, + 0xa1, 0x22, 0x2f, 0xd6, 0xda, 0x9e, 0x3a, 0x03, 0xde, 0x5d, 0xb3, 0x33, 0x6e, 0xd2, 0xbc, 0x80, + 0x78, 0x00, 0xc1, 0x0b, 0x3c, 0x81, 0x04, 0x2f, 0x48, 0x88, 0xdf, 0xc1, 0x5f, 0xe0, 0xc7, 0xf0, + 0xc0, 0x23, 0x9a, 0xcb, 0xde, 0xec, 0x4d, 0xd6, 0xa4, 0xb7, 0x37, 0xcf, 0xf1, 0x37, 0xe7, 0x9c, + 0x39, 0x97, 0xef, 0xcc, 0x2c, 0xa0, 0x9e, 0xc5, 0xac, 0x36, 0xc5, 0xde, 0x53, 0xd2, 0xc5, 0x6b, + 0x43, 0xcf, 0x65, 0x2e, 0x3a, 0x67, 0x93, 0xc1, 0xd3, 0x11, 0x95, 0xab, 0x35, 0x0e, 0xa8, 0x97, + 0xba, 0xae, 0x6d, 0xbb, 0x8e, 0x14, 0xd5, 0x2b, 0xc4, 0x61, 0xd8, 0x73, 0xac, 0x81, 0x5a, 0x97, + 0xa2, 0x1b, 0x8c, 0xaf, 0x60, 0xc1, 0xc4, 0x7d, 0x42, 0x19, 0xf6, 0x76, 0xdd, 0x1e, 0x36, 0xf1, + 0x97, 0x23, 0x4c, 0x19, 0x7a, 0x07, 0x32, 0x1d, 0x8b, 0xe2, 0x9a, 0xb6, 0xa2, 0xad, 0x16, 0x37, + 0x2e, 0xad, 0xc5, 0x8c, 0x28, 0xf5, 0x3b, 0xb4, 0xbf, 0x69, 0x51, 0x6c, 0x0a, 0x24, 0x7a, 0x0f, + 0x72, 0x56, 0xaf, 0xe7, 0x61, 0x4a, 0x6b, 0xb3, 0xa7, 0x6c, 0xba, 0x2b, 0x31, 0xa6, 0x0f, 0x36, + 0x7e, 0xd4, 0x60, 0x31, 0xee, 0x01, 0x1d, 0xba, 0x0e, 0xc5, 0x68, 0x13, 0x8a, 0xc4, 0x21, 0xac, + 0x3d, 0xb4, 0x3c, 0xcb, 0xa6, 0xca, 0x93, 0xab, 0x71, 0xa5, 0xc1, 0xd1, 0x9a, 0x0e, 0x61, 0x7b, + 0x02, 0x68, 0x02, 0x09, 0x7e, 0xa3, 0xdb, 0x30, 0x47, 0x99, 0xc5, 0x46, 0xbe, 0x4f, 0x17, 0x13, + 0x7d, 0x6a, 0x09, 0x88, 0xa9, 0xa0, 0xc6, 0xdf, 0x1a, 0x94, 0x5a, 0xb8, 0xdf, 0x6c, 0xf8, 0xc1, + 0x58, 0x84, 0x6c, 0xd7, 0x1d, 0x39, 0x4c, 0xf8, 0x50, 0x36, 0xe5, 0x02, 0xad, 0x40, 0xb1, 0x7b, + 0x60, 0x39, 0x0e, 0x1e, 0xec, 0x5a, 0x36, 0x16, 0x06, 0x0a, 0x66, 0x54, 0x84, 0x0c, 0x28, 0x75, + 0xdd, 0xc1, 0x00, 0x77, 0x19, 0x71, 0x9d, 0x66, 0xa3, 0xa6, 0xaf, 0x68, 0xab, 0xba, 0x19, 0x93, + 0x71, 0x2d, 0x43, 0xcb, 0x63, 0x44, 0x41, 0x32, 0x02, 0x12, 0x15, 0xa1, 0x8b, 0x50, 0xe0, 0x3b, + 0xda, 0x0e, 0xb7, 0x92, 0x15, 0x56, 0xf2, 0x5c, 0x20, 0x4c, 0x5c, 0x83, 0x4a, 0x80, 0x95, 0x88, + 0x39, 0x81, 0x28, 0x07, 0x52, 0x0e, 0x33, 0x7e, 0xd2, 0x00, 0xdd, 0xa5, 0x94, 0xf4, 0x9d, 0xd8, + 0xc1, 0x96, 0x61, 0xce, 0x71, 0x7b, 0xb8, 0xd9, 0x10, 0x27, 0xd3, 0x4d, 0xb5, 0xe2, 0x26, 0x87, + 0x18, 0x7b, 0x6d, 0xcf, 0x1d, 0xf8, 0x07, 0xcb, 0x73, 0x81, 0xe9, 0x0e, 0x30, 0xda, 0x82, 0x32, + 0x8d, 0x28, 0xa1, 0x35, 0x7d, 0x45, 0x5f, 0x2d, 0x6e, 0x5c, 0x59, 0x9b, 0x28, 0xc4, 0xb5, 0xa8, + 0x31, 0x33, 0xbe, 0xcb, 0xf8, 0x6b, 0x16, 0xe6, 0xc5, 0xff, 0xd2, 0x2f, 0x1b, 0x3b, 0x22, 0xd0, + 0x02, 0xa4, 0xdc, 0x91, 0x8b, 0x29, 0x02, 0x1d, 0x24, 0x48, 0x8f, 0x26, 0x68, 0x3c, 0xfc, 0x99, + 0xf4, 0xf0, 0x67, 0x27, 0xc3, 0x7f, 0x05, 0x8a, 0xf8, 0x68, 0x48, 0x3c, 0xdc, 0x66, 0x44, 0x85, + 0x37, 0x63, 0x82, 0x14, 0x3d, 0x22, 0x36, 0x8e, 0xd4, 0x58, 0x6e, 0xea, 0x1a, 0x8b, 0x27, 0x35, + 0x9f, 0x9a, 0xd4, 0x42, 0x52, 0x52, 0x7f, 0xd5, 0x60, 0x21, 0x96, 0x54, 0xd5, 0x38, 0xbb, 0x50, + 0xa5, 0xf1, 0xc0, 0xf2, 0xee, 0xe1, 0x39, 0x32, 0x4e, 0xca, 0x51, 0x08, 0x35, 0x27, 0xf6, 0x9e, + 0xad, 0x89, 0x8e, 0xa0, 0x74, 0x7f, 0x30, 0xa2, 0x07, 0x67, 0x27, 0x14, 0x04, 0x99, 0x5e, 0xa7, + 0xd9, 0x10, 0x46, 0x75, 0x53, 0xfc, 0x9e, 0x26, 0xa5, 0xc6, 0xef, 0x1a, 0xa0, 0xd6, 0x81, 0x7b, + 0xd8, 0xc2, 0x7d, 0x71, 0xa0, 0x33, 0x3b, 0x30, 0x6e, 0x6c, 0x36, 0xbd, 0x7e, 0xf4, 0xc9, 0xfa, + 0xf1, 0x8f, 0x91, 0x09, 0x8f, 0x61, 0x7c, 0x0e, 0x0b, 0x31, 0x0f, 0x55, 0xe2, 0x2e, 0x03, 0x50, + 0x29, 0x6a, 0x36, 0x64, 0xca, 0x74, 0x33, 0x22, 0x39, 0x5b, 0x22, 0x0e, 0x60, 0x51, 0xd9, 0xe1, + 0x7f, 0x60, 0x7a, 0xf6, 0x78, 0xc4, 0xdd, 0x9b, 0x1d, 0x77, 0xcf, 0xf8, 0x45, 0x87, 0x6a, 0xd4, + 0x54, 0xd3, 0x79, 0xe2, 0xa2, 0x4b, 0x50, 0x08, 0x20, 0xaa, 0xad, 0x43, 0x01, 0x7a, 0x17, 0xb2, + 0xdc, 0x4d, 0xd9, 0xd4, 0x95, 0x93, 0x38, 0x24, 0xd0, 0x68, 0x4a, 0x34, 0xef, 0xc9, 0xae, 0x87, + 0x2d, 0xa6, 0x7a, 0x52, 0x97, 0x3d, 0x29, 0x45, 0xa2, 0x27, 0xaf, 0x40, 0x91, 0x62, 0x6b, 0x80, + 0x7b, 0x12, 0x90, 0x91, 0x00, 0x29, 0x12, 0x80, 0xab, 0x50, 0x7a, 0xc2, 0xcb, 0xd3, 0x47, 0x64, + 0x05, 0xa2, 0xa8, 0x64, 0x02, 0xf2, 0x00, 0xe6, 0x29, 0xb3, 0x3c, 0xd6, 0x1e, 0xba, 0x54, 0x24, + 0x93, 0xd6, 0xe6, 0x92, 0xba, 0x28, 0x98, 0x41, 0x3b, 0xb4, 0xbf, 0xa7, 0xa0, 0x66, 0x45, 0x6c, + 0xf5, 0x97, 0x14, 0x6d, 0x43, 0x19, 0x3b, 0xbd, 0x88, 0xaa, 0xdc, 0xd4, 0xaa, 0x4a, 0xd8, 0xe9, + 0x85, 0x8a, 0xc2, 0x1a, 0xc8, 0x4f, 0x5f, 0x03, 0xdf, 0x6b, 0xb0, 0x34, 0x56, 0x04, 0xaa, 0xe4, + 0x42, 0x75, 0xda, 0xf4, 0xe4, 0xf5, 0xa1, 0xdc, 0x84, 0x65, 0x11, 0x14, 0x37, 0xde, 0x4c, 0x49, + 0x1b, 0x2f, 0x04, 0x53, 0x6d, 0x31, 0x08, 0x9c, 0x6f, 0x3a, 0x14, 0x7b, 0x6c, 0x93, 0x38, 0x03, + 0xb7, 0xbf, 0x67, 0xb1, 0xe7, 0xe0, 0x88, 0x58, 0x75, 0xcd, 0x8e, 0x55, 0x97, 0xf1, 0xa7, 0x06, + 0x17, 0xc6, 0x6d, 0x85, 0x47, 0xaf, 0x43, 0xfe, 0x09, 0xc1, 0x83, 0x5e, 0xd8, 0x6b, 0xc1, 0x1a, + 0xdd, 0x81, 0xec, 0x90, 0x83, 0xd5, 0x01, 0x4f, 0xba, 0x75, 0xb4, 0x98, 0x47, 0x9c, 0xfe, 0x43, + 0x42, 0x99, 0x29, 0xf1, 0x91, 0x78, 0xea, 0xd3, 0xa7, 0xe7, 0x6b, 0x0d, 0x16, 0xa5, 0x9f, 0xf7, + 0xe4, 0x50, 0x7b, 0xb9, 0xa4, 0x99, 0x70, 0x0d, 0x31, 0x6c, 0x58, 0x7a, 0x6c, 0xb1, 0xee, 0x41, + 0xc3, 0x7e, 0x6e, 0x17, 0xb8, 0xb9, 0x70, 0x36, 0xcb, 0x10, 0x16, 0xcc, 0x98, 0xcc, 0xf8, 0x4d, + 0x83, 0x79, 0x31, 0x1e, 0x5a, 0xb8, 0xff, 0xca, 0x0f, 0x3b, 0x46, 0x64, 0x99, 0x09, 0x22, 0xfb, + 0x57, 0x87, 0xa2, 0xaa, 0xdf, 0x29, 0x38, 0xec, 0xc5, 0x8c, 0x89, 0x6b, 0x50, 0x21, 0xa2, 0x04, + 0xda, 0x2a, 0x50, 0x82, 0xb4, 0x0a, 0x66, 0x99, 0x44, 0x0b, 0x83, 0xdf, 0x1b, 0xdc, 0x21, 0x76, + 0xa2, 0xa4, 0x95, 0xe7, 0x82, 0x24, 0xd6, 0x9b, 0x4b, 0x65, 0xbd, 0xdc, 0x24, 0xeb, 0x5d, 0x80, + 0xbc, 0x33, 0xb2, 0xdb, 0x9e, 0x7b, 0x28, 0x19, 0x46, 0x37, 0x73, 0xce, 0xc8, 0x36, 0xdd, 0x43, + 0xca, 0xff, 0xb2, 0xb1, 0xdd, 0xa6, 0xe4, 0x58, 0x5e, 0x48, 0x74, 0x33, 0x67, 0x63, 0xbb, 0x45, + 0x8e, 0x71, 0xc8, 0xe3, 0xf0, 0xbf, 0x78, 0xbc, 0x09, 0x95, 0x38, 0xc5, 0xd6, 0x8a, 0x53, 0xd3, + 0x62, 0x39, 0xc6, 0xb0, 0x68, 0x0b, 0x4a, 0x51, 0x82, 0xad, 0x95, 0xa6, 0x56, 0x54, 0x8c, 0xf0, + 0xab, 0x71, 0x04, 0xa0, 0x1c, 0xdd, 0xa1, 0xfd, 0x33, 0x94, 0xe4, 0xfb, 0x90, 0x53, 0x95, 0xa1, + 0x66, 0xf4, 0xe5, 0x93, 0x43, 0x21, 0x68, 0xd1, 0x87, 0x1b, 0xdf, 0x68, 0xb0, 0x7c, 0x2f, 0xa8, + 0x19, 0x1e, 0x26, 0xfa, 0xea, 0x69, 0xe0, 0x3b, 0x0d, 0xce, 0x4f, 0x38, 0xa1, 0xf8, 0xf2, 0x8e, + 0xcc, 0xb1, 0x7f, 0x97, 0xbc, 0x9a, 0xe8, 0xc6, 0x03, 0xfc, 0xec, 0x53, 0x6b, 0x30, 0xc2, 0x7b, + 0x16, 0xf1, 0x64, 0x96, 0xcf, 0x78, 0x6d, 0xf9, 0x43, 0x83, 0xa5, 0x3d, 0xbf, 0x3f, 0x5e, 0x4f, + 0x34, 0xd2, 0xdf, 0x66, 0xc6, 0xb7, 0x1a, 0x2c, 0x8f, 0x7b, 0xf9, 0x5a, 0xc2, 0xb5, 0x03, 0x95, + 0xfb, 0x7c, 0x78, 0x09, 0x52, 0xdd, 0xc1, 0xcc, 0x42, 0x35, 0xc8, 0xa9, 0x71, 0xa6, 0x28, 0xcb, + 0x5f, 0x72, 0x16, 0xe8, 0x88, 0x79, 0xd8, 0x0e, 0x67, 0x5c, 0xc1, 0x2c, 0x76, 0xc2, 0x19, 0x69, + 0xfc, 0xa0, 0x05, 0x57, 0xb9, 0x50, 0xe3, 0xe9, 0x34, 0xf8, 0x06, 0x00, 0xa1, 0x6d, 0x45, 0x25, + 0xc2, 0xf5, 0xbc, 0x59, 0x20, 0xf4, 0xbe, 0x14, 0xa0, 0x0f, 0x60, 0x4e, 0xd8, 0xa7, 0xb5, 0x6c, + 0x52, 0x3c, 0x44, 0x5f, 0xc4, 0x4f, 0x60, 0xaa, 0x0d, 0xc6, 0x27, 0x50, 0x6a, 0x34, 0x1e, 0x86, + 0x7e, 0x8c, 0xa7, 0x4e, 0x4b, 0x48, 0xdd, 0x14, 0x67, 0x8c, 0x37, 0xdc, 0x3d, 0xfe, 0x64, 0x7c, + 0xf5, 0x0d, 0xd7, 0x8b, 0xf6, 0x9b, 0xf2, 0xe1, 0x79, 0xae, 0x66, 0xc1, 0x4b, 0x58, 0x3a, 0x22, + 0x17, 0xb7, 0x5c, 0xf1, 0x41, 0x23, 0xa0, 0x5f, 0x34, 0x1f, 0xcc, 0xb7, 0x5d, 0xd7, 0xc1, 0xd5, + 0x19, 0xb4, 0x20, 0xde, 0xe2, 0x52, 0xc0, 0xb6, 0x8e, 0x08, 0x65, 0x55, 0x0d, 0x21, 0xa8, 0x28, + 0xe1, 0xb6, 0xe7, 0x1e, 0x12, 0xa7, 0x5f, 0x9d, 0x45, 0xe7, 0xa0, 0xec, 0x6b, 0x12, 0x63, 0xa5, + 0xaa, 0x47, 0x60, 0x2a, 0xd7, 0xd5, 0xcc, 0xc6, 0x3f, 0x00, 0xc5, 0x86, 0xc5, 0xac, 0x96, 0xfc, + 0x54, 0x85, 0x2c, 0x28, 0x45, 0xbf, 0xf1, 0xa0, 0xeb, 0x09, 0xd9, 0x4f, 0xf8, 0x0c, 0x55, 0xbf, + 0x91, 0x8a, 0x93, 0xc1, 0x32, 0x66, 0xd0, 0x36, 0x64, 0x85, 0x7d, 0x94, 0x34, 0x7c, 0xa2, 0x4f, + 0xd1, 0xfa, 0x69, 0x81, 0x34, 0x66, 0x50, 0x07, 0xe6, 0x83, 0x57, 0xb5, 0xaa, 0xed, 0x6b, 0x09, + 0x2a, 0x27, 0x3f, 0xa7, 0xd4, 0xaf, 0xa7, 0xc1, 0x02, 0x67, 0xdb, 0x50, 0x8a, 0x3c, 0x00, 0x69, + 0xa2, 0x81, 0xc9, 0x37, 0x6c, 0xa2, 0x81, 0x84, 0x87, 0xa4, 0x31, 0x83, 0xfa, 0x50, 0xdd, 0xc6, + 0x2c, 0x76, 0xe7, 0x47, 0x37, 0x52, 0xa6, 0xb2, 0xcf, 0xb0, 0xf5, 0xd5, 0x74, 0x60, 0x60, 0xc8, + 0x83, 0xc5, 0x6d, 0xcc, 0x26, 0x6e, 0xd9, 0xe8, 0x56, 0x82, 0x8e, 0x13, 0xee, 0xfd, 0xf5, 0xb7, + 0xa6, 0xc0, 0x46, 0x6d, 0x5a, 0x70, 0x2e, 0xb0, 0xa9, 0x2e, 0x46, 0xc9, 0xa7, 0x4b, 0xba, 0x54, + 0xd7, 0xd3, 0x2f, 0xf3, 0xe2, 0x58, 0xe7, 0xb7, 0x31, 0x8b, 0x8f, 0x42, 0x42, 0x19, 0xe9, 0x52, + 0x74, 0x33, 0xc1, 0x50, 0xf2, 0xe0, 0xae, 0xdf, 0x9a, 0x06, 0x1a, 0x1c, 0xcb, 0x85, 0xe5, 0x6d, + 0xcc, 0x62, 0xe3, 0x44, 0x99, 0x4c, 0x4a, 0x48, 0xe2, 0x70, 0xac, 0xdf, 0x9c, 0x02, 0x19, 0x18, + 0xdc, 0x07, 0x24, 0x0e, 0x69, 0x0f, 0x5d, 0x27, 0x2c, 0x93, 0x7a, 0x62, 0x7b, 0x6c, 0xd9, 0x43, + 0xf6, 0x6c, 0xbc, 0x00, 0x83, 0xd8, 0x8d, 0xe9, 0x30, 0x66, 0xd0, 0x63, 0xa1, 0x9b, 0x5f, 0x29, + 0x1f, 0x91, 0xee, 0x17, 0xfe, 0xf5, 0xf5, 0x34, 0xdd, 0x63, 0xaf, 0x48, 0xb5, 0x90, 0x59, 0x89, + 0x38, 0xfd, 0x99, 0x28, 0xb8, 0x30, 0x38, 0x2f, 0x50, 0xf5, 0x3e, 0x2c, 0x85, 0x4d, 0xc3, 0x6f, + 0x67, 0x2f, 0x50, 0x77, 0x17, 0xf2, 0x22, 0xd6, 0x23, 0x87, 0xa5, 0x54, 0x50, 0x74, 0x12, 0xa5, + 0x54, 0x50, 0x6c, 0x60, 0x18, 0x33, 0x1b, 0x3f, 0xcf, 0x42, 0x9e, 0xd3, 0xae, 0xe0, 0xd8, 0x97, + 0x99, 0xdd, 0x7d, 0x98, 0x8f, 0x3f, 0x17, 0x93, 0x6b, 0x34, 0xf1, 0x49, 0x99, 0xc6, 0xbf, 0x26, + 0x94, 0xfd, 0xa7, 0xa1, 0x24, 0x47, 0xe3, 0x24, 0x42, 0x0f, 0x1f, 0x8f, 0x29, 0x3a, 0x37, 0x3f, + 0xde, 0xff, 0xa8, 0x4f, 0xd8, 0xc1, 0xa8, 0xc3, 0xff, 0x59, 0x3f, 0x26, 0x83, 0x01, 0x39, 0x66, + 0xb8, 0x7b, 0xb0, 0x2e, 0x77, 0xbd, 0xdd, 0x23, 0x94, 0x79, 0xa4, 0x33, 0x62, 0xb8, 0xb7, 0xee, + 0x1f, 0x7b, 0x5d, 0xa8, 0x5a, 0xe7, 0xe6, 0x86, 0x9d, 0xce, 0x9c, 0x58, 0xdd, 0xfe, 0x2f, 0x00, + 0x00, 0xff, 0xff, 0x12, 0x7a, 0x41, 0x93, 0x83, 0x19, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/querynode/collection_replica.go b/internal/querynode/collection_replica.go index d6ca26c9b5..901ce5db57 100644 --- a/internal/querynode/collection_replica.go +++ b/internal/querynode/collection_replica.go @@ -41,7 +41,7 @@ type collectionReplica interface { getCollectionByID(collectionID UniqueID) (*Collection, error) getCollectionByName(collectionName string) (*Collection, error) hasCollection(collectionID UniqueID) bool - getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) + getVecFieldsByCollectionID(collectionID UniqueID) (map[int64]string, error) // partition // Partition tags in different collections are not unique, @@ -175,7 +175,7 @@ func (colReplica *collectionReplicaImpl) hasCollection(collectionID UniqueID) bo return false } -func (colReplica *collectionReplicaImpl) getVecFieldsByCollectionID(collectionID UniqueID) ([]int64, error) { +func (colReplica *collectionReplicaImpl) getVecFieldsByCollectionID(collectionID UniqueID) (map[int64]string, error) { colReplica.mu.RLock() defer colReplica.mu.RUnlock() @@ -184,10 +184,10 @@ func (colReplica *collectionReplicaImpl) getVecFieldsByCollectionID(collectionID return nil, err } - vecFields := make([]int64, 0) + vecFields := make(map[int64]string) for _, field := range col.Schema().Fields { if field.DataType == schemapb.DataType_VECTOR_BINARY || field.DataType == schemapb.DataType_VECTOR_FLOAT { - vecFields = append(vecFields, field.FieldID) + vecFields[field.FieldID] = field.Name } } diff --git a/internal/querynode/index.go b/internal/querynode/index.go index e0539990a7..8c8f84b17e 100644 --- a/internal/querynode/index.go +++ b/internal/querynode/index.go @@ -29,7 +29,7 @@ func (s *Segment) buildIndex(collection *Collection) commonpb.Status { return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} } -func (s *Segment) dropIndex(fieldID int64) commonpb.Status { +func (s *Segment) dropIndex(fieldName string) commonpb.Status { // WARN: Not support yet return commonpb.Status{ErrorCode: commonpb.ErrorCode_SUCCESS} diff --git a/internal/querynode/load_index_info.go b/internal/querynode/load_index_info.go index 37032ffe18..d56cca4f21 100644 --- a/internal/querynode/load_index_info.go +++ b/internal/querynode/load_index_info.go @@ -51,9 +51,10 @@ func (li *LoadIndexInfo) appendIndexParam(indexKey string, indexValue string) er return nil } -func (li *LoadIndexInfo) appendFieldInfo(fieldID int64) error { +func (li *LoadIndexInfo) appendFieldInfo(fieldName string, fieldID int64) error { + cFieldName := C.CString(fieldName) cFieldID := C.long(fieldID) - status := C.AppendFieldInfo(li.cLoadIndexInfo, cFieldID) + status := C.AppendFieldInfo(li.cLoadIndexInfo, cFieldName, cFieldID) errorCode := status.error_code if errorCode != 0 { diff --git a/internal/querynode/load_index_info_test.go b/internal/querynode/load_index_info_test.go index 9cfd4f14a9..95261c7002 100644 --- a/internal/querynode/load_index_info_test.go +++ b/internal/querynode/load_index_info_test.go @@ -1,64 +1,12 @@ package querynode import ( - "strconv" "testing" "github.com/stretchr/testify/assert" - "github.com/zilliztech/milvus-distributed/internal/indexnode" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" ) -func genIndexBinarySet() ([][]byte, error) { - const ( - msgLength = 1000 - DIM = 16 - ) - - indexParams := make(map[string]string) - indexParams["index_type"] = "IVF_PQ" - indexParams["index_mode"] = "cpu" - indexParams["dim"] = "16" - indexParams["k"] = "10" - indexParams["nlist"] = "100" - indexParams["nprobe"] = "10" - indexParams["m"] = "4" - indexParams["nbits"] = "8" - indexParams["metric_type"] = "L2" - indexParams["SLICE_SIZE"] = "4" - - typeParams := make(map[string]string) - typeParams["dim"] = strconv.Itoa(DIM) - var indexRowData []float32 - for n := 0; n < msgLength; n++ { - for i := 0; i < DIM; i++ { - indexRowData = append(indexRowData, float32(n*i)) - } - } - - index, err := indexnode.NewCIndex(typeParams, indexParams) - if err != nil { - return nil, err - } - - err = index.BuildFloatVecIndexWithoutIds(indexRowData) - if err != nil { - return nil, err - } - - // save index to minio - binarySet, err := index.Serialize() - if err != nil { - return nil, err - } - - bytesSet := make([][]byte, 0) - for i := range binarySet { - bytesSet = append(bytesSet, binarySet[i].Value) - } - return bytesSet, nil -} - func TestLoadIndexInfo(t *testing.T) { indexParams := make([]*commonpb.KeyValuePair, 0) indexParams = append(indexParams, &commonpb.KeyValuePair{ @@ -70,21 +18,19 @@ func TestLoadIndexInfo(t *testing.T) { Value: "cpu", }) - indexBytes, err := genIndexBinarySet() - assert.NoError(t, err) + indexBytes := make([][]byte, 0) + indexValue := make([]byte, 10) + indexBytes = append(indexBytes, indexValue) indexPaths := make([]string, 0) - indexPaths = append(indexPaths, "IVF") + indexPaths = append(indexPaths, "index-0") loadIndexInfo, err := newLoadIndexInfo() assert.Nil(t, err) for _, indexParam := range indexParams { - err = loadIndexInfo.appendIndexParam(indexParam.Key, indexParam.Value) - assert.NoError(t, err) + loadIndexInfo.appendIndexParam(indexParam.Key, indexParam.Value) } - err = loadIndexInfo.appendFieldInfo(0) - assert.NoError(t, err) - err = loadIndexInfo.appendIndex(indexBytes, indexPaths) - assert.NoError(t, err) + loadIndexInfo.appendFieldInfo("field0", 0) + loadIndexInfo.appendIndex(indexBytes, indexPaths) deleteLoadIndexInfo(loadIndexInfo) } diff --git a/internal/querynode/load_service.go b/internal/querynode/load_service.go index 1ce28cb54a..bec0170ef8 100644 --- a/internal/querynode/load_service.go +++ b/internal/querynode/load_service.go @@ -47,6 +47,7 @@ type loadService struct { type loadIndex struct { segmentID UniqueID fieldID int64 + fieldName string indexPaths []string } @@ -261,7 +262,7 @@ func (s *loadService) updateSegmentIndex(indexParams indexParam, bytesIndex [][] if err != nil { return err } - err = loadIndexInfo.appendFieldInfo(l.fieldID) + err = loadIndexInfo.appendFieldInfo(l.fieldName, l.fieldID) if err != nil { return err } @@ -421,9 +422,10 @@ func (s *loadService) loadIndexImmediate(segment *Segment, indexPaths []string) if err != nil { return err } - for _, id := range vecFieldIDs { + for id, name := range vecFieldIDs { l := &loadIndex{ segmentID: segment.ID(), + fieldName: name, fieldID: id, indexPaths: indexPaths, } @@ -447,9 +449,10 @@ func (s *loadService) loadIndexDelayed(collectionID, segmentID UniqueID, indexPa if err != nil { return err } - for _, id := range vecFieldIDs { + for id, name := range vecFieldIDs { l := &loadIndex{ segmentID: segmentID, + fieldName: name, fieldID: id, indexPaths: indexPaths, } @@ -484,18 +487,10 @@ func (s *loadService) getInsertBinlogPaths(segmentID UniqueID) ([]*internalpb2.S return pathResponse.Paths, pathResponse.FieldIDs, nil } -func (s *loadService) filterOutVectorFields(fieldIDs []int64, vectorFields []int64) []int64 { - containsFunc := func(s []int64, e int64) bool { - for _, a := range s { - if a == e { - return true - } - } - return false - } +func (s *loadService) filterOutVectorFields(fieldIDs []int64, vectorFields map[int64]string) []int64 { targetFields := make([]int64, 0) for _, id := range fieldIDs { - if !containsFunc(vectorFields, id) { + if _, ok := vectorFields[id]; !ok { targetFields = append(targetFields, id) } }