#!/usr/bin/env python
import copy
import tables
from .Sparse3DMatrix import Sparse3DMatrix
import numpy as np
from scipy.sparse import lil_matrix, coo_matrix, csc_matrix, csr_matrix
[docs]def enum(**enums):
return type('Enum', (), enums)
[docs]class AlignmentPropertyMatrix(Sparse3DMatrix):
Axis = enum(LOCUS=0, HAPLOTYPE=1, READ=2, GROUP=3, HAPLOGROUP=4)
def __init__(self, other=None, \
h5file=None, datanode='/', metanode='/', shallow=False, \
shape=None, dtype=float, haplotype_names=None, locus_names=None, read_names=None, \
grpfile=None):
Sparse3DMatrix.__init__(self, other=other, h5file=h5file, datanode=datanode, shape=shape, dtype=dtype)
self.num_loci, self.num_haplotypes, self.num_reads = self.shape
self.num_groups = 0
self.count = None
self.hname = None
self.lname = None # locus name
self.rname = None # read name
self.lid = None # locus ID
self.rid = None # read ID
self.gname = None # group name
self.groups = None # groups in terms of locus IDs
if other is not None: # Use for copying from other existing AlignmentPropertyMatrix object
if other.count is not None:
self.count = copy.copy(other.count)
if not shallow:
self.__copy_names(other)
self.__copy_group_info(other)
elif h5file is not None: # Use for loading from a pytables file
h5fh = tables.open_file(h5file, 'r')
if h5fh.__contains__('%s' % (datanode + '/count')):
self.count = h5fh.get_node(datanode, 'count').read()
if not shallow:
self.hname = h5fh.get_node_attr(datanode, 'hname')
self.lname = h5fh.get_node(metanode, 'lname').read()
self.lid = dict(zip(self.lname, np.arange(self.num_loci)))
if h5fh.__contains__('%s' % (metanode + '/rname')):
self.rname = h5fh.get_node(metanode, 'rname').read()
self.rid = dict(zip(self.rname, np.arange(self.num_reads)))
h5fh.close()
elif shape is not None: # Use for initializing an empty matrix
if haplotype_names is not None:
if len(haplotype_names) == self.num_haplotypes:
self.hname = haplotype_names
else:
raise RuntimeError('The number of names does not match to the matrix shape.')
if locus_names is not None:
if len(locus_names) == self.num_loci:
self.lname = np.array(locus_names)
self.lid = dict(zip(self.lname, np.arange(self.num_loci)))
else:
raise RuntimeError('The number of names does not match to the matrix shape.')
if read_names is not None:
if len(read_names) == self.num_reads:
self.rname = np.array(read_names)
self.rid = dict(zip(self.rname, np.arange(self.num_reads)))
else:
raise RuntimeError('The number of names does not match to the matrix shape.')
if grpfile is not None:
self.__load_groups(grpfile)
def __load_groups(self, grpfile): # A group is a set of isoforms within a gene
if self.lid is not None:
self.gname = list()
self.groups = list()
with open(grpfile) as fh:
for curline in fh:
item = curline.rstrip().split("\t")
self.gname.append(item[0])
tid_list = [ self.lid[t] for t in item[1:] ]
self.groups.append(tid_list)
self.gname = np.array(self.gname)
self.num_groups = len(self.gname)
else:
raise RuntimeError('Locus IDs are not availalbe.')
load_groups = __load_groups
def __copy_names(self, other):
self.hname = other.hname
self.lname = copy.copy(other.lname)
self.rname = copy.copy(other.rname)
self.lid = copy.copy(other.lid)
self.rid = copy.copy(other.rid)
def __copy_group_info(self, other):
# if other.num_groups > 0:
if other.groups is not None and other.gname is not None:
self.groups = copy.deepcopy(other.groups)
self.gname = copy.copy(other.gname)
self.num_groups = other.num_groups
[docs] def copy(self, shallow=False):
dmat = Sparse3DMatrix.copy(self)
dmat.count = copy.copy(self.count)
dmat.num_loci, dmat.num_haplotypes, dmat.num_reads = dmat.shape
if not shallow:
dmat.__copy_names(self)
dmat.__copy_group_info(self)
return dmat
def _bundle_inline(self, reset=False): # Inline bundling method
if self.finalized:
if self.num_groups > 0 and self.groups is not None and self.gname is not None:
grp_conv_mat = lil_matrix((self.num_loci, self.num_groups))
for i in xrange(self.num_groups):
grp_conv_mat[self.groups[i], i] = 1.0
grp_conv_mat = grp_conv_mat.tocsc()
for hid in xrange(self.num_haplotypes):
self.data[hid] = self.data[hid] * grp_conv_mat # TODO: Is there any better way to save memory?
self.num_loci = self.num_groups
self.shape = (self.num_groups, self.num_haplotypes, self.num_reads)
self.lname = copy.copy(self.gname)
self.lid = dict(zip(self.gname, np.arange(self.num_groups)))
self.num_groups = 0
self.groups = None
self.gname = None
if reset:
self.reset()
else:
raise RuntimeError('No group information is available for bundling.')
else:
raise RuntimeError('The matrix is not finalized.')
[docs] def bundle(self, reset=False, shallow=False): # Copies the original matrix (Use lots of memory)
"""
Returns ``AlignmentPropertyMatrix`` object in which loci are bundled using grouping information.
:param reset: whether to reset the values at the loci
:param shallow: whether to copy all the meta data
"""
if self.finalized:
# if self.num_groups > 0:
if self.groups is not None and self.gname is not None:
grp_conv_mat = lil_matrix((self.num_loci, self.num_groups))
for i in xrange(self.num_groups):
grp_conv_mat[self.groups[i], i] = 1.0
grp_align = Sparse3DMatrix.__mul__(self, grp_conv_mat) # The core of the bundling
grp_align.num_loci = self.num_groups
grp_align.num_haplotypes = self.num_haplotypes
grp_align.num_reads = self.num_reads
grp_align.shape = (grp_align.num_loci, grp_align.num_haplotypes, grp_align.num_reads)
if not shallow:
grp_align.lname = copy.copy(self.gname)
grp_align.hname = self.hname
grp_align.rname = copy.copy(self.rname)
grp_align.lid = dict(zip(grp_align.lname, np.arange(grp_align.num_loci)))
grp_align.rid = copy.copy(self.rid)
if reset:
grp_align.reset()
return grp_align
else:
raise RuntimeError('No group information is available for bundling.')
else:
raise RuntimeError('The matrix is not finalized.')
#
# Binary Operators
#
def __add__(self, other):
dmat = Sparse3DMatrix.__add__(self, other)
dmat.num_loci, dmat.num_haplotypes, dmat.num_reads = self.shape
dmat.__copy_names(self)
dmat.__copy_group_info(self)
return dmat
def __sub__(self, other):
dmat = Sparse3DMatrix.__sub__(self, other)
dmat.num_loci, dmat.num_haplotypes, dmat.num_reads = self.shape
dmat.__copy_names(self)
dmat.__copy_group_info(self)
return dmat
def __mul__(self, other):
dmat = Sparse3DMatrix.__mul__(self, other)
dmat.num_loci, dmat.num_haplotypes, dmat.num_reads = dmat.shape
if isinstance(other, (np.ndarray, csc_matrix, csr_matrix, coo_matrix, lil_matrix)):
dmat.hname = self.hname
dmat.rname = copy.copy(self.rname)
dmat.rid = copy.copy(self.rid)
dmat.num_groups = 0
else:
dmat.__copy_names(self)
dmat.__copy_group_info(self)
return dmat
#
# Helper functions
#
[docs] def sum(self, axis):
if self.finalized:
if axis == self.Axis.LOCUS:
sum_mat = [] # sum along loci
for hid in xrange(self.num_haplotypes):
sum_mat.append(self.data[hid].sum(axis=1).A)
sum_mat = np.hstack(sum_mat)
elif axis == self.Axis.HAPLOTYPE: # sum along haplotypes
sum_mat = self.data[0]
for hid in xrange(1, self.num_haplotypes):
sum_mat = sum_mat + self.data[hid] # Unlike others, this sum_mat is still sparse matrix
elif axis == self.Axis.READ: # sum along reads
sum_mat = []
for hid in xrange(self.num_haplotypes):
if self.count is None:
sum_hap = self.data[hid].sum(axis=0).A
else:
hap_mat = self.data[hid].copy()
hap_mat.data *= self.count[hap_mat.indices]
sum_hap = hap_mat.sum(axis=0).A
sum_mat.append(sum_hap)
sum_mat = np.vstack(sum_mat)
else:
raise RuntimeError('The axis should be 0, 1, or 2.')
return sum_mat
else:
raise RuntimeError('The original matrix must be finalized.')
[docs] def normalize_reads(self, axis, grouping_mat=None):
"""
Read-wise normalization
:param axis: The dimension along which we want to normalize values
:param grouping_mat: An incidence matrix that specifies which isoforms are from a same gene
:return: Nothing (as the method performs in-place operations)
:rtype: None
"""
if self.finalized:
if axis == self.Axis.LOCUS: # Locus-wise normalization on each read
normalizer = self.sum(axis=self.Axis.HAPLOTYPE) # Sparse matrix of |reads| x |loci|
normalizer.eliminate_zeros()
for hid in xrange(self.num_haplotypes):
self.data[hid].eliminate_zeros() # Trying to avoid numerical problem (inf or nan)
self.data[hid] = np.divide(self.data[hid], normalizer) # element-wise division
elif axis == self.Axis.HAPLOTYPE: # haplotype-wise normalization on each read
for hid in xrange(self.num_haplotypes):
normalizer = self.data[hid].sum(axis=self.Axis.HAPLOTYPE) # 1-dim Sparse matrix of |reads| x 1
normalizer = normalizer.A.flatten()
self.data[hid].data /= normalizer[self.data[hid].indices]
elif axis == self.Axis.READ: # normalization each read as a whole
sum_mat = self.sum(axis=self.Axis.LOCUS)
normalizer = sum_mat.sum(axis=self.Axis.HAPLOTYPE)
normalizer = normalizer.ravel()
for hid in xrange(self.num_haplotypes):
self.data[hid].data /= normalizer[self.data[hid].indices]
elif axis == self.Axis.GROUP: # group-wise normalization on each read
if grouping_mat is None:
raise RuntimeError('Group information matrix is missing.')
normalizer = self.sum(axis=self.Axis.HAPLOTYPE) * grouping_mat
for hid in xrange(self.num_haplotypes):
self.data[hid].eliminate_zeros() # Trying to avoid numerical problem (inf or nan)
self.data[hid] = np.divide(self.data[hid], normalizer)
elif axis == self.Axis.HAPLOGROUP: # haplotype-wise & group-wise normalization on each read
if grouping_mat is None:
raise RuntimeError('Group information matrix is missing.')
for hid in xrange(self.num_haplotypes): # normalizer is different hap-by-hap
normalizer = self.data[hid] * grouping_mat # Sparse matrix of |reads| x |loci|
self.data[hid].eliminate_zeros() # Trying to avoid numerical problem (inf or nan)
self.data[hid] = np.divide(self.data[hid], normalizer)
else:
raise RuntimeError('The axis should be 0, 1, 2, or 3.')
else:
raise RuntimeError('The original matrix must be finalized.')
[docs] def pull_alignments_from(self, reads_to_use, shallow=False):
"""
Pull out alignments of certain reads
:param reads_to_use: numpy array of dtype=bool specifying which reads to use
:param shallow: whether to copy sparse 3D matrix only or not
:return: a new AlignmentPropertyMatrix object that particular reads are
"""
new_alnmat = self.copy(shallow=shallow)
for hid in xrange(self.num_haplotypes):
hdata = new_alnmat.data[hid]
hdata.data *= reads_to_use[hdata.indices]
hdata.eliminate_zeros()
if new_alnmat.count is not None:
new_alnmat.count[np.logical_not(reads_to_use)] = 0
return new_alnmat
[docs] def get_unique_reads(self, ignore_haplotype=False, shallow=False):
"""
Pull out alignments of uniquely-aligning reads
:param ignore_haplotype: whether to regard allelic multiread as uniquely-aligning read
:param shallow: whether to copy sparse 3D matrix only or not
:return: a new AlignmentPropertyMatrix object that particular reads are
"""
if self.finalized:
if ignore_haplotype:
summat = self.sum(axis=self.Axis.HAPLOTYPE)
nnz_per_read = np.diff(summat.tocsr().indptr)
unique_reads = np.logical_and(nnz_per_read > 0, nnz_per_read < 2)
else: # allelic multireads should be removed
alncnt_per_read = self.sum(axis=self.Axis.LOCUS).sum(axis=self.Axis.HAPLOTYPE)
unique_reads = np.logical_and(alncnt_per_read > 0, alncnt_per_read < 2)
return self.pull_alignments_from(unique_reads, shallow=shallow)
else:
raise RuntimeError('The matrix is not finalized.')
[docs] def count_unique_reads(self, ignore_haplotype=False):
if self.finalized:
unique_reads = self.get_unique_reads(ignore_haplotype=ignore_haplotype, shallow=True)
if ignore_haplotype:
numaln_per_read = unique_reads.sum(axis=self.Axis.HAPLOTYPE)
if self.count is None:
numaln_per_read.data = np.ones(numaln_per_read.nnz)
else:
numaln_per_read.data = self.count[numaln_per_read.indices]
return numaln_per_read.sum(axis=0).A.ravel() # An array of size |num_loci|
else:
return unique_reads.sum(axis=self.Axis.READ) # An array of size |num_haplotypes|x|num_loci|
else:
raise RuntimeError('The matrix is not finalized.')
[docs] def count_alignments(self):
if self.finalized:
return self.sum(axis=self.Axis.READ)
else:
raise RuntimeError('The matrix is not finalized.')
[docs] def report_alignment_counts(self, filename):
alignment_counts = self.count_alignments()
allelic_unique_counts = self.count_unique_reads(ignore_haplotype=False)
locus_unique_counts = self.count_unique_reads(ignore_haplotype=True)
cntdata = np.vstack((alignment_counts, allelic_unique_counts))
cntdata = np.vstack((cntdata, locus_unique_counts))
fhout = open(filename, 'w')
fhout.write("locus\t" + "\t".join(['aln_%s' % h for h in self.hname]) + "\t")
fhout.write("\t".join(['uniq_%s' % h for h in self.hname]) + "\t")
fhout.write("locus_uniq" + "\n")
for locus_id in xrange(self.num_loci):
fhout.write("\t".join([self.lname[locus_id]] + map(str, cntdata[:, locus_id].ravel())) + "\n")
fhout.close()
[docs] def combine(self, other, shallow=False):
if self.finalized and other.finalized:
dmat = Sparse3DMatrix.combine(self, other)
dmat.num_loci, dmat.num_haplotypes, dmat.num_reads = dmat.shape
if self.count is not None and other.count is not None:
dmat.count = np.concatenate((self.count, other.count))
if not shallow:
dmat.hname = self.hname
dmat.lname = copy.copy(self.lname)
dmat.rname = np.concatenate((self.rname, other.rname))
dmat.lid = copy.copy(self.lid)
dmat.rid = dict(zip(dmat.rname, np.arange(dmat.num_reads)))
dmat.__copy_group_info(self)
return dmat
else:
raise RuntimeError('Both matrices must be finalized.')
[docs] def save(self, h5file, title=None, index_dtype='uint32', data_dtype=float, incidence_only=True, complib='zlib', shallow=False):
Sparse3DMatrix.save(self, h5file=h5file, title=title, index_dtype=index_dtype, data_dtype=data_dtype, incidence_only=incidence_only, complib=complib)
h5fh = tables.open_file(h5file, 'a')
fil = tables.Filters(complevel=1, complib=complib)
if self.count is not None:
h5fh.create_carray(h5fh.root, 'count', obj=self.count, title='Equivalence Class Counts', filters=fil)
if not shallow:
h5fh.set_node_attr(h5fh.root, 'hname', self.hname)
h5fh.create_carray(h5fh.root, 'lname', obj=self.lname, title='Locus Names', filters=fil)
if self.rname is not None:
h5fh.create_carray(h5fh.root, 'rname', obj=self.rname, title='Read Names', filters=fil)
h5fh.flush()
h5fh.close()
[docs] def get_read_data(self, rid):
return self.get_cross_section(index=rid, axis=self.Axis.READ)
[docs] def print_read(self, rid):
"""
Prints nonzero rows of the read wanted
"""
if self.rname is not None:
print self.rname[rid]
print '--'
r = self.get_read_data(rid)
aligned_loci = np.unique(r.nonzero()[1])
for locus in aligned_loci:
nzvec = r[:, locus].todense().transpose()[0].A.flatten()
if self.lname is not None:
print self.lname[locus],
else:
print locus,
print nzvec
#
# For future use
#
[docs] def get_reads_aligned_to_locus(self, lid, hid=None):
ridset = set()
if hid is None:
for hid in xrange(self.num_haplotypes):
curset = set(np.nonzero(self.data[hid][:, lid])[0])
ridset = ridset.union(curset)
return sorted(list(ridset))
else:
return sorted(np.nonzero(self.data[hid][:, lid])[0])
if __name__ == "__main__":
pass # TODO: Put a simple usage example here