Skip to content
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
executable file 223 lines (177 sloc) 6.67 KB
# -*- coding: utf-8 -*-
"""
Created on Wed Jun 07 14:27:18 2017
@author: PatAd
"""
import os
import queue
import re
import subprocess
import threading
import multiprocessing
import sys
import signal
import time
import psutil
from multiprocessing import Value
import main
import pandas as pd
import discretization_quality_measure as dqm
import data_generator as dg
import objects as o
newRun = None
nbThreads = int(multiprocessing.cpu_count() / 2 - 1)
# nbThreads = 1
onlyListTasks = False
showOutput = False
def init(args1, args2):
''' store the nbTasksDone for later use '''
global nbTasksDone, written_tasks_counter
nbTasksDone = args1
written_tasks_counter = args2
# if "--continue" in sys.argv or "-c" in sys.argv:
# newRun=False
# elif "--start" in sys.argv or "-s" in sys.argv:
# newRun = True
# else:
# raise RuntimeError("Missing information if starting or continuing experiments")
if "--list" in sys.argv or "-l" in sys.argv:
onlyListTasks = True
if "--verbose" in sys.argv or "-v" in sys.argv:
showOutput = True
if "--threads" in sys.argv:
nbThreads = int(sys.argv[sys.argv.index("--threads") + 1])
if "-t" in sys.argv:
nbThreads = int(sys.argv[sys.argv.index("-t") + 1])
items = multiprocessing.Queue()
class UnregisteredItem(Exception):
pass
with multiprocessing.Manager() as manager:
class Loader():
def __init__(self):
self.dataset = manager.dict()
self.ideal_discs = manager.dict()
self.global_lock = multiprocessing.RLock()
self.dataset_locks = {}#manager.dict()
self.ideal_disc_locks = {}#manager.dict()
def load_ideal_disc(self, name):
if not name in self.ideal_disc_locks:
raise UnregisteredItem('Unregistered ideal discretization shall be loaded ', name)
with self.ideal_disc_locks[name]:
if not name in self.ideal_discs:
self.ideal_discs[name] = dqm.parse_ideal_cuts(name)
return self.ideal_discs[name]
def load_dataset(self, path, delim):
if not path in self.dataset_locks:
raise UnregisteredItem('Unregistered dataset shall be loaded ', path)
with self.dataset_locks[path]:
if not path in self.dataset:
self.dataset[path] = pd.read_csv(path, delimiter=delim, header=None, na_values='?')
return self.dataset[path]
def register_dataset(self, path):
with self.global_lock:
if path not in self.dataset_locks:
self.dataset_locks[path] = multiprocessing.RLock()
def register_ideal_disc(self, name):
with self.global_lock:
if name not in self.ideal_disc_locks:
self.ideal_disc_locks[name] = multiprocessing.RLock()
loader = Loader()
# todo items.put(WHATEVER PARAMETERS OF TASK)
# params = dg.produce_all_data_generators()
# for data_generator in params:
# items.put(data_generator)
params = main.collect_experiment_params("logs_real")
if len(params) == 0:
print("no parameters collected!")
exit(0)
for param in params:
loader.register_dataset(param.data_file)
loader.register_ideal_disc(param.experiment_name)
items.put(param)
if onlyListTasks:
while not items.empty():
para = items.get()
print(para)
nbTasksTotal = len(params)
paramQueueLock = multiprocessing.RLock()
runningMain = True
results = multiprocessing.Queue()
def worker(worker_id):
global items, results, nbTasksTotal, runningMain, nbTasksDone
print('Worker ID ', worker_id, ' is born')
while True:
# while psutil.virtual_memory().available < 10 * 1024**3:
while psutil.virtual_memory().percent > 90:
if not runningMain:
return
print('sleep for 10 seconds')
time.sleep(10)
para = None
print('Worker ID ', worker_id, ' awaits parameters')
try:
with paramQueueLock:
para = items.get(block=False)
except queue.Empty:
return
print('Worker ID ', worker_id, 'is executing', para)
# todo generate data sets
# results.put(para.build())
results.put(main.execute(para, loader))
print('Worker ID ', worker_id, ' execution finished')
with nbTasksDone.get_lock():
if runningMain:
nbTasksDone.value += 1
print("Jobs done ", nbTasksDone.value, "/", nbTasksTotal)
def datasetWriter():
global results, nbTasksTotal, runningMain, nbTasksDone, written_tasks_counter
if nbTasksTotal < 1:
return
while True:
while True:
try:
result = results.get(block=True, timeout=10)
# todo store
# dg.store(result)
main.store(result, loader)
with written_tasks_counter.get_lock():
written_tasks_counter.value += 1
print('Jobs written', written_tasks_counter.value, "/", nbTasksTotal)
except queue.Empty:
break
if results.empty() or not runningMain:
break
with nbTasksDone.get_lock():
if nbTasksDone.value == nbTasksTotal and results.empty():
print("All the results stored")
break
if not runningMain:
print("Stop writing")
break
def receive_sig_int(signum, frame):
global items, results, runningMain
print("Received SigInt")
runningMain = False
with items.mutex:
items.queue.clear()
with results.mutex:
results.queue.clear()
print("Processed SigInt")
signal.signal(signal.SIGINT, receive_sig_int)
print('nbThreads', nbThreads)
print('Tasks to do: ', nbTasksTotal)
t = threading.Thread(target=datasetWriter)
# # threads.append(t)
t.daemon = True
t.start()
global nbTasksDone, written_tasks_counter
nbTasksDone = Value('i', 0)
written_tasks_counter = Value('i', 0)
nbProcesses = min(nbThreads, len(params))
with multiprocessing.Pool(nbProcesses, initializer=init, initargs = (nbTasksDone,written_tasks_counter)) as pool:
pool.map(worker, [i for i in range(nbProcesses)])
# pool.map(datasetWriter, [0])
pool.close()
pool.join()
print("Processing finished")
t.join()