升级 sdk 2.4.2 并添加 一致性级别

This commit is contained in:
xgc 2024-07-20 00:56:12 +08:00
parent 4835bf5ebd
commit 86686ccee4
12 changed files with 193 additions and 45 deletions

View File

@ -28,7 +28,7 @@
<dependency>
<groupId>io.milvus</groupId>
<artifactId>milvus-sdk-java</artifactId>
<version>2.4.0</version>
<version>2.4.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
@ -58,6 +58,12 @@
<version>${mica-auto.vaersion}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.47</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@ -1,5 +1,8 @@
package org.dromara.milvus.plus.annotation;
import io.milvus.v2.common.ConsistencyLevel;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@ -20,4 +23,6 @@ public @interface MilvusCollection {
*
*/
String[] alias() default {};
ConsistencyLevel level() default ConsistencyLevel.BOUNDED;
}

View File

@ -2,6 +2,7 @@ package org.dromara.milvus.plus.builder;
import io.milvus.exception.MilvusException;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.CreateCollectionReq;
@ -14,6 +15,7 @@ public class CollectionSchemaBuilder {
private final String collectionName;
private final MilvusClientV2 wrapper;
private final CreateCollectionReq.CollectionSchema schema;
private ConsistencyLevel consistencyLevel=ConsistencyLevel.BOUNDED;
public CollectionSchemaBuilder(String collectionName, MilvusClientV2 wrapper) {
this.collectionName = collectionName;
@ -31,11 +33,19 @@ public class CollectionSchemaBuilder {
}
return this;
}
public void addConsistencyLevel(ConsistencyLevel level){
this.consistencyLevel=level;
}
public CreateCollectionReq.FieldSchema getField(String fileName){
return schema.getField(fileName);
}
public void createSchema() throws MilvusException {
CreateCollectionReq req=CreateCollectionReq.builder().collectionName(this.collectionName).collectionSchema(this.schema).build();
CreateCollectionReq req=CreateCollectionReq.builder().
collectionName(this.collectionName).
collectionSchema(this.schema).
consistencyLevel(this.consistencyLevel)
.build();
wrapper.createCollection(req);
}
public void createIndex(java.util.List<IndexParam> indexParams) throws MilvusException {

View File

@ -3,6 +3,7 @@ package org.dromara.milvus.plus.converter;
import com.google.common.collect.Lists;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.collection.request.AddFieldReq;
import io.milvus.v2.service.collection.request.GetLoadStateReq;
@ -22,6 +23,8 @@ import org.dromara.milvus.plus.model.MilvusEntity;
import org.springframework.util.CollectionUtils;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -66,6 +69,9 @@ public class MilvusConverter {
// 集合名称
String collectionName = collectionAnnotation.name();
milvus.setCollectionName(collectionName);
//一致性级别
ConsistencyLevel level = collectionAnnotation.level();
milvus.setConsistencyLevel(level);
// 集合别名
if (collectionAnnotation.alias().length > 0) {
milvus.setAlias(Arrays.asList(collectionAnnotation.alias()));
@ -105,7 +111,13 @@ public class MilvusConverter {
.filter(StringUtils::isNotEmpty).ifPresent(builder::description);
// 处理向量字段的维度
Optional.of(fieldAnnotation.dimension())
.filter(dimension -> dimension > 0).ifPresent(builder::dimension);
.filter(dimension -> dimension > 0).ifPresent(
dimension -> {
builder.dimension(dimension);
if (!isListFloat(field)) {
throw new IllegalArgumentException("Vector field type mismatch");
}
} );
// 数组字段的最大长度
Optional.of(fieldAnnotation.maxLength())
.filter(maxLength -> maxLength > 0).ifPresent(builder::maxLength);
@ -120,7 +132,6 @@ public class MilvusConverter {
// 设置Milvus字段和索引参数
milvus.setMilvusFields(milvusFields);
milvus.setIndexParams(indexParams);
// 缓存转换结果和集合信息
ConversionCache conversionCache = new ConversionCache();
conversionCache.setMilvusEntity(milvus);
@ -212,6 +223,7 @@ public class MilvusConverter {
milvusEntity.getCollectionName(), client
);
schemaBuilder.addField(milvusEntity.getMilvusFields().toArray(new AddFieldReq[0]));
schemaBuilder.addConsistencyLevel(milvusEntity.getConsistencyLevel());
log.info("-------create schema---------");
schemaBuilder.createSchema();
schemaBuilder.createIndex(indexParams);
@ -274,4 +286,36 @@ public class MilvusConverter {
client.loadPartitions(loadPartitionsReq);
log.info("load partition--{}", milvusEntity.getPartitionName());
}
/**
* 判断字段是否是 List<Float> 类型
*
* @param field 要检查的字段
* @return 如果字段是 List<Float> 类型返回 true否则返回 false
*/
public static boolean isListFloat(Field field) {
// 确保字段不是 null
if (field == null) {
return false;
}
// 获取字段的泛型类型
Type genericType = field.getGenericType();
// 检查是否是参数化类型
if (!(genericType instanceof ParameterizedType)) {
return false;
}
// 类型转换
ParameterizedType parameterizedType = (ParameterizedType) genericType;
// 获取参数化类型的原始类型
Type rawType = parameterizedType.getRawType();
// 检查原始类型是否是 List.class
if (!(rawType instanceof Class) || !List.class.isAssignableFrom((Class<?>) rawType)) {
return false;
}
// 获取类型参数
Type[] typeArguments = parameterizedType.getActualTypeArguments();
// 检查类型参数是否正好有一个并且是 Float.TYPE
return typeArguments.length == 1 && typeArguments[0] == Float.class;
}
}

View File

@ -1,6 +1,8 @@
package org.dromara.milvus.plus.converter;
import com.alibaba.fastjson2.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonObject;
import io.milvus.v2.service.vector.response.GetResp;
import io.milvus.v2.service.vector.response.QueryResp;
import io.milvus.v2.service.vector.response.SearchResp;
@ -42,14 +44,19 @@ public class SearchRespConverter {
Map<String, Object> entityMap = new HashMap<>();
for (Map.Entry<String, Object> entry : searchResult.getEntity().entrySet()) {
String key = propertyCache.findKeyByValue(entry.getKey());
entityMap.put(key, entry.getValue());
Object value = entry.getValue();
if (value instanceof JsonObject) {
JSONObject v = JSONObject.parseObject(value.toString());
entityMap.put(key,v);
} else {
entityMap.put(key,value);
}
}
// 将转换后的Map转换为Java实体类T
T entity = objectMapper.convertValue(entityMap, entityType);
MilvusResult<T> tMilvusResult = new MilvusResult<>();
tMilvusResult.setId(searchResult.getId());
tMilvusResult.setDistance(searchResult.getDistance());
tMilvusResult.setDistance(searchResult.getScore());
tMilvusResult.setEntity(entity);
return tMilvusResult;
})
@ -107,7 +114,13 @@ public class SearchRespConverter {
// 通过属性缓存转换键名以适应Java实体的字段命名
for (Map.Entry<String, Object> entry : entityMap.entrySet()) {
String key = propertyCache.findKeyByValue(entry.getKey());
entityMap2.put(key,entry.getValue());
Object value = entry.getValue();
if (value instanceof JsonObject) {
JSONObject v = JSONObject.parseObject(value.toString());
entityMap2.put(key,v);
} else {
entityMap2.put(key,value);
}
}
// 使用转换工具将映射后的Map转换为指定类型的实体
T entity = objectMapper.convertValue(entityMap2, entityType);

View File

@ -203,8 +203,6 @@ public abstract class ConditionBuilder<T> {
return this;
}
public ConditionBuilder<T> eq(FieldFunction<T,?> fieldName, Object value) {
return addFilter(fieldName, "==", value);
}

View File

@ -2,6 +2,8 @@ package org.dromara.milvus.plus.core.conditions;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.milvus.exception.MilvusException;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.vector.request.InsertReq;
@ -9,7 +11,6 @@ import io.milvus.v2.service.vector.response.InsertResp;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.dromara.milvus.plus.cache.CollectionToPrimaryCache;
import org.dromara.milvus.plus.cache.ConversionCache;
@ -20,6 +21,7 @@ import org.dromara.milvus.plus.model.vo.MilvusResp;
import org.dromara.milvus.plus.util.IdWorkerUtils;
import java.util.*;
import java.util.stream.Collectors;
/**
* 构建器内部类用于构建insert请求
@ -74,10 +76,12 @@ public class LambdaInsertWrapper<T> extends AbstractChainWrapper<T> implements
private MilvusResp<InsertResp> insert(List<JSONObject> jsonObjects){
log.info("insert data--->{}", JSON.toJSONString(jsonObjects));
JsonParser jsonParser = new JsonParser();
log.info("update data--->{}", JSON.toJSONString(jsonObjects));
List<JsonObject> objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList());
InsertReq.InsertReqBuilder<?, ?> builder = InsertReq.builder()
.collectionName(collectionName)
.data(jsonObjects);
.data(objects);
if(StringUtils.isNotEmpty(partitionName)){
builder.partitionName(partitionName);
}
@ -105,11 +109,6 @@ public class LambdaInsertWrapper<T> extends AbstractChainWrapper<T> implements
for (Map.Entry<String, Object> entry : propertiesMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (!ClassUtils.isPrimitiveOrWrapper(value.getClass()) && !(value instanceof String)&& !(value instanceof Collection)) {
//对象类型转成JSONObject
String jv = JSONObject.toJSONString(value);
value = JSONObject.parseObject(jv);
}
String tk = propertyCache.functionToPropertyMap.get(key);
jsonObject.put(tk,value);
}

View File

@ -1,12 +1,14 @@
package org.dromara.milvus.plus.core.conditions;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.JSON;
import io.milvus.exception.MilvusException;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.service.vector.request.GetReq;
import io.milvus.v2.service.vector.request.QueryReq;
import io.milvus.v2.service.vector.request.SearchReq;
import io.milvus.v2.service.vector.request.data.BaseVector;
import io.milvus.v2.service.vector.request.data.FloatVec;
import io.milvus.v2.service.vector.response.GetResp;
import io.milvus.v2.service.vector.response.QueryResp;
import io.milvus.v2.service.vector.response.SearchResp;
@ -15,6 +17,8 @@ import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.dromara.milvus.plus.cache.ConversionCache;
import org.dromara.milvus.plus.cache.MilvusCache;
import org.dromara.milvus.plus.converter.MilvusConverter;
import org.dromara.milvus.plus.converter.SearchRespConverter;
import org.dromara.milvus.plus.core.FieldFunction;
import org.dromara.milvus.plus.model.vo.MilvusResp;
@ -41,7 +45,7 @@ public class LambdaQueryWrapper<T> extends AbstractChainWrapper<T> implements Wr
private String annsField;
private int topK;
private List<List<?>> vectors = new ArrayList<>();
private List<BaseVector> vectors = new ArrayList<>();
private long offset;
private long limit;
private int roundDecimal = -1;
@ -77,6 +81,10 @@ public class LambdaQueryWrapper<T> extends AbstractChainWrapper<T> implements Wr
this.partitionNames.addAll(Arrays.asList(partitionName));
return this;
}
public LambdaQueryWrapper<T> consistencyLevel(ConsistencyLevel level){
this.consistencyLevel=level;
return this;
}
public LambdaQueryWrapper<T> partition(FieldFunction<T, ?>... partitionName) {
Iterator<FieldFunction<T, ?>> iterator = new ArrayIterator<>(partitionName);
@ -402,16 +410,34 @@ public class LambdaQueryWrapper<T> extends AbstractChainWrapper<T> implements Wr
this.annsField=annsField.getFieldName(annsField);
return this;
}
public LambdaQueryWrapper<T> vector(List<?> vector) {
public LambdaQueryWrapper<T> vector(List<? extends Float> vector) {
BaseVector baseVector = new FloatVec((List<Float>) vector);
vectors.add(baseVector);
return this;
}
public LambdaQueryWrapper<T> vector(String annsField, List<? extends Float> vector) {
this.annsField=annsField;
BaseVector baseVector = new FloatVec((List<Float>) vector);
vectors.add(baseVector);
return this;
}
public LambdaQueryWrapper<T> vector(FieldFunction<T,?> annsField, List<? extends Float> vector) {
this.annsField=annsField.getFieldName(annsField);
BaseVector baseVector = new FloatVec((List<Float>) vector);
vectors.add(baseVector);
return this;
}
public LambdaQueryWrapper<T> vector(BaseVector vector) {
vectors.add(vector);
return this;
}
public LambdaQueryWrapper<T> vector(String annsField, List<?> vector) {
public LambdaQueryWrapper<T> vector(String annsField,BaseVector vector) {
this.annsField=annsField;
vectors.add(vector);
return this;
}
public LambdaQueryWrapper<T> vector(FieldFunction<T,?> annsField, List<?> vector) {
public LambdaQueryWrapper<T> vector(FieldFunction<T,?> annsField, BaseVector vector) {
this.annsField=annsField.getFieldName(annsField);
vectors.add(vector);
return this;
@ -431,10 +457,12 @@ public class LambdaQueryWrapper<T> extends AbstractChainWrapper<T> implements Wr
private SearchReq buildSearch() {
SearchReq.SearchReqBuilder<?, ?> builder = SearchReq.builder()
.collectionName(StringUtils.isNotBlank(collectionAlias) ? collectionAlias : collectionName);
if (annsField != null && !annsField.isEmpty()) {
builder.annsField(annsField);
}
if(consistencyLevel!=null){
builder.consistencyLevel(consistencyLevel);
}
if (!vectors.isEmpty()) {
builder.data(vectors);
}
@ -497,20 +525,57 @@ public class LambdaQueryWrapper<T> extends AbstractChainWrapper<T> implements Wr
*
* @return 搜索响应对象
*/
public MilvusResp<List<MilvusResult<T>>> query() throws MilvusException {
// public MilvusResp<List<MilvusResult<T>>> query() throws MilvusException {
// if (!vectors.isEmpty()) {
// SearchReq searchReq = buildSearch();
// log.info("build search param-->{}", JSON.toJSONString(searchReq));
// SearchResp search = client.search(searchReq);
// return SearchRespConverter.convertSearchRespToMilvusResp(search, entityType);
// } else {
// QueryReq queryReq = buildQuery();
// log.info("build query param-->{}", JSON.toJSONString(queryReq));
// QueryResp query = client.query(queryReq);
// return SearchRespConverter.convertGetRespToMilvusResp(query, entityType);
// }
// }
public MilvusResp<List<MilvusResult<T>>> query() throws MilvusException{
return query(1);
}
private MilvusResp<List<MilvusResult<T>>> query(int attempt) throws MilvusException{
try {
return executeQuery(attempt);
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("collection not loaded") && attempt < maxRetries) {
log.warn("Attempt {}: Collection not loaded, attempting to reload and retry.", attempt);
handleCollectionNotLoaded();
return query(attempt + 1);
} else {
throw new RuntimeException(e);
}
}
}
private MilvusResp<List<MilvusResult<T>>> executeQuery(int attempt) throws Exception {
if (!vectors.isEmpty()) {
SearchReq searchReq = buildSearch();
log.info("build search param-->{}", JSON.toJSONString(searchReq));
SearchResp search = client.search(searchReq);
return SearchRespConverter.convertSearchRespToMilvusResp(search, entityType);
log.info("Attempt {}: Build search param--> {}", attempt, JSON.toJSONString(searchReq));
SearchResp searchResp = client.search(searchReq);
return SearchRespConverter.convertSearchRespToMilvusResp(searchResp, entityType);
} else {
QueryReq queryReq = buildQuery();
log.info("build query param-->{}", JSON.toJSONString(queryReq));
QueryResp query = client.query(queryReq);
return SearchRespConverter.convertGetRespToMilvusResp(query, entityType);
log.info("Attempt {}: Build query param--> {}", attempt, JSON.toJSONString(queryReq));
QueryResp queryResp = client.query(queryReq);
return SearchRespConverter.convertGetRespToMilvusResp(queryResp, entityType);
}
}
private void handleCollectionNotLoaded() {
ConversionCache cache = MilvusCache.milvusCache.get(entityType.getName());
MilvusConverter.loadStatus(cache.getMilvusEntity(), client);
}
// 定义最大重试次数的常量
private static final int maxRetries = 2;
public MilvusResp<List<MilvusResult<T>>> query(FieldFunction<T, ?>... outputFields) throws MilvusException {
List<String> otf = new ArrayList<>();
for (FieldFunction<T, ?> outputField : outputFields) {

View File

@ -2,6 +2,8 @@ package org.dromara.milvus.plus.core.conditions;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.milvus.exception.MilvusException;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.vector.request.QueryReq;
@ -22,6 +24,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 构建器内部类用于构建update请求
@ -427,10 +430,12 @@ public class LambdaUpdateWrapper<T> extends AbstractChainWrapper<T> implements W
}
private MilvusResp<UpsertResp> update(List<JSONObject> jsonObjects) {
JsonParser jsonParser = new JsonParser();
log.info("update data--->{}", JSON.toJSONString(jsonObjects));
List<JsonObject> objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList());
UpsertReq.UpsertReqBuilder<?, ?> builder = UpsertReq.builder()
.collectionName(collectionName)
.data(jsonObjects);
.data(objects);
if (StringUtils.isNotEmpty(partitionName)) {
builder.partitionName(partitionName);
}

View File

@ -1,5 +1,6 @@
package org.dromara.milvus.plus.model;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.common.IndexParam;
import io.milvus.v2.service.collection.request.AddFieldReq;
import lombok.Data;
@ -16,5 +17,5 @@ public class MilvusEntity {
private List<IndexParam> indexParams;
private List<AddFieldReq> milvusFields;
private List<String> partitionName;
private ConsistencyLevel consistencyLevel;
}

View File

@ -1,10 +1,11 @@
package org.dromara.milvus.plus.service;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import io.milvus.exception.MilvusException;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.service.vector.request.*;
import io.milvus.v2.service.vector.request.data.BaseVector;
import io.milvus.v2.service.vector.response.*;
import java.util.Collections;
@ -110,7 +111,7 @@ public interface IVecMService {
*/
default InsertResp insert(
String collectionName,
List<JSONObject> data,
List<JsonObject> data,
String partitionName
) throws MilvusException {
MilvusClientV2 client = getClient();
@ -129,7 +130,7 @@ public interface IVecMService {
* @return InsertResp对象包含插入实体的数量信息
* @throws MilvusException 如果操作过程中发生错误
*/
default InsertResp insert(String collectionName, JSONObject data) throws MilvusException {
default InsertResp insert(String collectionName, JsonObject data) throws MilvusException {
return insert(collectionName, Collections.singletonList(data), null);
}
@ -219,7 +220,7 @@ public interface IVecMService {
int topK,
String filter,
List<String> outputFields,
List<Object> data,
List<BaseVector> data,
long offset,
long limit,
int roundDecimal,
@ -258,7 +259,7 @@ public interface IVecMService {
* @return 搜索结果对象列表
* @throws MilvusException 如果操作过程中发生错误
*/
default SearchResp search(String collectionName, List<Object> data, int topK) throws MilvusException {
default SearchResp search(String collectionName, List<BaseVector> data, int topK) throws MilvusException {
return search(collectionName, Collections.emptyList(), null, topK, "", Collections.emptyList(), data, 0, 0, -1, Collections.emptyMap(), 0, 0, ConsistencyLevel.BOUNDED, false);
}
@ -273,7 +274,7 @@ public interface IVecMService {
default UpsertResp upsert(
String collectionName,
String partitionName,
List<JSONObject> data) throws MilvusException {
List<JsonObject> data) throws MilvusException {
MilvusClientV2 client = getClient();
UpsertReq upsertReq = UpsertReq.builder()
.collectionName(collectionName)
@ -290,7 +291,7 @@ public interface IVecMService {
* @return UpsertResp对象包含插入或更新实体的数量信息
* @throws MilvusException 如果操作过程中发生错误
*/
default UpsertResp upsert(String collectionName, JSONObject data) throws MilvusException {
default UpsertResp upsert(String collectionName, JsonObject data) throws MilvusException {
return upsert(collectionName, null, Collections.singletonList(data));
}

View File

@ -1,5 +1,6 @@
package org.dromara.milvus.demo.model;
import io.milvus.v2.common.ConsistencyLevel;
import io.milvus.v2.common.DataType;
import io.milvus.v2.common.IndexParam;
import lombok.Data;
@ -8,7 +9,7 @@ import org.dromara.milvus.plus.annotation.*;
import java.util.List;
@Data
@MilvusCollection(name = "face_collection")
@MilvusCollection(name = "face_collection",level = ConsistencyLevel.STRONG)
@MilvusPartition(name = {"face_001", "face_002"})
@GenerateMilvusMapper
public class Face {