use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use log::{error, info, warn};
use crate::algorithms::ComponentAnalyzer;
use crate::args::Args;
use crate::db_trait::{
AnalysisDatabase, AnalysisError, GraphProjectionParams,
};
use crate::models::metrics::{GraphMetrics, NodeMetrics};
pub struct TaskHandler {
db_client: Arc<dyn AnalysisDatabase>,
args: Args,
}
impl TaskHandler {
pub fn new(db_client: Arc<dyn AnalysisDatabase>, args: Args) -> Self {
Self { db_client, args }
}
pub async fn execute(&self) -> Result<(), Box<dyn std::error::Error>> {
match &self.args.task {
Some(task) => self.execute_specific_task(task).await,
None => self.execute_default_analysis().await,
}
}
async fn execute_specific_task(
&self,
task: &str,
) -> Result<(), Box<dyn std::error::Error>> {
match task {
"projection-create" => self.handle_projection_create().await,
"projection-delete" => self.handle_projection_delete().await,
"projection-exists" => self.handle_projection_exists().await,
"metrics-basic" => self.handle_metrics_basic().await,
"metrics-degrees" => self.handle_metrics_degrees().await,
"metrics-distribution" => self.handle_metrics_distribution().await,
"components-analysis" => self.handle_components_analysis().await,
"info-database" => self.handle_info_database().await,
"help" => self.handle_help().await,
_ => {
error!("Unknown task: {}", task);
println!("{}", Args::task_help());
std::process::exit(1);
}
}
}
async fn execute_default_analysis(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Running default analysis workflow...");
let proj_params = self.create_default_projection_params();
info!(
"Creating/updating GDS graph projection: '{}' with node \
label: '{}'",
proj_params.projection_name, proj_params.node_label
);
self.db_client
.create_graph_projection(&proj_params)
.await
.map_err(|e: AnalysisError| {
error!(
"Failed to create GDS graph projection '{}': {:?}",
proj_params.projection_name, e
);
Box::new(e) as Box<dyn std::error::Error>
})?;
info!(
"Successfully created/updated GDS graph projection: '{}'",
proj_params.projection_name
);
self.calculate_and_display_metrics(&proj_params.projection_name)
.await?;
self.handle_components_analysis().await?;
Ok(())
}
async fn handle_projection_create(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let proj_params = self.create_projection_params("tor_erpc_projection");
if !self.args.force
&& self
.db_client
.check_graph_projection_exists(&proj_params.projection_name)
.await?
{
println!(
"Projection '{}' already exists. Use --force to recreate.",
proj_params.projection_name
);
return Ok(());
}
info!(
"Creating graph projection: '{}'",
proj_params.projection_name
);
self.db_client.create_graph_projection(&proj_params).await?;
println!(
"✅ Successfully created projection: '{}'",
proj_params.projection_name
);
Ok(())
}
async fn handle_projection_delete(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Deleting graph projection: tor_erpc_projection");
self.db_client
.delete_graph_projection("tor_erpc_projection")
.await?;
println!("✅ Successfully deleted projection: tor_erpc_projection");
Ok(())
}
async fn handle_projection_exists(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let exists = self
.db_client
.check_graph_projection_exists("tor_erpc_projection")
.await?;
if exists {
println!("✅ Projection: tor_erpc_projection exists");
} else {
println!("❌ Projection: tor_erpc_projection does not exist");
}
Ok(())
}
async fn handle_metrics_basic(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Calculating basic metrics for projection: tor_erpc_projection");
let metrics = self
.db_client
.calculate_graph_metrics("tor_erpc_projection")
.await
.map_err(|e| {
error!("Failed to calculate graph metrics: {:?}", e);
e
})?;
info!("Successfully calculated metrics, displaying results...");
self.display_basic_metrics(&metrics)?;
Ok(())
}
async fn handle_metrics_degrees(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let node_degrees = self
.db_client
.calculate_node_degrees("tor_erpc_projection")
.await
.map_err(|e| {
error!("Failed to calculate node degrees: {:?}", e);
e
})?;
self.display_degree_metrics(&node_degrees)?;
Ok(())
}
async fn handle_metrics_distribution(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let metrics = self
.db_client
.calculate_graph_metrics("tor_erpc_projection")
.await
.map_err(|e| {
error!(
"Failed to calculate graph metrics for distribution: {:?}",
e
);
e
})?;
self.display_degree_distribution(&metrics)?;
Ok(())
}
async fn handle_info_database(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
let connection_status = match self
.db_client
.check_graph_projection_exists("test_connection")
.await
{
Ok(_) => "Connected",
Err(_) => "Connection Failed",
};
println!("Database Information:");
println!(" Type: Neo4j");
println!(" Status: {}", connection_status);
Ok(())
}
async fn handle_help(&self) -> Result<(), Box<dyn std::error::Error>> {
println!("{}", Args::task_help());
Ok(())
}
async fn handle_components_analysis(
&self,
) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Creating partition detection projection (SUCCESS edges only)..."
);
let partition_params = self.create_partition_detection_params(
"tor_erpc_connectivity_analysis",
);
self.db_client
.create_graph_projection(&partition_params)
.await
.map_err(|e| {
error!(
"Failed to create partition detection projection: {:?}",
e
);
e
})?;
info!("Running weak connectivity analysis using WCC algorithm...");
self.run_wcc_analysis(&partition_params.projection_name)
.await?;
info!("Running strong connectivity analysis using SCC algorithm...");
self.run_scc_analysis(&partition_params.projection_name)
.await?;
Ok(())
}
async fn run_wcc_analysis(
&self,
projection_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let analyzer = ComponentAnalyzer::new(Arc::clone(&self.db_client));
let result = analyzer
.analyze_weakly_connected_components(projection_name)
.await?;
analyzer.display_weak_connectivity_analysis(&result)?;
Ok(())
}
async fn run_scc_analysis(
&self,
projection_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let analyzer = ComponentAnalyzer::new(Arc::clone(&self.db_client));
let result = analyzer
.analyze_strongly_connected_components(projection_name)
.await?;
analyzer.display_strong_connectivity_analysis(&result)?;
Ok(())
}
fn create_default_projection_params(&self) -> GraphProjectionParams {
let mut rel_types_map = HashMap::new();
rel_types_map
.insert("CIRCUIT_SUCCESS".to_string(), "NATURAL".to_string());
rel_types_map
.insert("CIRCUIT_FAILURE".to_string(), "NATURAL".to_string());
GraphProjectionParams {
projection_name: "tor_erpc_projection".to_string(),
node_label: "Relay".to_string(),
relationship_types: rel_types_map,
relationship_properties_to_project: None,
}
}
fn create_projection_params(&self, name: &str) -> GraphProjectionParams {
let mut rel_types_map = HashMap::new();
rel_types_map
.insert("CIRCUIT_SUCCESS".to_string(), "NATURAL".to_string());
rel_types_map
.insert("CIRCUIT_FAILURE".to_string(), "NATURAL".to_string());
GraphProjectionParams {
projection_name: name.to_string(),
node_label: "Relay".to_string(),
relationship_types: rel_types_map,
relationship_properties_to_project: None,
}
}
fn create_partition_detection_params(
&self,
name: &str,
) -> GraphProjectionParams {
let mut rel_types_map = HashMap::new();
rel_types_map
.insert("CIRCUIT_SUCCESS".to_string(), "NATURAL".to_string());
GraphProjectionParams {
projection_name: name.to_string(),
node_label: "Relay".to_string(),
relationship_types: rel_types_map,
relationship_properties_to_project: None,
}
}
async fn calculate_and_display_metrics(
&self,
projection_name: &str,
) -> Result<(), Box<dyn std::error::Error>> {
info!("=== Starting Graph Metrics Calculation ===");
let graph_metrics = self
.db_client
.calculate_graph_metrics(projection_name)
.await
.map_err(|e: AnalysisError| {
error!("Failed to calculate graph metrics: {:?}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
self.display_basic_metrics(&graph_metrics)?;
self.display_degree_distribution(&graph_metrics)?;
let node_degrees = self
.db_client
.calculate_node_degrees(projection_name)
.await
.map_err(|e: AnalysisError| {
error!("Failed to calculate node degrees: {:?}", e);
Box::new(e) as Box<dyn std::error::Error>
})?;
self.display_degree_metrics(&node_degrees)?;
info!("=== Graph Metrics Calculation Complete ===");
Ok(())
}
fn display_basic_metrics(
&self,
metrics: &GraphMetrics,
) -> Result<(), Box<dyn std::error::Error>> {
info!("Basic Graph Metrics:");
info!(" Nodes: {}", metrics.node_count.unwrap_or(0));
info!(
" Relationships: {}",
metrics.relationship_count.unwrap_or(0)
);
info!(
" Average degree: {:.2}",
metrics.average_degree.unwrap_or(0.0)
);
info!(" Maximum degree: {}", metrics.max_degree.unwrap_or(0));
info!(" Minimum degree: {}", metrics.min_degree.unwrap_or(0));
Ok(())
}
fn display_degree_metrics(
&self,
node_degrees: &[NodeMetrics],
) -> Result<(), Box<dyn std::error::Error>> {
let mut sorted_degrees = node_degrees.to_vec();
sorted_degrees.sort_by(|a, b| b.total_degree.cmp(&a.total_degree));
info!("Top 10 Most Connected Relays:");
for (i, node) in sorted_degrees.iter().take(10).enumerate() {
info!(
" {}. {} - Total: {}, In: {}, Out: {}",
i + 1,
&node.fingerprint,
node.total_degree,
node.in_degree,
node.out_degree
);
}
Ok(())
}
fn display_degree_distribution(
&self,
metrics: &GraphMetrics,
) -> Result<(), Box<dyn std::error::Error>> {
if let Some(degree_dist) = &metrics.degree_distribution {
info!(
"Degree Distribution ({} unique values):",
degree_dist.len()
);
let mut dist_vec: Vec<(i64, i64)> =
degree_dist.iter().map(|(&k, &v)| (k, v)).collect();
dist_vec.sort_by(|a, b| b.1.cmp(&a.1)); info!(" Top 5 Most Common Degrees:");
for (i, (degree, count)) in dist_vec.iter().take(5).enumerate() {
info!(
" {}. {} relays have degree {}",
i + 1,
count,
degree
);
}
} else {
warn!("No degree distribution data available");
}
Ok(())
}
}