diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/AbstractChainWrapper.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/AbstractChainWrapper.java index a100e5c..5c8fbce 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/AbstractChainWrapper.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/AbstractChainWrapper.java @@ -1,13 +1,21 @@ package org.dromara.milvus.plus.core.conditions; +import io.milvus.v2.client.MilvusClientV2; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; +import org.dromara.milvus.plus.cache.ConversionCache; +import org.dromara.milvus.plus.cache.MilvusCache; +import org.dromara.milvus.plus.converter.MilvusConverter; +import org.dromara.milvus.plus.model.vo.MilvusResp; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.function.Supplier; @EqualsAndHashCode(callSuper = true) @Data +@Slf4j public abstract class AbstractChainWrapper extends ConditionBuilder{ protected static class ArrayIterator implements Iterator { private final T[] array; @@ -27,4 +35,27 @@ public abstract class AbstractChainWrapper extends ConditionBuilder{ return array[index++]; } } + // 定义最大重试次数的常量 + public static final int maxRetries = 2; + protected MilvusResp executeWithRetry(Supplier> action, String errorMessage, int maxRetries,Class entityType, MilvusClientV2 client) { + int attempt = 1; + while (true) { + try { + return action.get(); // 尝试执行操作 + } catch (Exception e) { + if (e.getMessage() != null && e.getMessage().contains(errorMessage) && attempt < maxRetries) { + log.warn("Attempt {}: {} - attempting to retry.", attempt, errorMessage); + handleCollectionNotLoaded(entityType,client); + attempt++; + } else { + throw new RuntimeException(e); // 如果不是预期的错误或者重试次数达到上限,则抛出异常 + } + } + } + } + protected void handleCollectionNotLoaded(Class entityType, MilvusClientV2 client) { + ConversionCache cache = MilvusCache.milvusCache.get(entityType.getName()); + MilvusConverter.loadStatus(cache.getMilvusEntity(), client); + } + } diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaDeleteWrapper.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaDeleteWrapper.java index 95a5766..a951638 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaDeleteWrapper.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaDeleteWrapper.java @@ -366,13 +366,21 @@ public class LambdaDeleteWrapper extends AbstractChainWrapper implements * @return 搜索响应对象 */ public MilvusResp remove() throws MilvusException { - DeleteReq deleteReq = build(); - log.info("build remove param-->{}", JSON.toJSONString(deleteReq)); - DeleteResp delete = client.delete(deleteReq); - MilvusResp resp = new MilvusResp<>(); - resp.setData(delete); - resp.setSuccess(true); - return resp; + return executeWithRetry( + () -> { + DeleteReq deleteReq = build(); + log.info("build remove param-->{}", JSON.toJSONString(deleteReq)); + DeleteResp delete = client.delete(deleteReq); + MilvusResp resp = new MilvusResp<>(); + resp.setData(delete); + resp.setSuccess(true); + return resp; + }, + "collection not loaded", + maxRetries, + entityType, + client + ); } public MilvusResp removeById(Serializable... ids) throws MilvusException { diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaInsertWrapper.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaInsertWrapper.java index ad2a4f7..4e6f057 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaInsertWrapper.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaInsertWrapper.java @@ -76,22 +76,30 @@ public class LambdaInsertWrapper extends AbstractChainWrapper implements private MilvusResp insert(List jsonObjects){ - JsonParser jsonParser = new JsonParser(); - log.info("update data--->{}", JSON.toJSONString(jsonObjects)); - List objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList()); - InsertReq.InsertReqBuilder builder = InsertReq.builder() - .collectionName(collectionName) - .data(objects); - if(StringUtils.isNotEmpty(partitionName)){ - builder.partitionName(partitionName); - } - InsertReq insertReq = builder - .build(); - InsertResp insert = client.insert(insertReq); - MilvusResp resp = new MilvusResp<>(); - resp.setData(insert); - resp.setSuccess(true); - return resp; + return executeWithRetry( + () -> { + JsonParser jsonParser = new JsonParser(); + log.info("update data--->{}", JSON.toJSONString(jsonObjects)); + List objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList()); + InsertReq.InsertReqBuilder builder = InsertReq.builder() + .collectionName(collectionName) + .data(objects); + if(StringUtils.isNotEmpty(partitionName)){ + builder.partitionName(partitionName); + } + InsertReq insertReq = builder + .build(); + InsertResp insert = client.insert(insertReq); + MilvusResp resp = new MilvusResp<>(); + resp.setData(insert); + resp.setSuccess(true); + return resp; + }, + "collection not loaded", + maxRetries, + entityType, + client + ); } public MilvusResp insert(T ... entity) throws MilvusException { Iterator iterator = new ArrayIterator<>(entity); diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaQueryWrapper.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaQueryWrapper.java index abc5ba5..6722b20 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaQueryWrapper.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaQueryWrapper.java @@ -17,8 +17,6 @@ import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.dromara.milvus.plus.cache.ConversionCache; -import org.dromara.milvus.plus.cache.MilvusCache; -import org.dromara.milvus.plus.converter.MilvusConverter; import org.dromara.milvus.plus.converter.SearchRespConverter; import org.dromara.milvus.plus.core.FieldFunction; import org.dromara.milvus.plus.model.vo.MilvusResp; @@ -525,56 +523,27 @@ public class LambdaQueryWrapper extends AbstractChainWrapper implements Wr * * @return 搜索响应对象 */ -// public MilvusResp>> query() throws MilvusException { -// if (!vectors.isEmpty()) { -// SearchReq searchReq = buildSearch(); -// log.info("build search param-->{}", JSON.toJSONString(searchReq)); -// SearchResp search = client.search(searchReq); -// return SearchRespConverter.convertSearchRespToMilvusResp(search, entityType); -// } else { -// QueryReq queryReq = buildQuery(); -// log.info("build query param-->{}", JSON.toJSONString(queryReq)); -// QueryResp query = client.query(queryReq); -// return SearchRespConverter.convertGetRespToMilvusResp(query, entityType); -// } -// } public MilvusResp>> query() throws MilvusException{ - return query(1); + return executeWithRetry( + () -> { + if (!vectors.isEmpty()) { + SearchReq searchReq = buildSearch(); + log.info("Build search param--> {}", JSON.toJSONString(searchReq)); + SearchResp searchResp = client.search(searchReq); + return SearchRespConverter.convertSearchRespToMilvusResp(searchResp, entityType); + } else { + QueryReq queryReq = buildQuery(); + log.info("Build query param--> {}", JSON.toJSONString(queryReq)); + QueryResp queryResp = client.query(queryReq); + return SearchRespConverter.convertGetRespToMilvusResp(queryResp, entityType); + } + }, + "collection not loaded", + maxRetries, + entityType, + client + ); } - private MilvusResp>> query(int attempt) throws MilvusException{ - try { - return executeQuery(attempt); - } catch (Exception e) { - if (e.getMessage() != null && e.getMessage().contains("collection not loaded") && attempt < maxRetries) { - log.warn("Attempt {}: Collection not loaded, attempting to reload and retry.", attempt); - handleCollectionNotLoaded(); - return query(attempt + 1); - } else { - throw new RuntimeException(e); - } - } - } - private MilvusResp>> executeQuery(int attempt) throws Exception { - if (!vectors.isEmpty()) { - SearchReq searchReq = buildSearch(); - log.info("Attempt {}: Build search param--> {}", attempt, JSON.toJSONString(searchReq)); - SearchResp searchResp = client.search(searchReq); - return SearchRespConverter.convertSearchRespToMilvusResp(searchResp, entityType); - } else { - QueryReq queryReq = buildQuery(); - log.info("Attempt {}: Build query param--> {}", attempt, JSON.toJSONString(queryReq)); - QueryResp queryResp = client.query(queryReq); - return SearchRespConverter.convertGetRespToMilvusResp(queryResp, entityType); - } - } - - private void handleCollectionNotLoaded() { - ConversionCache cache = MilvusCache.milvusCache.get(entityType.getName()); - MilvusConverter.loadStatus(cache.getMilvusEntity(), client); - } - - // 定义最大重试次数的常量 - private static final int maxRetries = 2; public MilvusResp>> query(FieldFunction... outputFields) throws MilvusException { List otf = new ArrayList<>(); diff --git a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaUpdateWrapper.java b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaUpdateWrapper.java index 04eb83b..5e0107c 100644 --- a/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaUpdateWrapper.java +++ b/milvus-plus-core/src/main/java/org/dromara/milvus/plus/core/conditions/LambdaUpdateWrapper.java @@ -430,22 +430,30 @@ public class LambdaUpdateWrapper extends AbstractChainWrapper implements W } private MilvusResp update(List jsonObjects) { - JsonParser jsonParser = new JsonParser(); - log.info("update data--->{}", JSON.toJSONString(jsonObjects)); - List objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList()); - UpsertReq.UpsertReqBuilder builder = UpsertReq.builder() - .collectionName(collectionName) - .data(objects); - if (StringUtils.isNotEmpty(partitionName)) { - builder.partitionName(partitionName); - } - UpsertReq upsertReq = builder - .build(); - UpsertResp upsert = client.upsert(upsertReq); - MilvusResp resp = new MilvusResp<>(); - resp.setData(upsert); - resp.setSuccess(true); - return resp; + return executeWithRetry( + () -> { + JsonParser jsonParser = new JsonParser(); + log.info("update data--->{}", JSON.toJSONString(jsonObjects)); + List objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList()); + UpsertReq.UpsertReqBuilder builder = UpsertReq.builder() + .collectionName(collectionName) + .data(objects); + if (StringUtils.isNotEmpty(partitionName)) { + builder.partitionName(partitionName); + } + UpsertReq upsertReq = builder + .build(); + UpsertResp upsert = client.upsert(upsertReq); + MilvusResp resp = new MilvusResp<>(); + resp.setData(upsert); + resp.setSuccess(true); + return resp; + }, + "collection not loaded", + maxRetries, + entityType, + client + ); } public MilvusResp updateById(T... entity) throws MilvusException {