Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
ipd_extended/interaction_distance.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
96 lines (74 sloc)
3.24 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import numpy as np | |
import uds | |
from constants import CorrelationMeasure, ID_THRESHOLD_QUANTILE, ID_SLIDING_WINDOW | |
def compute_IDs(bin_map, curr, data, dist_bins, dim_maxes): | |
data_wo_curr = data.copy() | |
data_wo_curr.pop(curr) # todo slow? | |
return _compute_IDs(bin_map, data_wo_curr, dim_maxes, dist_bins) | |
def compute_IDs_extended(bin_map, curr, data, dist_bins, dim_maxes, cor_measure, k): | |
if cor_measure == CorrelationMeasure.UDS: | |
subspace = uds.find_correlated_subspace(data, curr, k) | |
else: | |
ValueError('No implementation!') | |
data = data.copy().loc[:, subspace] | |
return _compute_IDs(bin_map, data, dim_maxes, dist_bins) | |
def _compute_IDs(bin_map, data, dim_maxes, dist_bins): | |
inner_bin_measures = [] | |
inter_bin_measures = [] | |
for bin_id, binn in enumerate(dist_bins): | |
bin_data = data.loc[bin_map == binn] | |
points_count = bin_data.shape[0] | |
prev_bin_data = None | |
inter_prod_matrix = None | |
prev_points_count = None | |
if bin_id > 0: | |
prev_bin_data = data.loc[bin_map == dist_bins[bin_id - 1]] | |
prev_points_count = prev_bin_data.shape[0] | |
inter_prod_matrix = np.ones([points_count, prev_points_count]) | |
inner_prod_matrix = np.ones([points_count, points_count]) | |
# product elements for each dimension | |
for dim in bin_data: | |
inner_elem = compute_ID_elem(bin_data[dim], bin_data[dim], dim_maxes[dim]) | |
inner_prod_matrix = np.multiply(inner_prod_matrix, inner_elem) | |
if bin_id > 0: | |
inter_elem = compute_ID_elem(bin_data[dim], prev_bin_data[dim], dim_maxes[dim]) | |
inter_prod_matrix = np.multiply(inter_prod_matrix, inter_elem) | |
inner_bin_measures.append(np.sum(inner_prod_matrix) / points_count ** 2) | |
if bin_id > 0: | |
inter_bin_measures.append(2 * np.sum(inter_prod_matrix) / (points_count * prev_points_count)) | |
IDs = [] | |
for c, inter_measure in enumerate(inter_bin_measures): | |
IDs.append(inner_bin_measures[c] - inter_measure + inner_bin_measures[c + 1]) | |
IDs = np.array(IDs) | |
return IDs | |
def compute_ID_elem(bin1, bin2, dim_max): | |
points_count1 = bin1.shape[0] | |
points_count2 = bin2.shape[0] | |
# max_i array | |
max_array = np.ones([points_count1, points_count2]) | |
max_array.fill(dim_max) | |
# max_i - max(R^i_{j_1}, R^i_{j_2}) | |
outer_max = np.maximum.outer(bin1, np.transpose(bin2)) | |
return max_array - outer_max | |
def compute_ID_threshold(IDs): | |
IDs = IDs.copy() | |
IDs.sort() | |
# similar to original ipd (but possibly wrong) todo | |
return IDs[math.ceil(int(len(IDs) * ID_THRESHOLD_QUANTILE)) - 1] | |
# return IDs[int(len(IDs) * ID_THRESHOLD_QUANTILE)] | |
def compute_max_ID_threshold(IDs): | |
IDs = IDs.copy() | |
IDs.sort() | |
return max(IDs) | |
def compute_sliding_count(IDs, ID_threshold): | |
count = [] | |
avg = sum(IDs) / len(IDs) | |
for i in range(ID_SLIDING_WINDOW, len(IDs)): | |
start = i - ID_SLIDING_WINDOW if i > ID_SLIDING_WINDOW else 0 | |
count.append(sum(1 for id in IDs[start: i] if id > ID_threshold)) | |
# i = 0 | |
# while i < len(IDs): | |
# count.append(sum(1 for id in IDs[i: i + ID_SLIDING_WINDOW] if id > ID_threshold)) | |
# i += ID_SLIDING_WINDOW | |
return count |