"""
Core analysis functions for image quality quantification.
Author: Edwing Ulin-Briseno
Date: 2025-07-16
"""
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import matplotlib.patches as patches
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import yacs.config
from .metrics import get_values
from .phantom import NemaPhantom
from .statistics import Estimator
from .utils import (
extract_canny_mask,
find_phantom_center,
find_phantom_center_cv2_threshold,
)
_logger = logging.getLogger(__name__)
def _propagate_ratio(
mu1: float,
mu2: float,
var1: float,
var2: float,
cov12: float = 0.0,
eps: float = 1e-12,
) -> float:
"""
First-order Taylor variance propagation for ratio mu1 / mu2.
Returns standard deviation.
"""
if abs(mu2) < eps:
return 0.0
grad1 = 1.0 / mu2
grad2 = -mu1 / (mu2**2)
var = grad1**2 * var1 + grad2**2 * var2 + 2.0 * grad1 * grad2 * cov12
return float(np.sqrt(max(var, 0.0)))
[docs]
def create_cylindrical_mask(
shape_zyx: Tuple[int, int, int],
center_zyx: Tuple[float, float, float],
radius_mm: float,
height_mm: float,
spacing_xyz: npt.NDArray[np.float64],
) -> npt.NDArray[np.bool_]:
"""Create a boolean mask for a cylinder aligned with the z-axis."""
center_z, center_y, center_x = center_zyx
radius_vox_x = radius_mm / spacing_xyz[0]
radius_vox_y = radius_mm / spacing_xyz[1]
half_height_vox_z = (height_mm / spacing_xyz[2]) / 2.0
z_min = max(0, int(np.floor(center_z - half_height_vox_z)))
z_max = min(shape_zyx[0] - 1, int(np.ceil(center_z + half_height_vox_z)))
yy, xx = np.ogrid[: shape_zyx[1], : shape_zyx[2]]
ellipse = ((xx - center_x) / radius_vox_x) ** 2 + (
(yy - center_y) / radius_vox_y
) ** 2 <= 1.0
mask = np.zeros(shape_zyx, dtype=bool)
mask[slice(z_min, z_max + 1), :, :] = ellipse
return mask
def _calculate_background_stats(
image_data: npt.NDArray[Any],
phantom: NemaPhantom,
slices_indices: List[int],
centers_offset: List[Tuple[int, int]],
save_visualizations: bool = False,
viz_dir: Optional[Path] = None,
) -> Dict[int, Dict[str, float]]:
"""
Internal function to calculate background mean (C_B) and std dev (SD_B)
"""
reference_sphere = None
for name, sphere_def in phantom.rois.items():
if "sphere" in name:
reference_sphere = sphere_def
break
if reference_sphere is None:
raise ValueError("No sphere ROI found in phantom")
pivot_point_yx = reference_sphere["center_vox"]
slices_dims_yx = (image_data.shape[1], image_data.shape[2])
bkg_counts_per_size: Dict[int, List[float]] = {}
for name, _ in phantom.rois.items():
if "sphere" in name:
sphere_roi = phantom.get_roi(name)
if sphere_roi:
diam_mm = int(round(sphere_roi["diameter"]))
if diam_mm not in bkg_counts_per_size:
bkg_counts_per_size[diam_mm] = []
if save_visualizations and viz_dir:
central_slice = image_data[slices_indices[len(slices_indices) // 2], :, :]
ref_radius = None
for name, _ in phantom.rois.items():
if "sphere" in name:
sphere_roi = phantom.get_roi(name)
if sphere_roi:
ref_radius = sphere_roi["radius_vox"]
break
if ref_radius:
save_background_visualization(
central_slice,
centers_offset,
pivot_point_yx,
ref_radius,
viz_dir,
slices_indices[len(slices_indices) // 2],
)
for y_offset, x_offset in centers_offset:
for name, _ in phantom.rois.items():
if "sphere" not in name:
continue
sphere_roi = phantom.get_roi(name)
if sphere_roi is None:
continue
roi_mask = extract_circular_mask_2d(
slices_dims_yx,
(pivot_point_yx[0] + y_offset, pivot_point_yx[1] + x_offset),
sphere_roi["radius_vox"],
)
for slice_idx in slices_indices:
if 0 <= slice_idx < image_data.shape[0]:
img_slice = image_data[slice_idx, :, :]
avg_count = (
np.mean(img_slice[roi_mask]) if np.sum(roi_mask) > 0 else 0.0
)
sphere_diam_mm = int(round(sphere_roi["diameter"]))
bkg_counts_per_size[sphere_diam_mm].append(float(avg_count))
bkg_stats = {}
for diam, counts_list in bkg_counts_per_size.items():
if len(counts_list) > 0:
bkg_stats[diam] = {
"C_B": float(np.mean(counts_list)),
"SD_B": float(np.std(counts_list)),
"n_B": len(counts_list),
}
else:
bkg_stats[diam] = {"C_B": 100.0, "SD_B": 0.0, "n_B": 0}
return bkg_stats
def _calculate_hot_sphere_counts_offset_zxy(
image_data: npt.NDArray[Any],
phantom: NemaPhantom,
central_slice_idx: int,
save_visualizations: bool = False,
viz_dir: Optional[Path] = None,
) -> Dict[str, Dict[str, float]]: # Changed return type
"""Internal function to calculate the mean counts (C_H) for each hot sphere."""
offsets_xy = [(dy, dx) for dy in range(-10, 11) for dx in range(-10, 11)]
offsets_z = list(range(-10, 11))
# offsets_xy = [(dy, dx) for dy in range(-1, 2) for dx in range(-1, 2)]
# offsets_z = list(range(-1, 2))
hot_sphere_counts = {}
z_dim, y_dim, x_dim = image_data.shape
for name, _ in phantom.rois.items():
if "sphere" not in name:
continue
sphere_roi = phantom.get_roi(name)
if sphere_roi is None:
continue
center_y, center_x = sphere_roi["center_vox"]
radius = sphere_roi["radius_vox"]
diameter = int(np.ceil(radius * 2))
mask_size = diameter if diameter % 2 == 1 else diameter + 1
base_mask = extract_circular_mask_2d(
(mask_size, mask_size),
(mask_size // 2, mask_size // 2),
radius,
)
mask_half = mask_size // 2
max_mean = -np.inf
best_std = 0.0
best_n = 1
best_mask = None
best_slice = None
best_offset = None
for dz in offsets_z:
z_idx = central_slice_idx + dz
if z_idx < 0 or z_idx >= z_dim:
continue
current_slice = image_data[z_idx]
for dy, dx in offsets_xy:
target_y = int(round(center_y + dy))
target_x = int(round(center_x + dx))
y_min = target_y - mask_half
y_max = target_y + mask_half + 1
x_min = target_x - mask_half
x_max = target_x + mask_half + 1
if y_min < 0 or x_min < 0 or y_max > y_dim or x_max > x_dim:
continue
shifted_region = current_slice[y_min:y_max, x_min:x_max]
values = shifted_region[base_mask]
if values.size == 0:
continue
mean_val = np.mean(values)
if mean_val > max_mean:
max_mean = mean_val
best_std = np.std(values, ddof=1)
best_n = values.size
best_mask = base_mask
best_slice = shifted_region
best_offset = (dz, dy, dx)
if max_mean == -np.inf:
_logger.warning(f"No valid ROI found for {name}")
hot_sphere_counts[name] = {
"mean": 0.0,
"std_H": 0.0,
"n_H": 1,
}
continue
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(
f"{name}: best offset {best_offset}, "
f"mean={max_mean:.3f}, std={best_std:.3f}, n={best_n}"
)
hot_sphere_counts[name] = {
"mean": float(max_mean),
"std_H": float(best_std),
"n_H": int(best_n),
}
if save_visualizations and viz_dir and best_slice is not None:
save_sphere_visualization(
best_slice,
name,
(mask_half, mask_half),
radius,
best_mask if best_mask is not None else np.zeros_like(best_slice, dtype=bool), # type: ignore[arg-type]
viz_dir,
central_slice_idx,
)
return hot_sphere_counts
def _calculate_crc_std(
image_data: npt.NDArray[Any],
phantom: NemaPhantom,
central_slice_idx: int,
measure_in_px: int,
uniform_region_mask: npt.NDArray[Any],
cfg: yacs.config.CfgNode,
) -> Dict[str, Dict[str, Any]]: # type: ignore[return-value]
"""
Calculate Recovery Coefficients following NEMA NU4-2008 standard.
Per NEMA NU4-2008:
- "Averaging the voxels along the axial axis and determining the maximum
average activity inside each VOI."
- "Five VOIs with a diameter TWICE the rod diameter and length of 10 mm."
Important: The phantom config specifies the rod diameter, but per NEMA standard
the VOI must be TWICE that diameter. This is critical for accurate RC calculation.
Algorithm:
1. Create VOI with diameter = 2 × rod diameter (radius = 2 × rod radius)
2. For each (y,x) position in the VOI, create an axial line profile (along z)
3. Find the (y,x) position with maximum average along z-axis
4. Extract that specific line profile
5. Compute mean and std dev from that line profile only
This differs from computing over entire 2D slices and reduces std dev by
focusing on axial uniformity at the peak location.
"""
mode = cfg.STATISTICS.MODE
eps = getattr(cfg.STATISTICS, "EPSILON", 1e-12)
uniform_values = image_data[uniform_region_mask]
uniform_est = Estimator.from_samples(uniform_values, mode=mode)
_logger.debug(
" RC params: center=%d, measure_in_px=%d, z_dim=%d",
central_slice_idx,
measure_in_px,
image_data.shape[0],
)
num_slices = [
i
for i in range(
central_slice_idx - measure_in_px, central_slice_idx + measure_in_px + 1
)
if 0 <= i < image_data.shape[0]
]
_logger.debug(f" RC slices in bounds: {num_slices}")
crc_results: Dict[str, Dict[str, Any]] = {}
for name, _ in phantom.rois.items():
sphere_roi = phantom.get_roi(name)
if sphere_roi is None:
continue
# NEMA NU4-2008: VOI diameter must be TWICE the rod diameter
# Config contains rod diameter, so VOI radius = 2 × rod radius
voi_radius_vox = sphere_roi["radius_vox"] * 2.0
y, x = sphere_roi["center_vox"]
# Create 2D mask for the ROD VOI
roi_mask = extract_circular_mask_2d(
(image_data.shape[1], image_data.shape[2]),
sphere_roi["center_vox"],
voi_radius_vox,
)
# Get (y,x) coordinates where mask is True
y_coords, x_coords = np.where(roi_mask)
if len(y_coords) == 0:
_logger.warning(f" No voxels found in ROI mask for {name}")
crc_results[voi_radius_vox / 2.0] = { # type: ignore[index]
"mean_signal": 0.0,
"std_signal": 0.0,
"uniform_mean": 0.0, # type: ignore[name-defined]
"uniform_std": 0.0, # type: ignore[name-defined]
"recovery_coeff": 0.0,
"percentage_STD_rc": 0.0,
"cError": 0.0,
}
continue
_logger.debug(
f" Calculating RC for {name}:"
f" rod_diameter={sphere_roi['diameter']}mm," # type: ignore[index]
f" voi_radius_vox={voi_radius_vox:.3f},"
f" mask_pixels={len(y_coords)}"
)
for name, _ in phantom.rois.items():
sphere_roi = phantom.get_roi(name)
if sphere_roi is None:
continue
voi_radius_vox = sphere_roi["radius_vox"] * 2.0
roi_mask = extract_circular_mask_2d(
(image_data.shape[1], image_data.shape[2]),
sphere_roi["center_vox"],
voi_radius_vox,
)
y_coords, x_coords = np.where(roi_mask)
if len(y_coords) == 0:
continue
max_avg = -np.inf
best_profile = None
for y, x in zip(y_coords, x_coords):
profile = [float(image_data[z, y, x]) for z in num_slices]
avg = np.mean(profile)
if avg > max_avg:
max_avg = float(avg)
best_profile = profile
if best_profile is None:
continue
rod_est = Estimator.from_samples(
np.array(best_profile),
mode=mode,
)
if abs(uniform_est.mean) < eps:
rc_est = Estimator(0.0, 0.0, 1)
else:
rc_est = rod_est.ratio(uniform_est)
# Calculate coefficient of variation for rod and uniform region (NEMA NU4 style)
cv_rod = (
np.std(best_profile) / np.mean(best_profile)
if abs(np.mean(best_profile)) > eps
else 0.0
)
cv_uniform = (
np.std(uniform_values) / np.mean(uniform_values)
if abs(np.mean(uniform_values)) > eps
else 0.0
)
# Combined %STD: propagate both rod and uniform variability
percentage_std_combined = 100.0 * np.sqrt(cv_rod**2 + cv_uniform**2)
crc_results[voi_radius_vox / 2.0] = { # type: ignore[index]
"mean_signal": rod_est.mean,
"std_signal": rod_est.std,
"uniform_mean": uniform_est.mean,
"uniform_std": uniform_est.std,
"recovery_coeff": rc_est.mean * 100.0,
"percentage_STD_rc": percentage_std_combined,
"cError": rc_est.std * 100.0,
}
return crc_results
def _calculate_spillover_ratio(
image_data: npt.NDArray[Any],
phantom: NemaPhantom,
uniform_region_mask: npt.NDArray[Any],
air_region_mask: npt.NDArray[Any],
water_region_mask: npt.NDArray[Any],
cfg: yacs.config.CfgNode,
) -> Dict[str, Dict[str, Any]]: # type: ignore[return-value]
"""Internal function to calculate the spillover ratio for lung inserts."""
mode = cfg.STATISTICS.MODE
eps = getattr(cfg.STATISTICS, "EPSILON", 1e-12)
uniform_values = image_data[uniform_region_mask]
uniform_est = Estimator.from_samples(uniform_values, mode=mode)
spillover_ratios = {}
for region_name, region_mask in [
("air", air_region_mask),
("water", water_region_mask),
]:
region_values = image_data[region_mask]
region_est = Estimator.from_samples(region_values, mode=mode)
if abs(uniform_est.mean) < eps:
sor_est = Estimator(0.0, 0.0, 1)
else:
sor_est = region_est.ratio(uniform_est)
# Calculate coefficient of variation using raw std (NEMA NU4 style, matching AMIDE)
cv_region = (
np.std(region_values) / np.mean(region_values)
if abs(np.mean(region_values)) > eps
else 0.0
)
cv_uniform = (
np.std(uniform_values) / np.mean(uniform_values)
if abs(np.mean(uniform_values)) > eps
else 0.0
)
# Combined %STD: propagate both region and uniform variability
percentage_std_combined = 100.0 * np.sqrt(cv_region**2 + cv_uniform**2)
_logger.debug(
f"ROI {region_name.capitalize()}:"
f" voxels={np.sum(region_mask)},"
f" mean={region_est.mean:.6f},"
f" std={region_est.std:.6f},"
f" perc_std={100 * region_est.std / region_est.mean if region_est.mean != 0 else 0:.2f}"
)
spillover_ratios[region_name] = {
"SOR": sor_est.mean * 100.0,
"SOR_error": sor_est.std * 100.0,
"%STD": percentage_std_combined,
}
return spillover_ratios
def _calculate_lung_insert_counts(
image_data: npt.NDArray[Any],
lung_inserts_centers: npt.NDArray[Any],
CB_37: float,
voxel: float,
) -> Dict[int, float]:
"""Internal function to calculate the mean counts (C_lung) for each axial slice within the permitted lung bounds."""
lung_insert = {}
for z, y, x in lung_inserts_centers:
axial_cut = image_data[z, :, :]
slice_dims_yx = axial_cut.shape
roi_mask = extract_circular_mask_2d(
(slice_dims_yx[0], slice_dims_yx[1]), (y, x), 15 / voxel
)
lung_insert[z] = (np.mean(axial_cut[roi_mask]) / CB_37) * 100
return lung_insert
[docs]
def calculate_weighted_cbr_from(results):
"""
Calculates weighted Contrast-to-Background Ratio (CBR) and Figure of Merit (FOM) from sphere results.
Orchestrates weighted metric calculation by processing individual sphere measurements and computing
diameter-weighted averages as defined in the NEMA NU 2-2018 standard for overall image quality assessment.
Parameters
----------
results : list of dict
List of calculated metrics for each sphere containing diameter_mm, percentaje_constrast_QH,
and background_variability_N keys.
Returns
-------
dict
Dictionary containing weighted_CBR, weighted_FOM, individual CBRs, FOMs, weights, and diameters.
Returns None values for weighted metrics if results list is empty.
Notes
-----
Author: EdAlita
Date: 2025-01-08 16:15:00
The weighting scheme uses inverse diameter weighting (1/d) normalized to sum to 1.0.
CBR is calculated as contrast/variability, FOM as contrast²/variability.
"""
if not results:
return {"weighted_CBR": None, "weighted_FOM": None}
diameters = [r["diameter_mm"] for r in results]
contrasts = [r["percentaje_constrast_QH"] for r in results]
variabilities = [r["background_variability_N"] for r in results]
weights = [1 / (d**2) for d in diameters]
total_weight = sum(weights)
weights = [w / total_weight for w in weights]
CBRs = [c / v if v != 0 else 0 for c, v in zip(contrasts, variabilities)]
FOMs = [(c**2) / v if v != 0 else 0 for c, v in zip(contrasts, variabilities)]
weighted_CBR = sum(w * cbr for w, cbr in zip(weights, CBRs))
weighted_FOM = sum(w * fom for w, fom in zip(weights, FOMs))
return {
"weighted_CBR": weighted_CBR,
"weighted_FOM": weighted_FOM,
"CBRs": CBRs,
"FOMs": FOMs,
"weigths": weights,
"diameters": diameters,
}
[docs]
def calculate_nema_metrics(
image_data: npt.NDArray[Any],
phantom: NemaPhantom,
cfg: yacs.config.CfgNode,
save_visualizations: bool = False,
visualizations_dir: str = "visualizations",
protocol: Optional[str] = "NU_2_2018",
) -> Tuple[List[Dict[str, Any]], Dict[int, float]]:
"""Calculate NEMA NU 2-2018 image quality metrics.
Orchestrates the complete NEMA image quality analysis pipeline:
1. Identifies background regions at multiple slice positions
2. Extracts hot sphere region counts
3. Computes Percent Contrast (Q_H) and Background Variability (N)
4. Analyzes lung insert counts for spillover ratio assessment
This is the primary function for quantifying image quality according to the
NEMA NU 2-2018 standard.
Parameters
----------
image_data : numpy.ndarray
3D PET image array with shape (z, y, x) in voxel units.
phantom : NemaPhantom
Initialized phantom model with ROI definitions and coordinate transforms.
cfg : yacs.config.CfgNode
Configuration object containing:
- ACTIVITY.RATIO: Hot/background activity ratio (a_H / a_B)
- ROIS.CENTRAL_SLICE: Central slice index in z-axis
- ROIS.BACKGROUND_OFFSET_YX: Background region offsets
- ROIS.ORIENTATION_YX: Orientation multipliers for background regions
- ROIS.SPACING: Voxel spacing in mm
save_visualizations : bool, optional
If True, saves ROI mask visualizations to disk. Default is False.
visualizations_dir : str, optional
Directory path for saving visualization images. Default is "visualizations".
Returns
-------
results : list[dict]
List of metric dictionaries for each hot sphere:
- diameter_mm: Sphere diameter in mm
- percentaje_constrast_QH: Percent contrast (%)
- background_variability_N: Background variability (%)
- avg_hot_counts_CH: Average hot sphere counts
- avg_bkg_counts_CB: Average background counts
- bkg_std_dev_SD: Background standard deviation
results_lung : dict[int, float]
Lung insert spillover ratios indexed by slice number.
Raises
------
ValueError
If activity ratio is not greater than 1.
Notes
-----
The function analyzes multiple slice positions (±10mm and ±20mm from central slice)
to account for axial variation in the phantom and image statistics.
References
----------
- NEMA NU 2-2018: Performance Measurements of Positron Emission Tomographs
Examples
--------
Calculate NEMA metrics for a loaded image:
>>> from nema_quant.phantom import NemaPhantom
>>> from nema_quant.config import CfgNode
>>> phantom = NemaPhantom(image_path='pet_image.nii.gz')
>>> cfg = get_default_config()
>>> metrics, lung_results = calculate_nema_metrics(
... image_data, phantom, cfg, save_visualizations=True
... )
>>> for m in metrics:
... print(f"Sphere {m['diameter_mm']}mm: Q_H={m['percentaje_constrast_QH']:.1f}%")
"""
viz_dir = None
if save_visualizations:
viz_dir = Path(visualizations_dir)
viz_dir.mkdir(parents=True, exist_ok=True)
_logger.info(f" Saving visualizations to: {viz_dir}")
activity_ratio = cfg.ACTIVITY.RATIO
if activity_ratio <= 0 or activity_ratio <= 1:
raise ValueError(
"Activity ratio (a_H / a_B) must be greater than 1 and"
"background activity must be positive."
)
central_slices_idx = cfg.ROIS.CENTRAL_SLICE
cm_in_z_vox = phantom._mm_to_voxels(10, 2)
slices_indices = sorted(
{
central_slices_idx,
int(round(central_slices_idx + cm_in_z_vox)),
int(round(central_slices_idx - cm_in_z_vox)),
int(round(central_slices_idx + 2 * cm_in_z_vox)),
int(round(central_slices_idx - 2 * cm_in_z_vox)),
}
)
background_stats = _calculate_background_stats(
image_data,
phantom,
slices_indices,
[
(y * cfg.ROIS.ORIENTATION_YX[0], x * cfg.ROIS.ORIENTATION_YX[1])
for y, x in cfg.ROIS.BACKGROUND_OFFSET_YX
],
save_visualizations=save_visualizations,
viz_dir=viz_dir,
)
hot_sphere_counts = _calculate_hot_sphere_counts_offset_zxy(
image_data,
phantom,
central_slices_idx,
save_visualizations=save_visualizations,
viz_dir=viz_dir,
)
results = []
activity_ratio_term = activity_ratio - 1.0
CB_37 = 0.0
eps = getattr(cfg.STATISTICS, "EPSILON", 1e-12)
mode = cfg.STATISTICS.MODE
sd_model = cfg.STATISTICS.SD_VARIANCE_MODEL
_logger.info(f"{'mm':^4} | {'RC [%]':^13} | {'BV [%]':^12}")
_logger.info(f"{'-'*4} | {'-'*13} | {'-'*12}")
for name, hot_data in hot_sphere_counts.items():
sphere_def = phantom.get_roi(name)
if sphere_def is None:
continue
sphere_diam_mm = int(round(sphere_def["diameter"]))
sphere_diam_mm = int(round(sphere_def["diameter"]))
hot_est = Estimator.from_mean_std(
mean=hot_data["mean"],
std=hot_data["std_H"],
n=int(hot_data["n_H"]),
mode=mode,
)
bkg_data = background_stats[sphere_diam_mm]
bkg_est = Estimator.from_mean_std(
mean=bkg_data["C_B"],
std=bkg_data["SD_B"],
n=int(bkg_data["n_B"]),
mode=mode,
)
if abs(bkg_est.mean) < eps:
_logger.warning(
f"Sphere {sphere_diam_mm}mm skipped: C_B too small ({bkg_est.mean:.3e})"
)
continue
ratio_est = hot_est.ratio(bkg_est)
qh_est = ratio_est.subtract_constant(1.0).scale(100.0 / activity_ratio_term)
# Build SD estimator
SD_B = bkg_data["SD_B"] # type: ignore[index]
n_B = max(int(bkg_data["n_B"]), 1) # type: ignore[arg-type]
C_B = bkg_est.mean
if sd_model == "poisson":
var_sd = C_B / (2.0 * n_B)
else:
var_sd = (SD_B**2) / (2.0 * n_B) if n_B > 1 else 0.0
sd_est = Estimator(SD_B, var_sd, n_B) # type: ignore[arg-type]
n_est = sd_est.ratio(bkg_est).scale(100.0)
# Save 37 mm background for lung insert
if sphere_diam_mm == 37:
CB_37 = bkg_est.mean
_logger.info(
f"{sphere_diam_mm:>4.1f} | "
f"{qh_est.mean:>5.2f} ± {qh_est.std:>4.2f}% | "
f"{n_est.mean:>5.2f} ± {n_est.std:>4.2f}%"
)
results.append(
{
"diameter_mm": sphere_diam_mm,
"percentaje_constrast_QH": qh_est.mean,
"percentaje_constrast_QH_error": qh_est.std,
"percentaje_constrast_QH_%STD": (
100 * qh_est.std / qh_est.mean if abs(qh_est.mean) > eps else 0.0
),
"background_variability_N": n_est.mean,
"background_variability_error": n_est.std,
"background_variability_%STD": (
100 * n_est.std / n_est.mean if abs(n_est.mean) > eps else 0.0
),
"avg_hot_counts_CH": hot_est.mean,
"avg_bkg_counts_CB": bkg_est.mean,
"bkg_std_dev_SD": SD_B,
}
)
if protocol == "NU_2_2018":
phantom_center_zyx = find_phantom_center(
image_data, threshold=(np.max(image_data) * 0.41)
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(f" Phantom Center found at (z,y,x) : {phantom_center_zyx}")
lung_insert_centers = extract_canny_mask(
image_data, cfg.ROIS.SPACING, int(phantom_center_zyx[0])
)
results_lung = _calculate_lung_insert_counts(
image_data, lung_insert_centers, CB_37, cfg.ROIS.SPACING
)
if _logger.isEnabledFor(logging.DEBUG):
_logger.debug(" Lung Insert Results")
for k, v in results_lung.items():
_logger.debug(f" Slice {int(k)}: {float(v):.3f}")
else:
results_lung = {}
return results, results_lung
[docs]
def calculate_nema_metrics_nu4_2008(
image_data: npt.NDArray[Any],
phantom: NemaPhantom,
cfg: yacs.config.CfgNode,
save_visualizations: bool = False,
visualizations_dir: str = "visualizations",
) -> Tuple[List[Dict[str, Any]], Dict[int, float], Dict[str, float]]:
"""Calculate NEMA NU 4-2008 image quality metrics.
This function is a placeholder for the implementation of NEMA NU 4-2008 metrics calculation.
The actual logic for processing the image data according to the NU 4-2008 standard needs to be implemented.
Parameters
----------
image_data : numpy.ndarray
3D PET image array with shape (z, y, x) in voxel units.
phantom : NemaPhantom
Initialized phantom model with ROI definitions and coordinate transforms.
cfg : yacs.config.CfgNode
Configuration object containing necessary parameters for NU 4-2008 analysis.
save_visualizations : bool, optional
If True, saves ROI mask visualizations to disk. Default is False.
visualizations_dir : str, optional
Directory path for saving visualization images. Default is "visualizations".
Returns
-------
Tuple[List[Dict[str, Any]], Dict[int, float], Dict[str, float]]
"""
mode = cfg.STATISTICS.MODE
eps = getattr(cfg.STATISTICS, "EPSILON", 1e-12)
dim_z, dim_y, dim_x = image_data.shape
_logger.debug(f" Image data dimensions (z,y,x): {dim_z}, {dim_y}, {dim_x}")
# Find phantom center in xy plane only (for centering cylindrical ROIs)
center_method = getattr(cfg.ROIS, "PHANTOM_CENTER_METHOD", "weighted_slices")
center_threshold = getattr(cfg.ROIS, "PHANTOM_CENTER_THRESHOLD_FRACTION", 0.41)
ce_z, ce_y, ce_x = find_phantom_center_cv2_threshold(
image_data,
threshold_fraction=center_threshold,
method=center_method,
)
phantom_center_z = int(ce_z)
phantom_center_x = int(ce_x)
phantom_center_y = int(ce_y)
rods_center_z = cfg.ROIS.CENTRAL_SLICE
_logger.debug(
f" Phantom center (z, x,y) found at: ({phantom_center_z}, {phantom_center_x}, {phantom_center_y})"
)
_logger.debug(
f" Using CENTRAL_SLICE z={rods_center_z} as reference (hot rods center)"
)
# Calculate uniform region center and bounds
uniform_center_z = rods_center_z - cfg.ROIS.ORIENTATION_Z * (
cfg.ROIS.UNIFORM_OFFSET_MM / cfg.ROIS.SPACING
)
half_height_vox = (cfg.ROIS.UNIFORM_HEIGHT_MM / cfg.ROIS.SPACING) / 2.0
uniform_z_min = int(np.floor(uniform_center_z - half_height_vox))
uniform_z_max = int(np.ceil(uniform_center_z + half_height_vox))
_logger.debug(
f" Uniform region: center_z={uniform_center_z:.1f}, "
f"range z={uniform_z_min}-{uniform_z_max}, "
f"offset={cfg.ROIS.UNIFORM_OFFSET_MM}mm, "
f"height={cfg.ROIS.UNIFORM_HEIGHT_MM}mm"
)
uniform_region_mask = create_cylindrical_mask(
shape_zyx=(image_data.shape[0], image_data.shape[1], image_data.shape[2]), # type: ignore[arg-type]
center_zyx=(
phantom_center_z
+ cfg.ROIS.ORIENTATION_Z * (cfg.ROIS.UNIFORM_OFFSET_MM / cfg.ROIS.SPACING),
phantom_center_y,
phantom_center_x,
),
radius_mm=cfg.ROIS.UNIFORM_RADIUS_MM,
height_mm=cfg.ROIS.UNIFORM_HEIGHT_MM,
spacing_xyz=np.array([cfg.ROIS.SPACING, cfg.ROIS.SPACING, cfg.ROIS.SPACING]), # type: ignore[arg-type]
)
uniform_values = image_data[uniform_region_mask]
uniform_est = Estimator.from_samples(uniform_values, mode=mode)
uniformity_results = {
"mean": uniform_est.mean,
"maximum": float(np.max(uniform_values)) if uniform_values.size > 0 else 0.0,
"minimum": float(np.min(uniform_values)) if uniform_values.size > 0 else 0.0,
"%STD": (
100 * np.std(uniform_values) / np.mean(uniform_values)
if abs(np.mean(uniform_values)) > eps
else 0.0
),
}
_logger.info("Uniformity Results:")
_logger.info(f"{'Mean':^10} | {'Maximum':^15} | {'Minimum':^15} | {'%STD':^7}")
_logger.info(f"{'-'*10} | {'-'*15} | {'-'*15} | {'-'*7}")
_logger.info(
f"{uniformity_results['mean']:>10.4f} | "
f"{uniformity_results['maximum']:>15.4f} | "
f"{uniformity_results['minimum']:>15.4f} | "
f"{uniformity_results['%STD']:>7.4f}"
)
air_region_mask = create_cylindrical_mask(
shape_zyx=(image_data.shape[0], image_data.shape[1], image_data.shape[2]), # type: ignore[arg-type]
center_zyx=(
phantom_center_z
- cfg.ROIS.ORIENTATION_Z * (cfg.ROIS.AIRWATER_OFFSET_MM / cfg.ROIS.SPACING),
phantom_center_y
- cfg.ROIS.ORIENTATION_YX[0]
* (cfg.ROIS.AIRWATER_SEPARATION_MM / cfg.ROIS.SPACING),
phantom_center_x,
),
radius_mm=cfg.ROIS.AIR_RADIUS_MM,
height_mm=cfg.ROIS.AIR_HEIGHT_MM,
spacing_xyz=np.array([cfg.ROIS.SPACING, cfg.ROIS.SPACING, cfg.ROIS.SPACING]), # type: ignore[arg-type]
)
water_region_mask = create_cylindrical_mask(
shape_zyx=(image_data.shape[0], image_data.shape[1], image_data.shape[2]), # type: ignore[arg-type]
center_zyx=(
phantom_center_z
- cfg.ROIS.ORIENTATION_Z * (cfg.ROIS.AIRWATER_OFFSET_MM / cfg.ROIS.SPACING),
phantom_center_y
+ cfg.ROIS.ORIENTATION_YX[0]
* (cfg.ROIS.AIRWATER_SEPARATION_MM / cfg.ROIS.SPACING),
phantom_center_x,
),
radius_mm=cfg.ROIS.WATER_RADIUS_MM,
height_mm=cfg.ROIS.WATER_HEIGHT_MM,
spacing_xyz=np.array([cfg.ROIS.SPACING, cfg.ROIS.SPACING, cfg.ROIS.SPACING]), # type: ignore[arg-type]
)
spillover_ratio = _calculate_spillover_ratio(
image_data=image_data,
phantom=phantom,
uniform_region_mask=uniform_region_mask,
air_region_mask=air_region_mask,
water_region_mask=water_region_mask,
cfg=cfg,
)
_logger.info("Spillover Ratios:")
_logger.info(f"{'Region':^10} | {'SOR ± Error':^15} | {'%STD':^5}")
_logger.info(f"{'-'*10} | {'-'*15} | {'-'*5}")
for region, metrics in spillover_ratio.items():
_logger.info(
f"{region.capitalize():<10} | "
f"{metrics['SOR']:>5.2f} ± {metrics['SOR_error']:<5.2f} | "
f"{metrics['%STD']:>5.2f}"
)
crc_results = _calculate_crc_std(
image_data=image_data,
phantom=phantom,
central_slice_idx=rods_center_z,
measure_in_px=10, # 10mm height / 0.5mm spacing = 20 voxels → ±10 slices = 21 total
uniform_region_mask=uniform_region_mask,
cfg=cfg,
)
_logger.info("RC Results:")
_logger.info(f"{'Diameter':^10} | {'RC ± Error':^15} | {'%STD':^7}")
_logger.info(f"{'-'*10} | {'-'*15} | {'-'*7}")
for region, metrics in crc_results.items():
_logger.info(
f"{region:>10.1f} | "
f"{metrics['recovery_coeff']:>5.2f} ± {metrics['cError']:<5.2f} | "
f"{metrics['percentage_STD_rc']:>5.2f}"
)
return crc_results, spillover_ratio, uniformity_results # type: ignore[return-value]
[docs]
def save_sphere_visualization(
image_slice: npt.NDArray[Any],
sphere_name: str,
center_yx: Tuple[float, float],
radius_vox: float,
roi_mask: npt.NDArray[Any],
output_dir: Path,
slice_idx: int,
) -> None:
"""
Saves a visualization of the sphere ROI and mask for debugging purposes.
Generates and stores an image showing the analyzed sphere, its center and radius, and the ROI mask overlay. Useful for verifying
ROI placement and mask accuracy.
Parameters
----------
image_slice : np.ndarray
2D image slice containing the sphere.
sphere_name : str
Name of the sphere being analyzed.
center_yx : Tuple[float, float]
Center coordinates (y, x) of the sphere.
radius_vox : float
Radius of the sphere in voxels.
roi_mask : np.ndarray
Boolean mask indicating the ROI.
output_dir : Path
Directory to save the visualization.
slice_idx : int
Index of the slice being analyzed.
Returns
-------
None
"""
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
axes[0].imshow(
image_slice, cmap="gray", vmin=0, vmax=np.percentile(image_slice, 99)
)
circle = patches.Circle(
(center_yx[1], center_yx[0]),
radius_vox,
linewidth=2,
edgecolor="red",
facecolor="none",
)
axes[0].add_patch(circle)
axes[0].plot(center_yx[1], center_yx[0], "r+", markersize=10, markeredgewidth=2)
axes[0].set_title(f"{sphere_name}\nOriginal Image with ROI")
axes[0].axis("off")
axes[1].imshow(roi_mask, cmap="Reds", alpha=0.8)
axes[1].set_title(f"{sphere_name}\nROI Mask")
axes[1].axis("off")
masked_image = image_slice.copy()
masked_image[~roi_mask] = 0
axes[2].imshow(
masked_image, cmap="gray", vmin=0, vmax=np.percentile(image_slice, 99)
)
axes[2].set_title(f"{sphere_name}\nMasked Region Only")
axes[2].axis("off")
mean_counts = np.mean(image_slice[roi_mask])
std_counts = np.std(image_slice[roi_mask])
num_pixels = np.sum(roi_mask)
fig.suptitle(
f"Slice {slice_idx} - {sphere_name}\n"
f"Mean: {mean_counts:.2f}, Std: {std_counts:.2f}, Pixels: {num_pixels}",
fontsize=12,
)
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f"{sphere_name}_slice_{slice_idx}_visualization.png"
plt.savefig(output_file, dpi=150, bbox_inches="tight")
plt.close()
print(f"Saved sphere visualization: {output_file}")
[docs]
def save_background_visualization(
image_slice: npt.NDArray[Any],
centers_offset: List[Tuple[int, int]],
pivot_point_yx: Tuple[float, float],
radius_vox: float,
output_dir: Path,
slice_idx: int,
) -> None:
"""
Saves a visualization of the background ROIs for debugging purposes.
Generates and stores an image displaying background ROI locations, offsets, reference pivot point, and radii. Useful for verifying
placement and mask accuracy.
Parameters
----------
image_slice : np.ndarray
2D image slice.
centers_offset : List[Tuple[int, int]]
List of offset positions for background ROIs.
pivot_point_yx : Tuple[float, float]
Pivot point (y, x) of the reference sphere.
radius_vox : float
Radius for background ROIs in voxels.
output_dir : Path
Directory to save the visualization.
slice_idx : int
Index of the slice being analyzed.
Returns
-------
None
"""
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
ax.imshow(image_slice, cmap="gray", vmin=0, vmax=np.percentile(image_slice, 99))
for i, (y_offset, x_offset) in enumerate(centers_offset):
center_y = pivot_point_yx[0] + y_offset
center_x = pivot_point_yx[1] + x_offset
circle = patches.Circle(
(center_x, center_y),
radius_vox,
linewidth=1.5,
edgecolor="cyan",
facecolor="none",
alpha=0.8,
)
ax.add_patch(circle)
ax.text(
center_x,
center_y,
str(i + 1),
ha="center",
va="center",
color="yellow",
fontweight="bold",
fontsize=8,
)
ax.plot(
pivot_point_yx[1], pivot_point_yx[0], "r+", markersize=15, markeredgewidth=3
)
ax.text(
pivot_point_yx[1] + 10,
pivot_point_yx[0],
"Pivot",
color="red",
fontweight="bold",
)
ax.set_title(
f"Background ROIs - Slice {slice_idx}\n{len(centers_offset)} ROIs shown"
)
ax.axis("off")
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f"background_rois_slice_{slice_idx}.png"
plt.savefig(output_file, dpi=150, bbox_inches="tight")
plt.close()
print(f"Saved background visualization: {output_file}")
[docs]
def calculate_advanced_metrics(
image_data: npt.NDArray[Any],
gt_data: npt.NDArray[Any],
measures: Tuple[str, ...],
cfg: yacs.config.CfgNode,
) -> Dict[str, Any]:
"""Compute advanced image-quality metrics against a reference mask.
Parameters
----------
image_data : npt.NDArray[Any]
3D reconstructed image volume.
gt_data : npt.NDArray[Any]
Ground-truth or reference mask aligned with `image_data`.
measures : Tuple[str, ...]
Metric names to compute (passed to `get_values()`).
cfg : yacs.config.CfgNode
Configuration containing `ROIS.SPACING` (voxel size in mm).
Returns
-------
Dict[str, Any]
Mapping of metric names to computed values.
Notes
-----
Logs each computed metric at INFO level.
"""
values = dict(
get_values(
image_data,
gt_data,
measures=measures,
voxelspacing=(cfg.ROIS.SPACING, cfg.ROIS.SPACING, cfg.ROIS.SPACING),
)
)
_logger.info("Advanced Metrics:")
for k, v in values.items():
_logger.info(f" {k}: {v:.7f}")
return values