handle not load collection err

This commit is contained in:
xgc 2024-07-20 12:52:22 +08:00
parent 86686ccee4
commit a1bea858e5
5 changed files with 113 additions and 89 deletions

View File

@ -1,13 +1,21 @@
package org.dromara.milvus.plus.core.conditions; package org.dromara.milvus.plus.core.conditions;
import io.milvus.v2.client.MilvusClientV2;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.dromara.milvus.plus.cache.ConversionCache;
import org.dromara.milvus.plus.cache.MilvusCache;
import org.dromara.milvus.plus.converter.MilvusConverter;
import org.dromara.milvus.plus.model.vo.MilvusResp;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.function.Supplier;
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@Data @Data
@Slf4j
public abstract class AbstractChainWrapper<T> extends ConditionBuilder<T>{ public abstract class AbstractChainWrapper<T> extends ConditionBuilder<T>{
protected static class ArrayIterator<T> implements Iterator<T> { protected static class ArrayIterator<T> implements Iterator<T> {
private final T[] array; private final T[] array;
@ -27,4 +35,27 @@ public abstract class AbstractChainWrapper<T> extends ConditionBuilder<T>{
return array[index++]; return array[index++];
} }
} }
// 定义最大重试次数的常量
public static final int maxRetries = 2;
protected <R> MilvusResp<R> executeWithRetry(Supplier<MilvusResp<R>> action, String errorMessage, int maxRetries,Class entityType, MilvusClientV2 client) {
int attempt = 1;
while (true) {
try {
return action.get(); // 尝试执行操作
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains(errorMessage) && attempt < maxRetries) {
log.warn("Attempt {}: {} - attempting to retry.", attempt, errorMessage);
handleCollectionNotLoaded(entityType,client);
attempt++;
} else {
throw new RuntimeException(e); // 如果不是预期的错误或者重试次数达到上限则抛出异常
}
}
}
}
protected void handleCollectionNotLoaded(Class entityType, MilvusClientV2 client) {
ConversionCache cache = MilvusCache.milvusCache.get(entityType.getName());
MilvusConverter.loadStatus(cache.getMilvusEntity(), client);
}
} }

View File

@ -366,13 +366,21 @@ public class LambdaDeleteWrapper<T> extends AbstractChainWrapper<T> implements
* @return 搜索响应对象 * @return 搜索响应对象
*/ */
public MilvusResp<DeleteResp> remove() throws MilvusException { public MilvusResp<DeleteResp> remove() throws MilvusException {
DeleteReq deleteReq = build(); return executeWithRetry(
log.info("build remove param-->{}", JSON.toJSONString(deleteReq)); () -> {
DeleteResp delete = client.delete(deleteReq); DeleteReq deleteReq = build();
MilvusResp<DeleteResp> resp = new MilvusResp<>(); log.info("build remove param-->{}", JSON.toJSONString(deleteReq));
resp.setData(delete); DeleteResp delete = client.delete(deleteReq);
resp.setSuccess(true); MilvusResp<DeleteResp> resp = new MilvusResp<>();
return resp; resp.setData(delete);
resp.setSuccess(true);
return resp;
},
"collection not loaded",
maxRetries,
entityType,
client
);
} }
public MilvusResp<DeleteResp> removeById(Serializable... ids) throws MilvusException { public MilvusResp<DeleteResp> removeById(Serializable... ids) throws MilvusException {

View File

@ -76,22 +76,30 @@ public class LambdaInsertWrapper<T> extends AbstractChainWrapper<T> implements
private MilvusResp<InsertResp> insert(List<JSONObject> jsonObjects){ private MilvusResp<InsertResp> insert(List<JSONObject> jsonObjects){
JsonParser jsonParser = new JsonParser(); return executeWithRetry(
log.info("update data--->{}", JSON.toJSONString(jsonObjects)); () -> {
List<JsonObject> objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList()); JsonParser jsonParser = new JsonParser();
InsertReq.InsertReqBuilder<?, ?> builder = InsertReq.builder() log.info("update data--->{}", JSON.toJSONString(jsonObjects));
.collectionName(collectionName) List<JsonObject> objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList());
.data(objects); InsertReq.InsertReqBuilder<?, ?> builder = InsertReq.builder()
if(StringUtils.isNotEmpty(partitionName)){ .collectionName(collectionName)
builder.partitionName(partitionName); .data(objects);
} if(StringUtils.isNotEmpty(partitionName)){
InsertReq insertReq = builder builder.partitionName(partitionName);
.build(); }
InsertResp insert = client.insert(insertReq); InsertReq insertReq = builder
MilvusResp<InsertResp> resp = new MilvusResp<>(); .build();
resp.setData(insert); InsertResp insert = client.insert(insertReq);
resp.setSuccess(true); MilvusResp<InsertResp> resp = new MilvusResp<>();
return resp; resp.setData(insert);
resp.setSuccess(true);
return resp;
},
"collection not loaded",
maxRetries,
entityType,
client
);
} }
public MilvusResp<InsertResp> insert(T ... entity) throws MilvusException { public MilvusResp<InsertResp> insert(T ... entity) throws MilvusException {
Iterator<T> iterator = new ArrayIterator<>(entity); Iterator<T> iterator = new ArrayIterator<>(entity);

View File

@ -17,8 +17,6 @@ import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.dromara.milvus.plus.cache.ConversionCache; import org.dromara.milvus.plus.cache.ConversionCache;
import org.dromara.milvus.plus.cache.MilvusCache;
import org.dromara.milvus.plus.converter.MilvusConverter;
import org.dromara.milvus.plus.converter.SearchRespConverter; import org.dromara.milvus.plus.converter.SearchRespConverter;
import org.dromara.milvus.plus.core.FieldFunction; import org.dromara.milvus.plus.core.FieldFunction;
import org.dromara.milvus.plus.model.vo.MilvusResp; import org.dromara.milvus.plus.model.vo.MilvusResp;
@ -525,56 +523,27 @@ public class LambdaQueryWrapper<T> extends AbstractChainWrapper<T> implements Wr
* *
* @return 搜索响应对象 * @return 搜索响应对象
*/ */
// public MilvusResp<List<MilvusResult<T>>> query() throws MilvusException {
// if (!vectors.isEmpty()) {
// SearchReq searchReq = buildSearch();
// log.info("build search param-->{}", JSON.toJSONString(searchReq));
// SearchResp search = client.search(searchReq);
// return SearchRespConverter.convertSearchRespToMilvusResp(search, entityType);
// } else {
// QueryReq queryReq = buildQuery();
// log.info("build query param-->{}", JSON.toJSONString(queryReq));
// QueryResp query = client.query(queryReq);
// return SearchRespConverter.convertGetRespToMilvusResp(query, entityType);
// }
// }
public MilvusResp<List<MilvusResult<T>>> query() throws MilvusException{ public MilvusResp<List<MilvusResult<T>>> query() throws MilvusException{
return query(1); return executeWithRetry(
() -> {
if (!vectors.isEmpty()) {
SearchReq searchReq = buildSearch();
log.info("Build search param--> {}", JSON.toJSONString(searchReq));
SearchResp searchResp = client.search(searchReq);
return SearchRespConverter.convertSearchRespToMilvusResp(searchResp, entityType);
} else {
QueryReq queryReq = buildQuery();
log.info("Build query param--> {}", JSON.toJSONString(queryReq));
QueryResp queryResp = client.query(queryReq);
return SearchRespConverter.convertGetRespToMilvusResp(queryResp, entityType);
}
},
"collection not loaded",
maxRetries,
entityType,
client
);
} }
private MilvusResp<List<MilvusResult<T>>> query(int attempt) throws MilvusException{
try {
return executeQuery(attempt);
} catch (Exception e) {
if (e.getMessage() != null && e.getMessage().contains("collection not loaded") && attempt < maxRetries) {
log.warn("Attempt {}: Collection not loaded, attempting to reload and retry.", attempt);
handleCollectionNotLoaded();
return query(attempt + 1);
} else {
throw new RuntimeException(e);
}
}
}
private MilvusResp<List<MilvusResult<T>>> executeQuery(int attempt) throws Exception {
if (!vectors.isEmpty()) {
SearchReq searchReq = buildSearch();
log.info("Attempt {}: Build search param--> {}", attempt, JSON.toJSONString(searchReq));
SearchResp searchResp = client.search(searchReq);
return SearchRespConverter.convertSearchRespToMilvusResp(searchResp, entityType);
} else {
QueryReq queryReq = buildQuery();
log.info("Attempt {}: Build query param--> {}", attempt, JSON.toJSONString(queryReq));
QueryResp queryResp = client.query(queryReq);
return SearchRespConverter.convertGetRespToMilvusResp(queryResp, entityType);
}
}
private void handleCollectionNotLoaded() {
ConversionCache cache = MilvusCache.milvusCache.get(entityType.getName());
MilvusConverter.loadStatus(cache.getMilvusEntity(), client);
}
// 定义最大重试次数的常量
private static final int maxRetries = 2;
public MilvusResp<List<MilvusResult<T>>> query(FieldFunction<T, ?>... outputFields) throws MilvusException { public MilvusResp<List<MilvusResult<T>>> query(FieldFunction<T, ?>... outputFields) throws MilvusException {
List<String> otf = new ArrayList<>(); List<String> otf = new ArrayList<>();

View File

@ -430,22 +430,30 @@ public class LambdaUpdateWrapper<T> extends AbstractChainWrapper<T> implements W
} }
private MilvusResp<UpsertResp> update(List<JSONObject> jsonObjects) { private MilvusResp<UpsertResp> update(List<JSONObject> jsonObjects) {
JsonParser jsonParser = new JsonParser(); return executeWithRetry(
log.info("update data--->{}", JSON.toJSONString(jsonObjects)); () -> {
List<JsonObject> objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList()); JsonParser jsonParser = new JsonParser();
UpsertReq.UpsertReqBuilder<?, ?> builder = UpsertReq.builder() log.info("update data--->{}", JSON.toJSONString(jsonObjects));
.collectionName(collectionName) List<JsonObject> objects = jsonObjects.stream().map(v -> jsonParser.parse(v.toJSONString()).getAsJsonObject()).collect(Collectors.toList());
.data(objects); UpsertReq.UpsertReqBuilder<?, ?> builder = UpsertReq.builder()
if (StringUtils.isNotEmpty(partitionName)) { .collectionName(collectionName)
builder.partitionName(partitionName); .data(objects);
} if (StringUtils.isNotEmpty(partitionName)) {
UpsertReq upsertReq = builder builder.partitionName(partitionName);
.build(); }
UpsertResp upsert = client.upsert(upsertReq); UpsertReq upsertReq = builder
MilvusResp<UpsertResp> resp = new MilvusResp<>(); .build();
resp.setData(upsert); UpsertResp upsert = client.upsert(upsertReq);
resp.setSuccess(true); MilvusResp<UpsertResp> resp = new MilvusResp<>();
return resp; resp.setData(upsert);
resp.setSuccess(true);
return resp;
},
"collection not loaded",
maxRetries,
entityType,
client
);
} }
public MilvusResp<UpsertResp> updateById(T... entity) throws MilvusException { public MilvusResp<UpsertResp> updateById(T... entity) throws MilvusException {