From 25858984cf8143e48417c6bd5e43b3dad40cce52 Mon Sep 17 00:00:00 2001 From: Tatiana Dembelova Date: Thu, 12 Oct 2017 23:52:54 +0200 Subject: [PATCH] concurrent execution of main.py --- commands.txt | 2 + constants.py | 42 ++- data_generator.py | 76 +++--- experiments_logging.py | 62 +++-- main.py | 575 ++++++++++++++++++++++++++++------------- runExperiment.py | 217 +++++++++------- util.py | 13 +- 7 files changed, 633 insertions(+), 354 deletions(-) diff --git a/commands.txt b/commands.txt index 88f7bbd..9ee0fef 100644 --- a/commands.txt +++ b/commands.txt @@ -43,3 +43,5 @@ for pid in $(ps aux | grep 'python' | grep -v grep | grep -v USER | awk '{print for f in *_*_T*.csv; do mv $f "${f/*_*_T/T}" ;done for f in *_*_CJS*.csv; do mv $f "${f/*_*_C/C}" ;done for f in *_*_CJS*.csv; do echo mv $f "${f/*_*_C/C}" ;done + +rsync -av --exclude 'data*' /Users/tatyanadembelova/Documents/study/thesis/code-fic/ tdembelo@push.mmci.uni-saarland.de:/home/tdembelo/code-fic/ \ No newline at end of file diff --git a/constants.py b/constants.py index 6f2eb1e..341bc46 100644 --- a/constants.py +++ b/constants.py @@ -1,15 +1,21 @@ from enum import Enum +import socket class Method(Enum): PERFECT = 8 TRIVIAL = 0 - ORIGINAL = 1 - GREEDY_TOPK = 2 - HET_GREEDY_TOPK = 3 - BEST_FIRST = 4 - BEAM_SEARCH = 5 - HET_BEAM_SEARCH = 6 - PREDEFINED = 7 + # ORIGINAL = 1 #full ipd + SM_GREEDY_TOPK = 2 + SM_HET_GREEDY_TOPK = 3 + SM_BEST_FIRST = 4 + SM_BEAM_SEARCH = 5 + SM_HET_BEAM_SEARCH = 6 + # PREDEFINED = 7 #subspaces up to optimal + + PREDEFINED_SUBSPACESETS = 9 + PREDEFINED_OPTIMAL_SUBSPACESET = 10 + PREDEFINED_OPTIMAL_SUBSPACESET_AND_IRRELEVANT = 11 + FULL = 11 class CorrelationMeasure(Enum): @@ -23,7 +29,7 @@ class DistanceMeasure(Enum): CJS = 2 -ID_THRESHOLD_QUANTILE = 0.80 +ID_THRESHOLD_QUANTILE = 0.3 ID_SLIDING_WINDOW = 40 NORMALIZATION_RADIUS = 1 @@ -41,7 +47,19 @@ class DistanceMeasure(Enum): CLUMP = 2 MAXMAX = 5 -SLIM_DATA_DIR = "/Users/tatyanadembelova/Documents/study/thesis/code-fic/data/" -SLIM_BIN = "/Users/tatyanadembelova/Documents/study/thesis/code-fic/branches/slim/trunk/fic" -SLIM_COMPRESS_CONF = "/Users/tatyanadembelova/Documents/study/thesis/code-fic/branches/slim/trunk/compress.conf" -SLIM_CONVERT_CONF = "/Users/tatyanadembelova/Documents/study/thesis/code-fic/branches/slim/trunk/convertdb.conf" \ No newline at end of file +SUBSPACE_SET_STEP = 2 + +# todo change later +IRRELEVANT_FEATURES = 3 + +BASE = '/local/tmp/ipd_extended_experiments2/' if socket.gethostname() == 'push' \ + else '/Users/tatyanadembelova/Documents/study/thesis/ipd_extended/' +DATA_DIR = BASE + 'new_cubes/' +PERFECT_DISCRETIZATIONS_DIR = BASE + 'ideal_disc/' +PERFECT_SUBSPACES_JSON = BASE + 'ideal_subspaces.json' + +SLIM_BASE = ("/Users/tatyanadembelova/Documents/study/thesis/" if socket.gethostname() != 'push' else BASE) + "code-fic/" +SLIM_DATA_DIR = SLIM_BASE + "data/" +SLIM_BIN = SLIM_BASE + "branches/slim/trunk/fic" +SLIM_COMPRESS_CONF = SLIM_BASE + "branches/slim/trunk/compress.conf" +SLIM_CONVERT_CONF = SLIM_BASE + "branches/slim/trunk/convertdb.conf" \ No newline at end of file diff --git a/data_generator.py b/data_generator.py index 254fd89..7d06e9e 100644 --- a/data_generator.py +++ b/data_generator.py @@ -4,14 +4,14 @@ import time import os import json +import constants as cst import experiments_logging as l +import socket RADIUS = 2 CUBE_WIDTH = 1 ROWS = 6000 OVERLAP_PROBABILITY = 0.6 -# BASE = '/Users/tatyanadembelova/Documents/study/thesis/ipd_extended/' -BASE = '/local/tmp/ipd_extended_experiments/' class CubeParameters: def __init__(self, rows, loc=None): @@ -148,46 +148,42 @@ def produce_data_generator(rf, irf, c, type, name): def produce_all_data_generators(): data_generators = [] global basedir + basedir = cst.DATA_DIR + if not os.path.exists(basedir): + os.mkdir(basedir) + perf_disc_dir = cst.PERFECT_DISCRETIZATIONS_DIR + if not os.path.exists(perf_disc_dir): + os.mkdir(perf_disc_dir) + perf_subspaces_file = cst.PERFECT_SUBSPACES_JSON - basedir = BASE + 'new_cubes/' - perf_disc_dir = BASE + 'ideal_disc/' - perf_subspaces_file = BASE + 'ideal_subspaces.json' perf_subspaces = dict() perf_discs = dict() - ## relevant features 2 - 30 - # for rf in range(10, 11): - # # irrelevant features 0 - 100: - # for irf in range(100, 101): - # # cubes 1 - 10 - # for c in range(3, 4): - # # cube types complete, incomplete, incomplete overlapping - # for type in ['i']: + # relevant features 2 - 30 + for rf in range(2, 3): + # cubes 1 - 10 + for c in range(3, 4): + # cube types complete, incomplete, incomplete overlapping + for type in ['c']: # relevant features 2 - 30 - for rf in range(2, 31): - # irrelevant features 0 - 100: - for irf in range(101): - # cubes 1 - 10 - for c in range(1, 11): - # cube types complete, incomplete, incomplete overlapping - for type in ['c', 'i', 'io']: - if c == 1 and type != 'c': - continue - if rf / c < 2 and type != 'c': - # if not (rf / c < 2 and type == 'c'): - continue - name = 'cubes_' + '{0:02d}'.format(rf) + '_' \ - + '{0:03d}'.format(irf) + '_' \ - + '{0:02d}'.format(c) + '_' \ - + type + '.csv' - # if os.path.exists(basedir + name) and os.path.exists( - # perf_disc_dir + 'cut_' + name.replace('csv', 'txt')): - # continue - - dg = produce_data_generator(rf, irf, c, type, name) - perf_discs[name] = dg.get_discs() - perf_subspaces[name] = dg.get_subspaces() - data_generators.append(dg) + # for rf in range(2, 31): + # # cubes 1 - 10 + # for c in range(1, 11): + # # cube types complete, incomplete, incomplete overlapping + # for type in ['c', 'i', 'io']: + if (c == 1 or rf / c < 2) and type != 'c': + continue + name = 'cubes_' + '{0:02d}'.format(rf) + '_' \ + + '{0:02d}'.format(c) + '_' \ + + type + '.csv' + # if os.path.exists(basedir + name) and os.path.exists( + # perf_disc_dir + 'cut_' + name.replace('csv', 'txt')): + # continue + + dg = produce_data_generator(rf, cst.IRRELEVANT_FEATURES, c, type, name) + perf_discs[name] = dg.get_discs() + perf_subspaces[name] = dg.get_subspaces() + data_generators.append(dg) for name in perf_discs: write_cut_file(perf_disc_dir + 'cut_' + name.replace('csv', 'txt'), perf_discs[name]) with open(perf_subspaces_file, 'w') as psf: @@ -211,4 +207,8 @@ def store(data): if __name__ == '__main__': - print(generate_overlap_partition(7, 3)) + # print(generate_overlap_partition(7, 3)) + generators = produce_all_data_generators() + for g in generators: + + store(g.build()) diff --git a/experiments_logging.py b/experiments_logging.py index 194462c..b3762fd 100644 --- a/experiments_logging.py +++ b/experiments_logging.py @@ -97,31 +97,43 @@ def save_plot_data_2d(f, data): plt.clf() -def write_out_file(problem, name, disc_intervals, disc_points, class_labels): - with open(name, 'w') as out: - out.write('@relation ' + util.get_escaped_name(problem) + "\n\n") - counter = [1] - for i in range(len(disc_intervals)): - out.write( - '@attribute dim' + str(i) + ' {' + ','.join([str(j + counter[-1]) for j in disc_intervals[i]]) + '}\n') - counter.append(counter[-1] + len(disc_intervals[i])) - out.write('@attribute class {' + ','.join(['"' + str(i) + '"' for i in class_labels.unique()]) + '}\n\n') - out.write('@data\n') - - for i in range(len(disc_points[0])): - for j in range(len(disc_points)): - out.write(str(disc_points[j][i] + counter[j])) - out.write(',') - out.write('"' + str(class_labels[i]) + '"\n') - - -def write_cut_file(name, disc_intervals): - with open(name, 'w') as out: - for i in range(len(disc_intervals)): - out.write('dimension ' + str(i) + ' (' + str(len(disc_intervals[i])) + ' bins)\n') - for bin in disc_intervals[i]: - out.write(str(disc_intervals[i][bin][1]) + '\n') - out.write('-------------------------------------\n') +def write_out_file(problem, disc_intervals, disc_points, class_labels): + lines = ['@relation ' + util.get_escaped_name(problem) + "\n\n"] + counter = [1] + for i in range(len(disc_intervals)): + lines.append( + '@attribute dim' + str(i) + ' {' + ','.join([str(j + counter[-1]) for j in disc_intervals[i]]) + '}\n') + counter.append(counter[-1] + len(disc_intervals[i])) + lines.append('@attribute class {' + ','.join(['"' + str(i) + '"' for i in class_labels.unique()]) + '}\n\n') + lines.append('@data\n') + + for i in range(len(disc_points[0])): + for j in range(len(disc_points)): + lines.append(str(disc_points[j][i] + counter[j])) + lines.append(',') + lines.append('"' + str(class_labels[i]) + '"\n') + return lines + + +def write_outdat_file(disc_intervals, disc_points, class_labels, relevant_features): + lines = [] + counter = [1] + for i in range(len(disc_intervals)): + counter.append(counter[-1] + len(disc_intervals[i])) + for i in range(len(disc_points[0])): + line = ' '.join([str(disc_points[j][i] + counter[j]) for j in range(relevant_features)]) + lines.append(line + " " + str(class_labels[i]) + '\n') + return lines + + +def write_cut_file(disc_intervals): + lines = [] + for i in range(len(disc_intervals)): + lines.append('dimension ' + str(i) + ' (' + str(len(disc_intervals[i])) + ' bins)\n') + for bin in disc_intervals[i]: + lines.append(str(disc_intervals[i][bin][1]) + '\n') + lines.append('-------------------------------------\n') + return lines if __name__ == '__main__': diff --git a/main.py b/main.py index 5f5e622..6bd9931 100644 --- a/main.py +++ b/main.py @@ -18,10 +18,13 @@ import subspace_mining as sm import util from correlation_measures.binning import Binning -from experiments_logging import write_out_file, write_cut_file +import experiments_logging as el from merging import dynamic_merging import cjs import discretization_quality_measure as dq +import json +import random +import traceback # ------------------------------------------------------ @@ -35,9 +38,7 @@ def find_disc_macro_id(disc_macro_intervals, point): def write(log, *args): join = ' '.join([str(a) for a in args]) - if not log: - print(join) - else: + if log: log.write(join) log.write('\n') @@ -72,12 +73,12 @@ def compute_distances(bin_map, curr, data, dim_maxes, k=cst.MAX_SUBSPACE_SIZE, delta=cst.HETEROGENEOUS_THRESHOLD, beam_width=cst.BEAM_WIDTH, - subspace=None, + subspace_map=None, log=None): - if method == cst.Method.ORIGINAL: - return id.compute_IDs(bin_map, curr, data, dim_maxes) if distance_measure == cst.DistanceMeasure.ID \ - else cjs.compute_CJSs(bin_map, curr, data, dim_maxes) - if method is not cst.Method.PREDEFINED: + if method == cst.Method.FULL: + return (id.compute_IDs(bin_map, curr, data, dim_maxes) if distance_measure == cst.DistanceMeasure.ID + else cjs.compute_CJSs(bin_map, curr, data, dim_maxes)), 0 + if method.name.startswith("SM"): subspace_mining_start = time.time() if method == cst.Method.GREEDY_TOPK: curr_subspace = sm.greedy_topk(data, curr, k, cor_measure) @@ -92,24 +93,25 @@ def compute_distances(bin_map, curr, data, dim_maxes, else: raise ValueError("there is no such method!") subspace_mining_end = time.time() - write(log, 'subspace mining runtime:', subspace_mining_end - subspace_mining_start, 'seconds') + sm_runtime = subspace_mining_end - subspace_mining_start + write(log, 'subspace mining runtime:', sm_runtime, 'seconds') else: - if curr in subspace: - curr_subspace = subspace.copy() - curr_subspace.remove(curr) + sm_runtime = 0 + if curr in subspace_map: + curr_subspace = subspace_map[curr] else: curr_subspace = [] data = data.copy().loc[:, curr_subspace] dim_maxes = dim_maxes[curr_subspace] - return id.compute_IDs1(bin_map, data, dim_maxes) if distance_measure == cst.DistanceMeasure.ID \ - else cjs.compute_CJSs1(bin_map, data, dim_maxes) + return (id.compute_IDs1(bin_map, data, dim_maxes) if distance_measure == cst.DistanceMeasure.ID + else cjs.compute_CJSs1(bin_map, data, dim_maxes)), sm_runtime -def compute_optimal_discretization(data, method=cst.Method.ORIGINAL, cor_measure=None, - distance_measure=cst.DistanceMeasure.ID, - subspace=None, - log=None): +def compute_IPD(data, rel_features_count, method=cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET, cor_measure=None, + distance_measure=cst.DistanceMeasure.ID, + subspace_set=None, + log=None): start = time.time() dim_count = data.shape[1] @@ -134,8 +136,10 @@ def compute_optimal_discretization(data, method=cst.Method.ORIGINAL, cor_measure disc_macro_intervals = [] disc_points = [] + subspace_map = get_map_from_subspace_set(subspace_set) distancez = [] # iterate over all the dimensions + full_sm_runtime = 0 for curr in range(dim_count): binning = Binning(norm_data, curr, init_bins_count) bin_map = binning.equal_frequency_binning_by_rank() @@ -143,8 +147,8 @@ def compute_optimal_discretization(data, method=cst.Method.ORIGINAL, cor_measure # -----------------------------INTERACTION DISTANCES---------------------------------- - distances = compute_distances(bin_map, curr, norm_data, dim_maxes, cor_measure, method, - distance_measure, subspace=subspace, log=log) + distances, sm_runtime = compute_distances(bin_map, curr, norm_data, dim_maxes, cor_measure, method, + distance_measure, subspace_map=subspace_map, log=log) # todo python361 # distancez.append([[data.loc[rank_data[rank_data[curr] == math.floor(dist_bins[i].right)].index.tolist()[0], curr] for i in # range(len(distances))], distances]) @@ -153,6 +157,7 @@ def compute_optimal_discretization(data, method=cst.Method.ORIGINAL, cor_measure == math.floor(float(re.search(', (-*\d+\.*\d*e*[+-]*\d*)', dist_bins[i]).group(1)))] .index.tolist()[0], curr] for i in range(len(distances))], distances]) + full_sm_runtime += sm_runtime ID_threshold = id.compute_ID_threshold(distances) # todo ext compute sliding average and count ID peaks above the avg (in a sliding window) @@ -197,8 +202,10 @@ def compute_optimal_discretization(data, method=cst.Method.ORIGINAL, cor_measure disc_macro_intervals.append(curr_macro_intervals) disc_points.append(curr_macro_points) end = time.time() - write(log, 'full runtime:', end - start, 'seconds') - return disc_macro_intervals, disc_points, distancez + runtime = end - start + write(log, 'full runtime:', runtime, 'seconds') + # todo sm_runtime! + return disc_macro_intervals, disc_points, distancez, init_bins_count, runtime, full_sm_runtime def compute_trivial_discretization(data, log=None): @@ -239,8 +246,9 @@ def compute_trivial_discretization(data, log=None): disc_macro_intervals.append(curr_macro_intervals) disc_points.append(curr_macro_points) end = time.time() - write(log, end - start, 'seconds') - return disc_macro_intervals, disc_points, distancez + runtime = end - start + write(log, runtime, 'seconds') + return disc_macro_intervals, disc_points, distancez, init_bins_count, runtime def read_discretization(disc_file): @@ -285,7 +293,7 @@ def compute_perfect_discretization(problem, data, log=None): disc_points.append(macro_points) end = time.time() write(log, end - start, 'seconds') - return all_intervals, disc_points + return all_intervals, disc_points, end - start def get_discretized_points(curr, data, discretization, dist_bins, rank_data): @@ -340,109 +348,123 @@ def _compute_subspaces(dims, sets): sets.extend(new_set) -def compute_subspaces(data_file_name): - dims_count = util.parse_relevant_features(data_file_name) - if dims_count == 2: - return [[0, 1]] - - dims = [i for i in range(dims_count)] - - sets = [] - - _compute_subspaces(dims, sets) - - result = [s for s in sets if len(s) > 1] - - return result - - -if __name__ == "__main__": - print(str(compute_subspaces("/Users/tatyanadembelova/Documents/study/thesis/ipd_extended/synthetic_cases/cubes/4d_3_cubes_xor.csv"))) - exit(0) - if len(sys.argv) == 1: - print( - 'Usage: main.py ' - '-b= ' - '-f= ' - '-d= ' - '-c= ' - '-m=<[original|greedy_topk|trivial|...]> ' - '-cor=<[uds]> ' - '-dist=<[id, cjs]> ' - '-t= ' - '-s[=] ' - '-r= ') - command = '-b=logs -f=synthetic_cases/synthetic_3d_parity_problem.csv -d=; -dist=ID' - print('Running default: ', command) - command_list = command.split(' ') - else: - command_list = sys.argv[1:] - - file_arg = list(filter(lambda x: x.startswith("-f="), command_list)) - if not file_arg: - raise ValueError('No data file provided!') - base_dir_arg = list(filter(lambda x: x.startswith("-b="), command_list)) - if not base_dir_arg: - raise ValueError('No logs base dir provided!') - notime = len(list(filter(lambda x: x.startswith("-notime"), command_list))) != 0 - delim_arg = list(filter(lambda x: x.startswith("-d="), command_list)) - columns_arg = list(filter(lambda x: x.startswith("-c="), command_list)) - rows_arg = list(filter(lambda x: x.startswith("-r="), command_list)) - method_arg = list(filter(lambda x: x.startswith("-m="), command_list)) - corr_measure_arg = list(filter(lambda x: x.startswith("-cor="), command_list)) - distance_measure_arg = list(filter(lambda x: x.startswith("-dist="), command_list)) - threshold_arg = list(filter(lambda x: x.startswith("-t="), command_list)) - subspace_arg = list(filter(lambda x: x.startswith("-s"), command_list)) - - data_file = file_arg[0].replace('-f=', '') - # defining prefix for the output files - data_file_name = util.get_file_name(data_file) - base_dir = base_dir_arg[0].replace('-b=', '') - - if delim_arg: - delimiter = delim_arg[0].replace('-d=', '') - else: - print('using default delimiter ;') - delimiter = ';' - columns = int(columns_arg[0].replace('-c=', '')) if columns_arg else None - rows = int(rows_arg[0].replace('-r=', '')) if rows_arg else None - if method_arg: - method = cst.Method[method_arg[0].replace('-m=', '').upper()] - else: - print('using default method ORIGINAL') - method = cst.Method.ORIGINAL - - dims_count = util.parse_relevant_features(data_file, delimiter) - subspaces = [[]] - if subspace_arg: - if subspace_arg[0].startswith('-s='): - if notime: - raise ValueError('A danger of overwriting of previously run experiment!') - subspaces = [[int(s) for s in subspace_arg[0].replace('-s=', '').split(',')]] - else: - subspaces = compute_subspaces(data_file_name) - - if method == cst.Method.PREDEFINED and len(subspaces[0]) == 0: - raise ValueError("no subspace is defined!") - if len(subspaces[0]) > 0 and method is not cst.Method.PREDEFINED: - raise ValueError("Are you sure you mean predefined subspace method?") - - cor_measure = cst.CorrelationMeasure[corr_measure_arg[0].replace('-cor=', '').upper()] if corr_measure_arg \ - else None - if method is not cst.Method.ORIGINAL and method is not cst.Method.TRIVIAL and method is not cst.Method.PERFECT and method is not cst.Method.PREDEFINED and cor_measure is None: - raise ValueError('A correlation measure should be given!') - if distance_measure_arg: - distance_measure = cst.DistanceMeasure[distance_measure_arg[0].replace('-dist=', '').upper()] - else: - print('using default distance measure ID') - distance_measure = cst.DistanceMeasure.ID - if threshold_arg: - cst.ID_THRESHOLD_QUANTILE = float(threshold_arg[0].replace('-t=', '')) +# # todo return list of dictionaries +# def compute_subspace_sets(data_file_name, method): +# dims_count = util.parse_relevant_features(data_file_name) +# if dims_count == 2: +# return [[0, 1]] +# +# dims = [i for i in range(dims_count)] +# +# sets = [] +# +# _compute_subspaces(dims, sets) +# +# result = [s for s in sets if len(s) > 1] +# +# return result + +ideal = None +with open(cst.PERFECT_SUBSPACES_JSON, "r") as f: + ideal = json.load(f) + + +# todo return list of dictionaries +def get_ideal_subspace_set(data_file_name): + # todo naive implementation + return ideal.get(data_file_name) + + +def get_map_from_subspace_set(subspace_set): + map = dict() + for subspace in subspace_set: + for dim in subspace: + if dim not in map: + map[dim] = set() + map[dim] = map[dim].union(set(subspace) - {dim}) + return map + + +def compute_subspace_sets(data_file_name, method): + rel_features = util.parse_relevant_features(data_file_name) + ideal_subspace_set = get_ideal_subspace_set(data_file_name) + + if method is cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET \ + or method is cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET_AND_IRRELEVANT: + # ideal_map = get_map_from_subspace_set(ideal_subspace_set, rel_features) + + if method is cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET: + return [ideal_subspace_set] + + if method is cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET_AND_IRRELEVANT: + redundant_subspace_sets = [] + for irr in range(rel_features + 1, cst.IRRELEVANT_FEATURES + rel_features + 1, cst.SUBSPACE_SET_STEP): + rss = [ideal_subspace + [rf for rf in range(rel_features, irr)] for ideal_subspace in + ideal_subspace_set] + redundant_subspace_sets.append(rss) + return redundant_subspace_sets + + elif method is cst.Method.PREDEFINED_SUBSPACESETS: + subspace_sets = [] + init_subset = [] + dim_map = {i: [] for i in range(rel_features)} + dims = [] + # if the ideal_subspace_set is already minimal there are no other subspace sets + if sum([len(s) == 2 for s in ideal_subspace_set]) == len(ideal_subspace_set): + return subspace_sets + + for e, ideal_subspace in enumerate(ideal_subspace_set): + # every subspace consists of 2 dims + init_subset.append(ideal_subspace[:2]) + # dims left for considering + for i in ideal_subspace[2:]: + dim_map[i].append(e) + dims.append(i) + subspace_sets.append(init_subset) + + last = init_subset + # 2 is minimal number of interacting dimensions + for i in range(rel_features - len(ideal_subspace_set) * 2 - 1): + d = random.choice(dims) + dims.remove(d) + subspace = dim_map[d][0] + if len(dim_map[d]) > 1: + dim_map[d].pop() + else: + del dim_map[d] + subset = [ss.copy() for ss in last] + subset[subspace].append(d) + if i % cst.SUBSPACE_SET_STEP == 0 or i == rel_features - len(ideal_subspace_set) * 2 - 1: + subspace_sets.append(subset) + last = subset + return subspace_sets else: - print('using default ID_THRESHOLD_QUANTILE = ', str(cst.ID_THRESHOLD_QUANTILE)) + raise ValueError("wrong method!") + + +def execute(param, loader=None): + assert type(param) == RunParams + base_dir = param.base_dir + experiment_name = param.experiment_name + method = param.method + data_file = param.data_file + delim = param.delim + rows = param.rows + columns = param.columns + subspace_set = param.subspace_set + distance_measure = param.distance_measure + cor_measure = param.cor_measure + + # todo should not change the constant! fix later + # threshold = param.threshold + # cst.ID_THRESHOLD_QUANTILE = threshold # reading data from the file with delimiter and NaN values as "?" - data = pd.read_csv(data_file, delimiter=delimiter, header=None, na_values='?') + data = None + if loader == None: + data = pd.read_csv(data_file, delimiter=delim, header=None, na_values='?') + else: + data = loader.load_dataset(data_file, delim) # drop a data point if it contains inconsistent data data = data.dropna(axis=0, how='any') @@ -455,57 +477,254 @@ def compute_subspaces(data_file_name): class_labels = data.pop(data.shape[1] - 1) + print('executing', experiment_name) + # log_file = dir + "log.txt" + + try: + relevant_features = util.parse_relevant_features(experiment_name) + # with open(log_file, 'w') as log: + if method is cst.Method.TRIVIAL: + disc_intervals, disc_points, distances, init_bins_count, runtime = compute_trivial_discretization(data) + sm_runtime = 0 + elif method is cst.Method.PERFECT: + disc_intervals, disc_points, runtime = compute_perfect_discretization( + re.search('(.+?_.+?_.+?_.+?)_', experiment_name).group(1), data) + sm_runtime = 0 + init_bins_count = 0 + else: + # if subspace_set: + # write(log, 'subspace set:', str(subspace_set)) + disc_intervals, disc_points, distances, init_bins_count, runtime, sm_runtime = compute_IPD(data, + relevant_features, + method, + cor_measure, + distance_measure, + subspace_set) + + # plot_distances(dir, distances, disc_intervals) + # output file for classification measurements + outdat_file_content = el.write_outdat_file(disc_intervals, disc_points, class_labels, relevant_features) + # output file for compression measurements + # slim_dat_content = dq.prepare_slim_dat(base_dir, experiment_name) + + cut_file_content = el.write_cut_file(disc_intervals) + return Result(base_dir, experiment_name, outdat_file_content, cut_file_content, runtime, sm_runtime, + init_bins_count) + except: + print("Error in " + experiment_name + ":", sys.exc_info()[0], sys.exc_info()[1]) + traceback.print_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2], + limit=2, file=sys.stdout) + # print("deleting the directory of the failed experiment") + # shutil.rmtree(dir) + # raise + return None + + +class Result: + def __init__(self, base_dir, experiment_name, outdat_file_content, cut_file_content, runtime, sm_runtime, + initial_bin_count): + self.sm_runtime = sm_runtime + self.initial_bin_count = initial_bin_count + self.runtime = runtime + self.base_dir = base_dir + self.experiment_name = experiment_name + self.cut_file_content = cut_file_content + self.outdat_file_content = outdat_file_content + self.dir = base_dir + '/' + experiment_name + "/" + + +def store(result): + if not result: + return + assert type(result) is Result + print('storing experiment', result.experiment_name) + + if not os.path.exists(result.base_dir): + os.makedirs(result.base_dir) + + if not os.path.exists(result.dir): + os.makedirs(result.dir) + + with open(result.dir + "log.txt", "w") as f: + f.write("initial bins count: " + str(result.initial_bin_count) + "\n") + f.write("runtime " + str(result.runtime) + " seconds\n") + f.write("sm runtime " + str(result.sm_runtime) + " seconds\n") + if not os.path.exists(cst.SLIM_DATA_DIR + result.experiment_name): + os.makedirs(cst.SLIM_DATA_DIR + result.experiment_name) + + with open(cst.SLIM_DATA_DIR + result.experiment_name + "/" + result.experiment_name + ".dat", "w") as f: + f.writelines(result.outdat_file_content) + + with open(result.dir + cst.FILE_DATA_CUTS, "w") as f: + f.writelines(result.cut_file_content) + + +class RunParams: + def __init__(self, base_dir, experiment_name, method, data_file, delim, columns, rows, distance_measure, threshold, + cor_measure, subspace_set): + self.cor_measure = cor_measure + self.threshold = threshold + self.distance_measure = distance_measure + self.subspace_set = subspace_set + self.columns = columns + self.rows = rows + self.delim = delim + self.data_file = data_file + self.method = method + self.experiment_name = experiment_name + self.base_dir = base_dir + + +def prepare(base_dir, data_file, method, time_mark=False, delim=";", columns=None, rows=None, + distance_measure=cst.DistanceMeasure.ID, + cor_measure=None, threshold=cst.ID_THRESHOLD_QUANTILE): + params = [] + # # defining prefix for the output files + data_file_name = util.get_file_name(data_file) + if method.name.startswith("PREDEFINED"): + subspace_sets = compute_subspace_sets(data_file_name, method) + if not subspace_sets: + return params + else: + subspace_sets = None + + base_dir = cst.BASE + base_dir if not os.path.exists(base_dir): os.makedirs(base_dir) - counter = 0 - for subspace in subspaces: - if method is cst.Method.PERFECT or method is cst.Method.TRIVIAL: - name = method.name + "_" + data_file_name + ("_" + str(columns) if columns else "") + ( - "_" + str(rows) if rows else "") - time_name = (util.now() if not notime else "") + ("_" if not notime else "") + name - dir = base_dir + '/' + time_name + "/" - else: - if method is cst.Method.PREDEFINED and len(subspace) != dims_count: - counter += 1 - name = distance_measure.name + ( - ("_" if not notime else "") + cor_measure.name if cor_measure else "") + "_" + ( - 'ORIGINAL' if len(subspace) == dims_count and method is cst.Method.PREDEFINED else method.name) + ( - "_s" + str(counter) if method == cst.Method.PREDEFINED and len( - subspace) != dims_count else "") + "_" + str( - cst.ID_THRESHOLD_QUANTILE) + "_" + data_file_name + ("_" + str(columns) if columns else "") + ( - "_" + str(rows) if rows else "") - time_name = (util.now() if not notime else "") + ( - "_" if not notime else "") + name - dir = base_dir + '/' + time_name + "/" - os.makedirs(dir) - - print('output files are:', dir + '*') - log_file = dir + "log.txt" - - try: - with open(log_file, 'w') as log: - if method is cst.Method.TRIVIAL: - disc_intervals, disc_points, distances = compute_trivial_discretization(data, log) - elif method is cst.Method.PERFECT: - disc_intervals, disc_points = compute_perfect_discretization(re.search('(.*?)(?:_\d*)?(?:\.csv)', data_file_name).group(1), data, log) - else: - if method == cst.Method.PREDEFINED: - write(log, 'subspace:', str(subspace)) - disc_intervals, disc_points, distances = compute_optimal_discretization(data, method, - cor_measure, - distance_measure, - subspace, - log) - - plot_distances(dir, distances, disc_intervals) - # output file for classification measurements - write_out_file(name, dir + cst.FILE_DATA_OUTPUT, disc_intervals, disc_points, class_labels) - # output file for compression measurements - dq.prepare_compression1(base_dir, time_name) - - write_cut_file(dir + cst.FILE_DATA_CUTS, disc_intervals) - except: - print("Error in " + dir + ":", sys.exc_info()[0]) - # print("deleting the directory of the failed experiment") - # shutil.rmtree(dir) - raise + # full, trivial, SM methods + if not method.name.startswith("PREDEFINED"): + experiment_name = data_file_name.replace(".csv", "") + ("_" + str(columns) + "c" if columns else "") + ( + "_" + str(rows) + "r" if rows else "") + "_" \ + + method.name.replace("_", "") + timed_name = (util.now() if time_mark else "") + ("_" if time_mark else "") + experiment_name + params.append( + RunParams(base_dir, timed_name, method, data_file, delim, columns, rows, distance_measure, threshold, + cor_measure, None)) + print("prepared parameters for", experiment_name) + + # predefined subspace sets + else: + assert subspace_sets is not None + counter = 1 + for subspace_set in subspace_sets: + experiment_name = data_file_name.replace(".csv", "") + ("_" + str(columns) + "c" if columns else "") + ( + "_" + str(rows) + "r" if rows else "") + "_" \ + + method.name.replace("_", "") \ + + ("_s" + str(counter) if method is cst.Method.PREDEFINED_SUBSPACESETS else "") \ + + ("_i" + str( + counter) if method is cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET_AND_IRRELEVANT else "") + counter += 1 + timed_name = (util.now() + "_" if time_mark else "") + experiment_name + params.append( + RunParams(base_dir, timed_name, method, data_file, delim, columns, rows, distance_measure, threshold, + cor_measure, subspace_set)) + print("prepared parameters for", experiment_name) + return params + + +def collect_params(base_dir): + params = [] + # relevant features 2 - 30 + for rf in range(2, 31): + # cubes 1 - 10 + for c in range(1, 11): + # cube types complete, incomplete, incomplete overlapping + for type in ['c', 'i', 'io']: + # for rf in range(2, 3): + # # cubes 1 - 10 + # for c in range(3, 4): + # # cube types complete, incomplete, incomplete overlapping + # for type in ['c']: + if (c == 1 or rf / c < 2) and type != 'c': + continue + filepath = cst.DATA_DIR + 'cubes_' + '{0:02d}'.format(rf) + '_' \ + + '{0:02d}'.format(c) + '_' \ + + type + '.csv' + for method in [cst.Method.PREDEFINED_SUBSPACESETS, + cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET_AND_IRRELEVANT, + cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET + ]: + params.extend(prepare(base_dir, filepath, method)) + return params + + +if __name__ == "__main__": + params = collect_params("logs_test") + # print(compute_subspace_sets("cubes_10_03_i.csv", cst.Method.PREDEFINED_SUBSPACESETS)) + # exit(1) + # if len(sys.argv) == 1: + # # print( + # # 'Usage: main.py ' + # # '-b= ' + # # '-f= ' + # # '-d= ' + # # '-c= ' + # # '-m=<[original|greedy_topk|trivial|...]> ' + # # '-cor=<[uds]> ' + # # '-dist=<[id, cjs]> ' + # # '-t= ' + # # '-s[=] ' + # # '-r= ') + # # command = '-b=logs -f=synthetic_cases/synthetic_3d_parity_problem.csv -d=; -dist=ID' + # # print('Running default: ', command) + # # command_list = command.split(' ') + # raise ValueError("no arguments passed!") + # else: + # command_list = sys.argv[1:] + # + # file_arg = list(filter(lambda x: x.startswith("-f="), command_list)) + # if not file_arg: + # raise ValueError('No data file provided!') + # base_dir_arg = list(filter(lambda x: x.startswith("-b="), command_list)) + # if not base_dir_arg: + # raise ValueError('No logs base dir provided!') + # time_mark = len(list(filter(lambda x: x.startswith("-time"), command_list))) != 0 + # delim_arg = list(filter(lambda x: x.startswith("-d="), command_list)) + # columns_arg = list(filter(lambda x: x.startswith("-c="), command_list)) + # rows_arg = list(filter(lambda x: x.startswith("-r="), command_list)) + # method_arg = list(filter(lambda x: x.startswith("-m="), command_list)) + # corr_measure_arg = list(filter(lambda x: x.startswith("-cor="), command_list)) + # distance_measure_arg = list(filter(lambda x: x.startswith("-dist="), command_list)) + # threshold_arg = list(filter(lambda x: x.startswith("-t="), command_list)) + # + # data_file = file_arg[0].replace('-f=', '') + # base_dir = base_dir_arg[0].replace('-b=', '') + # + # if delim_arg: + # delimiter = delim_arg[0].replace('-d=', '') + # else: + # print('using default delimiter ;') + # delimiter = ';' + # columns = int(columns_arg[0].replace('-c=', '')) if columns_arg else None + # rows = int(rows_arg[0].replace('-r=', '')) if rows_arg else None + # if method_arg: + # method = cst.Method[method_arg[0].replace('-m=', '').upper()] + # else: + # print('using default method PREDEFINED_OPTIMAL_SUBSPACESET') + # method = cst.Method.PREDEFINED_OPTIMAL_SUBSPACESET + # + # cor_measure = cst.CorrelationMeasure[corr_measure_arg[0].replace('-cor=', '').upper()] if corr_measure_arg \ + # else None + # if method.name.startswith("SM") and cor_measure is None: + # raise ValueError('A correlation measure should be given!') + # + # if distance_measure_arg: + # distance_measure = cst.DistanceMeasure[distance_measure_arg[0].replace('-dist=', '').upper()] + # print('using distance measure ' + distance_measure.name) + # else: + # distance_measure = cst.DistanceMeasure.ID + # print('using default distance measure ID') + # if threshold_arg: + # threshold = float(threshold_arg[0].replace('-t=', '')) + # + # print('using ID_THRESHOLD_QUANTILE = ', str(threshold)) + # else: + # threshold = cst.ID_THRESHOLD_QUANTILE + # print('using default ID_THRESHOLD_QUANTILE = ', str(threshold)) + # + # params = prepare(base_dir, data_file, method, time_mark, delimiter, columns, rows, distance_measure, cor_measure, + # threshold) + + for p in params: + result = execute(p) + store(result) diff --git a/runExperiment.py b/runExperiment.py index 911cd94..bf548f0 100755 --- a/runExperiment.py +++ b/runExperiment.py @@ -14,23 +14,17 @@ import sys import signal import time -import data_generator as dg import psutil +import main +import pandas as pd newRun = None nbThreads = int(multiprocessing.cpu_count() / 2) -print('nbThreads', nbThreads) + # nbThreads = 1 -filterItems = re.compile(".*\.graph") -progressfile = "progress.txt" -folderData = "DATA" onlyListTasks = False showOutput = False -scripts = {"noml": "./vogNoML2.bash", "nogf": "./vogNoGF2.bash", "all": "./voPy2.bash"} -# callingScript="./vogPy2.bash" -# callingScript="./vogNoML2.bash" -callingScript = "./vogNoGF2.bash" # if "--continue" in sys.argv or "-c" in sys.argv: # newRun=False @@ -50,109 +44,150 @@ if "-t" in sys.argv: nbThreads = int(sys.argv[sys.argv.index("-t") + 1]) -if "--filter" in sys.argv: - filterItems = re.compile(sys.argv[sys.argv.index("--filter") + 1]) -if "-f" in sys.argv: - filterItems = re.compile(sys.argv[sys.argv.index("-f") + 1]) - -if "--operation" in sys.argv: - callingScript = sys.argv[sys.argv.index("--operation") + 1] -if "-o" in sys.argv: - callingScript = sys.argv[sys.argv.index("-o") + 1] -if callingScript in scripts: - callingScript = scripts[callingScript] -items = queue.Queue() +items = multiprocessing.Queue() # todo items.put(WHATEVER PARAMETERS OF TASK) -data_generators = dg.produce_all_data_generators() -for data_generator in data_generators: - items.put(data_generator) +# data_generators = dg.produce_all_data_generators() +# for data_generator in data_generators: +# items.put(data_generator) -if onlyListTasks: - while not items.empty(): - para = items.get() - print(para) +class UnregisteredDataset(Exception): + pass -nbTasksTotal = items.qsize() -nbTasksDone = 0 -counterLock = threading.RLock() -runningMain = True +with multiprocessing.Manager() as manager: + class Loader(): + def __init__(self): + self.dataset = manager.dict() -datasets = queue.Queue() + self.global_lock = multiprocessing.RLock() + self.dataset_locks = {}#manager.dict() + def load_dataset(self, path, delim): + if not path in self.dataset_locks: + raise UnregisteredDataset('Unregistered dataset shall be loaded ', path) -def worker(): - global items, datasets, counterLock, nbTasksTotal, nbTasksDone, runningMain - while True: - # while psutil.virtual_memory().available < 10 * 1024**3: - while psutil.virtual_memory().percent > 90: - if not runningMain: - return - print('sleep for 10 seconds') - time.sleep(10) - para = None - try: - para = items.get(block=False) - except queue.Empty: - return - - # todo generate data sets + with self.dataset_locks[path]: + if not path in self.dataset: + self.dataset[path] = pd.read_csv(path, delimiter=delim, header=None, na_values='?') + return self.dataset[path] + def register_dataset(self, path): + with self.global_lock: + if path not in self.dataset_locks: + self.dataset_locks[path] = multiprocessing.RLock() - # datasets.put(#PUT DATASET) - datasets.put(para.build()) - with counterLock: - if runningMain: - nbTasksDone += 1 - print("Jobs done ", nbTasksDone, "/", nbTasksTotal) - items.task_done() + loader = Loader() -def datasetWriter(): - global datasets, nbTasksDone, nbTasksTotal, runningMain - if nbTasksTotal < 1: - return - while True: - while True: - try: - dataset = datasets.get(block=True, timeout=10) - dg.store(dataset) - except queue.Empty: - break + params = main.collect_params("logs_test") + for param in params: + loader.register_dataset(param.data_file) + items.put(param) - if datasets.empty() or not runningMain: - break + if onlyListTasks: + while not items.empty(): + para = items.get() + print(para) - with counterLock: - if nbTasksDone == nbTasksTotal and datasets.empty() or not runningMain: - break + nbTasksTotal = items.qsize() + nbTasksDone = 0 + counterLock = multiprocessing.RLock() + paramQueueLock = multiprocessing.RLock() + runningMain = True + datasets = multiprocessing.Queue() -def receive_sig_int(signum, frame): - global items, datasets, runningMain - print("Received SigInt") - runningMain = False - with items.mutex: - items.queue.clear() - with datasets.mutex: - datasets.queue.clear() - print("Processed SigInt") -signal.signal(signal.SIGINT, receive_sig_int) -threads = [] -for i in range(nbThreads): - t = threading.Thread(target=worker) - threads.append(t) + def worker(worker_id): + global items, datasets, counterLock, nbTasksTotal, nbTasksDone, runningMain + print('Worker ID ', worker_id, ' is born') + while True: + # while psutil.virtual_memory().available < 10 * 1024**3: + while psutil.virtual_memory().percent > 90: + if not runningMain: + return + print('sleep for 10 seconds') + time.sleep(10) + para = None + print('Worker ID ', worker_id, ' awaits parameters') + try: + with paramQueueLock: + para = items.get(block=False) + except queue.Empty: + return + print('thread ', threading.get_ident(), ' / started', para) + # todo generate data sets + + # datasets.put(para.build()) + datasets.put(main.execute(para, loader)) + print('Worker ID ', worker_id, ' execution finished') + with counterLock: + if runningMain: + nbTasksDone += 1 + print("Jobs done ", nbTasksDone, "/", nbTasksTotal) + # items.task_done() + + + def datasetWriter(): + global datasets, nbTasksDone, nbTasksTotal, runningMain + if nbTasksTotal < 1: + return + while True: + while True: + try: + result = datasets.get(block=True, timeout=10) + # dg.store(dataset) + main.store(result) + except queue.Empty: + break + + if datasets.empty() or not runningMain: + break + + with counterLock: + if nbTasksDone == nbTasksTotal and datasets.empty() or not runningMain: + break + + + def receive_sig_int(signum, frame): + global items, datasets, runningMain + print("Received SigInt") + runningMain = False + with items.mutex: + items.queue.clear() + with datasets.mutex: + datasets.queue.clear() + print("Processed SigInt") + + + signal.signal(signal.SIGINT, receive_sig_int) + + print('nbThreads', nbThreads) + print('Tasks to do: ', nbTasksTotal) + # threads = [] + # for i in range(nbThreads): + # t = threading.Thread(target=worker) + # threads.append(t) + # t.daemon = True + # t.start() + + t = threading.Thread(target=datasetWriter) + # # threads.append(t) t.daemon = True t.start() -t = threading.Thread(target=datasetWriter) -threads.append(t) -t.daemon = True -t.start() + nbProcesses = min(nbThreads, len(params)) + with multiprocessing.Pool(nbProcesses) as pool: + pool.map(worker, [i for i in range(nbProcesses)]) + # pool.map(datasetWriter, [0]) + pool.close() + pool.join() + print("Writing") -for t in threads: t.join() + +#for t in threads: +# t.join() diff --git a/util.py b/util.py index c4cbe7b..65df0f5 100644 --- a/util.py +++ b/util.py @@ -13,17 +13,10 @@ def get_escaped_name(problem): return problem.replace("-", "_").replace(".", "") -def parse_relevant_features(data_file, delim=";"): +def parse_relevant_features(data_file): data_file_name = get_file_name(data_file) - search = re.search('(\d)d', data_file_name) + search = re.search('cubes_(\d+)_', data_file_name) if not search: - print('file name ' + data_file_name + " does not contain dimensions count") - print("checking dimensions count in the data file") - with open(data_file, "r") as f: - line = f.readline() - if len(line) == 0: - raise ValueError("File " + data_file + " does not contain valid data!") - return len(line.split(delim)) - 1 - + raise ValueError("wrong file format!") dims_count = int(search.group(1)) return dims_count