From 9341b99a2710935f0ead2afd192f69ad52df90bf Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Thu, 8 Aug 2024 11:25:58 +0800 Subject: [PATCH] add cdc test Signed-off-by: zhuwenxing --- .../cdc/perf/milvus_cdc_recovery_perf_test.py | 87 ++++++++++++------- 1 file changed, 58 insertions(+), 29 deletions(-) diff --git a/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py b/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py index 87f4891185..a186ed6fd8 100644 --- a/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py +++ b/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py @@ -44,6 +44,11 @@ class MilvusCDCPerformance: 'sync_throughput': [], 'avg_latency': [] } + self.count_series_data = { + 'timestamp': [], + 'source_count': [], + 'target_count': [] + } def report_realtime_metrics(self): current_time = time.time() @@ -83,8 +88,15 @@ class MilvusCDCPerformance: time.sleep(interval) def plot_time_series_data(self): - fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10)) - + fig, (ax0, ax1, ax2) = plt.subplots(3, 1, figsize=(12, 10)) + # Plot count + ax0.plot(self.count_series_data['timestamp'], self.count_series_data['source_count'], label='Source Count') + ax0.plot(self.count_series_data['timestamp'], self.count_series_data['target_count'], label='Target Count') + ax0.set_xlabel('Time (seconds)') + ax0.set_ylabel('Entity Count') + ax0.set_title('Source and Target Collection Count over Time') + ax0.legend() + ax0.grid(True) # Plot throughput ax1.plot(self.time_series_data['timestamp'], self.time_series_data['insert_throughput'], label='Insert Throughput') @@ -232,40 +244,57 @@ class MilvusCDCPerformance: time.sleep(0.01) # Query interval def continuous_count(self): + + def count_target(self): + try: + t0 = time.time() + results = self.target_collection.query( + expr="", + output_fields=["count(*)"], + timeout=5 + ) + tt = time.time() - t0 + self.target_count = results[0]['count(*)'] + except Exception as e: + logger.error(f"Target count failed: {e}") + self.target_count = self.last_target_count + + def count_source(self): + try: + t0 = time.time() + results = self.source_collection.query( + expr="", + output_fields=["count(*)"], + timeout=5 + ) + tt = time.time() - t0 + self.source_count = results[0]['count(*)'] + except Exception as e: + logger.error(f"Source count failed: {e}") + self.source_count = self.last_source_count previous_count = self.target_collection.query( expr="", output_fields=["count(*)"], )[0]['count(*)'] while not self.stop_query: try: - try: - t0 = time.time() - results = self.target_collection.query( - expr="", - output_fields=["count(*)"], - ) - tt = time.time() - t0 - target_count = results[0]['count(*)'] - except Exception as e: - logger.error(f"Count failed: {e}") - target_count = self.last_target_count - try: - t0 = time.time() - results = self.source_collection.query( - expr="", - output_fields=["count(*)"], - ) - tt = time.time() - t0 - source_count = results[0]['count(*)'] - except Exception as e: - logger.error(f"Count failed: {e}") - source_count = self.last_source_count - progress = (target_count / source_count) * 100 if source_count > 0 else 0 - self.source_count = source_count - self.target_count = target_count - self.sync_count = target_count - previous_count - logger.debug(f"sync progress {target_count}/{source_count} {progress:.2f}%") + thread1 = threading.Thread(target=count_target) + thread2 = threading.Thread(target=count_source) + + thread1.start() + thread2.start() + + thread1.join() + thread2.join() + + progress = (self.target_count / self.source_count) * 100 if self.source_count > 0 else 0 + + self.sync_count = self.target_count - previous_count + self.count_series_data['timestamp'].append(time.time()) + self.count_series_data['source_count'].append(self.source_count) + self.count_series_data['target_count'].append(self.target_count) + logger.debug(f"sync progress {self.target_count}/{self.source_count} {progress:.2f}%") except Exception as e: logger.error(f"Count failed: {e}") time.sleep(0.01) # Query interval