Skip to content
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
import numpy as np
import pandas as pd
import numba
##################################################################################################################
# Load data
# Example
def load_example_cov_data():
cov_data = np.array([
[10, 1, 5, 4],
[1, 10, 2, 6],
[5, 2, 10, 3],
[4, 6, 3, 10],
])
assert np.all(cov_data == cov_data.transpose())
return cov_data
def load_example_conn_matrix():
conn_matrix = np.array([
[1, 1, 0, 1],
[1, 1, 1, 0],
[0, 1, 1, 1],
[1, 0, 1, 1],
], dtype=np.bool)
assert np.all(conn_matrix == conn_matrix.transpose())
return conn_matrix
# Cytometry
protein_names = np.array([
"Raf",
"Mek",
"Plcg",
"PIP2",
"PIP3",
'Erk', # "p44/42",
'Akt', # "pakts473",
"PKA",
"PKC",
"P38",
"Jnk"
])
def load_cytometry_data():
data = pd.read_csv('sachs.data.txt', delim_whitespace=True, names=protein_names)
return data
def load_cytometry_cov_data():
cov_data = pd.read_csv('sachs.covmatrix.txt', delim_whitespace=True, names=protein_names)
cov_data.index = protein_names
return cov_data
def bidirectional_edges(edges):
opposite = edges.copy()
opposite.columns = opposite.columns[::-1]
return pd.concat([edges.sort_index(axis=1), opposite.sort_index(axis=1)], axis=0)
def edges_to_conn_matrix(names, edges):
edges = edges.copy()
edges['conn'] = np.ones(len(edges), dtype=np.bool)
conns = edges.set_index(['a', 'b'])['conn']
conns = conns.unstack().reindex(names, axis=0).reindex(names, axis=1).fillna(False)
return conns
def load_cytometry_conn_matrix():
edges = pd.DataFrame([
['Raf', 'Mek'],
['Mek', 'P38'],
['Mek', 'PKA'],
['Mek', 'Akt'],
['Mek', 'PIP2'],
['Plcg', 'P38'],
['Plcg', 'PKA'],
['Plcg', 'PIP2'],
['PIP2', 'Jnk'],
['PIP2', 'P38'],
['PIP2', 'PKA'],
['PIP2', 'Akt'],
['Akt', 'P38'],
['PKA', 'P38'],
['PKC', 'P38'],
['P38', 'Jnk'],
], columns=['a', 'b'])
edges = bidirectional_edges(edges)
conn_matrix = edges_to_conn_matrix(protein_names, edges)
assert np.all(conn_matrix == conn_matrix.transpose())
return conn_matrix
##################################################################################################################
# Estimate edge parameters when structure known
@numba.jit
def estimate_edge_parameters(cov_data, conn_matrix):
p = cov_data.shape[0]
# W
cov_estimate = cov_data.copy().astype(np.float)
converged = False
while not converged:
prev_cov_estimate = cov_estimate.copy()
for j in range(p):
# w 11
cov_estimate_rest, _, _ = split_parts(cov_estimate, j)
# s 12
_, cov_data_target, _ = split_parts(cov_data, j)
_, conn_target, _ = split_parts(conn_matrix, j)
# w 11 star
w_rest_connected = drop_row_and_col_by_mask(cov_estimate_rest, conn_target)
# beta star
regression_coef_connected = np.linalg.inv(w_rest_connected) @ cov_data_target[conn_target]
# beta hat
regression_coef = np.zeros(len(conn_target), dtype=np.float)
regression_coef[conn_target] = regression_coef_connected
# w 11 start
new_cov_estimate_target = cov_estimate_rest @ regression_coef
# we don't want to alter element jj
new_cov_estimate_target = np.insert(new_cov_estimate_target, j, cov_estimate[j, j])
cov_estimate[j, :] = new_cov_estimate_target
cov_estimate[:, j] = new_cov_estimate_target
converged = np.all(np.isclose(prev_cov_estimate, cov_estimate))
return cov_estimate
@numba.jit
def drop_row_and_col(a, i):
return np.delete(np.delete(a, i, axis=0), i, axis=1)
@numba.jit
def split_parts(a, i):
"""
:return:
11, 12, 22
"""
return (
drop_row_and_col(a, i),
np.delete(a[i, :], i),
a[i, i],
)
@numba.jit
def drop_row_and_col_by_mask(a, mask):
return a[mask].T[mask].T
##################################################################################################################
# Graphical lasso
@numba.jit
def estimate_graphical_lasso(empirical_covariance, reg_param):
p = empirical_covariance.shape[0]
W = empirical_covariance.copy().astype(np.float) + reg_param * np.identity(len(empirical_covariance))
converged = False
while not converged:
prev_W = W.copy()
for j in range(p):
w_11, w_12, w_22 = split_parts(W, j)
s_11, s_12, s_22 = split_parts(empirical_covariance, j)
beta_hat = cyclical_coordinate_descent(w_11, s_12, reg_param, p)
new_w_12 = w_11 @ beta_hat
new_w_12 = np.insert(new_w_12, j, W[j, j]) # we don't want to alter element jj
W[j, :] = new_w_12
W[:, j] = new_w_12
converged = np.all(np.isclose(prev_W, W))
return W
@numba.jit
def cyclical_coordinate_descent(v, s_12, reg_param, p):
beta_hat = np.zeros(s_12.shape, dtype=np.float)
converged = False
while not converged:
prev_beta_hat = beta_hat.copy()
for j in range(p - 1):
vs = mult_v_beta_hat(v, beta_hat, j, p)
x = s_12[j] - sum(vs)
beta_hat[j] = soft_threshold(x, reg_param) / v[j, j]
converged = np.all(np.isclose(prev_beta_hat, beta_hat))
return beta_hat
@numba.jit
def mult_v_beta_hat(v, beta_hat, j, p): # had to split this out due to numba bug
return [
v[k, j] * beta_hat[k]
for k in range(p - 1)
if k != j
]
@numba.jit
def soft_threshold(x, t):
return np.sign(x) * np.maximum(np.abs(x) - t, 0)
##################################################################################################################
# Other
def corr_from_cov(cov):
stds = np.sqrt(np.diag(cov))
corr = cov / np.outer(stds, stds)
return corr
if __name__ == '__main__':
pass