From d8351be4f7cea8853ae9af696e876051fdafcf8f Mon Sep 17 00:00:00 2001 From: Tatiana Dembelova Date: Tue, 10 Oct 2017 10:59:12 +0200 Subject: [PATCH] added concurrent data generation --- data_generator.py | 259 ++++++++++++++++++++++++++++++++--------- experiments_logging.py | 2 +- runExperiment.py | 158 +++++++++++++++++++++++++ 3 files changed, 362 insertions(+), 57 deletions(-) create mode 100755 runExperiment.py diff --git a/data_generator.py b/data_generator.py index 87beb1f..254fd89 100644 --- a/data_generator.py +++ b/data_generator.py @@ -1,67 +1,214 @@ import numpy as np +import pandas as pd +import random +import time +import os +import json import experiments_logging as l +RADIUS = 2 +CUBE_WIDTH = 1 +ROWS = 6000 +OVERLAP_PROBABILITY = 0.6 +# BASE = '/Users/tatyanadembelova/Documents/study/thesis/ipd_extended/' +BASE = '/local/tmp/ipd_extended_experiments/' -class DataGenerator: - def __init__(self, dims_count, radius): - self.feature_count = dims_count - self.dim_borders = [[-radius, radius] for d in range(dims_count)] - self.cubes = [] - self.generated_data = None - - def add_cube(self, points_count, cube_param=None): - if self.generated_data: - raise ValueError("the data is already generated!") - if cube_param is None: - cube_param = {} - label = 0 - else: - label = len(self.cubes) + 1 - cube = [] +class CubeParameters: + def __init__(self, rows, loc=None): + self.rows = rows + self.loc = loc + self.subspaces = [] + + +class CubesGenerator: + def __init__(self, feature_count, radius, file_name): + self.file_name = file_name + self.cube_parameters = [] + self.feature_count = feature_count + self.dim_borders = [[-radius, radius] for d in range(feature_count)] + self.subspaces = [] + self.perf_disc = [{d[1]} for d in self.dim_borders] + + def add_cube_parameter(self, cube_param): + if cube_param.loc is None: + cube_param.loc = {} + self.cube_parameters.append(cube_param) + location_params = cube_param.loc + s = list(location_params.keys()) + if s and not s in self.subspaces: + self.subspaces.append(s) for feat in range(self.feature_count): - if feat in cube_param.keys(): - dim_params = cube_param[feat] - if dim_params[0] < self.dim_borders[feat][0] \ - or dim_params[0] + dim_params[1] > self.dim_borders[feat][1]: - raise ValueError("The cube with params " + str(cube_param) + " does not fit in dim " + str(feat) + "!") + if feat in cube_param.loc.keys(): + dim_params = location_params[feat] + # perfect discretization + if dim_params[0] != -RADIUS: + self.perf_disc[feat].add(dim_params[0]) + self.perf_disc[feat].add(dim_params[0] + dim_params[1]) + + def build(self): + + cubes = [] + + for cube_parameter in self.cube_parameters: - column = np.random.uniform(0, dim_params[1], points_count) + np.ones(points_count) * dim_params[0] + location_params = cube_parameter.loc + points_count = cube_parameter.rows + + if len(location_params) == 0: + label = 0 else: - column = np.random.uniform(self.dim_borders[feat][0], self.dim_borders[feat][1], points_count) - cube.append(column) - class_labels = np.empty(points_count) - class_labels.fill(label) - cube.append(class_labels) - self.cubes.append(cube) + label = len(cubes) + 1 + cube = [] + for feat in range(self.feature_count): + if feat in location_params.keys(): + dim_params = location_params[feat] + if dim_params[0] < self.dim_borders[feat][0] \ + or dim_params[0] + dim_params[1] > self.dim_borders[feat][1]: + raise ValueError( + "The cube with params " + str(location_params) + " does not fit in dim " + str( + feat) + "!") - def build(self): - if not self.generated_data: - self.generated_data = np.concatenate([np.array(cube) for cube in self.cubes], axis=1).transpose() - return self.generated_data + column = np.random.uniform(0, dim_params[1], points_count) + np.ones(points_count) * dim_params[ + 0] + else: + column = np.random.uniform(self.dim_borders[feat][0], self.dim_borders[feat][1], points_count) + cube.append(column) + class_labels = np.empty(points_count) + class_labels.fill(label) + cube.append(class_labels) + cubes.append(cube) + generated_data = np.concatenate([np.array(cube) for cube in cubes], axis=1).transpose() + return generated_data, self.file_name + + def get_subspaces(self): + return self.subspaces + + def get_discs(self): + return [sorted(p) for p in self.perf_disc] + + +def generate_partition(rf, c): + arr = [i for i in range(rf)] + random.shuffle(arr) + min = 2 + pivot = 0 + partition = [] + for i in range(c - 1): + max = rf - pivot - (c - i - 1) * min + t = random.randint(min, max) + partition.append(arr[pivot: pivot + t]) + pivot += t + partition.append(arr[pivot:]) + assert len(partition) == c + return partition + + +def generate_overlap_partition(rf, c): + partition = generate_partition(rf, c) + additions = [] + for p in partition: + add = [] + # at most a half of the partition times of possibility of overlap + for l in range(int(len(p) / 2)): + if random.uniform(0, 1) < OVERLAP_PROBABILITY: + others = list({i for i in range(rf)} - set(p)) + rand = random.randint(0, rf - len(p) - 1) + add.append(others[rand]) + additions.append(add) + + for i, p in enumerate(partition): + for add in additions[i]: + p.append(add) + return partition + + +def produce_data_generator(rf, irf, c, type, name): + total_f = rf + irf + dg = CubesGenerator(total_f, RADIUS, name) + # same number of records for each of the cubes + background + cube_rows = int(ROWS / (c + 1)) + if type == 'c': + partition = [range(rf) for i in range(c)] + elif type == 'i': + partition = generate_partition(rf, c) + elif type == 'io': + partition = generate_overlap_partition(rf, c) + else: + raise ValueError("no such type!") + + for p in partition: + location = dict() + for j in p: + location[j] = (random.uniform(0, 1) * (RADIUS * 2 - 1) - RADIUS, CUBE_WIDTH) + dg.add_cube_parameter(CubeParameters(cube_rows, location)) + dg.add_cube_parameter(CubeParameters(cube_rows)) + return dg + + +def produce_all_data_generators(): + data_generators = [] + global basedir + + basedir = BASE + 'new_cubes/' + perf_disc_dir = BASE + 'ideal_disc/' + perf_subspaces_file = BASE + 'ideal_subspaces.json' + perf_subspaces = dict() + perf_discs = dict() + ## relevant features 2 - 30 + # for rf in range(10, 11): + # # irrelevant features 0 - 100: + # for irf in range(100, 101): + # # cubes 1 - 10 + # for c in range(3, 4): + # # cube types complete, incomplete, incomplete overlapping + # for type in ['i']: + + # relevant features 2 - 30 + for rf in range(2, 31): + # irrelevant features 0 - 100: + for irf in range(101): + # cubes 1 - 10 + for c in range(1, 11): + # cube types complete, incomplete, incomplete overlapping + for type in ['c', 'i', 'io']: + if c == 1 and type != 'c': + continue + if rf / c < 2 and type != 'c': + # if not (rf / c < 2 and type == 'c'): + continue + name = 'cubes_' + '{0:02d}'.format(rf) + '_' \ + + '{0:03d}'.format(irf) + '_' \ + + '{0:02d}'.format(c) + '_' \ + + type + '.csv' + # if os.path.exists(basedir + name) and os.path.exists( + # perf_disc_dir + 'cut_' + name.replace('csv', 'txt')): + # continue + + dg = produce_data_generator(rf, irf, c, type, name) + perf_discs[name] = dg.get_discs() + perf_subspaces[name] = dg.get_subspaces() + data_generators.append(dg) + for name in perf_discs: + write_cut_file(perf_disc_dir + 'cut_' + name.replace('csv', 'txt'), perf_discs[name]) + with open(perf_subspaces_file, 'w') as psf: + json.dump(perf_subspaces, psf) + return data_generators + + +def write_cut_file(name, disc_intervals): + with open(name, 'w') as out: + for i in range(len(disc_intervals)): + out.write('dimension ' + str(i) + ' (' + str(len(disc_intervals[i])) + ' bins)\n') + for break_point in disc_intervals[i]: + out.write(format(break_point, '.1f') + '\n') + out.write('-------------------------------------\n') + + +def store(data): + global basedir + name = data[1] + pd.DataFrame(data[0]).to_csv(basedir + name, sep=';', header=False, index=False, float_format='%.2f') if __name__ == '__main__': - dg = DataGenerator(30, 2) - rows = 3 - dg.add_cube(rows, {0: (-1.7, 1), 1: (-1.7, 1)}) - dg.add_cube(rows, {1: (-0.4, 1), 2: (-1, 1)}) - dg.add_cube(rows, {0: (0, 1), 1: (0, 1), 2: (0, 1)}) - dg.add_cube(rows) - dg.add_cube(rows) - print(dg.build()) - # 3d_cube_99.csv - - # 3d_2cubes_99.csv - # 3d_20cubes_99.csv - - # 3d_2incompletecubes_99.csv - # 3d_20incompletecubes_99.csv - - # 30d_cube_99.csv - - # 30d_2cubes_99.csv - # 30d_20cubes_99.csv - - # 30d_2incompletecubes_99.csv - # 30d_20incompletecubes_99.csv - # l.plot_data_3d(dg.generated_data) + print(generate_overlap_partition(7, 3)) diff --git a/experiments_logging.py b/experiments_logging.py index f44225f..194462c 100644 --- a/experiments_logging.py +++ b/experiments_logging.py @@ -128,7 +128,7 @@ def write_cut_file(name, disc_intervals): # rows = 20000 # data = np.concatenate((synthetic_cube_in_cube(rows, 2, 0), np.zeros((rows, 1))), axis=1) # data = pd.read_csv("synthetic_cases/blobs/3d_3_blobs_aligned.csv", delimiter=";", header=None, na_values='?') - data = pd.read_csv("synthetic_cases/cubes/3d_3_cubes_aligned_xor.csv", delimiter=";", header=None, na_values='?') + data = pd.read_csv("new_cubes/cubes_10_100_03_i.csv", delimiter=";", header=None, na_values='?') # data = pd.DataFrame(dg.cubes(4000)) plot_data_3d(data) diff --git a/runExperiment.py b/runExperiment.py new file mode 100755 index 0000000..911cd94 --- /dev/null +++ b/runExperiment.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Jun 07 14:27:18 2017 + +@author: PatAd +""" + +import os +import queue +import re +import subprocess +import threading +import multiprocessing +import sys +import signal +import time +import data_generator as dg +import psutil + +newRun = None +nbThreads = int(multiprocessing.cpu_count() / 2) +print('nbThreads', nbThreads) +# nbThreads = 1 +filterItems = re.compile(".*\.graph") +progressfile = "progress.txt" +folderData = "DATA" +onlyListTasks = False +showOutput = False + +scripts = {"noml": "./vogNoML2.bash", "nogf": "./vogNoGF2.bash", "all": "./voPy2.bash"} +# callingScript="./vogPy2.bash" +# callingScript="./vogNoML2.bash" +callingScript = "./vogNoGF2.bash" + +# if "--continue" in sys.argv or "-c" in sys.argv: +# newRun=False +# elif "--start" in sys.argv or "-s" in sys.argv: +# newRun = True +# else: +# raise RuntimeError("Missing information if starting or continuing experiments") + +if "--list" in sys.argv or "-l" in sys.argv: + onlyListTasks = True + +if "--verbose" in sys.argv or "-v" in sys.argv: + showOutput = True + +if "--threads" in sys.argv: + nbThreads = int(sys.argv[sys.argv.index("--threads") + 1]) +if "-t" in sys.argv: + nbThreads = int(sys.argv[sys.argv.index("-t") + 1]) + +if "--filter" in sys.argv: + filterItems = re.compile(sys.argv[sys.argv.index("--filter") + 1]) +if "-f" in sys.argv: + filterItems = re.compile(sys.argv[sys.argv.index("-f") + 1]) + +if "--operation" in sys.argv: + callingScript = sys.argv[sys.argv.index("--operation") + 1] +if "-o" in sys.argv: + callingScript = sys.argv[sys.argv.index("-o") + 1] +if callingScript in scripts: + callingScript = scripts[callingScript] + +items = queue.Queue() + +# todo items.put(WHATEVER PARAMETERS OF TASK) +data_generators = dg.produce_all_data_generators() +for data_generator in data_generators: + items.put(data_generator) + +if onlyListTasks: + while not items.empty(): + para = items.get() + print(para) + +nbTasksTotal = items.qsize() +nbTasksDone = 0 +counterLock = threading.RLock() +runningMain = True + +datasets = queue.Queue() + + +def worker(): + global items, datasets, counterLock, nbTasksTotal, nbTasksDone, runningMain + while True: + # while psutil.virtual_memory().available < 10 * 1024**3: + while psutil.virtual_memory().percent > 90: + if not runningMain: + return + print('sleep for 10 seconds') + time.sleep(10) + para = None + try: + para = items.get(block=False) + except queue.Empty: + return + + # todo generate data sets + + # datasets.put(#PUT DATASET) + datasets.put(para.build()) + + with counterLock: + if runningMain: + nbTasksDone += 1 + print("Jobs done ", nbTasksDone, "/", nbTasksTotal) + items.task_done() + + +def datasetWriter(): + global datasets, nbTasksDone, nbTasksTotal, runningMain + if nbTasksTotal < 1: + return + while True: + while True: + try: + dataset = datasets.get(block=True, timeout=10) + dg.store(dataset) + except queue.Empty: + break + + if datasets.empty() or not runningMain: + break + + with counterLock: + if nbTasksDone == nbTasksTotal and datasets.empty() or not runningMain: + break + + +def receive_sig_int(signum, frame): + global items, datasets, runningMain + print("Received SigInt") + runningMain = False + with items.mutex: + items.queue.clear() + with datasets.mutex: + datasets.queue.clear() + print("Processed SigInt") + + +signal.signal(signal.SIGINT, receive_sig_int) + +threads = [] +for i in range(nbThreads): + t = threading.Thread(target=worker) + threads.append(t) + t.daemon = True + t.start() + +t = threading.Thread(target=datasetWriter) +threads.append(t) +t.daemon = True +t.start() + +for t in threads: + t.join()