diff --git a/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py b/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py index 263bf97c96..b2cf4bd60d 100644 --- a/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py +++ b/tests/python_client/cdc/perf/milvus_cdc_recovery_perf_test.py @@ -18,7 +18,7 @@ logger.remove() logger.add(sink=sys.stdout, level="DEBUG") -def create_interactive_plot(data, count_data): +def create_interactive_plot(data, count_data, cdc_events): # Convert timestamps to datetime data['datetime'] = pd.to_datetime(data['timestamp'], unit='s') count_data['datetime'] = pd.to_datetime(count_data['timestamp'], unit='s') @@ -37,6 +37,11 @@ def create_interactive_plot(data, count_data): fig.add_trace(go.Scatter(x=count_data['datetime'], y=count_data['target_count'], mode='lines', name='Target Count'), row=1, col=1) + # Add CDC pause and resume events + for event in cdc_events: + event_time = pd.to_datetime(event['timestamp'], unit='s') + fig.add_vline(x=event_time, line_dash="dash", line_color="red" if event['action'] == 'pause' else "green", + annotation_text=f"CDC {event['action'].capitalize()}", row=1, col=1) # Plot 2: Insert and Sync Throughput fig.add_trace(go.Scatter(x=data['datetime'], y=data['insert_throughput'], @@ -114,6 +119,7 @@ class MilvusCDCPerformance: 'source_count': [], 'target_count': [] } + self.cdc_events = [] def report_realtime_metrics(self): current_time = time.time() @@ -158,7 +164,7 @@ class MilvusCDCPerformance: def plot_time_series_data(self): df = pd.DataFrame(self.time_series_data) count_df = pd.DataFrame(self.count_series_data) - fig = create_interactive_plot(df, count_df) + fig = create_interactive_plot(df, count_df, self.cdc_events) pio.write_html(fig, file='milvus_cdc_performance.html', auto_open=True) logger.info("Interactive performance plot saved as 'milvus_cdc_performance.html'") @@ -184,6 +190,7 @@ class MilvusCDCPerformance: result = response.json() logger.info(f"Pause CDC task {task_id} response: {result}") self.cdc_paused = True + self.cdc_events.append({'timestamp': time.time(), 'action': 'pause'}) # Record pause event logger.info("All CDC tasks paused") def resume_cdc_tasks(self): @@ -199,6 +206,7 @@ class MilvusCDCPerformance: result = response.json() logger.info(f"Resume CDC task {task_id} response: {result}") self.cdc_paused = False + self.cdc_events.append({'timestamp': time.time(), 'action': 'resume'}) # Record resume event logger.info("All CDC tasks resumed") def pause_and_resume_cdc_tasks(self, duration):