update cdc test

Signed-off-by: zhuwenxing <wenxing.zhu@zilliz.com>
This commit is contained in:
zhuwenxing 2024-08-06 15:58:40 +08:00
parent de8a487331
commit 764330c042

View File

@ -20,6 +20,8 @@ class MilvusCDCPerformance:
self.target_collection = None
self.insert_count = 0
self.sync_count = 0
self.source_count = 0
self.target_count = 0
self.insert_lock = threading.Lock()
self.sync_lock = threading.Lock()
self.latest_insert_ts = 0
@ -32,8 +34,8 @@ class MilvusCDCPerformance:
}
self.start_time = None
self.last_report_time = None
self.last_insert_count = 0
self.last_sync_count = 0
self.last_source_count = 0
self.last_target_count = 0
# New attributes for time series data
self.time_series_data = {
@ -47,13 +49,13 @@ class MilvusCDCPerformance:
current_time = time.time()
if self.last_report_time is None:
self.last_report_time = current_time
self.last_insert_count = self.insert_count
self.last_sync_count = self.sync_count
self.last_source_count = self.source_count
self.last_target_count = self.target_count
return
time_diff = current_time - self.last_report_time
insert_diff = self.insert_count - self.last_insert_count
sync_diff = self.sync_count - self.last_sync_count
insert_diff = self.source_count - self.last_source_count
sync_diff = self.target_count - self.last_target_count
insert_throughput = insert_diff / time_diff
sync_throughput = sync_diff / time_diff
@ -72,8 +74,8 @@ class MilvusCDCPerformance:
self.time_series_data['avg_latency'].append(avg_latency)
self.last_report_time = current_time
self.last_insert_count = self.insert_count
self.last_sync_count = self.sync_count
self.last_source_count = self.source_count
self.last_target_count = self.target_count
def continuous_monitoring(self, interval=5):
while not self.stop_query:
@ -236,22 +238,32 @@ class MilvusCDCPerformance:
)[0]['count(*)']
while not self.stop_query:
try:
t0 = time.time()
results = self.target_collection.query(
expr="",
output_fields=["count(*)"],
)
tt = time.time() - t0
target_count = results[0]['count(*)']
t0 = time.time()
results = self.source_collection.query(
expr="",
output_fields=["count(*)"],
)
tt = time.time() - t0
source_count = results[0]['count(*)']
try:
t0 = time.time()
results = self.target_collection.query(
expr="",
output_fields=["count(*)"],
)
tt = time.time() - t0
target_count = results[0]['count(*)']
except Exception as e:
logger.error(f"Count failed: {e}")
target_count = self.last_target_count
try:
t0 = time.time()
results = self.source_collection.query(
expr="",
output_fields=["count(*)"],
)
tt = time.time() - t0
source_count = results[0]['count(*)']
except Exception as e:
logger.error(f"Count failed: {e}")
source_count = self.last_source_count
progress = (target_count / source_count) * 100 if source_count > 0 else 0
self.source_count = source_count
self.target_count = target_count
self.sync_count = target_count - previous_count
logger.debug(f"sync progress {target_count}/{source_count} {progress:.2f}%")
except Exception as e: