erpc_analysis/
db_client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
//! Provides the `Neo4jAnalysisClient`, a concrete implementation of the
//! `AnalysisDatabase` trait for interacting with a Neo4j database for
//! eRPC's analysis tasks.
//!
//! This module includes:
//! - The client struct (`Neo4jAnalysisClient`) for executing queries.
//! - Methods for common analysis-related database operations, such as creating
//!   GDS graph projections.

use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;

use log::{debug, error, info, warn};

use neo4rs::{Error as Neo4rsDriverError, Graph, Query, RowStream};

use crate::config::Neo4jConfig;
use crate::graph::projections::{
    build_gds_drop_cypher, build_gds_project_cypher,
};
use crate::models::metrics::{GraphMetrics, NodeMetrics};

use crate::db_trait::{
    AnalysisDatabase, AnalysisError, GraphProjectionParams,
};

use crate::models::partitions::{ComponentAnalysisResult, ConnectedComponent};

/// A client for interacting with Neo4j, tailored for eRPC analysis tasks.
pub struct Neo4jAnalysisClient {
    graph: Arc<Graph>,
}

impl Neo4jAnalysisClient {
    /// Creates a new `Neo4jAnalysisClient` instance.
    ///
    /// Establishes a connection to the Neo4j database using the provided
    /// configuration.
    ///
    /// # Arguments
    /// * `config` - A reference to `Neo4jConfig` containing the URI, username,
    ///   and password.
    ///
    /// # Errors
    /// Returns `AnalysisError::ConnectionFailed` if the connection cannot be
    /// established.
    pub async fn new(config: &Neo4jConfig) -> Result<Self, AnalysisError> {
        Graph::new(&config.uri, &config.username, &config.password)
            .await
            .map_err(|e: Neo4rsDriverError| {
                AnalysisError::ConnectionFailed(format!(
                    "Failed to connect to Neo4j at URI '{}': {}",
                    config.uri, e
                ))
            })
            .map(|graph_conn| Self {
                graph: Arc::new(graph_conn),
            })
    }
}

#[async_trait]
impl AnalysisDatabase for Neo4jAnalysisClient {
    /// Creates or recreates a GDS graph projection in Neo4j.
    ///
    /// This implementation first attempts to delete an existing projection
    /// with the same name to ensure idempotency, then creates a new one
    /// based on the provided parameters.
    ///
    /// # Arguments
    /// * `params` - A reference to `GraphProjectionParams` specifying the
    ///   projection name, node label, relationship types, and properties
    ///   to project.
    ///
    /// # Errors
    /// Returns `AnalysisError` if any step (deletion of old projection,
    /// creation of new projection) fails. This includes driver errors,
    /// query failures, or issues with GDS procedures
    async fn create_graph_projection(
        &self,
        params: &GraphProjectionParams,
    ) -> Result<(), AnalysisError> {
        // Attempt to delete existing projection first to ensure idempotency.
        // delete_graph_projection is designed to succeed even if the
        // projection doesn't exist.
        if let Err(e) =
            self.delete_graph_projection(&params.projection_name).await
        {
            warn!(
                "Attempt to delete old projection '{}' failed before \
                 creation: {:?}. Proceeding with creation attempt.",
                params.projection_name, e
            );
        }

        let project_query_cypher = build_gds_project_cypher(
            &params.projection_name,
            &params.node_label,
            &params.relationship_types,
            params.relationship_properties_to_project.as_deref(),
        );

        info!(
            "Executing GDS Projection for graph '{}' with node label '{}'",
            params.projection_name, params.node_label
        );
        debug!("GDS Projection Query: {}", project_query_cypher);
        let project_query = Query::new(project_query_cypher);

        let mut result_stream: RowStream = self
            .graph
            .execute(project_query)
            .await
            .map_err(AnalysisError::from)?;

        match result_stream.next().await {
            Ok(Some(row)) => {
                let projected_graph_name: String =
                    row.get::<String>("graphName").ok_or_else(|| {
                        AnalysisError::ProjectionCreationFailed {
                            projection_name: params.projection_name.clone(),
                            source_error:
                                "GDS projection report missing 'graphName'"
                                    .to_string(),
                        }
                    })?;

                let node_count: i64 =
                    row.get::<i64>("nodeCount").ok_or_else(|| {
                        AnalysisError::ProjectionCreationFailed {
                            projection_name: params.projection_name.clone(),
                            source_error:
                                "GDS projection report missing 'nodeCount'"
                                    .to_string(),
                        }
                    })?;

                let relationship_count: i64 =
                    row.get::<i64>("relationshipCount").ok_or_else(|| {
                        AnalysisError::ProjectionCreationFailed {
                            projection_name: params.projection_name.clone(),
                            source_error: "GDS projection report \
                                       missing 'relationshipCount'"
                                .to_string(),
                        }
                    })?;

                let project_millis: i64 =
                    row.get::<i64>("projectMillis").ok_or_else(|| {
                        AnalysisError::ProjectionCreationFailed {
                            projection_name: params.projection_name.clone(),
                            source_error:
                                "GDS projection report missing 'projectMillis'"
                                    .to_string(),
                        }
                    })?;

                info!(
                    "GDS Reported: Projected graph '{}' with {} nodes, \
                     {} relationships in {} ms.",
                    projected_graph_name,
                    node_count,
                    relationship_count,
                    project_millis
                );
                if projected_graph_name != params.projection_name {
                    warn!(
                        "Projected graph name from GDS ('{}') does not \
                         match requested name ('{}').",
                        projected_graph_name, params.projection_name
                    );
                }
            }
            Ok(None) => {
                return Err(AnalysisError::ProjectionCreationFailed {
                    projection_name: params.projection_name.clone(),
                    source_error: "GDS graph project query executed \
                                   but returned no rows."
                        .to_string(),
                });
            }
            Err(e) => {
                return Err(AnalysisError::ProjectionCreationFailed {
                    projection_name: params.projection_name.clone(),
                    source_error: format!(
                        "Failed to process result from GDS graph project \
                         query: {}",
                        e
                    ),
                });
            }
        }
        info!(
            "Graph projection command for '{}' completed.",
            params.projection_name
        );
        Ok(())
    }

    /// Deletes an existing GDS graph projection from Neo4j if it exists.
    ///
    /// This method first checks if the projection exists using
    /// `gds.graph.exists` (via `check_graph_projection_exists`).
    /// If it does, `gds.graph.drop` is called.
    ///
    /// The operation is idempotent and will succeed even if the
    /// projection does not initially exist.
    ///
    /// # Arguments
    /// * `projection_name` - The name of the GDS graph projection to delete.
    ///
    /// # Errors
    /// Returns `AnalysisError` if checking for existence or executing
    /// the drop query fails
    async fn delete_graph_projection(
        &self,
        projection_name: &str,
    ) -> Result<(), AnalysisError> {
        let exists =
            self.check_graph_projection_exists(projection_name).await?;

        if exists {
            info!(
                "GDS graph projection '{}' exists. Attempting to drop it.",
                projection_name
            );
            let drop_query_cypher = build_gds_drop_cypher(projection_name);
            let drop_query = Query::new(drop_query_cypher);

            match self.graph.execute(drop_query).await {
                Ok(mut drop_stream) => match drop_stream.next().await {
                    Ok(Some(row)) => {
                        let dropped_name_opt: Option<String> =
                            row.get::<String>("graphName");
                        info!(
                            "Successfully dropped GDS graph projection: '{}'. \
                             GDS returned: '{}'",
                            projection_name,
                            dropped_name_opt
                                .unwrap_or_else(|| "N/A".to_string())
                        );
                    }
                    Ok(None) => {
                        info!(
                            "GDS graph drop for '{}' returned no rows, \
                             assuming drop was successful.",
                            projection_name
                        );
                    }
                    Err(e) => {
                        warn!(
                            "Error processing result stream from GDS graph \
                             drop for '{}': {}. Assuming dropped.",
                            projection_name, e
                        );
                    }
                },
                Err(e_n4rs) => {
                    error!(
                        "Failed to execute GDS graph drop query for '{}': {}",
                        projection_name, e_n4rs
                    );
                    return Err(AnalysisError::ProjectionDropFailed {
                        projection_name: projection_name.to_string(),
                        source_error: format!(
                            "Failed to execute GDS graph drop query: {}",
                            e_n4rs
                        ),
                    });
                }
            }
        } else {
            info!(
                "GDS graph projection '{}' does not exist. No deletion \
                 needed.",
                projection_name
            );
        }
        Ok(())
    }

    /// Checks if a GDS graph projection with the given name exists in Neo4j.
    ///
    /// This method calls the `gds.graph.exists` procedure.
    ///
    /// # Arguments
    /// * `projection_name` - The name of the GDS graph projection to check.
    ///
    /// # Returns
    /// `Ok(true)` if the projection exists, `Ok(false)` otherwise.
    ///
    /// # Errors
    /// Returns `AnalysisError` if the query to `gds.graph.exists` fails
    /// or the result cannot be parsed correctly.
    async fn check_graph_projection_exists(
        &self,
        projection_name: &str,
    ) -> Result<bool, AnalysisError> {
        debug!(
            "Checking if GDS graph projection '{}' exists.",
            projection_name
        );
        let query_str = "CALL gds.graph.exists($projection_name) YIELD \
             graphName, exists RETURN exists";
        let query = Query::new(query_str.to_string())
            .param("projection_name", projection_name);

        let mut stream: RowStream = self
            .graph
            .execute(query)
            .await
            .map_err(AnalysisError::from)?;
        let row = stream
            .next()
            .await
            .map_err(AnalysisError::from)?
            .ok_or_else(|| {
                AnalysisError::QueryFailed(format!(
                    "gds.graph.exists query for '{}' returned no rows",
                    projection_name
                ))
            })?;

        let exists: bool = row.get::<bool>("exists").ok_or_else(|| {
            AnalysisError::QueryFailed(format!(
                "Failed to get 'exists' field from gds.graph.exists \
                 for '{}' or field was null",
                projection_name
            ))
        })?;

        debug!(
            "GDS graph projection '{}' exists status: {}",
            projection_name, exists
        );
        Ok(exists)
    }

    /// Calculates comprehensive graph metrics for a given GDS projection
    /// including basic counts (node count, relationship count), degree
    /// distribution, and degree statistics.
    async fn calculate_graph_metrics(
        &self,
        projection_name: &str,
    ) -> Result<GraphMetrics, AnalysisError> {
        info!(
            "Calculating graph metrics for GDS projection: '{}'",
            projection_name
        );

        // First get basic metrics (node count, relationship count) from GDS
        let query_str = "CALL gds.graph.list($projection_name) YIELD \
                                    graphName, nodeCount, relationshipCount \
                                    RETURN nodeCount, relationshipCount";
        let query = Query::new(query_str.to_string())
            .param("projection_name", projection_name);

        let mut stream: RowStream = self
            .graph
            .execute(query)
            .await
            .map_err(AnalysisError::from)?;

        if let Some(row) = stream.next().await.map_err(AnalysisError::from)? {
            let node_count: i64 =
                row.get::<i64>("nodeCount").ok_or_else(|| {
                    AnalysisError::QueryFailed(format!(
                    "Failed to get 'nodeCount' for projection '{}' or field \
                     was null",
                    projection_name
                ))
                })?;

            let relationship_count: i64 =
                row.get::<i64>("relationshipCount").ok_or_else(|| {
                    AnalysisError::QueryFailed(format!(
                        "Failed to get 'relationshipCount' for projection \
                         '{}' or field was null",
                        projection_name
                    ))
                })?;

            info!(
                "Basic metrics retrieved: {} nodes, {} relationships",
                node_count, relationship_count
            );

            // Calculate node degrees for analysis
            let node_degrees =
                self.calculate_node_degrees(projection_name).await?;

            if node_degrees.is_empty() {
                return Ok(GraphMetrics {
                    node_count: Some(node_count),
                    relationship_count: Some(relationship_count),
                    degree_distribution: Some(HashMap::new()),
                    average_degree: Some(0.0),
                    max_degree: Some(0),
                    min_degree: Some(0),
                });
            }

            // Calculate degree distribution and statistics
            let mut degree_distribution = HashMap::new();
            let mut total_degree_sum = 0i64;
            let mut max_degree = 0i64;
            let mut min_degree = i64::MAX;

            for node in &node_degrees {
                let degree = node.total_degree;
                *degree_distribution.entry(degree).or_insert(0) += 1;
                total_degree_sum += degree;
                max_degree = max_degree.max(degree);
                min_degree = min_degree.min(degree);
            }

            // Handle edge case where all nodes have degree 0
            if min_degree == i64::MAX {
                min_degree = 0;
            }

            let average_degree = if !node_degrees.is_empty() {
                total_degree_sum as f64 / node_degrees.len() as f64
            } else {
                0.0
            };

            info!(
                "Comprehensive metrics calculated: avg_degree={:.2}, \
                 max_degree={}, min_degree={}, distribution_size={}",
                average_degree,
                max_degree,
                min_degree,
                degree_distribution.len()
            );

            Ok(GraphMetrics {
                node_count: Some(node_count),
                relationship_count: Some(relationship_count),
                degree_distribution: Some(degree_distribution),
                average_degree: Some(average_degree),
                max_degree: Some(max_degree),
                min_degree: Some(min_degree),
            })
        } else {
            // If gds.graph.list stream is empty for the projection,
            // it implies the projection doesn't exist.
            Err(AnalysisError::ProjectionNotFound(
                projection_name.to_string(),
            ))
        }
    }

    /// Calculates node-level degree metrics for all nodes in a given
    /// GDS projection.
    /// Uses both Neo4j GDS library for total degree and direct Cypher
    /// for in/out degree calculation.
    async fn calculate_node_degrees(
        &self,
        projection_name: &str,
    ) -> Result<Vec<NodeMetrics>, AnalysisError> {
        info!(
            "Calculating node metrics for GDS projection: '{}'",
            projection_name
        );

        // Use direct Cypher query to calculate in-degree and out-degree
        // This approach counts actual relationships rather than using
        // GDS degree centrality
        let query_str = "
            MATCH (r:Relay)
            OPTIONAL MATCH (r)-[out:CIRCUIT_SUCCESS]->()
            OPTIONAL MATCH ()-[in:CIRCUIT_SUCCESS]->(r)
            RETURN r.fingerprint as fingerprint,
                   count(DISTINCT in) as in_degree,
                   count(DISTINCT out) as out_degree,
                   count(DISTINCT in) + count(DISTINCT out) as total_degree
            ORDER BY total_degree DESC
        ";

        let query = Query::new(query_str.to_string());
        let mut stream: RowStream = self
            .graph
            .execute(query)
            .await
            .map_err(AnalysisError::from)?;

        let mut node_metrics = Vec::new();

        while let Some(row) =
            stream.next().await.map_err(AnalysisError::from)?
        {
            let fingerprint: String =
                row.get::<String>("fingerprint").ok_or_else(|| {
                    AnalysisError::QueryFailed(
                        "Failed to get 'fingerprint' field from node degree \
                         calculation"
                            .to_string(),
                    )
                })?;

            let in_degree: i64 =
                row.get::<i64>("in_degree").ok_or_else(|| {
                    AnalysisError::QueryFailed(
                        "Failed to get 'in_degree' field from node degree \
                     calculation"
                            .to_string(),
                    )
                })?;

            let out_degree: i64 =
                row.get::<i64>("out_degree").ok_or_else(|| {
                    AnalysisError::QueryFailed(
                        "Failed to get 'out_degree' field from node degree \
                     calculation"
                            .to_string(),
                    )
                })?;

            let total_degree: i64 =
                row.get::<i64>("total_degree").ok_or_else(|| {
                    AnalysisError::QueryFailed(
                        "Failed to get 'total_degree' field from node degree \
                     calculation"
                            .to_string(),
                    )
                })?;

            node_metrics.push(NodeMetrics {
                fingerprint,
                in_degree,
                out_degree,
                total_degree,
            });
        }

        Ok(node_metrics)
    }

    /// Calculates weakly connected components using Neo4j GDS WCC algorithm.
    /// Returns analysis results containing components, sizes, and statistics.
    async fn calculate_weakly_connected_components(
        &self,
        projection_name: &str,
    ) -> Result<ComponentAnalysisResult, AnalysisError> {
        info!(
            "Calculating weakly connected components for projection: '{}'",
            projection_name
        );

        // Execute Neo4j GDS WCC algorithm
        let wcc_query = format!(
            "CALL gds.wcc.stream('{}')
             YIELD nodeId, componentId
             RETURN gds.util.asNode(nodeId).fingerprint AS relay_fingerprint,
                    componentId
             ORDER BY componentId, relay_fingerprint",
            projection_name
        );

        debug!("WCC Query: {}", wcc_query);
        let query = Query::new(wcc_query);
        let mut stream: RowStream =
            self.graph.execute(query).await.map_err(|e| {
                error!("Failed to execute WCC query: {:?}", e);
                AnalysisError::AlgorithmError(format!(
                    "WCC algorithm execution failed for projection '{}': {}",
                    projection_name, e
                ))
            })?;

        // Group results by component ID
        let mut component_map: HashMap<i64, Vec<String>> = HashMap::new();

        while let Some(row) =
            stream.next().await.map_err(AnalysisError::from)?
        {
            let relay_fingerprint: String =
                row.get::<String>("relay_fingerprint").ok_or_else(|| {
                    AnalysisError::QueryFailed(
                        "Failed to get 'relay_fingerprint' from WCC result"
                            .to_string(),
                    )
                })?;

            let component_id: i64 =
                row.get::<i64>("componentId").ok_or_else(|| {
                    AnalysisError::QueryFailed(
                        "Failed to get 'componentId' from WCC result"
                            .to_string(),
                    )
                })?;

            component_map
                .entry(component_id)
                .or_default()
                .push(relay_fingerprint);
        }

        // Convert to ConnectedComponent structs
        let mut components: Vec<ConnectedComponent> = component_map
            .into_iter()
            .map(|(component_id, relay_fingerprints)| {
                let size = relay_fingerprints.len();
                ConnectedComponent {
                    component_id,
                    relay_fingerprints,
                    size,
                }
            })
            .collect();

        // Sort components by size (largest first)
        components.sort_by(|a, b| b.size.cmp(&a.size));

        // Calculate statistics
        let total_components = components.len();
        let largest_component_size =
            components.first().map(|c| c.size).unwrap_or(0);
        let smallest_component_size =
            components.last().map(|c| c.size).unwrap_or(0);

        // Calculate size distribution
        let mut component_size_distribution = HashMap::new();
        for component in &components {
            *component_size_distribution
                .entry(component.size)
                .or_insert(0) += 1;
        }

        // Calculate isolation ratio (percentage of nodes in largest component)
        let total_nodes: usize = components.iter().map(|c| c.size).sum();
        let isolation_ratio = if total_nodes > 0 {
            (largest_component_size as f64 / total_nodes as f64) * 100.0
        } else {
            0.0
        };

        info!(
            "WCC analysis complete: {} components, largest: {}, \
             smallest: {}, isolation ratio: {:.2}%",
            total_components,
            largest_component_size,
            smallest_component_size,
            isolation_ratio
        );

        Ok(ComponentAnalysisResult {
            components,
            total_components: Some(total_components),
            largest_component_size: Some(largest_component_size),
            smallest_component_size: Some(smallest_component_size),
            component_size_distribution: Some(component_size_distribution),
            isolation_ratio: Some(isolation_ratio),
        })
    }

    /// Calculates strongly connected components using Neo4j GDS SCC algorithm.
    /// Returns analysis results containing components, sizes, and statistics.
    async fn calculate_strongly_connected_components(
        &self,
        projection_name: &str,
    ) -> Result<ComponentAnalysisResult, AnalysisError> {
        info!(
            "Calculating strongly connected components for projection: '{}'",
            projection_name
        );

        // Execute Neo4j GDS SCC algorithm
        let scc_query = format!(
            "CALL gds.scc.stream('{}')
             YIELD nodeId, componentId
             RETURN gds.util.asNode(nodeId).fingerprint AS relay_fingerprint,
                    componentId
             ORDER BY componentId, relay_fingerprint",
            projection_name
        );

        debug!("SCC Query: {}", scc_query);
        let query = Query::new(scc_query);
        let mut stream: RowStream =
            self.graph.execute(query).await.map_err(|e| {
                error!("Failed to execute SCC query: {:?}", e);
                AnalysisError::AlgorithmError(format!(
                    "SCC algorithm execution failed for projection '{}': {}",
                    projection_name, e
                ))
            })?;

        // Group results by component ID
        let mut component_map: HashMap<i64, Vec<String>> = HashMap::new();

        while let Some(row) =
            stream.next().await.map_err(AnalysisError::from)?
        {
            let relay_fingerprint: String =
                row.get::<String>("relay_fingerprint").ok_or_else(|| {
                    AnalysisError::QueryFailed(
                        "Failed to get 'relay_fingerprint' from SCC result"
                            .to_string(),
                    )
                })?;

            let component_id: i64 =
                row.get::<i64>("componentId").ok_or_else(|| {
                    AnalysisError::QueryFailed(
                        "Failed to get 'componentId' from SCC result"
                            .to_string(),
                    )
                })?;

            component_map
                .entry(component_id)
                .or_default()
                .push(relay_fingerprint);
        }

        // Convert to ConnectedComponent structs
        let mut components: Vec<ConnectedComponent> = component_map
            .into_iter()
            .map(|(component_id, relay_fingerprints)| {
                let size = relay_fingerprints.len();
                ConnectedComponent {
                    component_id,
                    relay_fingerprints,
                    size,
                }
            })
            .collect();

        // Sort components by size (largest first)
        components.sort_by(|a, b| b.size.cmp(&a.size));

        // Calculate statistics
        let total_components = components.len();
        let largest_component_size =
            components.first().map(|c| c.size).unwrap_or(0);
        let smallest_component_size =
            components.last().map(|c| c.size).unwrap_or(0);

        // Calculate size distribution
        let mut component_size_distribution = HashMap::new();
        for component in &components {
            *component_size_distribution
                .entry(component.size)
                .or_insert(0) += 1;
        }

        // Calculate isolation ratio (percentage of nodes in largest component)
        let total_nodes: usize = components.iter().map(|c| c.size).sum();
        let isolation_ratio = if total_nodes > 0 {
            (largest_component_size as f64 / total_nodes as f64) * 100.0
        } else {
            0.0
        };

        info!(
            "SCC analysis complete: {} components, largest: {}, \
             smallest: {}, isolation ratio: {:.2}%",
            total_components,
            largest_component_size,
            smallest_component_size,
            isolation_ratio
        );

        Ok(ComponentAnalysisResult {
            components,
            total_components: Some(total_components),
            largest_component_size: Some(largest_component_size),
            smallest_component_size: Some(smallest_component_size),
            component_size_distribution: Some(component_size_distribution),
            isolation_ratio: Some(isolation_ratio),
        })
    }
}