Source code for isweep.inference

# imports
import gzip
import numpy as np
import pandas as pd
from random import randint
from math import floor, exp, log, ceil
from numpy.random import exponential as Exp
from scipy.stats import norm, binom
from scipy.optimize import minimize, minimize_scalar
from .coalescent import walk_variant_backward, probability_quasi_geometric, probability_erlang_segments
from .utilities import bin_ibd_segments

# read .ibd file

[docs]def read_ibd_file(ibd_file, header = 1, include_length = 1): '''Create list of IBD segments Parameters ---------- ibd_file : str Name of text file with IBD segments header : int 0 for no header include_length : int 0 for no length Returns ------- list (ID1, ID2, cM length) pairs ''' segments = [] with gzip.open(ibd_file, 'rt') as f: if header != 0: f.readline() for line in f: row = line.strip().split('\t') id1 = str(row[0]) + '_' + str(row[1]) id2 = str(row[2]) + '_' + str(row[3]) if include_length != 0: length = float(row[7]) segments.append((id1, id2, length)) else: segments.append((id1, id2)) return segments
# goodness-of-fit statistics (estimate selection coeffficent)
[docs]def chi2_labeled_isweep(s, p0, Ne, n, obs1, obs0, ab, one_step_model = 'a', tau0 = 0, sv=-0.01, ploidy = 2): '''Chi-squared statistic for sweep model (labeled) Parameters ---------- s : float Selection coefficient p0 : float Variant frequency at generation 0 Ne : dict Effective population sizes n : int Pairs sample size obs1 : numpy.array Observed counts for ibd segment bins (labeled 1) obs0 : numpy.array Observed counts for ibd segment bins (labeled 0) ab : numpy.array Increasing floats in centiMorgans one_step_model : str 'm', 'a', 'd', or 'r' tau0 : int Generation at which neutrality begins sv: float Allele frequency of standing variation (Default -0.01 will assume de novo sweep) ploidy : int 1 for haploid or 2 for diploid Returns ------- float Goodness-of-fit statistic ''' # assertions assert len(obs1) == len(obs0) assert len(obs1) == (len(ab) - 1) # probabilities ps, Ns, xs = walk_variant_backward(s, p0, Ne, False, one_step_model, tau0, sv, ploidy) mass1 = (ps[0] ** 2) * probability_quasi_geometric(ps, Ns, ploidy) mass0 = ((1 - ps[0]) ** 2) * probability_quasi_geometric(1 - ps, Ns, ploidy) # expecteds E0 = probability_erlang_segments(ab, mass0) E1 = probability_erlang_segments(ab, mass1) E1 = n * E1 E0 = n * E0 # chi-squared goodness-of-fit OE1 = ((obs1 - E1) ** 2) / E1 OE0 = ((obs0 - E0) ** 2) / E0 return np.sum(OE1) + np.sum(OE0)
[docs]def chi2_isweep(s, p0, Ne, n, obs, ab, one_step_model = 'a', tau0 = 0, sv=-0.01, ploidy = 2): '''Chi-squared statistic for sweep model (unlabeled) Parameters ---------- s : float Selection coefficient p0 : float Variant frequency at generation 0 Ne : dict Effective population sizes n : int Pairs sample size obs : numpy.array Observed counts for ibd segment bins (unlabeled) ab : numpy.array Increasing floats in centiMorgans one_step_model : str 'm', 'a', 'd', or 'r' tau0 : int Generation at which neutrality begins sv: float Allele frequency of standing variation (Default -0.01 will assume de novo sweep) ploidy : int 1 for haploid or 2 for diploid Returns ------- float Goodness-of-fit statistic ''' assert len(obs) == (len(ab) - 1) ps, Ns, xs = walk_variant_backward(s, p0, Ne, False, one_step_model, tau0, sv, ploidy) mass1 = (ps[0] ** 2) * probability_quasi_geometric(ps, Ns, ploidy) mass0 = ((1 - ps[0]) ** 2) * probability_quasi_geometric(1 - ps, Ns, ploidy) E0 = probability_erlang_segments(ab, mass0) E1 = probability_erlang_segments(ab, mass1) E = E1 + E0 E = n * E OE = ((obs - E) ** 2) / E return np.sum(OE)
### bootstrap (study uncertainty) ###
[docs]def bootstrap_standard(val, boot, alpha1 = 0.025, alpha2 = 0.975): '''Implements the standard bootstrap interval estimator Parameters ---------- val : float Parameter estimate boot : numpy.array Bootstraps alpha1 : float Percentile alpha2 : float Percentile Returns ------- tuple (lower, middle, upper) interval estimator ''' za = norm.ppf(alpha1) zb = norm.ppf(alpha2) sd = np.std(boot) return (val + za * sd, val, val + zb * sd)
[docs]def bootstrap_standard_bc(val, boot, alpha1 = 0.025, alpha2 = 0.975): '''Implements the standard bootstrap interval estimator (w/ bias-correction) Parameters ---------- val : float Parameter estimate boot : numpy.array Bootstraps alpha1 : float Percentile alpha2 : float Percentile Returns ------- tuple (lower, middle, upper) interval estimator ''' za = norm.ppf(alpha1) zb = norm.ppf(alpha2) bc = np.mean(boot) - val sd = np.std(boot) return (val - bc + za * sd, val - bc, val - bc + zb * sd)
def bootstrap_percentile_bc(val, boot, alpha1 = 0.025, alpha2 = 0.975): '''Implement percentile-based interval estimator with bias correction Parameters ---------- val : float Parameter estimate boot : numpy.array Bootstraps alpha1 : float Percentile alpha2 : float Percentile Returns ------- tuple (lower, middle, upper) interval estimator ''' boot = np.array(boot) err = boot - val return ( val - np.quantile(err, alpha2), val - np.quantile(err, 0.5), val - np.quantile(err, alpha1) )
[docs]def bootstrap_percentile(val, boot, alpha1 = 0.025, alpha2 = 0.975): '''Implements percentile-based interval estimator Parameters ---------- val : float Parameter estimate boot : numpy.array Bootstraps alpha1 : float Percentile alpha2 : float Percentile Returns ------- tuple (lower, middle, upper) interval estimator ''' vl = np.quantile(boot, alpha1) vu = np.quantile(boot, alpha2) vm = np.quantile(boot, 0.5) return (vl, vm, vu)
##### time to event ##### def when_count(ct, s, p0, Ne, random_walk = True, one_step_model = 'a', tau0 = 0, sv=-0.01, ploidy=2): '''Report when variant count reaches set value Parameters ---------- ct : int Variant count s : float Selection coefficient p0 : float Variant frequency at generation 0 Ne : dict Effective population sizes random_walk : bool True for random walk one_step_model : str 'm', 'a', 'd', or 'r' tau0 : int Generation when neutrality begins sv: float Allele frequency of standing variation (Default -0.01 will assume de novo sweep) ploidy : int 1 for haploid or 2 for diploid Returns ------- int Generation time ''' var = walk_variant_backward(s, p0, Ne, random_walk, one_step_model, tau0, sv, ploidy)[2] idx = np.argmax(var <= ct) if idx == 0: # handle variant introduction return len(var) return idx def bootstrap_count(ct, B, boots, bootp, Ne, random_walk = True, one_step_model = 'a', tau0 = 0, sv=-0.01, ploidy=2): '''Parametric bootstrap for variant count generation time Parameters ---------- ct : int Variant count B : int Number of bootstraps boots : array-like NumPy array of bootstraps for selection coefficient bootp : array-like NumPy array of bootstraps for variant frequency Ne : dict Effective population sizes random_walk : bool True for random walk one_step_model : str 'm', 'a', 'd', or 'r' tau0 : int Generation when neutrality begins sv: float Allele frequency of standing variation (Default -0.01 will assume de novo sweep) ploidy : int 1 for haploid or 2 for diploid Returns ------- array-like NumPy array of bootstraps for variant count generation time ''' assert len(boots) == len(bootp) boott = [] for i in range(len(boots)): for b in range(B): boott.append(when_count(ct, boots[i], bootp[i], Ne, random_walk, one_step_model, tau0, sv, ploidy)) return boott
[docs]def when_freq(maf, s, p0, Ne, random_walk = True, one_step_model = 'a', tau0 = 0, sv=-0.01, ploidy=2): '''Report when variant frequency reaches set value Parameters ---------- maf : float Variant frequency s : float Selection coefficient p0 : float Variant frequency at generation 0 Ne : dict Effective population sizes random_walk : bool True for random walk one_step_model : str 'm', 'a', 'd', or 'r' tau0 : int Generation when neutrality begins sv: float Allele frequency of standing variation (Default -0.01 will assume de novo sweep) ploidy : int 1 for haploid or 2 for diploid Returns ------- int Generation time ''' var = walk_variant_backward(s, p0, Ne, random_walk, one_step_model, tau0, sv, ploidy)[0] idx = np.argmax(var <= maf) if idx == 0: # handle variant introduction return len(var) return idx
[docs]def bootstrap_freq(maf, B, boots, bootp, Ne, random_walk = True, one_step_model = 'a', tau0 = 0, sv=-0.01, ploidy=2): '''Parametric bootstrap for variant frequency generation time Parameters ---------- maf : float Variant frequency B : int Number of bootstraps boots : numpy.array NumPy array of bootstraps for selection coefficient bootp : numpy.array NumPy array of bootstraps for variant frequency Ne : dict Effective population sizes random_walk : bool True for random walk one_step_model : str 'm', 'a', 'd', or 'r' tau0 : int Generation when neutrality begins sv: float Allele frequency of standing variation (Default -0.01 will assume de novo sweep) ploidy : int 1 for haploid or 2 for diploid Returns ------- numpy.array NumPy array of bootstraps for variant frequency generation time ''' assert len(boots) == len(bootp) boott = [] for i in range(len(boots)): for b in range(B): boott.append(when_freq(maf, boots[i], bootp[i], Ne, random_walk, one_step_model, tau0, sv, ploidy)) return boott
### metrics (evaluate experimental results) ### # def check_envelope(x, a, b): # '''Check if value is in an interval # Parameters # ---------- # x : float # Parameter estimate # a : float # Left endpoint # b : float # Right endpoint # Returns # ------- # bool # If covered # ''' # if x >= a: # if x <= b: # return True # return False # def make_confusion_matrix(real, pred): # '''Form confusion matrix for statistical classification # Parameters # ---------- # real : array_like # Actual 1s and 0s # pred : array_like # Predicted 1s and 0s # Returns # ------- # pandas DataFrame object # 2 x 2 crosstab # ''' # data = {'Actual':real, 'Predicted':pred} # conf = pd.DataFrame(data, columns = ['Actual', 'Predicted']) # matr = pd.crosstab(conf['Actual'], # conf['Predicted'], # rownames=['Actual'], # colnames=['Predicted']) # return matr # def false_omission_rate(matrix): # '''Compute false omission rate # Parameters # ---------- # matrix : pandas DataFrame object # 2 x 2 cross tab # Returns # ------- # float # The false omission rate (https://en.wikipedia.org/wiki/Confusion_matrix) # ''' # tn = matrix[0][0] # fp = matrix[1][0] # fn = matrix[0][1] # tp = matrix[1][1] # return fn / (fn + tn) # def false_positive_rate(matrix): # '''Compute false positive rate # Parameters # ---------- # matrix : pandas DataFrame object # 2 x 2 cross tab # Returns # ------- # float # The false positive rate (https://en.wikipedia.org/wiki/Confusion_matrix) # ''' # tn = matrix[0][0] # fp = matrix[1][0] # fn = matrix[0][1] # tp = matrix[1][1] # return fp / (fp + tn) # def true_positive_rate(matrix): # '''Compute true positive rate # Parameters # ---------- # matrix : pandas DataFrame object # 2 x 2 cross tab # Returns # ------- # float # The true positive rate (https://en.wikipedia.org/wiki/Confusion_matrix) # ''' # tn = matrix[0][0] # fp = matrix[1][0] # fn = matrix[0][1] # tp = matrix[1][1] # return tp / (tp + fn)