mirror of
https://gitee.com/milvus-io/milvus.git
synced 2026-01-07 19:31:51 +08:00
Remove incorrect formatting, unused conditions (#19239)
Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>
This commit is contained in:
parent
86769e598c
commit
0306f71b49
@ -66,9 +66,9 @@ func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID
|
||||
zap.Any("numOfSegments", len(segments)),
|
||||
)
|
||||
var (
|
||||
flushedIds = make(typeutil.UniqueSet)
|
||||
unflushedIds = make(typeutil.UniqueSet)
|
||||
droppedIds = make(typeutil.UniqueSet)
|
||||
indexedIDs = make(typeutil.UniqueSet)
|
||||
unIndexedIDs = make(typeutil.UniqueSet)
|
||||
droppedIDs = make(typeutil.UniqueSet)
|
||||
seekPosition *internalpb.MsgPosition
|
||||
)
|
||||
for _, s := range segments {
|
||||
@ -78,26 +78,25 @@ func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID
|
||||
}
|
||||
segmentInfos[s.GetID()] = s
|
||||
if s.GetState() == commonpb.SegmentState_Dropped {
|
||||
droppedIds.Insert(s.GetID())
|
||||
droppedIDs.Insert(s.GetID())
|
||||
} else if indexed.Contain(s.GetID()) {
|
||||
flushedIds.Insert(s.GetID())
|
||||
indexedIDs.Insert(s.GetID())
|
||||
} else {
|
||||
unflushedIds.Insert(s.GetID())
|
||||
unIndexedIDs.Insert(s.GetID())
|
||||
}
|
||||
}
|
||||
for id := range unflushedIds {
|
||||
for id := range unIndexedIDs {
|
||||
// Indexed segments are compacted to a raw segment,
|
||||
// replace it with the indexed ones
|
||||
if !indexed.Contain(id) &&
|
||||
len(segmentInfos[id].GetCompactionFrom()) > 0 &&
|
||||
if len(segmentInfos[id].GetCompactionFrom()) > 0 &&
|
||||
indexed.Contain(segmentInfos[id].GetCompactionFrom()...) {
|
||||
flushedIds.Insert(segmentInfos[id].GetCompactionFrom()...)
|
||||
unflushedIds.Remove(id)
|
||||
droppedIds.Remove(segmentInfos[id].GetCompactionFrom()...)
|
||||
unIndexedIDs.Remove(id)
|
||||
indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...)
|
||||
droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
|
||||
}
|
||||
}
|
||||
|
||||
for id := range flushedIds {
|
||||
for id := range indexedIDs {
|
||||
var segmentPosition *internalpb.MsgPosition
|
||||
segment := segmentInfos[id]
|
||||
if segment.GetDmlPosition() != nil {
|
||||
@ -110,7 +109,7 @@ func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID
|
||||
seekPosition = segmentPosition
|
||||
}
|
||||
}
|
||||
for id := range unflushedIds {
|
||||
for id := range unIndexedIDs {
|
||||
var segmentPosition *internalpb.MsgPosition
|
||||
segment := segmentInfos[id]
|
||||
if segment.GetDmlPosition() != nil {
|
||||
@ -141,9 +140,9 @@ func (h *ServerHandler) GetVChanPositions(channel *channel, partitionID UniqueID
|
||||
CollectionID: channel.CollectionID,
|
||||
ChannelName: channel.Name,
|
||||
SeekPosition: seekPosition,
|
||||
FlushedSegmentIds: flushedIds.Collect(),
|
||||
UnflushedSegmentIds: unflushedIds.Collect(),
|
||||
DroppedSegmentIds: droppedIds.Collect(),
|
||||
FlushedSegmentIds: indexedIDs.Collect(),
|
||||
UnflushedSegmentIds: unIndexedIDs.Collect(),
|
||||
DroppedSegmentIds: droppedIDs.Collect(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -591,7 +591,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
)
|
||||
log.Info("receive get recovery info request")
|
||||
log.Info("get recovery info request received")
|
||||
resp := &datapb.GetRecoveryInfoResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
@ -706,7 +706,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// GetFlushedSegments returns all segment matches provided criterion and in State Flushed
|
||||
// GetFlushedSegments returns all segment matches provided criterion and in state Flushed or Dropped (compacted but not GCed yet)
|
||||
// If requested partition id < 0, ignores the partition id filter
|
||||
func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
|
||||
resp := &datapb.GetFlushedSegmentsResponse{
|
||||
|
||||
@ -101,8 +101,7 @@ class TestCompactionParams(TestcaseBase):
|
||||
start = time()
|
||||
while time() - start < cost:
|
||||
collection_w.load()
|
||||
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
if len(segment_info) == 1:
|
||||
break
|
||||
sleep(1.0)
|
||||
@ -327,8 +326,7 @@ class TestCompactionParams(TestcaseBase):
|
||||
collection_w.load()
|
||||
replicas = collection_w.get_replicas()[0]
|
||||
replica_num = len(replicas.groups)
|
||||
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
assert len(segment_info) == 1*replica_num
|
||||
|
||||
@pytest.mark.skip(reason="TODO")
|
||||
@ -532,8 +530,7 @@ class TestCompactionOperation(TestcaseBase):
|
||||
start = time()
|
||||
while True:
|
||||
sleep(5)
|
||||
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segment_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
if len(segment_info) != 0 and segment_info[0].segmentID == c_plans.plans[0].target:
|
||||
log.debug(segment_info)
|
||||
break
|
||||
@ -809,8 +806,7 @@ class TestCompactionOperation(TestcaseBase):
|
||||
cost = 60
|
||||
while True:
|
||||
sleep(5)
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
|
||||
# verify segments reaches threshold, auto-merge ten segments into one
|
||||
if len(segments_info) == 1:
|
||||
@ -880,8 +876,7 @@ class TestCompactionOperation(TestcaseBase):
|
||||
cost = 60
|
||||
while True:
|
||||
sleep(5)
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
|
||||
# verify segments reaches threshold, auto-merge ten segments into one
|
||||
if len(segments_info) == 1:
|
||||
@ -921,8 +916,7 @@ class TestCompactionOperation(TestcaseBase):
|
||||
start = time()
|
||||
while True:
|
||||
sleep(5)
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
|
||||
# verify segments reaches threshold, auto-merge ten segments into one
|
||||
if len(segments_info) == 1:
|
||||
@ -995,8 +989,7 @@ class TestCompactionOperation(TestcaseBase):
|
||||
start = time()
|
||||
while True:
|
||||
sleep(5)
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
|
||||
# verify segments reaches threshold, auto-merge ten segments into one
|
||||
if len(segments_info) == 1*replica_num:
|
||||
@ -1023,8 +1016,7 @@ class TestCompactionOperation(TestcaseBase):
|
||||
collection_w.load()
|
||||
replicas = collection_w.get_replicas()[0]
|
||||
replica_num = len(replicas.groups)
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
assert len(segments_info) == less_threshold*replica_num
|
||||
|
||||
@pytest.mark.skip(reason="Todo")
|
||||
@ -1206,8 +1198,7 @@ class TestCompactionOperation(TestcaseBase):
|
||||
collection_w.load()
|
||||
replicas = collection_w.get_replicas()[0]
|
||||
replica_num = len(replicas.groups)
|
||||
seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
assert len(seg_info) == 2*replica_num
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@ -1235,8 +1226,7 @@ class TestCompactionOperation(TestcaseBase):
|
||||
collection_w.load()
|
||||
replicas = collection_w.get_replicas()[0]
|
||||
replica_num = len(replicas.groups)
|
||||
seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
seg_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
assert len(seg_info) == 1*replica_num
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
|
||||
@ -43,8 +43,7 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: Query result is empty
|
||||
"""
|
||||
# init collection with default_nb default data
|
||||
collection_w, _, _, ids = self.init_collection_general(
|
||||
prefix, insert_data=True, is_binary=is_binary)[0:4]
|
||||
collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True, is_binary=is_binary)[0:4]
|
||||
expr = f'{ct.default_int64_field_name} in {ids[:half_nb]}'
|
||||
|
||||
# delete half of data
|
||||
@ -64,16 +63,14 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: raise exception
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
|
||||
# remove connection and delete
|
||||
self.connection_wrap.remove_connection(ct.default_alias)
|
||||
res_list, _ = self.connection_wrap.list_connections()
|
||||
assert ct.default_alias not in res_list
|
||||
error = {ct.err_code: 0, ct.err_msg: "should create connect first"}
|
||||
collection_w.delete(
|
||||
expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.delete(expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
# Not Milvus Exception
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@ -84,11 +81,9 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: raise exception
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
error = {ct.err_code: 0, ct.err_msg: "expr cannot be None"}
|
||||
collection_w.delete(
|
||||
expr=None, check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.delete(expr=None, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.parametrize("expr", [1, [], ()])
|
||||
@ -99,11 +94,9 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: raise exception
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
error = {ct.err_code: 0, ct.err_msg: f"expr value {expr} is illegal"}
|
||||
collection_w.delete(
|
||||
expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.delete(expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
@pytest.mark.parametrize("expr", ["12-s", "中文"])
|
||||
@ -114,12 +107,10 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: Raise exception
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
error = {ct.err_code: 1,
|
||||
ct.err_msg: f"failed to create expr plan, expr = {expr}"}
|
||||
collection_w.delete(
|
||||
expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.delete(expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_delete_expr_empty_value(self):
|
||||
@ -129,8 +120,7 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: assert num entities
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
expr = f'{ct.default_int64_field_name} in {[]}'
|
||||
|
||||
# delete empty entities
|
||||
@ -144,8 +134,7 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: Describe num entities by one
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
expr = f'{ct.default_int64_field_name} in {[0]}'
|
||||
del_res, _ = collection_w.delete(expr)
|
||||
assert del_res.delete_count == 1
|
||||
@ -159,8 +148,7 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: num entities unchanged and deleted data will not be queried
|
||||
"""
|
||||
# init collection with default_nb default data
|
||||
collection_w, _, _, ids = self.init_collection_general(
|
||||
prefix, insert_data=True)[0:4]
|
||||
collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True)[0:4]
|
||||
expr = f'{ct.default_int64_field_name} in {ids}'
|
||||
del_res, _ = collection_w.delete(expr)
|
||||
|
||||
@ -178,8 +166,7 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: No exception
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
|
||||
# No exception
|
||||
expr = f'{ct.default_int64_field_name} in {[tmp_nb]}'
|
||||
@ -195,8 +182,7 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: delete existed id, ignore non-existed id
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
expr = f'{ct.default_int64_field_name} in {[0, tmp_nb]}'
|
||||
collection_w.delete(expr=expr)[0]
|
||||
collection_w.query(expr, check_task=CheckTasks.check_query_empty)
|
||||
@ -209,14 +195,12 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: raise exception
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
expr = f'{ct.default_int64_field_name} in {[0.0, 1.0]}'
|
||||
|
||||
# Bad exception message
|
||||
error = {ct.err_code: 1, ct.err_msg: "failed to create expr plan,"}
|
||||
collection_w.delete(
|
||||
expr=expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.delete(expr=expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_delete_expr_mix_values(self):
|
||||
@ -226,14 +210,12 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: raise exception
|
||||
"""
|
||||
# init collection with tmp_nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
expr = f'{ct.default_int64_field_name} in {[0, 1.0]}'
|
||||
|
||||
# Bad exception message
|
||||
error = {ct.err_code: 1, ct.err_msg: "failed to create expr plan"}
|
||||
collection_w.delete(
|
||||
expr=expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.delete(expr=expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L0)
|
||||
def test_delete_partition(self):
|
||||
@ -243,8 +225,7 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: verify partition entities are deleted
|
||||
"""
|
||||
# init collection and partition
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
|
||||
|
||||
# load collection and insert data to partition
|
||||
@ -253,13 +234,11 @@ class TestDeleteParams(TestcaseBase):
|
||||
partition_w.insert(df)
|
||||
|
||||
# delete ids from partition
|
||||
del_res, _ = collection_w.delete(
|
||||
tmp_expr, partition_name=partition_w.name)
|
||||
del_res, _ = collection_w.delete(tmp_expr, partition_name=partition_w.name)
|
||||
assert del_res.delete_count == 1
|
||||
|
||||
# query with deleted id and query with existed id
|
||||
collection_w.query(
|
||||
tmp_expr, check_task=CheckTasks.check_query_empty, partition_names=[partition_w.name])
|
||||
collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty, partition_names=[partition_w.name])
|
||||
res = df.iloc[1:2, :1].to_dict('records')
|
||||
collection_w.query(f'{ct.default_int64_field_name} in [1]',
|
||||
check_task=CheckTasks.check_query_results, check_items={exp_res: res})
|
||||
@ -272,10 +251,8 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: assert delete successfully
|
||||
"""
|
||||
# create, insert with flush, load collection
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
del_res, _ = collection_w.delete(
|
||||
tmp_expr, partition_name=ct.default_partition_name)
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
del_res, _ = collection_w.delete(tmp_expr, partition_name=ct.default_partition_name)
|
||||
assert del_res.delete_count == 1
|
||||
collection_w.num_entities
|
||||
collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
|
||||
@ -289,8 +266,7 @@ class TestDeleteParams(TestcaseBase):
|
||||
expected: Raise exception
|
||||
"""
|
||||
# create, insert with flush, load collection
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
|
||||
error = {ct.err_code: 0,
|
||||
ct.err_msg: f"partition_name value {partition_name} is illegal"}
|
||||
@ -325,8 +301,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: No exception for second deletion
|
||||
"""
|
||||
# init collection with nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
|
||||
# assert delete successfully and no exception
|
||||
collection_w.delete(expr=tmp_expr)
|
||||
@ -345,14 +320,12 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: assert index and deleted id not in search result
|
||||
"""
|
||||
# create collection, insert tmp_nb, flush and load
|
||||
collection_w, vectors = self.init_collection_general(
|
||||
prefix, insert_data=True)[0:2]
|
||||
collection_w, vectors = self.init_collection_general(prefix, insert_data=True)[0:2]
|
||||
|
||||
# create index
|
||||
index_params = {"index_type": "IVF_SQ8",
|
||||
"metric_type": "L2", "params": {"nlist": 64}}
|
||||
collection_w.create_index(
|
||||
ct.default_float_vec_field_name, index_params)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params)
|
||||
assert collection_w.has_index()[0]
|
||||
collection_w.release()
|
||||
collection_w.load()
|
||||
@ -378,8 +351,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Empty search result
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data()
|
||||
insert_res, _ = collection_w.insert(df)
|
||||
|
||||
@ -391,8 +363,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
# create index
|
||||
index_params = {"index_type": "IVF_SQ8",
|
||||
"metric_type": "L2", "params": {"nlist": 64}}
|
||||
collection_w.create_index(
|
||||
ct.default_float_vec_field_name, index_params)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params)
|
||||
assert collection_w.has_index()[0]
|
||||
|
||||
collection_w.load()
|
||||
@ -401,8 +372,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
ct.default_search_params, ct.default_limit)
|
||||
log.debug(search_res[0].ids)
|
||||
# assert search results not contains deleted ids
|
||||
inter = set(
|
||||
insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids))
|
||||
inter = set(insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids))
|
||||
log.debug(inter)
|
||||
assert len(inter) == 0
|
||||
|
||||
@ -418,8 +388,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Empty query result
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
|
||||
@ -435,8 +404,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
# insert id tmp_nb and delete id 0 and tmp_nb
|
||||
df_new = cf.gen_default_dataframe_data(nb=1, start=tmp_nb)
|
||||
collection_w.insert(df_new)
|
||||
collection_w.delete(
|
||||
expr=f'{ct.default_int64_field_name} in {[tmp_nb]}')
|
||||
collection_w.delete(expr=f'{ct.default_int64_field_name} in {[tmp_nb]}')
|
||||
|
||||
# query with id 0 and tmp_nb
|
||||
collection_w.query(expr=f'{ct.default_int64_field_name} in {[0, tmp_nb]}',
|
||||
@ -450,8 +418,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: deleted entity is not in the search result
|
||||
"""
|
||||
# init collection with nb default data
|
||||
collection_w, _, _, ids = self.init_collection_general(
|
||||
prefix, insert_data=True)[0:4]
|
||||
collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True)[0:4]
|
||||
entity, _ = collection_w.query(tmp_expr, output_fields=["%"])
|
||||
search_res, _ = collection_w.search([entity[0][ct.default_float_vec_field_name]],
|
||||
ct.default_float_vec_field_name,
|
||||
@ -481,8 +448,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: delete one entity
|
||||
"""
|
||||
# init collection with nb default data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
expr = f'{ct.default_int64_field_name} in {[0, 0, 0]}'
|
||||
del_res, _ = collection_w.delete(expr)
|
||||
assert del_res.delete_count == 3
|
||||
@ -498,8 +464,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: currently only delete one entity, query get one entity
|
||||
todo delete all entities
|
||||
"""
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(nb=tmp_nb)
|
||||
df[ct.default_int64_field_name] = 0
|
||||
collection_w.insert(df)
|
||||
@ -525,8 +490,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: No exception
|
||||
"""
|
||||
# init collection and partition
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
|
||||
|
||||
collection_w.delete(tmp_expr, partition_name=partition_w.name)
|
||||
@ -539,8 +503,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: raise exception
|
||||
"""
|
||||
# init collection with tmp_nb data
|
||||
collection_w = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
|
||||
|
||||
# raise exception
|
||||
error = {ct.err_code: 1,
|
||||
@ -559,8 +522,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
"""
|
||||
half = tmp_nb // 2
|
||||
# create, insert, flush, load
|
||||
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(
|
||||
half)
|
||||
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)
|
||||
|
||||
# delete entities from another partition
|
||||
expr = f'{ct.default_int64_field_name} in {[0]}'
|
||||
@ -581,8 +543,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: The data only in partition_1 will be deleted
|
||||
"""
|
||||
# init collection and partition
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
|
||||
|
||||
# insert same data into partition_w and default partition
|
||||
@ -609,8 +570,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: versify delete successfully
|
||||
"""
|
||||
# init an auto_id collection and insert tmp_nb data
|
||||
collection_w, _, _, ids = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
|
||||
collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
|
||||
|
||||
# delete with insert ids
|
||||
expr = f'{ct.default_int64_field_name} in {[ids[0]]}'
|
||||
@ -630,8 +590,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Raise exception
|
||||
"""
|
||||
# create collection, insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
assert collection_w.num_entities == tmp_nb
|
||||
@ -643,8 +602,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
# query without loading and raise exception
|
||||
error = {ct.err_code: 1,
|
||||
ct.err_msg: f"collection {collection_w.name} was not loaded into memory"}
|
||||
collection_w.query(
|
||||
expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
collection_w.query(expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error)
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_delete_sealed_segment_without_flush(self):
|
||||
@ -656,8 +614,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: No query result
|
||||
"""
|
||||
# create collection, insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
assert collection_w.num_entities == tmp_nb
|
||||
@ -681,8 +638,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: No query result
|
||||
"""
|
||||
# create collection
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
# load collection and the queryNode watch the insertChannel
|
||||
collection_w.load()
|
||||
# insert data
|
||||
@ -705,8 +661,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Delete successfully and no query result
|
||||
"""
|
||||
# create collection and insert flush data
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
assert collection_w.num_entities == tmp_nb
|
||||
@ -729,8 +684,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: No query result
|
||||
"""
|
||||
# create collection
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
# insert and flush data
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
@ -760,8 +714,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: No query result
|
||||
"""
|
||||
# create collection
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
# insert and flush data
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
@ -786,8 +739,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Empty query result
|
||||
"""
|
||||
# create collection
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
# insert without flush
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
@ -815,8 +767,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Verify that the query gets the newly inserted entity
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
collection_w.load()
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
@ -825,8 +776,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
del_res, _ = collection_w.delete(tmp_expr)
|
||||
log.debug(f'to_query:{to_query}')
|
||||
if to_query:
|
||||
collection_w.query(
|
||||
tmp_expr, check_task=CheckTasks.check_query_empty)
|
||||
collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
|
||||
|
||||
# insert entity with primary key 0
|
||||
df_new = cf.gen_default_dataframe_data(1)
|
||||
@ -857,8 +807,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Verify that the query gets the newly inserted entity
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
|
||||
# insert
|
||||
df = cf.gen_default_dataframe_data(1000)
|
||||
@ -870,14 +819,12 @@ class TestDeleteOperation(TestcaseBase):
|
||||
res = df.iloc[:1, :1].to_dict('records')
|
||||
collection_w.search(data=[df[ct.default_float_vec_field_name][0]], anns_field=ct.default_float_vec_field_name,
|
||||
param=default_search_params, limit=1)
|
||||
collection_w.query(
|
||||
tmp_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res})
|
||||
collection_w.query(tmp_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res})
|
||||
|
||||
# delete
|
||||
collection_w.delete(tmp_expr)
|
||||
if to_query:
|
||||
collection_w.query(
|
||||
tmp_expr, check_task=CheckTasks.check_query_empty)
|
||||
collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
|
||||
|
||||
# re-insert
|
||||
df_new = cf.gen_default_dataframe_data(nb=1)
|
||||
@ -901,8 +848,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: No exception
|
||||
"""
|
||||
# init an auto_id collection and insert tmp_nb data, flush and load
|
||||
collection_w, _, _, ids = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
|
||||
collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
|
||||
|
||||
for del_id in ids:
|
||||
expr = f'{ct.default_int64_field_name} in {[del_id]}'
|
||||
@ -921,8 +867,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: No exception
|
||||
"""
|
||||
# init an auto_id collection and insert tmp_nb data
|
||||
collection_w, _, _, ids = self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
|
||||
collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
|
||||
|
||||
batch = 10
|
||||
for i in range(tmp_nb // batch):
|
||||
@ -950,8 +895,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Empty query result
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), shards_num=1)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
|
||||
@ -990,8 +934,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Empty query result
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), shards_num=1)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
|
||||
@ -1020,12 +963,10 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Delete successfully, query get empty result
|
||||
"""
|
||||
# init collection and load
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), shards_num=1)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
|
||||
index_params = {"index_type": "IVF_SQ8",
|
||||
"metric_type": "L2", "params": {"nlist": 64}}
|
||||
collection_w.create_index(
|
||||
ct.default_float_vec_field_name, index_params)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params)
|
||||
collection_w.load()
|
||||
|
||||
# insert data and delete id 0
|
||||
@ -1039,8 +980,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
# wait for the handoff to complete
|
||||
while True:
|
||||
time.sleep(0.5)
|
||||
segment_infos = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segment_infos = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
if len(segment_infos) > 0 and segment_infos[0].state == SegmentState.Sealed:
|
||||
break
|
||||
# query deleted id
|
||||
@ -1071,8 +1011,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
insert_res, _ = collection_w.insert(df)
|
||||
collection_w.load()
|
||||
|
||||
tt = self.utility_wrap.mkts_from_hybridts(
|
||||
insert_res.timestamp, milliseconds=0.)
|
||||
tt = self.utility_wrap.mkts_from_hybridts(insert_res.timestamp, milliseconds=0.)
|
||||
|
||||
res_before, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(),
|
||||
ct.default_float_vec_field_name,
|
||||
@ -1099,8 +1038,7 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected: Verify result
|
||||
"""
|
||||
# create collection, insert multi times, each with tmp_nb entities
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix))
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
|
||||
multi = 3
|
||||
for i in range(multi):
|
||||
start = i * tmp_nb
|
||||
@ -1132,15 +1070,13 @@ class TestDeleteOperation(TestcaseBase):
|
||||
expected:
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), shards_num=2)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=2)
|
||||
# insert 3000 entities into 3 segments
|
||||
segment_num = 3
|
||||
segment_per_count = 2000
|
||||
ids = []
|
||||
for i in range(segment_num):
|
||||
df = cf.gen_default_dataframe_data(
|
||||
nb=segment_per_count, start=(i * segment_per_count))
|
||||
df = cf.gen_default_dataframe_data(nb=segment_per_count, start=(i * segment_per_count))
|
||||
res, _ = collection_w.insert(df)
|
||||
assert collection_w.num_entities == (i + 1) * segment_per_count
|
||||
ids.extend(res.primary_keys)
|
||||
@ -1167,8 +1103,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection with nb default data
|
||||
collection_w = \
|
||||
self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0]
|
||||
self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0]
|
||||
|
||||
# assert delete successfully and no exception
|
||||
collection_w.delete(expr=default_string_expr)
|
||||
@ -1193,11 +1128,9 @@ class TestDeleteString(TestcaseBase):
|
||||
# create index
|
||||
index_params_one = {"index_type": "IVF_SQ8",
|
||||
"metric_type": "L2", "params": {"nlist": 64}}
|
||||
collection_w.create_index(
|
||||
ct.default_float_vec_field_name, index_params_one, index_name=index_name1)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params_one, index_name=index_name1)
|
||||
index_params_two = {}
|
||||
collection_w.create_index(
|
||||
ct.default_string_field_name, index_params=index_params_two, index_name=index_name2)
|
||||
collection_w.create_index(ct.default_string_field_name, index_params=index_params_two, index_name=index_name2)
|
||||
assert collection_w.has_index(index_name=index_name2)
|
||||
|
||||
collection_w.release()
|
||||
@ -1226,8 +1159,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
df = cf.gen_default_dataframe_data()
|
||||
insert_res, _ = collection_w.insert(df)
|
||||
|
||||
@ -1240,8 +1172,7 @@ class TestDeleteString(TestcaseBase):
|
||||
# create index
|
||||
index_params = {"index_type": "IVF_SQ8",
|
||||
"metric_type": "L2", "params": {"nlist": 64}}
|
||||
collection_w.create_index(
|
||||
ct.default_float_vec_field_name, index_params)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params)
|
||||
assert collection_w.has_index()[0]
|
||||
|
||||
collection_w.load()
|
||||
@ -1250,8 +1181,7 @@ class TestDeleteString(TestcaseBase):
|
||||
ct.default_search_params, ct.default_limit)
|
||||
log.debug(search_res[0].ids)
|
||||
# assert search results not contains deleted ids
|
||||
inter = set(
|
||||
insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids))
|
||||
inter = set(insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids))
|
||||
log.debug(inter)
|
||||
assert len(inter) == 0
|
||||
|
||||
@ -1268,8 +1198,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
|
||||
@ -1286,8 +1215,7 @@ class TestDeleteString(TestcaseBase):
|
||||
# insert id tmp_nb and delete id 0 and tmp_nb
|
||||
df_new = cf.gen_default_dataframe_data(nb=1, start=tmp_nb)
|
||||
collection_w.insert(df_new)
|
||||
collection_w.delete(
|
||||
expr=f'{ct.default_string_field_name} in ["tmp_nb"]')
|
||||
collection_w.delete(expr=f'{ct.default_string_field_name} in ["tmp_nb"]')
|
||||
|
||||
# query with id 0 and tmp_nb
|
||||
collection_w.query(expr=f'{ct.default_string_field_name} in ["0", "tmp_nb"]',
|
||||
@ -1303,8 +1231,7 @@ class TestDeleteString(TestcaseBase):
|
||||
# init collection with nb default data
|
||||
collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True,
|
||||
primary_field=ct.default_string_field_name)[0:4]
|
||||
entity, _ = collection_w.query(
|
||||
default_string_expr, output_fields=["%"])
|
||||
entity, _ = collection_w.query(default_string_expr, output_fields=["%"])
|
||||
search_res, _ = collection_w.search([entity[0][ct.default_float_vec_field_name]],
|
||||
ct.default_float_vec_field_name,
|
||||
ct.default_search_params, ct.default_limit)
|
||||
@ -1335,8 +1262,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection with nb default data
|
||||
collection_w = \
|
||||
self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0]
|
||||
self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0]
|
||||
expr = f'{ct.default_string_field_name} in ["0", "0", "0"]'
|
||||
del_res, _ = collection_w.delete(expr)
|
||||
assert del_res.delete_count == 3
|
||||
@ -1353,8 +1279,7 @@ class TestDeleteString(TestcaseBase):
|
||||
todo delete all entities
|
||||
"""
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
df = cf.gen_default_dataframe_data(nb=tmp_nb)
|
||||
df[ct.default_string_field_name] = "0"
|
||||
collection_w.insert(df)
|
||||
@ -1383,8 +1308,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection and partition
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
|
||||
|
||||
# insert same data into partition_w and default partition
|
||||
@ -1415,8 +1339,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# create collection, insert data without flush
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
assert collection_w.num_entities == tmp_nb
|
||||
@ -1442,8 +1365,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# create collection
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
# load collection and the queryNode watch the insertChannel
|
||||
collection_w.load()
|
||||
# insert data
|
||||
@ -1468,8 +1390,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# create collection and insert flush data
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
assert collection_w.num_entities == tmp_nb
|
||||
@ -1494,8 +1415,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# create collection
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
# insert and flush data
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
@ -1522,8 +1442,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# create collection
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema)
|
||||
# insert without flush
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
@ -1600,8 +1519,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema, shards_num=1)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema, shards_num=1)
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
|
||||
@ -1644,8 +1562,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema, shards_num=1)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema, shards_num=1)
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
collection_w.insert(df)
|
||||
|
||||
@ -1676,12 +1593,10 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection and load
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
name=cf.gen_unique_str(prefix), schema=schema, shards_num=1)
|
||||
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), schema=schema, shards_num=1)
|
||||
index_params = {"index_type": "IVF_SQ8",
|
||||
"metric_type": "L2", "params": {"nlist": 64}}
|
||||
collection_w.create_index(
|
||||
ct.default_float_vec_field_name, index_params)
|
||||
collection_w.create_index(ct.default_float_vec_field_name, index_params)
|
||||
collection_w.load()
|
||||
|
||||
# insert data and delete id 0
|
||||
@ -1695,8 +1610,7 @@ class TestDeleteString(TestcaseBase):
|
||||
# wait for the handoff to complete
|
||||
while True:
|
||||
time.sleep(0.5)
|
||||
segment_infos = self.utility_wrap.get_query_segment_info(collection_w.name)[
|
||||
0]
|
||||
segment_infos = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
if len(segment_infos) > 0 and segment_infos[0].state == SegmentState.Sealed:
|
||||
break
|
||||
# query deleted id
|
||||
@ -1713,14 +1627,12 @@ class TestDeleteString(TestcaseBase):
|
||||
expected: search successfully
|
||||
"""
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), schema=schema)
|
||||
df = cf.gen_default_dataframe_data(tmp_nb)
|
||||
insert_res, _ = collection_w.insert(df)
|
||||
collection_w.load()
|
||||
|
||||
tt = self.utility_wrap.mkts_from_hybridts(
|
||||
insert_res.timestamp, milliseconds=0.)
|
||||
tt = self.utility_wrap.mkts_from_hybridts(insert_res.timestamp, milliseconds=0.)
|
||||
|
||||
res_before, _ = collection_w.search(df[ct.default_float_vec_field_name][:1].to_list(),
|
||||
ct.default_float_vec_field_name,
|
||||
@ -1749,8 +1661,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# create collection, insert multi times, each with tmp_nb entities
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), schema=schema)
|
||||
multi = 3
|
||||
for i in range(multi):
|
||||
start = i * tmp_nb
|
||||
@ -1781,8 +1692,7 @@ class TestDeleteString(TestcaseBase):
|
||||
expected: Raise exception
|
||||
"""
|
||||
collection_w = \
|
||||
self.init_collection_general(
|
||||
prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0]
|
||||
self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, primary_field=ct.default_string_field_name)[0]
|
||||
collection_w.load()
|
||||
error = {ct.err_code: 0,
|
||||
ct.err_msg: f"failed to create expr plan, expr = {default_invaild_string_exp}"}
|
||||
@ -1803,8 +1713,7 @@ class TestDeleteString(TestcaseBase):
|
||||
"""
|
||||
# init collection and insert data without flush
|
||||
schema = cf.gen_string_pk_default_collection_schema()
|
||||
collection_w = self.init_collection_wrap(
|
||||
cf.gen_unique_str(prefix), schema=schema)
|
||||
collection_w = self.init_collection_wrap(cf.gen_unique_str(prefix), schema=schema)
|
||||
|
||||
# insert
|
||||
df = cf.gen_default_dataframe_data(1000)
|
||||
@ -1817,8 +1726,7 @@ class TestDeleteString(TestcaseBase):
|
||||
default_search_params = {"metric_type": "L2", "params": {"nprobe": 16}}
|
||||
collection_w.search(data=[df[ct.default_float_vec_field_name][0]], anns_field=ct.default_float_vec_field_name,
|
||||
param=default_search_params, limit=1)
|
||||
collection_w.query(
|
||||
default_string_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res})
|
||||
collection_w.query(default_string_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res})
|
||||
|
||||
# delete
|
||||
collection_w.delete(default_string_expr)
|
||||
|
||||
@ -1423,13 +1423,11 @@ class TestUtilityAdvanced(TestcaseBase):
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_get_sealed_query_segment_info(self):
|
||||
"""
|
||||
target: test getting sealed query segment info of collection with data
|
||||
target: test getting sealed query segment info of collection without index
|
||||
method: init a collection, insert data, flush, load, and get query segment info
|
||||
expected:
|
||||
1. length of segment is greater than 0
|
||||
2. the sum num_rows of each segment is equal to num of entities
|
||||
1. length of segment is equal to 0
|
||||
"""
|
||||
pytest.skip("QueryCoord treat all segments without index as growing segments")
|
||||
c_name = cf.gen_unique_str(prefix)
|
||||
collection_w = self.init_collection_wrap(name=c_name)
|
||||
nb = 3000
|
||||
@ -1438,15 +1436,7 @@ class TestUtilityAdvanced(TestcaseBase):
|
||||
collection_w.num_entities
|
||||
collection_w.load()
|
||||
res, _ = self.utility_wrap.get_query_segment_info(c_name)
|
||||
assert len(res) > 0
|
||||
segment_ids = []
|
||||
cnt = 0
|
||||
for r in res:
|
||||
log.info(f"segmentID {r.segmentID}: state: {r.state}; num_rows: {r.num_rows} ")
|
||||
if r.segmentID not in segment_ids:
|
||||
segment_ids.append(r.segmentID)
|
||||
cnt += r.num_rows
|
||||
assert cnt == nb
|
||||
assert len(res) == 0
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L1)
|
||||
def test_get_sealed_query_segment_info_after_create_index(self):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user