Source code for pinefarm.external.nnlojet.nnpdf_interface

"""
Read up all important information form an NNPDF dataset.

The selector generation is very flaky and is (mostly) up to the user for now.

Some examples:

```yaml
    - histogram with selectors
    histograms:
        name: test
        observable: ptz
        bins: [10, 20, 40]
        extra_selectors:
            - "reject abs_ylp min = 1.37 max = 1.52"
            - "reject abs_ylm min = 1.37 max = 1.52"
```
"""

from copy import deepcopy

import numpy as np
from nnpdf_data import load_dataset_metadata
from ruamel.yaml import YAML, CommentedMap

# set-up the yaml reader
yaml = YAML(pure=True)
yaml.default_flow_style = False
yaml.indent(sequence=4, offset=2, mapping=4)

HISTOGRAM_VARIABLES = {"y", "etay", "eta", "pT", "pT2", "M2"}


def _legacy_nnpdf_translation(df, proc_type):
    """When reading variables with k1/k2/k3 tries to figure out to which variables it corresponds."""
    from validphys.filters import KIN_LABEL  # pylint: disable=E0401

    new_vars = list(KIN_LABEL[proc_type])
    # Reorganize a bit the names to avoid extra problems
    if "M_ll" in new_vars:
        new_vars[new_vars.index("M_ll2")] = "M2"
    df.columns = df.columns.set_levels(new_vars, level=0)


def _df_to_bins(dataframe):
    """NNPDF kin dataframe to list of bins.

    Convert a dataframe containing min/mid/max for some kin variable
    into a list of bins as NNLOJET understands it.
    """
    # If the NNPDF dataset has been implemented recently
    # we will have min/max
    # otherwise we have only mid and have to trick this
    if np.allclose(dataframe["min"], dataframe["max"]):
        # Fake min/max and hope for the best
        mid_points = dataframe["mid"].values
        shifts = np.diff(mid_points) / 2.0
        bins = mid_points[:-1] + shifts
        # Assume that the shift in the first and last points
        # is the same as the second and next-to-last
        fpo = [mid_points[0] - shifts[0]]
        lpo = [mid_points[-1] + shifts[-1]]
        return np.concatenate([fpo, bins, lpo])

    # Couple maxs and mins, assuming no overlap between bins...
    all_bins = np.concatenate([dataframe["min"], dataframe["max"]])
    return np.unique(all_bins)


def _1d_histogram(kin_df, hist_var):
    """Prepare the histogram for a 1d distribution."""
    histo_bins = _df_to_bins(kin_df[hist_var])

    if hist_var == "pT2":
        histo_bins = np.sqrt(histo_bins)
        hist_var = "pT"

    if hist_var == "M2":
        histo_bins = np.sqrt(histo_bins)
        hist_var = "M"

    # Don't do more than 3 decimals
    histo_bins = np.round(histo_bins, decimals=3).tolist()

    return {"name": hist_var, "observable": hist_var, "bins": histo_bins}


def _nnlojet_observable(observable, process):
    """Try to automatically understand the NNLOJET observables given the NNPDF process and obs."""
    observable = observable.lower()
    if observable in ("eta", "y", "etay"):
        if process.upper().startswith("Z"):
            return "yz"
        if process.upper().startswith("WP") and not process.upper().endswith("J"):
            return "ylp"
        if process.upper().startswith("WM") and not process.upper().endswith("J"):
            return "ylm"
    if observable == "pt":
        if process.upper().startswith("Z"):
            return "ptz"
        if process.upper().startswith("W"):
            return "ptw"
    if observable == "m" and process.upper().startswith("Z"):
        return "mll"
    if observable == "m2":
        print("\033[91m [WARNING] \033[0m Changed M2 to M in the selectors")
        return "mll"

    raise ValueError(f"Observable {observable} not recognized for process {process}")


def _generate_metadata(arxiv, hepdata, nnpdf_name, output):
    """Generate a minimal ``metadata.txt`` file."""
    empty_fields = [
        "description",
        "x1_label",
        "x1_label_tex",
        "x2_label",
        "x2_label_tex",
        "x2_unit",
        "y_label",
        "y_label_tex",
        "y_unit",
    ]
    ret = f"""arxiv={arxiv}
hepdata={hepdata}
nnpdf_id={nnpdf_name}
"""
    ret += "=\n".join(empty_fields) + "="
    output.write_text(ret)


[docs] def select_selectors(experiment, process): """A selection of default selectors to be selected depending on the selected experiment. The experiment defines the cuts to be applied to each variable. The process defines the name of the variables in NNLOJET """ cuts = { "rapidity": (None, None), "pt": (20.0, None), "inv_mass": (None, None), "mt": (None, None), } variables = {"rapidity": [], "pt": [], "inv_mass": [], "mt": []} if experiment == "LHCB": cuts["rapidity"] = (2.0, 4.5) cuts["inv_mass"] = (60.0, 120.0) elif experiment == "ATLAS": cuts["rapidity"] = (0.0, 2.5) cuts["inv_mass"] = (66.0, 116.0) cuts["pt"] = (25.0, None) cuts["mt"] = (50.0, None) elif experiment == "CMS": cuts["rapidity"] = (0.0, 2.4) cuts["inv_mass"] = (60.0, 120.0) cuts["pt"] = (35.0, None) else: raise NotImplementedError(f"Selectors for {experiment=} not implemented") if process.startswith("Z"): variables["rapidity"] += ["yz", "abs_ylp", "abs_ylm"] variables["inv_mass"].append("mll") variables["pt"].append("ptl2") elif process.startswith("W"): w_sign = process[1].lower() variables["rapidity"].append(f"abs_yl{w_sign}") variables["pt"].append(f"ptl{w_sign}") variables["mt"].append("mt") selector_options = [] for cut_type, (cut_min, cut_max) in cuts.items(): if cut_min is None and cut_max is None: continue for variable in variables[cut_type]: tmp = {"observable": variable} if cut_min is not None: tmp["min"] = cut_min if cut_max is not None: tmp["max"] = cut_max selector_options.append(tmp) return selector_options
def _generate_nnlojet_pinecard(runname, process, energy, experiment, histograms): """Generate a pinecard for NNLOJET runs from an NNPDF dataset.""" selectors = select_selectors(experiment, process) histograms = deepcopy(histograms) if process.startswith("Z0"): process = process.replace("Z0", "Z") # Digest the histogram variable for histo in histograms: ob = histo["observable"] histo["observable"] = _nnlojet_observable(ob, process) ret = { "runname": runname, "process": {"proc": process, "sqrts": energy}, "pdf": "NNPDF40_nnlo_as_01180", "techcut": 1e-7, "histograms": histograms, "multi_channel": 0, "channels": { "LO": "LO", "R": "R", "V": "V", "RR": "RR", "RV": "RV", "VV": "VV", }, "parameters": { "MASS[Z]": 91.1876, "MASS[W]": 80.379, "WIDTH[Z]": 2.4952, "WIDTH[W]": 2.085, "SCHEME[alpha]": "Gmu", "GF": "1.1663787d-5", }, "selectors": selectors, } return ret
[docs] def generate_pinecard_from_nnpdf( nnpdf_dataset, scale="etz", output_path=".", observables=None ): """Generate a NNLOJET pinecard from an NNPDF dataset. Takes as input an NNPDF dataset, which will be loaded with the nnpdf_data package. If a list of observables is provided, only those in the list will be loaded from the dataframe. """ metadata = load_dataset_metadata(nnpdf_dataset) kin_df = metadata.load_kinematics(drop_minmax=False) if observables is not None: # If a list of observables is provided, select them from the dataframe # list in case they enter as a tuple kin_df = kin_df.loc[:, list(observables)] # Change some variable names: kin_df.rename(columns={"m_ll2": "M2"}, inplace=True) output_runcards = [] # Put all the information we might need in nicely organized variables process = metadata.process energy = metadata.cm_energy experiment = metadata.experiment # Now use the kinematic information to generate histograms kin_variables = kin_df.columns.get_level_values(0) # Is it legacy? if "k1" in kin_variables: # Time to translate! _legacy_nnpdf_translation(kin_df, metadata.process_type) kin_variables = kin_df.columns.get_level_values(0) hist_vars = list(HISTOGRAM_VARIABLES.intersection(kin_variables)) # Now preprocess the variables according to the process type # Drell Yan if process in ("WPWM", "Z0", "DY"): # Special case: is a histogram binned by invariant mass? if "M2" in hist_vars: if len(kin_df["M2"]["mid"].unique()) == 1: hist_vars.remove("M2") # Create the histogram depending on whether this is a 1D or 2D distribution (or total) histograms = None if len(hist_vars) == 1: histograms = [_1d_histogram(kin_df, hist_vars[0])] elif len(hist_vars) == 2: # Let's see whether we know how to do this 2D distribution if "M2" in hist_vars: svar = "M2" elif "y" in hist_vars: svar = "y" else: raise NotImplementedError(f"Don't know how to do this 2D: {hist_vars}") hist_vars.remove(svar) # 2D distributions can only be done when min-max is available, otherwise it's a mess bounds_df = kin_df[svar] if svar.endswith("2"): bounds_df[["min", "max"]] = bounds_df[["min", "max"]].apply( lambda x: np.sqrt(x) ) bounds = bounds_df.drop_duplicates().values.tolist() nnlojet_var = _nnlojet_observable(svar, process) another_v = hist_vars[0] histograms = [] for i, (bin_min, mid_val, bin_max) in enumerate(bounds): idx = kin_df[svar]["mid"] == mid_val tmp = _1d_histogram(kin_df[idx], another_v) tmp["name"] = f"{another_v}_bin_{i}" tmp["extra_selectors"] = [ { "observable": f"{nnlojet_var}", "min": bin_min, "max": bin_max, } ] histograms.append(tmp) elif len(hist_vars) == 0 and metadata.process_type == "INC": # inclusive cross section, just create a big enough histogram histograms = [{"observable": "y", "bins": [-10.0, 10.0], "name": "tot"}] else: raise NotImplementedError( "3D distributions not implemented or process not recognized" ) is_normalized = metadata.theory.operation.lower() == "ratio" if is_normalized: print( "\033[91m [WARNING] \033[0m This dataset is probably normalized, you might be missing the runcard for the fiducial cross section" ) # Prepare the metadata for the data hepdata = metadata._parent.hepdata.url arxiv = metadata._parent.arXiv.url.split("/")[-1] tables = metadata.tables if not hepdata.startswith("https:"): # Try to autoguess the doi hepdata = f"https://doi.org/{hepdata}" data_comment = f"arXiv number: {arxiv}, hepdata entry: {hepdata} (tables: {tables})" print( f"\033[91m [WARNING] \033[0m Remember to update he selection cuts in the runcard, see {hepdata}" ) # For some NNPDF datasets, different processes/energies might be grouped together processes = [process] if process.startswith("WPWM"): processes = [process.replace("WP", ""), process.replace("WM", "")] if process == "DY": processes = ["Z0", "WP", "WM"] parent_folder = output_path.parent base_name = output_path.name output_runcards = [] for proc in processes: runname = nnpdf_dataset.replace(process, proc) ret = _generate_nnlojet_pinecard(runname, proc, energy, experiment, histograms) ret["scales"] = {"mur": scale, "muf": scale} # Beautify before dumping data = CommentedMap(ret) for key in ret: data.yaml_set_comment_before_after_key(key, before="\n") data.yaml_set_start_comment(data_comment) opath = parent_folder / base_name.replace(process, proc) opath.mkdir(exist_ok=True, parents=True) runcard_name = (opath / runname.upper()).with_suffix(".yaml") yaml.dump(data, runcard_name) output_runcards.append(runcard_name) _generate_metadata( arxiv, hepdata, nnpdf_dataset, runcard_name.with_name("metadata.txt") ) return output_runcards