Source code for bvalcalc.core.calculateB

import numpy as np
from bvalcalc.utils.dfe_helper import get_DFE_params
from scipy.integrate import quad
import sys

_params_cache: dict | None = None

[docs] def get_params(params_path: str | None = None): """ Loads DFE parameters from the provided population genetic parameters file. Parameters ---------- params_path: str Path to Params.py file """ global _params_cache if _params_cache is None: _params_cache = get_DFE_params(params_path) return _params_cache
[docs] def calculateB_linear(distance_to_element: int, length_of_element: int, params: dict | None = None): """ Calculate B due to purifying selection acting on a linked selected element of arbitrary length, assuming a constant crossover and gene conversion rate (analytical solution). Parameters ---------- distance_to_element: int Distance (bp) from the neutral site to the nearest edge of the selected element. length_of_element: int Length (bp) of the selected element. params : dict Required parameters from ``get_params()``, only kept as default (None) when being called by CLI, in which case parameters are sourced from the params file directly. """ with np.errstate(divide='ignore', invalid='ignore'): if params is None: params = get_params() r, u, g, k, t1, t1half, t2, t3, t4, f1, f2, f3, f0 = params["r"], params["u"], params["g"], params["k"], params["t1"], params["t1half"], params["t2"], params["t3"], params["t4"], params["f1"], params["f2"], params["f3"], params["f0"] C = (1.0 - np.exp(-2.0 * r * distance_to_element)) / 2.0 # cM U = length_of_element * u if g == 0: a = C # RECOMBINATION IN Y b = C + (r * length_of_element) # RECOMBINATION IN X elif g > 0: a, b = get_a_b_with_GC(C, distance_to_element, length_of_element) E_f1 = calculate_exponent(t1half, t2, U, a, b) E_f2 = calculate_exponent(t2, t3, U, a, b) E_f3 = calculate_exponent(t3, t4, U, a, b) E_bar = ( # Sum over the DFE f0 * 0.0 + f1 * ((t1half - t1) / (t2 - t1)) * 0.0 + f1 * ((t2 - t1half) / (t2 - t1)) * E_f1 + f2 * E_f2 + f3 * E_f3) B = np.exp(-1.0 * E_bar) return np.where(length_of_element == 0, 1.0, B)
def calculateB_recmap(distance_to_element, length_of_element, rec_distances = None, rec_lengths = None, gc_distances = None, gc_lengths = None, params = None): """ Calculate the B value WITH REC MAP for a single functional element at the focal site, summing over the DFE while consolidating the intermediate calculations. """ with np.errstate(divide='ignore', invalid='ignore'): if params is None: params = get_params() r, u, g, k, t1, t1half, t2, t3, t4, f1, f2, f3, f0 = params["r"], params["u"], params["g"], params["k"], params["t1"], params["t1half"], params["t2"], params["t3"], params["t4"], params["f1"], params["f2"], params["f3"], params["f0"] # rec_distances is the length of the element * rec rate in each spanned region. if rec_distances is not None: rec_adjusted_length_of_element = rec_lengths rec_adjusted_distance_to_element = rec_distances else: rec_adjusted_length_of_element = length_of_element rec_adjusted_distance_to_element = distance_to_element if gc_distances is not None: local_g = (gc_lengths + gc_distances)/(length_of_element + distance_to_element) * g else: local_g = g C = (1.0 - np.exp(-2.0 * r * rec_adjusted_distance_to_element)) / 2.0 # cM U = length_of_element * u if g == 0: a = C b = C + r * rec_adjusted_length_of_element # cM elif g > 0: a, b = get_a_b_with_GC_andMaps(C, y=distance_to_element, l=length_of_element, rec_l=rec_adjusted_length_of_element, local_g = local_g) E_f1 = calculate_exponent(t1half, t2, U, a, b) E_f2 = calculate_exponent(t2, t3, U, a, b) E_f3 = calculate_exponent(t3, t4, U, a, b) E_bar = ( # Sum over the DFE f0 * 0.0 + f1 * ((t1half - t1) / (t2 - t1)) * 0.0 + f1 * ((t2 - t1half) / (t2 - t1)) * E_f1 + f2 * E_f2 + f3 * E_f3) B = np.exp(-1.0 * E_bar) return np.where(length_of_element == 0, 1.0, B)
[docs] def calculateB_unlinked(unlinked_L: int, params: dict | None = None): """ Calculate B due to purifying selection at unlinked sites (numerical integration over DFE). Parameters ---------- unlinked_L : float Cumulative count of selected sites in unlinked regions. params : dict Required parameters from ``get_params()``, only kept as default (None) when being called by CLI, in which case parameters are sourced from the params file directly. """ if params is None: params = get_params() u, t1, t1half, t2, t3, t4, f0, f1, f2, f3 = params["u"], params["t1"], params["t1half"], params["t2"], params["t3"], params["t4"], params["f0"], params["f1"], params["f2"], params["f3"] f1_above_cutoff = f1 * ((t1half - t1) / (t2 - t1)) sum_f1 = (f1_above_cutoff / (t2 - t1half)) * (np.log((1 + t2) /(1 + t1half)) + (1 / (1 + t2)) - (1 / (1 + t1half))) sum_f2 = (f2 / (t3 - t2)) * (np.log((1 + t3) /(1 + t2)) + (1 / (1 + t3)) - (1 / (1 + t2))) sum_f3 = (f3 / (t4 - t3)) * (np.log((1 + t4) /(1 + t3)) + (1 / (1 + t4)) - (1 / (1 + t3))) unlinked_B = np.exp(-8 * u * 1.0 * unlinked_L * (sum_f1 + sum_f2 + sum_f3)) return unlinked_B
## Helper functions def calculate_exponent(t_start, t_end, U, a, b): """" Helper to calculate the exponent using "a" and "b" """ a, b, U = np.asarray(a), np.asarray(b), np.asarray(U) if U.size == 0: return 0 # If e.g. f1 proportion is 0, no need to calculate exponent E1 = ((U * a) / ((1 - a) * (a - b) * (t_end - t_start))) * np.log((a + (t_end * (1 - a))) / (a + (t_start * (1 - a)))) E2 = -1.0 * ((U * b) / ((1 - b) * (a - b) * (t_end - t_start))) * np.log((b + ((1 - b) * t_end)) / (b + ((1 - b) * t_start))) E = np.asarray(E1 + E2) rec_0_mask = np.isclose(a, b) # Get mask for where recombination rate = 0 within the gene if rec_0_mask.any(): # 4a) If a_arr is scalar (0‐d), compute limit once as scalar if a.ndim == 0: limit_factor = (1 / ((t_end - t_start)*(1-a)**2)) * ( # Calculate exponent with 0 recombination between gene and site, avoiding limits np.log((a + (1 - a) * t_end) / (a + (1 - a) * t_start)) + a / (a + (1 - a) * t_end) - a / (a + (1 - a) * t_start)) # Broadcast scalar limit_factor to all masked positions E[rec_0_mask] = U[rec_0_mask] * limit_factor # Get corresponding U for the numerator and plug back into E array to replace nan's else: # 4b) If a_arr is array, compute limit for each masked element ae = a[rec_0_mask] # array of a_i where a_i ≈ b_i limit_factor = (1 / ((t_end - t_start)*(1-ae)**2)) * ( # Calculate exponent with 0 recombination between gene and site, avoiding limits np.log((ae + (1 - ae) * t_end) / (ae + (1 - ae) * t_start)) + ae / (ae + (1 - ae) * t_end) - ae / (ae + (1 - ae) * t_start)) # Match array of limit_factor to corresponding positions in E (where rec_0_mask has True);'l;'l'' E[rec_0_mask] = U[rec_0_mask] * limit_factor # Get corresponding U for the numerator and plug back into E array to replace nan's return E def get_a_b_with_GC(C, y, l): with np.errstate(divide='ignore', invalid='ignore'): params = get_params() r, u, g, k, t1, t1half, t2, t3, t4, f1, f2, f3, f0 = params["r"], params["u"], params["g"], params["k"], params["t1"], params["t1half"], params["t2"], params["t3"], params["t4"], params["f1"], params["f2"], params["f3"], params["f0"] proportion_nogc_a = np.where(k < y + l, # When GC includes neutral site, this is proportion of the gene it includes np.maximum((0.5*(k-y)/l), 0), 1-(y + l)/(2 * k) ) proportion_nogc_b = np.where(k < y + l, # When GC includes gene site, this is probability the tract includes neutral site of interest 1/(2*k) * np.maximum(k-y+1,0) * np.maximum(k - y, 0) / l, # When overshooting not possible (k - y - 0.5 * l) / k) # When overshooting possible a = np.where(k < y, C + (2 * g * k), # Probability of GC on neutral site, where overlap with element not possible C + (2 * g * (y) + # When overlap possible this is probability gc is in neutral but doesn't include any of element g * (k - y) * # Probability gc is in neutral and includes some element (remaining probability from above) (1 - proportion_nogc_a) # Proportion of gene that gc breaks linkage with when it includes some element )) b = C + (r * l) + (2 * g * k) * (1 - (1-proportion_nogc_a)*proportion_nogc_b) #* prop k out return a, b def get_a_b_with_GC_andMaps(C, y, l, rec_l, local_g): params = get_params() r, u, g, k, t1, t1half, t2, t3, t4, f1, f2, f3, f0 = params["r"], params["u"], params["g"], params["k"], params["t1"], params["t1half"], params["t2"], params["t3"], params["t4"], params["f1"], params["f2"], params["f3"], params["f0"] with np.errstate(divide='ignore', invalid='ignore'): proportion_nogc_a = np.where(k < y + l, # When GC includes neutral site, this is proportion of the gene it includes np.maximum((0.5*(k-y)/l), 0), ((y) * (2 * k - (y + l)))/(2 * k * y) ) proportion_nogc_b = np.where(k < y + l, # When GC includes gene site, this is probability the tract includes neutral site of interest 1/(2*k) * np.maximum(k-y+1,0) * np.maximum(k - y, 0) / l, (k - y - 0.5 * l) / k) a = np.where(k < y, C + (2 * local_g * k), # Probability of GC on neutral site, where overlap with element not possible C + (2 * local_g * (y) + # When overlap possible this is probability gc is in neutral but doesn't include any of element local_g * (k - y) * # Probability gc is in neutral and includes some element (remaining probability from above) (1 - proportion_nogc_a) # Proportion of gene that gc breaks linkage with when it includes some element )) b = C + (r * rec_l) + (2 * local_g * k) * (1 - (1-proportion_nogc_a)*proportion_nogc_b) #* prop k out return a, b