"""TensorFlow Memory Analysis"""
import logging
import statistics
from dataclasses import asdict
from typing import Any, Dict, List, Mapping, Optional, cast
import numpy as np
from stormlog.collective_attribution import (
CollectiveAttributionConfig,
CollectiveAttributionResult,
attribute_collective_memory,
resolve_collective_attribution_config,
)
from stormlog.gap_analysis import GapFinding, analyze_hidden_memory_gaps
try:
from stormlog.phases import (
PhaseAttribution,
PhaseReplayIndex,
phase_attribution_to_payload,
)
except ImportError: # pragma: no cover - phase package may land in another slice
PhaseAttribution = Any # type: ignore[assignment,misc]
PhaseReplayIndex = Any # type: ignore[assignment,misc]
def phase_attribution_to_payload(
attribution: PhaseAttribution | None,
) -> dict[str, Any] | None:
return None
from stormlog.telemetry import TelemetryEventV2
from .utils import format_memory
_GAP_REMEDIATION_BY_CLASSIFICATION: Dict[str, List[str]] = {
"transient_spike": [
"Investigate non-allocator memory consumers active during spikes "
"(cuDNN workspace, XLA temporaries, other frameworks).",
"Use tf.config.experimental.get_memory_info() around spike windows.",
"Consider setting tf.config.experimental.set_memory_growth(gpu, True).",
],
"persistent_drift": [
"Look for non-TensorFlow GPU allocations accumulating over time "
"(e.g. custom CUDA ops, third-party libraries).",
"Monitor nvidia-smi used memory alongside TF allocator counters.",
"If gap stabilises after warmup, it may be one-time runtime overhead.",
],
"fragmentation_like": [
"Enable memory growth with tf.config.experimental.set_memory_growth(gpu, True).",
"Reduce allocation churn by reusing tensors or using tf.function.",
"Consider setting a hard memory limit with "
"tf.config.set_logical_device_configuration().",
],
}
[docs]
class MemoryAnalyzer:
"""Advanced TensorFlow memory analysis and optimization."""
def __init__(
self,
sensitivity: float = 0.05,
collective_sensitivity: str = "medium",
collective_threshold_overrides: Optional[Mapping[str, Any]] = None,
) -> None:
self.sensitivity = sensitivity
self.collective_attribution_config: CollectiveAttributionConfig = (
resolve_collective_attribution_config(
collective_sensitivity,
collective_threshold_overrides,
)
)
# Hidden-memory gap analysis thresholds
self.thresholds = {
"gap_ratio_threshold": 0.05, # 5% of device total = significant gap
"gap_spike_zscore": 2.0, # z-score for transient spike detection
"gap_drift_r_squared": 0.6, # R-squared for persistent drift
"gap_fragmentation_ratio": 0.3, # reserved-allocated / reserved
}
[docs]
def detect_memory_leaks(self, tracking_results: Any) -> List[Dict[str, Any]]:
"""Detect potential memory leaks using statistical analysis."""
if (
not hasattr(tracking_results, "memory_usage")
or len(tracking_results.memory_usage) < 10
):
return []
memory_data = tracking_results.memory_usage
_timestamps = tracking_results.timestamps
leaks: List[Dict[str, Any]] = []
# Check for monotonic increase
increasing_samples = sum(
1 for i in range(1, len(memory_data)) if memory_data[i] > memory_data[i - 1]
)
increase_ratio = increasing_samples / (len(memory_data) - 1)
if increase_ratio > 0.7: # 70% of samples show increase
leaks.append(
{
"type": "monotonic_increase",
"severity": "high" if increase_ratio > 0.9 else "medium",
"description": f"Memory shows monotonic increase in {increase_ratio:.1%} of samples",
"growth_rate": (
tracking_results.memory_growth_rate
if hasattr(tracking_results, "memory_growth_rate")
else 0
),
}
)
# Check for sudden spikes
if len(memory_data) > 5:
mean_memory = statistics.mean(memory_data)
std_memory = statistics.stdev(memory_data)
spikes = [m for m in memory_data if m > mean_memory + 3 * std_memory]
if spikes:
leaks.append(
{
"type": "memory_spikes",
"severity": "medium",
"description": f"Detected {len(spikes)} memory spikes above 3σ",
"spike_values": spikes,
}
)
# Check for lack of cleanup
final_memory = memory_data[-5:] # Last 5 samples
initial_memory = memory_data[:5] # First 5 samples
if final_memory and initial_memory:
final_avg = statistics.mean(final_memory)
initial_avg = statistics.mean(initial_memory)
if final_avg > initial_avg * 1.5: # 50% increase from start
growth_factor = (
float("inf") if initial_avg <= 0 else final_avg / initial_avg
)
leaks.append(
{
"type": "insufficient_cleanup",
"severity": "medium",
"description": (
f"Final memory {final_avg:.1f}MB is "
f"{growth_factor:.1f}x initial memory"
),
"initial_avg": initial_avg,
"final_avg": final_avg,
}
)
return leaks
[docs]
def analyze_fragmentation(self, profile_result: Any) -> Dict[str, float]:
"""Analyze memory fragmentation patterns."""
if (
not hasattr(profile_result, "snapshots")
or len(profile_result.snapshots) < 2
):
return {"fragmentation_score": 0.0, "trend": 0.0}
fragmentation_scores: List[float] = []
for snapshot in profile_result.snapshots:
if snapshot.gpu_memory_reserved_mb > 0:
utilization = snapshot.gpu_memory_mb / snapshot.gpu_memory_reserved_mb
fragmentation = 1.0 - utilization
fragmentation_scores.append(fragmentation)
if not fragmentation_scores:
return {"fragmentation_score": 0.0, "trend": 0.0}
avg_fragmentation = statistics.mean(fragmentation_scores)
# Calculate trend
if len(fragmentation_scores) >= 10:
early = statistics.mean(fragmentation_scores[:5])
late = statistics.mean(fragmentation_scores[-5:])
trend = late - early
else:
trend = 0.0
return {
"fragmentation_score": avg_fragmentation,
"trend": trend,
"max_fragmentation": max(fragmentation_scores),
"min_fragmentation": min(fragmentation_scores),
}
[docs]
def detect_patterns(self, tracking_results: Any) -> List[Dict[str, Any]]:
"""Detect memory usage patterns."""
if (
not hasattr(tracking_results, "memory_usage")
or len(tracking_results.memory_usage) < 20
):
return []
memory_data = tracking_results.memory_usage
patterns: List[Dict[str, Any]] = []
# Detect periodic patterns
try:
# Simple autocorrelation for periodic detection
data_mean = statistics.mean(memory_data)
normalized_data = [x - data_mean for x in memory_data]
# Check for patterns in chunks
chunk_size = min(10, len(normalized_data) // 4)
if chunk_size > 2:
chunk_correlations = []
for i in range(0, len(normalized_data) - chunk_size, chunk_size):
chunk1 = normalized_data[i : i + chunk_size]
chunk2 = normalized_data[i + chunk_size : i + 2 * chunk_size]
if len(chunk2) == chunk_size:
correlation = np.corrcoef(chunk1, chunk2)[0, 1]
if not np.isnan(correlation):
chunk_correlations.append(correlation)
if chunk_correlations:
avg_correlation = statistics.mean(chunk_correlations)
if avg_correlation > 0.7:
patterns.append(
{
"type": "periodic_pattern",
"correlation": avg_correlation,
"description": f"Detected periodic memory pattern (correlation: {avg_correlation:.3f})",
}
)
except Exception as exc:
logging.debug("Periodic pattern detection failed: %s", exc)
# Detect step patterns
if len(memory_data) > 10:
step_increases = 0
for i in range(1, len(memory_data)):
if memory_data[i] > memory_data[i - 1] * 1.2: # 20% sudden increase
step_increases += 1
if step_increases > len(memory_data) * 0.1: # More than 10% of samples
patterns.append(
{
"type": "step_pattern",
"step_count": step_increases,
"description": f"Detected {step_increases} sudden memory increases (step pattern)",
}
)
return patterns
[docs]
def analyze_efficiency(self, profile_result: Any) -> float:
"""Analyze memory usage efficiency (0-10 scale)."""
if not hasattr(profile_result, "peak_memory_mb"):
return 0.0
score = 10.0
# Penalize high peak memory
if profile_result.peak_memory_mb > 8000: # > 8GB
score -= 3.0
elif profile_result.peak_memory_mb > 4000: # > 4GB
score -= 1.5
# Penalize high memory growth rate
if hasattr(profile_result, "memory_growth_rate"):
if profile_result.memory_growth_rate > 200: # > 200 MB/s
score -= 2.0
elif profile_result.memory_growth_rate > 100: # > 100 MB/s
score -= 1.0
# Penalize fragmentation
if hasattr(profile_result, "snapshots"):
frag_info = self.analyze_fragmentation(profile_result)
if frag_info["fragmentation_score"] > 0.5:
score -= 2.0
elif frag_info["fragmentation_score"] > 0.3:
score -= 1.0
# Penalize memory leaks
if hasattr(profile_result, "memory_usage"):
# Create a simple tracking result for leak detection
class SimpleTrackingResult:
def __init__(self, memory_usage: List[float]) -> None:
self.memory_usage = memory_usage
self.timestamps = list(range(len(memory_usage)))
self.memory_growth_rate = 0
simple_result = SimpleTrackingResult(
[s.gpu_memory_mb for s in profile_result.snapshots]
)
leaks = self.detect_memory_leaks(simple_result)
high_severity_leaks = [
leak for leak in leaks if leak.get("severity") == "high"
]
if high_severity_leaks:
score -= 3.0
elif leaks:
score -= 1.5
return max(0.0, min(10.0, score))
# ------------------------------------------------------------------
# Hidden-memory gap analysis (operates on TelemetryEventV2 series)
# ------------------------------------------------------------------
[docs]
def analyze_memory_gaps(
self,
events: List[TelemetryEventV2],
*,
phase_resolver: PhaseReplayIndex | None = None,
) -> List[GapFinding]:
"""Classify allocator-vs-device hidden memory gaps over time.
Args:
events: Chronologically ordered telemetry samples.
Returns:
Prioritized list of gap findings (severity desc, confidence desc).
"""
return analyze_hidden_memory_gaps(
events=events,
thresholds=self.thresholds,
format_memory=format_memory,
remediation_by_classification=_GAP_REMEDIATION_BY_CLASSIFICATION,
phase_resolver=phase_resolver,
)
[docs]
def analyze_collective_attribution(
self,
events: List[TelemetryEventV2],
*,
phase_resolver: PhaseReplayIndex | None = None,
) -> List[CollectiveAttributionResult]:
"""Attribute hidden-memory spikes to collective communication phases."""
return attribute_collective_memory(
events=events,
config=self.collective_attribution_config,
phase_resolver=phase_resolver,
)
[docs]
def score_optimization(
self,
profile_result: Any,
events: Optional[List[TelemetryEventV2]] = None,
) -> Dict[str, Any]:
"""Score optimization opportunities.
Args:
profile_result: TensorFlow profiling result object.
events: Optional telemetry event series for gap analysis.
When provided, the result includes a ``gap_analysis`` section.
"""
optimization_score: Dict[str, Any] = {
"overall_score": 0.0,
"categories": {},
"top_recommendations": [],
"priority_actions": [],
}
categories = cast(Dict[str, float], optimization_score["categories"])
priority_actions = cast(List[str], optimization_score["priority_actions"])
# Memory efficiency
efficiency_score = self.analyze_efficiency(profile_result)
categories["memory_efficiency"] = efficiency_score
# Fragmentation analysis
if hasattr(profile_result, "snapshots"):
frag_info = self.analyze_fragmentation(profile_result)
frag_score = max(0, 10 - frag_info["fragmentation_score"] * 10)
categories["fragmentation"] = frag_score
else:
frag_score = 5.0
# Performance correlation
perf_corr = self.correlate_with_performance(profile_result)
if perf_corr["function_efficiency"]:
avg_efficiency = statistics.mean(
[
func["efficiency_score"]
for func in perf_corr["function_efficiency"].values()
]
)
perf_score = avg_efficiency * 10
else:
perf_score = 5.0
categories["performance"] = perf_score
# Overall score
optimization_score["overall_score"] = statistics.mean(
[efficiency_score, frag_score, perf_score]
)
# Generate recommendations
if efficiency_score < 6:
priority_actions.append("Address memory efficiency issues")
if frag_score < 6:
priority_actions.append("Reduce memory fragmentation")
if perf_score < 6:
priority_actions.append("Optimize function performance")
# Top recommendations based on analysis
from .utils import suggest_optimizations
top_suggestions = suggest_optimizations(profile_result)
optimization_score["top_recommendations"] = top_suggestions[:5]
# Hidden-memory gap analysis (only when telemetry events are supplied).
if events is not None:
phase_resolver = (
PhaseReplayIndex.from_events(events)
if hasattr(PhaseReplayIndex, "from_events")
else None
)
gap_findings = self.analyze_memory_gaps(
events,
phase_resolver=phase_resolver,
)
collective_attribution = self.analyze_collective_attribution(
events,
phase_resolver=phase_resolver,
)
optimization_score["gap_analysis"] = [
_serialize_gap_finding(f) for f in gap_findings
]
optimization_score["collective_attribution"] = [
_serialize_collective_attribution(result)
for result in collective_attribution
]
return optimization_score
def _serialize_gap_finding(finding: GapFinding) -> dict[str, Any]:
payload = asdict(finding)
payload["phase_attribution"] = phase_attribution_to_payload(
getattr(finding, "phase_attribution", None)
)
return payload
def _serialize_collective_attribution(
result: CollectiveAttributionResult,
) -> dict[str, Any]:
payload = asdict(result)
payload["phase_attribution"] = phase_attribution_to_payload(
getattr(result, "phase_attribution", None)
)
return payload