Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A/E survival fractions joint fitter and ecal fix #591

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
234 changes: 176 additions & 58 deletions src/pygama/pargen/AoE_cal.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from __future__ import annotations

import copy
import logging
import re
from datetime import datetime
Expand Down Expand Up @@ -607,24 +608,107 @@ def get_peak_label(peak: float) -> str:
return "Tl FEP @"


def pass_pdf_gos(
x, x_lo, x_hi, n_sig, epsilon_sig, n_bkg, epsilon_bkg, mu, sigma, hstep1, hstep2
):
return gauss_on_step.pdf_ext(
x, x_lo, x_hi, n_sig * epsilon_sig, mu, sigma, n_bkg * epsilon_bkg, hstep1
)


def fail_pdf_gos(
x, x_lo, x_hi, n_sig, epsilon_sig, n_bkg, epsilon_bkg, mu, sigma, hstep1, hstep2
):
return gauss_on_step.pdf_ext(
x,
x_lo,
x_hi,
n_sig * (1 - epsilon_sig),
mu,
sigma,
n_bkg * (1 - epsilon_bkg),
hstep2,
)


def pass_pdf_hpge(
x,
x_lo,
x_hi,
n_sig,
epsilon_sig,
n_bkg,
epsilon_bkg,
mu,
sigma,
htail,
tau,
hstep1,
hstep2,
):
return hpge_peak.pdf_ext(
x,
x_lo,
x_hi,
n_sig * epsilon_sig,
mu,
sigma,
htail,
tau,
n_bkg * epsilon_bkg,
hstep1,
)


def fail_pdf_hpge(
x,
x_lo,
x_hi,
n_sig,
epsilon_sig,
n_bkg,
epsilon_bkg,
mu,
sigma,
htail,
tau,
hstep1,
hstep2,
):
return hpge_peak.pdf_ext(
x,
x_lo,
x_hi,
n_sig * (1 - epsilon_sig),
mu,
sigma,
htail,
tau,
n_bkg * (1 - epsilon_bkg),
hstep2,
)


def update_guess(func, parguess, energies):
if func == gauss_on_step:
if func == gauss_on_step or func == hpge_peak:

total_events = len(energies)
parguess["n_sig"] = len(
energies[
(energies > parguess["mu"] - 2 * parguess["sigma"])
& (energies < parguess["mu"] + 2 * parguess["sigma"])
]
)
parguess["n_bkg"] = total_events - parguess["n_sig"]
return parguess

if func == hpge_peak:
total_events = len(energies)
parguess["n_sig"] = len(
parguess["n_sig"] -= len(
energies[
(energies > parguess["mu"] - 2 * parguess["sigma"])
& (energies < parguess["mu"] + 2 * parguess["sigma"])
(energies > parguess["x_lo"])
& (energies < parguess["x_lo"] + 2 * parguess["sigma"])
]
)
parguess["n_sig"] -= len(
energies[
(energies > parguess["x_hi"] - 2 * parguess["sigma"])
& (energies < parguess["x_hi"])
]
)
parguess["n_bkg"] = total_events - parguess["n_sig"]
Expand All @@ -643,11 +727,12 @@ def get_survival_fraction(
eres_pars,
fit_range=None,
high_cut=None,
guess_pars_cut=None,
guess_pars_surv=None,
pars=None,
errs=None,
dt_mask=None,
mode="greater",
func=hpge_peak,
fix_step=False,
display=0,
):
if dt_mask is None:
Expand All @@ -672,7 +757,7 @@ def get_survival_fraction(
else:
raise ValueError("mode not recognised")

if guess_pars_cut is None or guess_pars_surv is None:
if pars is None or errs is None:
(pars, errs, cov, _, func, _, _, _) = pgc.unbinned_staged_energy_fit(
energy,
func,
Expand All @@ -682,46 +767,82 @@ def get_survival_fraction(
fit_range=fit_range,
)

guess_pars_cut = pars
guess_pars_surv = pars
guess_pars_surv = copy.deepcopy(pars)

# add update guess here for n_sig and n_bkg
guess_pars_cut = update_guess(func, guess_pars_cut, energy[(~nan_idxs) & (~idxs)])
(cut_pars, cut_errs, cut_cov, _, _, _, _, _) = pgc.unbinned_staged_energy_fit(
energy[(~nan_idxs) & (~idxs)],
func,
guess=guess_pars_cut,
guess_func=energy_guess,
bounds_func=get_bounds,
fixed_func=fix_all_but_nevents,
guess_kwargs={"peak": peak, "eres": eres_pars},
lock_guess=True,
allow_tail_drop=False,
fit_range=fit_range,
)
guess_pars_surv = update_guess(func, guess_pars_cut, energy[(~nan_idxs) & (idxs)])
(surv_pars, surv_errs, surv_cov, _, _, _, _, _) = pgc.unbinned_staged_energy_fit(
energy[(~nan_idxs) & (idxs)],
func,
guess=guess_pars_surv,
guess_func=energy_guess,
bounds_func=get_bounds,
fixed_func=fix_all_but_nevents,
guess_kwargs={"peak": peak, "eres": eres_pars},
lock_guess=True,
allow_tail_drop=False,
fit_range=fit_range,
)
guess_pars_surv = update_guess(func, guess_pars_surv, energy[(~nan_idxs) & (idxs)])

parguess = {
"x_lo": pars["x_lo"],
"x_hi": pars["x_hi"],
"mu": pars["mu"],
"sigma": pars["sigma"],
"hstep1": pars["hstep"],
"hstep2": pars["hstep"],
"n_sig": pars["n_sig"],
"n_bkg": pars["n_bkg"],
"epsilon_sig": guess_pars_surv["n_sig"] / pars["n_sig"],
"epsilon_bkg": guess_pars_surv["n_bkg"] / pars["n_bkg"],
}

ct_n = cut_pars["n_sig"]
ct_err = cut_errs["n_sig"]
surv_n = surv_pars["n_sig"]
surv_err = surv_errs["n_sig"]
bounds = {
"n_sig": (0, pars["n_sig"] + pars["n_bkg"]),
"epsilon_sig": (0, 1),
"n_bkg": (0, pars["n_bkg"] + pars["n_sig"]),
"epsilon_bkg": (0, 1),
"hstep1": (-1, 1),
"hstep2": (-1, 1),
}

pc_n = ct_n + surv_n
if func == hpge_peak:
parguess.update({"htail": pars["htail"], "tau": pars["tau"]})

if func == hpge_peak:
lh = cost.ExtendedUnbinnedNLL(
energy[(~nan_idxs) & (idxs)], pass_pdf_hpge
) + cost.ExtendedUnbinnedNLL(energy[(~nan_idxs) & (~idxs)], fail_pdf_hpge)
elif func == gauss_on_step:
lh = cost.ExtendedUnbinnedNLL(
energy[(~nan_idxs) & (idxs)], pass_pdf_gos
) + cost.ExtendedUnbinnedNLL(energy[(~nan_idxs) & (~idxs)], fail_pdf_gos)

else:
raise ValueError("Unknown func")

m = Minuit(lh, **parguess)
fixed = ["x_lo", "x_hi", "n_sig", "n_bkg", "mu", "sigma"] # "hstep"
if func == hpge_peak:
fixed += ["tau", "htail"]
if fix_step is True:
fixed += ["hstep1", "hstep2"]

m.fixed[fixed] = True
for arg, val in bounds.items():
m.limits[arg] = val

m.simplex().migrad()
m.hesse()

sf = m.values["epsilon_sig"] * 100
err = m.errors["epsilon_sig"] * 100

if display > 1:
fig, (ax1, ax2) = plt.subplots(1, 2)
bins = np.arange(1552, 1612, 1)
ax1.hist(energy[(~nan_idxs) & (idxs)], bins=bins, histtype="step")

ax2.hist(energy[(~nan_idxs) & (~idxs)], bins=bins, histtype="step")

if func == hpge_peak:
ax1.plot(bins, pass_pdf_hpge(bins, **m.values.to_dict())[1])
ax2.plot(bins, fail_pdf_hpge(bins, **m.values.to_dict())[1])
elif func == gauss_on_step:
ax1.plot(bins, pass_pdf_gos(bins, **m.values.to_dict())[1])
ax2.plot(bins, fail_pdf_gos(bins, **m.values.to_dict())[1])

plt.show()

sf = (surv_n / pc_n) * 100
err = sf * np.sqrt((ct_err / pc_n**2) ** 2 + (surv_err / pc_n**2) ** 2)
return sf, err, cut_pars, surv_pars
return sf, err, m.values, m.errors


def get_sf_sweep(
Expand Down Expand Up @@ -752,16 +873,14 @@ def get_sf_sweep(
cut_vals = np.linspace(cut_range[0], cut_range[1], n_samples)
out_df = pd.DataFrame()

(pars, _, _, _, func, _, _, _) = pgc.unbinned_staged_energy_fit(
(pars, errs, _, _, func, _, _, _) = pgc.unbinned_staged_energy_fit(
energy,
hpge_peak,
guess_func=energy_guess,
bounds_func=get_bounds,
guess_kwargs={"peak": peak, "eres": eres_pars},
fit_range=fit_range,
)
guess_pars_cut = pars
guess_pars_surv = pars

for cut_val in cut_vals:
try:
Expand All @@ -774,8 +893,7 @@ def get_sf_sweep(
fit_range=fit_range,
dt_mask=dt_mask,
mode=mode,
guess_pars_cut=guess_pars_cut,
guess_pars_surv=guess_pars_surv,
pars=pars,
func=func,
)
out_df = pd.concat(
Expand All @@ -788,7 +906,7 @@ def get_sf_sweep(
raise (e)
out_df.set_index("cut_val", inplace=True)
if final_cut_value is not None:
sf, sf_err, cut_pars, surv_pars = get_survival_fraction(
sf, sf_err, _, _ = get_survival_fraction(
energy,
cut_param,
final_cut_value,
Expand All @@ -797,8 +915,7 @@ def get_sf_sweep(
fit_range=fit_range,
dt_mask=dt_mask,
mode=mode,
guess_pars_cut=guess_pars_cut,
guess_pars_surv=guess_pars_surv,
pars=pars,
func=func,
)
else:
Expand Down Expand Up @@ -835,8 +952,9 @@ def compton_sf(cut_param, low_cut_val, high_cut_val=None, mode="greater", dt_mas

pc_n = ct_n + surv_n

sf = (surv_n / pc_n) * 100
err = sf * np.sqrt((ct_err / pc_n**2) ** 2 + (surv_err / pc_n**2) ** 2)
sf = surv_n / pc_n
err = 100 * sf * (1 - sf) * np.sqrt((ct_err / ct_n) ** 2 + (surv_err / surv_n) ** 2)
sf *= 100

return {
"low_cut": low_cut_val,
Expand Down
Loading