#!/usr/bin/env python
import numpy as np
import time
from scipy.sparse import eye, lil_matrix
from .AlignmentPropertyMatrix import AlignmentPropertyMatrix as APM
[docs]class EMfactory:
"""
A class that coordinate Expectation-Maximization
"""
def __init__(self, alignments):
self.probability = alignments
self.allelic_expression = None
self.grp_conv_mat = None
self.t2t_mat = None
self.target_lengths = None
[docs] def prepare(self, pseudocount=0.0, lenfile=None, read_length=100):
"""
Initializes the probability of read origin according to the alignment profile
:param pseudocount: Uniform prior for allele specificity estimation
:return: Nothing (as it performs an in-place operations)
"""
if self.probability.num_groups > 0:
self.grp_conv_mat = lil_matrix((self.probability.num_loci, self.probability.num_groups))
for i in xrange(self.probability.num_groups):
self.grp_conv_mat[self.probability.groups[i], i] = 1.0
self.grp_conv_mat = self.grp_conv_mat.tocsc()
self.t2t_mat = eye(self.probability.num_loci, self.probability.num_loci)
self.t2t_mat = self.t2t_mat.tolil()
for tid_list in self.probability.groups:
for ii in xrange(len(tid_list)):
for jj in xrange(ii):
i = tid_list[ii]
j = tid_list[jj]
self.t2t_mat[i, j] = 1
self.t2t_mat[j, i] = 1
self.t2t_mat = self.t2t_mat.tocsc()
if lenfile is not None:
hid = dict(zip(self.probability.hname, np.arange(len(self.probability.hname))))
self.target_lengths = np.zeros((self.probability.num_loci, self.probability.num_haplotypes))
if self.probability.num_haplotypes > 1:
with open(lenfile) as fh:
for curline in fh:
item = curline.rstrip().split("\t")
locus, hap = item[0].split("_")
self.target_lengths[self.probability.lid[locus], hid[hap]] = max(float(item[1]) - read_length + 1.0, 1.0)
elif self.probability.num_haplotypes > 0:
with open(lenfile) as fh:
for curline in fh:
item = curline.rstrip().split("\t")
self.target_lengths[self.probability.lid[item[0]], 0] = max(float(item[1]) - read_length + 1.0, 1.0)
else:
raise RuntimeError('There is something wrong with your emase-format alignment file.')
self.target_lengths = self.target_lengths.transpose()
#self.target_lengths = self.target_lengths.transpose() / read_length # lengths in terms of read counts
if not np.all(self.target_lengths > 0.0):
raise RuntimeError('There exist transcripts missing length information.')
self.probability.normalize_reads(axis=APM.Axis.READ) # Initialize alignment probability matrix
self.allelic_expression = self.probability.sum(axis=APM.Axis.READ)
if self.target_lengths is not None: # allelic_expression will be at depth-level
self.allelic_expression = np.divide(self.allelic_expression, self.target_lengths)
if pseudocount > 0.0: # pseudocount is at depth-level
orig_allelic_expression_sum = self.allelic_expression.sum()
nzloci = np.nonzero(self.allelic_expression)[1]
self.allelic_expression[:, nzloci] += pseudocount
self.allelic_expression *= (orig_allelic_expression_sum / self.allelic_expression.sum()) # original depth scale
[docs] def reset(self, pseudocount=0.0):
"""
Initializes the probability of read origin according to the alignment profile
:param pseudocount: Uniform prior for allele specificity estimation
:return: Nothing (as it performs an in-place operations)
"""
self.probability.reset()
self.probability.normalize_reads(axis=APM.Axis.READ) # Initialize alignment probability matrix
self.allelic_expression = self.probability.sum(axis=APM.Axis.READ)
if self.target_lengths is not None: # allelic_expression will be at depth-level
self.allelic_expression = np.divide(self.allelic_expression, self.target_lengths)
if pseudocount > 0.0: # pseudocount is at depth-level
orig_allelic_expression_sum = self.allelic_expression.sum()
nzloci = np.nonzero(self.allelic_expression)[1]
self.allelic_expression[:, nzloci] += pseudocount
self.allelic_expression *= (orig_allelic_expression_sum / self.allelic_expression.sum()) # original depth scale
[docs] def get_allelic_expression(self, at_group_level=False):
if at_group_level:
return self.allelic_expression * self.grp_conv_mat
else:
return self.allelic_expression.copy()
[docs] def update_probability_at_read_level(self, model=3):
"""
Updates the probability of read origin at read level
:param model: Normalization model (1: Gene->Allele->Isoform, 2: Gene->Isoform->Allele, 3: Gene->Isoform*Allele, 4: Gene*Isoform*Allele)
:return: Nothing (as it performs in-place operations)
"""
self.probability.reset() # reset to alignment incidence matrix
if model == 1:
self.probability.multiply(self.allelic_expression, axis=APM.Axis.READ)
self.probability.normalize_reads(axis=APM.Axis.HAPLOGROUP, grouping_mat=self.t2t_mat)
haplogroup_sum_mat = self.allelic_expression * self.t2t_mat
self.probability.multiply(haplogroup_sum_mat, axis=APM.Axis.READ)
self.probability.normalize_reads(axis=APM.Axis.GROUP, grouping_mat=self.t2t_mat)
self.probability.multiply(haplogroup_sum_mat.sum(axis=0), axis=APM.Axis.HAPLOTYPE)
self.probability.normalize_reads(axis=APM.Axis.READ)
elif model == 2:
self.probability.multiply(self.allelic_expression, axis=APM.Axis.READ)
self.probability.normalize_reads(axis=APM.Axis.LOCUS)
self.probability.multiply(self.allelic_expression.sum(axis=0), axis=APM.Axis.HAPLOTYPE)
self.probability.normalize_reads(axis=APM.Axis.GROUP, grouping_mat=self.t2t_mat)
self.probability.multiply((self.allelic_expression * self.t2t_mat).sum(axis=0), axis=APM.Axis.HAPLOTYPE)
self.probability.normalize_reads(axis=APM.Axis.READ)
elif model == 3:
self.probability.multiply(self.allelic_expression, axis=APM.Axis.READ)
self.probability.normalize_reads(axis=APM.Axis.GROUP, grouping_mat=self.t2t_mat)
self.probability.multiply((self.allelic_expression * self.t2t_mat).sum(axis=0), axis=APM.Axis.HAPLOTYPE)
self.probability.normalize_reads(axis=APM.Axis.READ)
elif model == 4:
self.probability.multiply(self.allelic_expression, axis=APM.Axis.READ)
self.probability.normalize_reads(axis=APM.Axis.READ)
else:
raise RuntimeError('The read normalization model should be 1, 2, 3, or 4.')
[docs] def update_allelic_expression(self, model=3):
"""
A single EM step: Update probability at read level and then re-estimate allelic specific expression
:param model: Normalization model (1: Gene->Allele->Isoform, 2: Gene->Isoform->Allele, 3: Gene->Isoform*Allele, 4: Gene*Isoform*Allele)
:return: Nothing (as it performs in-place operations)
"""
self.update_probability_at_read_level(model)
self.allelic_expression = self.probability.sum(axis=APM.Axis.READ)
if self.target_lengths is not None:
self.allelic_expression = np.divide(self.allelic_expression, self.target_lengths)
[docs] def run(self, model, tol=0.001, max_iters=999, verbose=True):
"""
Runs EM iterations
:param model: Normalization model (1: Gene->Allele->Isoform, 2: Gene->Isoform->Allele, 3: Gene->Isoform*Allele, 4: Gene*Isoform*Allele)
:param tol: Tolerance for termination
:param max_iters: Maximum number of iterations until termination
:param verbose: Display information on how EM is running
:return: Nothing (as it performs in-place operations)
"""
orig_err_states = np.seterr(all='raise')
np.seterr(under='ignore')
if verbose:
print
print "Iter No Time (hh:mm:ss) Total change (TPM) "
print "------- --------------- ----------------------"
num_iters = 0
err_sum = 1000000.0
time0 = time.time()
target_err = 1000000.0 * tol
while err_sum > target_err and num_iters < max_iters:
prev_isoform_expression = self.get_allelic_expression().sum(axis=0)
prev_isoform_expression *= (1000000.0 / prev_isoform_expression.sum())
self.update_allelic_expression(model=model)
curr_isoform_expression = self.get_allelic_expression().sum(axis=0)
curr_isoform_expression *= (1000000.0 / curr_isoform_expression.sum())
err = np.abs(curr_isoform_expression - prev_isoform_expression)
err_sum = err.sum()
num_iters += 1
if verbose:
time1 = time.time()
delmin, s = divmod(int(time1 - time0), 60)
h, m = divmod(delmin, 60)
print " %5d %4d:%02d:%02d %9.1f / 1000000" % (num_iters, h, m, s, err_sum)
[docs] def report_read_counts(self, filename, grp_wise=False, reorder='as-is', notes=None):
"""
Exports expected read counts
:param filename: File name for output
:param grp_wise: whether the report is at isoform level or gene level
:param reorder: whether the report should be either 'decreasing' or 'increasing' order or just 'as-is'
:return: Nothing but the method writes a file
"""
expected_read_counts = self.probability.sum(axis=APM.Axis.READ)
if grp_wise:
lname = self.probability.gname
expected_read_counts = expected_read_counts * self.grp_conv_mat
else:
lname = self.probability.lname
total_read_counts = expected_read_counts.sum(axis=0)
if reorder == 'decreasing':
report_order = np.argsort(total_read_counts.flatten())
report_order = report_order[::-1]
elif reorder == 'increasing':
report_order = np.argsort(total_read_counts.flatten())
elif reorder == 'as-is':
report_order = np.arange(len(lname)) # report in the original locus order
cntdata = np.vstack((expected_read_counts, total_read_counts))
fhout = open(filename, 'w')
fhout.write("locus\t" + "\t".join(self.probability.hname) + "\ttotal")
if notes is not None:
fhout.write("\tnotes")
fhout.write("\n")
for locus_id in report_order:
lname_cur = lname[locus_id]
fhout.write("\t".join([lname_cur] + map(str, cntdata[:, locus_id].ravel())))
if notes is not None:
fhout.write("\t%s" % notes[lname_cur])
fhout.write("\n")
fhout.close()
[docs] def report_depths(self, filename, tpm=True, grp_wise=False, reorder='as-is', notes=None):
"""
Exports expected depths
:param filename: File name for output
:param grp_wise: whether the report is at isoform level or gene level
:param reorder: whether the report should be either 'decreasing' or 'increasing' order or just 'as-is'
:return: Nothing but the method writes a file
"""
if grp_wise:
lname = self.probability.gname
depths = self.allelic_expression * self.grp_conv_mat
else:
lname = self.probability.lname
depths = self.allelic_expression
if tpm:
depths *= (1000000.0 / depths.sum())
total_depths = depths.sum(axis=0)
if reorder == 'decreasing':
report_order = np.argsort(total_depths.flatten())
report_order = report_order[::-1]
elif reorder == 'increasing':
report_order = np.argsort(total_depths.flatten())
elif reorder == 'as-is':
report_order = np.arange(len(lname)) # report in the original locus order
cntdata = np.vstack((depths, total_depths))
fhout = open(filename, 'w')
fhout.write("locus\t" + "\t".join(self.probability.hname) + "\ttotal")
if notes is not None:
fhout.write("\tnotes")
fhout.write("\n")
for locus_id in report_order:
lname_cur = lname[locus_id]
fhout.write("\t".join([lname_cur] + map(str, cntdata[:, locus_id].ravel())))
if notes is not None:
fhout.write("\t%s" % notes[lname_cur])
fhout.write("\n")
fhout.close()
[docs] def export_posterior_probability(self, filename, title="Posterior Probability"):
"""
Writes the posterior probability of read origin
:param filename: File name for output
:param title: The title of the posterior probability matrix
:return: Nothing but the method writes a file in EMASE format (PyTables)
"""
self.probability.save(h5file=filename, title=title)
if __name__ == "__main__":
pass # TODO: Put some simple test