diff --git a/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py b/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py index 8a9b4b11e4..8371cc6616 100644 --- a/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py +++ b/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py @@ -20,6 +20,8 @@ class MilvusCDCPerformance: self.target_collection = None self.insert_count = 0 self.sync_count = 0 + self.source_count = 0 + self.target_count = 0 self.insert_lock = threading.Lock() self.sync_lock = threading.Lock() self.latest_insert_ts = 0 @@ -32,8 +34,8 @@ class MilvusCDCPerformance: } self.start_time = None self.last_report_time = None - self.last_insert_count = 0 - self.last_sync_count = 0 + self.last_source_count = 0 + self.last_target_count = 0 # New attributes for time series data self.time_series_data = { @@ -47,13 +49,13 @@ class MilvusCDCPerformance: current_time = time.time() if self.last_report_time is None: self.last_report_time = current_time - self.last_insert_count = self.insert_count - self.last_sync_count = self.sync_count + self.last_source_count = self.source_count + self.last_target_count = self.target_count return time_diff = current_time - self.last_report_time - insert_diff = self.insert_count - self.last_insert_count - sync_diff = self.sync_count - self.last_sync_count + insert_diff = self.source_count - self.last_source_count + sync_diff = self.target_count - self.last_target_count insert_throughput = insert_diff / time_diff sync_throughput = sync_diff / time_diff @@ -72,8 +74,8 @@ class MilvusCDCPerformance: self.time_series_data['avg_latency'].append(avg_latency) self.last_report_time = current_time - self.last_insert_count = self.insert_count - self.last_sync_count = self.sync_count + self.last_source_count = self.source_count + self.last_target_count = self.target_count def continuous_monitoring(self, interval=5): while not self.stop_query: @@ -236,22 +238,32 @@ class MilvusCDCPerformance: )[0]['count(*)'] while not self.stop_query: try: - t0 = time.time() - results = self.target_collection.query( - expr="", - output_fields=["count(*)"], - ) - tt = time.time() - t0 - target_count = results[0]['count(*)'] - t0 = time.time() - results = self.source_collection.query( - expr="", - output_fields=["count(*)"], - ) - tt = time.time() - t0 - source_count = results[0]['count(*)'] + try: + t0 = time.time() + results = self.target_collection.query( + expr="", + output_fields=["count(*)"], + ) + tt = time.time() - t0 + target_count = results[0]['count(*)'] + except Exception as e: + logger.error(f"Count failed: {e}") + target_count = self.last_target_count + try: + t0 = time.time() + results = self.source_collection.query( + expr="", + output_fields=["count(*)"], + ) + tt = time.time() - t0 + source_count = results[0]['count(*)'] + except Exception as e: + logger.error(f"Count failed: {e}") + source_count = self.last_source_count progress = (target_count / source_count) * 100 if source_count > 0 else 0 + self.source_count = source_count + self.target_count = target_count self.sync_count = target_count - previous_count logger.debug(f"sync progress {target_count}/{source_count} {progress:.2f}%") except Exception as e: