# ============================================================================= # Copyright (C) 2026 # # This file is part of pyfa. # # pyfa is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # pyfa is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with pyfa. If not, see . # ============================================================================= import math import random from eos.const import FittingModuleState, FittingSlot _RACK_SUFFIXES = { FittingSlot.HIGH: "Hi", FittingSlot.MED: "Med", FittingSlot.LOW: "Low", } # Cache: (fit_id, rack_slot, max_time_s, iterations) -> list of burnout time samples _burnout_samples_cache = {} def clear_burnout_samples_cache(fit_id=None): if fit_id is None: _burnout_samples_cache.clear() return to_drop = [k for k in _burnout_samples_cache if k[0] == fit_id] for k in to_drop: del _burnout_samples_cache[k] def _get_rack_suffix(rack_slot): return _RACK_SUFFIXES[rack_slot] def iter_rack_modules(fit, rack_slot): for mod in fit.modules: if mod.isEmpty: continue if mod.slot == rack_slot: yield mod def get_rack_heat_value(fit, rack_slot, time_s): """ Deterministic rack heat H(t) for a given rack and time, in [0, 1]. """ rack_suffix = _get_rack_suffix(rack_slot) ship = fit.ship heat_capacity = ship.getModifiedItemAttr(f"heatCapacity{rack_suffix}") heat_generation_multiplier = ship.getModifiedItemAttr("heatGenerationMultiplier") if heat_capacity is None or heat_generation_multiplier is None: raise ValueError("Missing heat attributes on ship for rack heat calculation") # Sum heat absorption over all overheated modules in this rack sum_absorption = 0.0 for mod in iter_rack_modules(fit, rack_slot): if mod.state >= FittingModuleState.OVERHEATED: sum_absorption += mod.getModifiedItemAttr("heatAbsorbtionRateModifier") argument = -time_s * heat_generation_multiplier * sum_absorption # Guard against numeric issues try: exp_term = math.exp(argument) except OverflowError: exp_term = 0.0 if argument < 0 else float("inf") heat = heat_capacity / 100.0 - exp_term return heat def _count_online_modules_by_rack(fit): counts = { FittingSlot.HIGH: 0, FittingSlot.MED: 0, FittingSlot.LOW: 0, } for mod in fit.modules: if mod.isEmpty: continue if mod.state >= FittingModuleState.ONLINE and mod.slot in counts: counts[mod.slot] += 1 return counts def _get_total_slot_count(fit): total = 0 for slot_type in (FittingSlot.HIGH, FittingSlot.MED, FittingSlot.LOW, FittingSlot.RIG): total += fit.getNumSlots(slot_type) return total def _get_base_module_hp(mod): hp = mod.getModifiedItemAttr("hp") return float(hp) def _get_heat_damage(mod): dmg = mod.getModifiedItemAttr("heatDamage") return float(dmg) def _get_cycle_time_s(mod): cycle_params = mod.getCycleParameters() if cycle_params is None: return None avg_time_ms = cycle_params.averageTime if not math.isfinite(avg_time_ms) or avg_time_ms <= 0: return None return avg_time_ms / 1000.0 def has_burnout_samples(fit, rack_slot, max_time_s, iterations): cache_key = (getattr(fit, "ID", None), int(rack_slot), max_time_s, iterations) return cache_key in _burnout_samples_cache def get_first_burnout_samples(fit, rack_slot, max_time_s, iterations, progress_cb=None): """ Monte Carlo simulation of time until the first module in the given rack burns out. Returns a list of burnout times (seconds). If no burnout happens before max_time_s, the sample is set to max_time_s for that run. """ if max_time_s <= 0 or iterations <= 0: raise ValueError("max_time_s and iterations must be positive.") cache_key = (getattr(fit, "ID", None), int(rack_slot), max_time_s, iterations) if cache_key in _burnout_samples_cache: return list(_burnout_samples_cache[cache_key]) rack_suffix = _get_rack_suffix(rack_slot) ship = fit.ship heat_capacity = ship.getModifiedItemAttr(f"heatCapacity{rack_suffix}") heat_generation_multiplier = ship.getModifiedItemAttr("heatGenerationMultiplier") heat_attenuation = ship.getModifiedItemAttr(f"heatAttenuation{rack_suffix}") if ( heat_capacity is None or heat_generation_multiplier is None or heat_generation_multiplier <= 0 or heat_attenuation is None ): raise ValueError("Missing heat attributes on ship for burnout simulation") rack_modules = list(iter_rack_modules(fit, rack_slot)) if not rack_modules: raise ValueError("No modules in this rack.") overheated_indices = [ idx for idx, mod in enumerate(rack_modules) if mod.state >= FittingModuleState.OVERHEATED ] if not overheated_indices: raise ValueError( "No overheated modules in this rack. Overheat at least one module in this rack to see the first-burnout CDF." ) total_slots = _get_total_slot_count(fit) if total_slots <= 0: raise ValueError("Ship has no high/mid/low/rig slots.") base_online_counts = _count_online_modules_by_rack(fit) base_hp = [_get_base_module_hp(mod) for mod in rack_modules] heat_damage = [_get_heat_damage(mod) for mod in rack_modules] heat_absorption = [ mod.getModifiedItemAttr("heatAbsorbtionRateModifier") for mod in rack_modules ] cycle_times = [_get_cycle_time_s(mod) if idx in overheated_indices else None for idx, mod in enumerate(rack_modules)] eligible_targets = [ mod.state >= FittingModuleState.ONLINE for mod in rack_modules ] positions = list(range(len(rack_modules))) samples = [] for i in range(iterations): hp = list(base_hp) dead = [hp_val <= 0 for hp_val in hp] online_counts = dict(base_online_counts) next_times = [None] * len(rack_modules) for idx in overheated_indices: if not dead[idx] and cycle_times[idx] is not None: next_times[idx] = cycle_times[idx] sample_time = max_time_s while True: # Find next event time candidates = [t for t in next_times if t is not None] if not candidates: break current_time = min(candidates) if current_time > max_time_s: break # Dynamic sum of heat absorption from still-active overheated modules sum_absorption = 0.0 for idx in overheated_indices: if not dead[idx] and cycle_times[idx] is not None: sum_absorption += heat_absorption[idx] if sum_absorption <= 0: break argument = -current_time * heat_generation_multiplier * sum_absorption try: exp_term = math.exp(argument) except OverflowError: exp_term = 0.0 if argument < 0 else float("inf") heat = heat_capacity / 100.0 - exp_term if heat <= 0: break numerator = ( online_counts[FittingSlot.HIGH] + online_counts[FittingSlot.MED] + online_counts[FittingSlot.LOW] ) slot_factor = numerator / float(total_slots) if slot_factor <= 0: break # Sources that complete a cycle at this time event_sources = [ idx for idx in overheated_indices if not dead[idx] and next_times[idx] is not None and abs(next_times[idx] - current_time) <= 1e-9 ] if not event_sources: # No actual events despite candidates, advance all timers and continue for idx, next_time in enumerate(next_times): if next_time is not None and cycle_times[idx] is not None: next_times[idx] = next_time + cycle_times[idx] continue burn_time = None for src_idx in event_sources: dmg = heat_damage[src_idx] if dmg <= 0: continue src_pos = positions[src_idx] for tgt_idx, tgt_hp in enumerate(hp): if dead[tgt_idx] or not eligible_targets[tgt_idx]: continue distance = abs(positions[tgt_idx] - src_pos) attenuation_factor = heat_attenuation ** distance probability = heat * slot_factor * attenuation_factor if probability <= 0: continue if probability >= 1.0 or random.random() < probability: new_hp = tgt_hp - dmg hp[tgt_idx] = new_hp if new_hp <= 0 and not dead[tgt_idx]: dead[tgt_idx] = True if rack_modules[tgt_idx].slot in online_counts and rack_modules[ tgt_idx ].state >= FittingModuleState.ONLINE: online_counts[rack_modules[tgt_idx].slot] -= 1 if tgt_idx in overheated_indices: next_times[tgt_idx] = None if burn_time is None or current_time < burn_time: burn_time = current_time if burn_time is not None: sample_time = burn_time break # Advance timers for all sources that fired at this time for src_idx in event_sources: if not dead[src_idx] and cycle_times[src_idx] is not None: next_times[src_idx] = current_time + cycle_times[src_idx] samples.append(sample_time) if progress_cb is not None: # progress_cb should return True to continue, False to cancel if not progress_cb(i + 1): break _burnout_samples_cache[cache_key] = samples return samples