286 lines
10 KiB
Python
286 lines
10 KiB
Python
# =============================================================================
|
|
# Copyright (C) 2026
|
|
#
|
|
# This file is part of pyfa.
|
|
#
|
|
# pyfa is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# pyfa is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with pyfa. If not, see <http://www.gnu.org/licenses/>.
|
|
# =============================================================================
|
|
|
|
|
|
import math
|
|
import random
|
|
|
|
from eos.const import FittingModuleState, FittingSlot
|
|
|
|
|
|
_RACK_SUFFIXES = {
|
|
FittingSlot.HIGH: "Hi",
|
|
FittingSlot.MED: "Med",
|
|
FittingSlot.LOW: "Low",
|
|
}
|
|
|
|
# Cache: (fit_id, rack_slot, max_time_s, iterations) -> list of burnout time samples
|
|
_burnout_samples_cache = {}
|
|
|
|
|
|
def clear_burnout_samples_cache(fit_id=None):
|
|
if fit_id is None:
|
|
_burnout_samples_cache.clear()
|
|
return
|
|
to_drop = [k for k in _burnout_samples_cache if k[0] == fit_id]
|
|
for k in to_drop:
|
|
del _burnout_samples_cache[k]
|
|
|
|
|
|
def _get_rack_suffix(rack_slot):
|
|
return _RACK_SUFFIXES[rack_slot]
|
|
|
|
|
|
def iter_rack_modules(fit, rack_slot):
|
|
for mod in fit.modules:
|
|
if mod.isEmpty:
|
|
continue
|
|
if mod.slot == rack_slot:
|
|
yield mod
|
|
|
|
|
|
def get_rack_heat_value(fit, rack_slot, time_s):
|
|
"""
|
|
Deterministic rack heat H(t) for a given rack and time, in [0, 1].
|
|
"""
|
|
rack_suffix = _get_rack_suffix(rack_slot)
|
|
ship = fit.ship
|
|
heat_capacity = ship.getModifiedItemAttr(f"heatCapacity{rack_suffix}")
|
|
heat_generation_multiplier = ship.getModifiedItemAttr("heatGenerationMultiplier")
|
|
if heat_capacity is None or heat_generation_multiplier is None:
|
|
raise ValueError("Missing heat attributes on ship for rack heat calculation")
|
|
# Sum heat absorption over all overheated modules in this rack
|
|
sum_absorption = 0.0
|
|
for mod in iter_rack_modules(fit, rack_slot):
|
|
if mod.state >= FittingModuleState.OVERHEATED:
|
|
sum_absorption += mod.getModifiedItemAttr("heatAbsorbtionRateModifier")
|
|
argument = -time_s * heat_generation_multiplier * sum_absorption
|
|
# Guard against numeric issues
|
|
try:
|
|
exp_term = math.exp(argument)
|
|
except OverflowError:
|
|
exp_term = 0.0 if argument < 0 else float("inf")
|
|
heat = heat_capacity / 100.0 - exp_term
|
|
return heat
|
|
|
|
|
|
def _count_online_modules_by_rack(fit):
|
|
counts = {
|
|
FittingSlot.HIGH: 0,
|
|
FittingSlot.MED: 0,
|
|
FittingSlot.LOW: 0,
|
|
}
|
|
for mod in fit.modules:
|
|
if mod.isEmpty:
|
|
continue
|
|
if mod.state >= FittingModuleState.ONLINE and mod.slot in counts:
|
|
counts[mod.slot] += 1
|
|
return counts
|
|
|
|
|
|
def _get_total_slot_count(fit):
|
|
total = 0
|
|
for slot_type in (FittingSlot.HIGH, FittingSlot.MED, FittingSlot.LOW, FittingSlot.RIG):
|
|
total += fit.getNumSlots(slot_type)
|
|
return total
|
|
|
|
|
|
def _get_base_module_hp(mod):
|
|
hp = mod.getModifiedItemAttr("hp")
|
|
return float(hp)
|
|
|
|
|
|
def _get_heat_damage(mod):
|
|
dmg = mod.getModifiedItemAttr("heatDamage")
|
|
return float(dmg)
|
|
|
|
|
|
def _get_cycle_time_s(mod):
|
|
cycle_params = mod.getCycleParameters()
|
|
if cycle_params is None:
|
|
return None
|
|
avg_time_ms = cycle_params.averageTime
|
|
if not math.isfinite(avg_time_ms) or avg_time_ms <= 0:
|
|
return None
|
|
return avg_time_ms / 1000.0
|
|
|
|
|
|
def get_first_burnout_samples(fit, rack_slot, max_time_s, iterations):
|
|
"""
|
|
Monte Carlo simulation of time until the first module in the given rack burns out.
|
|
Returns a list of burnout times (seconds). If no burnout happens before max_time_s,
|
|
the sample is set to max_time_s for that run.
|
|
"""
|
|
if max_time_s <= 0 or iterations <= 0:
|
|
raise ValueError("max_time_s and iterations must be positive.")
|
|
|
|
cache_key = (getattr(fit, "ID", None), int(rack_slot), max_time_s, iterations)
|
|
if cache_key in _burnout_samples_cache:
|
|
return list(_burnout_samples_cache[cache_key])
|
|
|
|
rack_suffix = _get_rack_suffix(rack_slot)
|
|
ship = fit.ship
|
|
heat_capacity = ship.getModifiedItemAttr(f"heatCapacity{rack_suffix}")
|
|
heat_generation_multiplier = ship.getModifiedItemAttr("heatGenerationMultiplier")
|
|
heat_attenuation = ship.getModifiedItemAttr(f"heatAttenuation{rack_suffix}")
|
|
if (
|
|
heat_capacity is None
|
|
or heat_generation_multiplier is None
|
|
or heat_generation_multiplier <= 0
|
|
or heat_attenuation is None
|
|
):
|
|
raise ValueError("Missing heat attributes on ship for burnout simulation")
|
|
|
|
rack_modules = list(iter_rack_modules(fit, rack_slot))
|
|
if not rack_modules:
|
|
raise ValueError("No modules in this rack.")
|
|
|
|
overheated_indices = [
|
|
idx for idx, mod in enumerate(rack_modules) if mod.state >= FittingModuleState.OVERHEATED
|
|
]
|
|
if not overheated_indices:
|
|
raise ValueError(
|
|
"No overheated modules in this rack. Overheat at least one module in this rack to see the first-burnout CDF."
|
|
)
|
|
|
|
total_slots = _get_total_slot_count(fit)
|
|
if total_slots <= 0:
|
|
raise ValueError("Ship has no high/mid/low/rig slots.")
|
|
base_online_counts = _count_online_modules_by_rack(fit)
|
|
|
|
base_hp = [_get_base_module_hp(mod) for mod in rack_modules]
|
|
heat_damage = [_get_heat_damage(mod) for mod in rack_modules]
|
|
heat_absorption = [
|
|
mod.getModifiedItemAttr("heatAbsorbtionRateModifier") for mod in rack_modules
|
|
]
|
|
cycle_times = [_get_cycle_time_s(mod) if idx in overheated_indices else None
|
|
for idx, mod in enumerate(rack_modules)]
|
|
eligible_targets = [
|
|
mod.state >= FittingModuleState.ONLINE for mod in rack_modules
|
|
]
|
|
positions = list(range(len(rack_modules)))
|
|
|
|
samples = []
|
|
|
|
for _ in range(iterations):
|
|
hp = list(base_hp)
|
|
dead = [hp_val <= 0 for hp_val in hp]
|
|
online_counts = dict(base_online_counts)
|
|
next_times = [None] * len(rack_modules)
|
|
|
|
for idx in overheated_indices:
|
|
if not dead[idx] and cycle_times[idx] is not None:
|
|
next_times[idx] = cycle_times[idx]
|
|
|
|
sample_time = max_time_s
|
|
|
|
while True:
|
|
# Find next event time
|
|
candidates = [t for t in next_times if t is not None]
|
|
if not candidates:
|
|
break
|
|
current_time = min(candidates)
|
|
if current_time > max_time_s:
|
|
break
|
|
|
|
# Dynamic sum of heat absorption from still-active overheated modules
|
|
sum_absorption = 0.0
|
|
for idx in overheated_indices:
|
|
if not dead[idx] and cycle_times[idx] is not None:
|
|
sum_absorption += heat_absorption[idx]
|
|
if sum_absorption <= 0:
|
|
break
|
|
|
|
argument = -current_time * heat_generation_multiplier * sum_absorption
|
|
try:
|
|
exp_term = math.exp(argument)
|
|
except OverflowError:
|
|
exp_term = 0.0 if argument < 0 else float("inf")
|
|
heat = heat_capacity / 100.0 - exp_term
|
|
if heat <= 0:
|
|
break
|
|
|
|
numerator = (
|
|
online_counts[FittingSlot.HIGH]
|
|
+ online_counts[FittingSlot.MED]
|
|
+ online_counts[FittingSlot.LOW]
|
|
)
|
|
slot_factor = numerator / float(total_slots)
|
|
if slot_factor <= 0:
|
|
break
|
|
|
|
# Sources that complete a cycle at this time
|
|
event_sources = [
|
|
idx
|
|
for idx in overheated_indices
|
|
if not dead[idx]
|
|
and next_times[idx] is not None
|
|
and abs(next_times[idx] - current_time) <= 1e-9
|
|
]
|
|
if not event_sources:
|
|
# No actual events despite candidates, advance all timers and continue
|
|
for idx, next_time in enumerate(next_times):
|
|
if next_time is not None and cycle_times[idx] is not None:
|
|
next_times[idx] = next_time + cycle_times[idx]
|
|
continue
|
|
|
|
burn_time = None
|
|
|
|
for src_idx in event_sources:
|
|
dmg = heat_damage[src_idx]
|
|
if dmg <= 0:
|
|
continue
|
|
src_pos = positions[src_idx]
|
|
for tgt_idx, tgt_hp in enumerate(hp):
|
|
if dead[tgt_idx] or not eligible_targets[tgt_idx]:
|
|
continue
|
|
distance = abs(positions[tgt_idx] - src_pos)
|
|
attenuation_factor = heat_attenuation ** distance
|
|
probability = heat * slot_factor * attenuation_factor
|
|
if probability <= 0:
|
|
continue
|
|
if probability >= 1.0 or random.random() < probability:
|
|
new_hp = tgt_hp - dmg
|
|
hp[tgt_idx] = new_hp
|
|
if new_hp <= 0 and not dead[tgt_idx]:
|
|
dead[tgt_idx] = True
|
|
if rack_modules[tgt_idx].slot in online_counts and rack_modules[
|
|
tgt_idx
|
|
].state >= FittingModuleState.ONLINE:
|
|
online_counts[rack_modules[tgt_idx].slot] -= 1
|
|
if tgt_idx in overheated_indices:
|
|
next_times[tgt_idx] = None
|
|
if burn_time is None or current_time < burn_time:
|
|
burn_time = current_time
|
|
|
|
if burn_time is not None:
|
|
sample_time = burn_time
|
|
break
|
|
|
|
# Advance timers for all sources that fired at this time
|
|
for src_idx in event_sources:
|
|
if not dead[src_idx] and cycle_times[src_idx] is not None:
|
|
next_times[src_idx] = current_time + cycle_times[src_idx]
|
|
|
|
samples.append(sample_time)
|
|
|
|
_burnout_samples_cache[cache_key] = samples
|
|
return samples
|
|
|