Skip to content
Permalink
master
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
executable file 888 lines (728 sloc) 30.4 KB
#!/usr/bin/env python3.6
# built with python 3.6, make sure you use this version if you have problems running it or get weird errors
# import pythons builtins
import logging.handlers
import sys
import os
import glob
import traceback
import json
import time
import datetime
import queue
from logging.handlers import RotatingFileHandler
from typing import MutableSequence, Union, List, Callable, Optional
# import third party modules
import ntplib
from pyroute2 import IPDB, NetlinkError
# import own modules
from mr_keyboard.src.constants import *
from mr_keyboard.src.settings_parser import SettingsParser
from mr_keyboard.src.optoboard import Optoboard
from mr_keyboard.src.callback import CallbackList
from mr_keyboard.src.midi_handler import MidiHandler
# Add the custom log levels used by the program to the logger
logging.addLevelName(DATA, 'DATA')
logging.addLevelName(DEBUG_V, 'DEBUG_V')
logging.addLevelName(DEBUG_VV, 'DEBUG_VV')
logging.addLevelName(DEBUG_VVV, 'DEBUG_VVV')
logging.addLevelName(DEBUG_VVVV, 'DEBUG_VVVV')
class MRKeyboard:
""" One class to rule them all. Starts and controls the rest of the program. """
def __init__(
self,
flags: Optional[str] = None,
logger: Optional[logging.Logger] = None,
settings: Optional[SettingsDict] = None
):
"""
This only sets up the loggers and some basic variables. The 'real' init happens in start().
:param flags: the CLI flags you want to use, ignored when settings are passed
:param logger: your own logger if you want to configure it yourself
:param settings: your own settings dict. This has to have a least the content of the one provided by SettingsParser
"""
if settings is None:
# load the settings
self._settings_parser = SettingsParser()
self.settings = self._settings_parser.get_settings(flags)
else:
self.settings = settings
# use the passed logger or define it ourselves
if logger is not None:
self.log = logger.getChild('mr-keyboard')
self.log.getEffectiveLevel()
else:
self.log = logging.getLogger('mr-keyboard')
self.log.setLevel(1)
self.attach_console_logger(self.log, self.settings['log']['format'], self.settings['log']['level'])
# create the file handler for the logs
if not self.settings['log']['file_disable']:
self.attach_file_handler_to_logger_from_settings(self.log)
if self.settings['log']['verbose_enable']:
# create the file handler for the high verbosity log handler
self.attach_file_handler_to_logger_from_settings(self.log, True)
# isolate logger
# TODO: test this!
self.log_queue_listener: logging.handlers.QueueListener = None
if self.settings['log']['isolate']:
self.log.log(DEBUG_V, 'isolating log handlers')
q = queue.Queue()
self.log_queue_listener = logging.handlers.QueueListener(q, [], respect_handler_level=True)
self.log_queue_listener.handlers = self.log.handlers
self.log.handlers = []
queue_handler = logging.handlers.QueueHandler(q)
self.log.addHandler(queue_handler)
self.log_queue_listener.start()
self.log.log(DEBUG_V, 'log handlers isolated')
# object wide variables
self._run = True
self.midi: MidiHandler = None
self.boards: Dict[str, Optoboard] = {}
self._last_cycle_time = None # timestamp of the end of the last cycle
self.total_cycles = 0
self.start_time = None
self.utc_offset = 2208988800. # the offset of time.time() to the UTC timestamp we get from the NTP server
self.clean = True # we start in a disabled start and set this to False in start()
# the callback lists
self.callbacks_start: CallbackList = None
self.callbacks_loop: CallbackList = None
self.callback_note_event: CallbackList = None
self.callbacks_cleanup: CallbackList = None
# set the callbacks if he aren't disabled
if self.settings['misc']['disable_callbacks']:
self.log.log(WARNING, 'callbacks are disabled')
else:
# create the init callback list
cbl_logger_init = self.create_child_logger('callback_list_start')
self.callbacks_start = CallbackList(cbl_logger_init, DEBUG_V)
# create the update callback list
cbl_logger_update = self.create_child_logger('callback_list_loop')
self.callbacks_loop = CallbackList(cbl_logger_update, DEBUG_VVV, DEBUG_V)
# create a callback list for note events
# this one gets called with called with the the following keyword arguments:
# 'pressed' True if note on, False if note off
# 'note' midi code of the note
# 'velocity' the velocity of the note (127 on note off)
# 'timestamp' a UTC timestamp of the event
cbl_logger_note_event = self.create_child_logger('callback_list_note_event')
self.callback_note_event = CallbackList(cbl_logger_note_event, DEBUG_VVV)
# create the cleanup callback list
cbl_logger_cleanup = self.create_child_logger('callback_list_cleanup')
self.callbacks_cleanup = CallbackList(cbl_logger_cleanup, DEBUG_V)
@staticmethod
def attach_console_logger(logger: logging.Logger, format_str: str, level: int) -> logging.StreamHandler:
"""
Attach a new stream handler to the supplied logger. It will log to stdout (usually the terminal).
:param logger: the logger
:param format_str: the format string used in the formatter for the logger
:param level: the log level the logger will be set to
:return: the log handler
"""
# create a formatter and the console handler for the log
formatter = logging.Formatter(format_str)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(level)
handler.setFormatter(formatter)
logger.addHandler(handler)
return handler
@staticmethod
def attach_file_handler_to_logger(
logger: logging.Logger,
path: str,
filename: str,
mode: str,
max_kbytes: int,
backup_count: int,
format_str: str
) -> RotatingFileHandler:
"""
Create a file handler and attach it to the logger.
:param logger: the logger the file handler should be attached to
:param path: the folder where the file will be
:param filename: the name of the logfile
:param mode: the write mode we use to open the file
:param max_kbytes: the max size of a file before we start another one
:param backup_count: how many older logfiles we keep
:param format_str: the format string we use in the logfile
:return: the file handler we attached
"""
# generate a path and create the folders if they don't exist
out_path = os.path.join(os.path.curdir, path)
if not os.path.isdir(out_path):
os.makedirs(out_path)
# create the log handler
# the RotatingFileHandler has an number of nice options like forcing the rollover, see the docs
log_handler_file: RotatingFileHandler = logging.handlers.RotatingFileHandler(
filename=os.path.join(out_path, filename),
mode=mode,
maxBytes=max_kbytes * 1024,
backupCount=backup_count)
log_formatter = logging.Formatter(format_str)
# set the formatter and add the handler
log_handler_file.setFormatter(log_formatter)
logger.addHandler(log_handler_file)
return log_handler_file
def attach_file_handler_to_logger_from_settings(self, logger: logging.Logger, high_verbosity=False) -> RotatingFileHandler:
"""
Attach a file handler to the given logger with the configurations taken from the settings dict.
:param logger: The logger you want to a add a file handler to
:param high_verbosity: should this logger be of the highest verbosity, this loads the corresponding settings
:return: the file handler we attached
"""
# use a different format string if desired
if self.settings['log']['file_format'] is None:
format_str = self.settings['log']['format']
else:
format_str = self.settings['log']['file_format']
# generate what differs between the normal and high verbosity file handler
filename = self.settings['log']['file_name']
if filename is None:
filename = time.strftime('%Y-%m-%d_%H-%M-%S')
if high_verbosity:
filename += self.settings['log']['verbose_file_suffix']
filename += self.settings['log']['file_ending']
if not high_verbosity or self.settings['log']['verbose_file_max_kbytes'] is None:
max_kbytes = self.settings['log']['file_max_kbytes']
else:
max_kbytes = self.settings['log']['verbose_file_max_kbytes']
if not high_verbosity or self.settings['log']['verbose_file_max_files'] is None:
backup_count = self.settings['log']['file_max_files']
else:
backup_count = self.settings['log']['verbose_file_max_files']
# attach the file handler
file_handler = self.attach_file_handler_to_logger(
logger=logger,
path=self.settings['log']['file_path'],
filename=filename,
mode=self.settings['log']['file_mode'],
max_kbytes=max_kbytes,
backup_count=backup_count,
format_str=format_str
)
if not high_verbosity:
file_handler.setLevel(self.settings['log']['level'])
else:
file_handler.setLevel(0)
return file_handler
def create_child_logger(self, name: str) -> logging.Logger:
"""
Convenience function to create a descendant logger from the one we use in Main.
:param name: The name of the logger. It will be appended to the name of the main logger
:return: the newly created logger
"""
logger = self.log.getChild(name)
logger.getEffectiveLevel()
return logger
def start(self):
"""
Instantiate all the objects we need and do everything we need done prior to starting the main loop.
This should only be called using a wrapper to catch (and log) occurring errors.
"""
self.log.log(DEBUG, 'self.settings read, starting init')
self.log.log(DEBUG_V, 'the current self.settings: ' + self.settings.__str__())
# set the cleanup routine to actually run when called from now on
self.clean = False
self.log.log(DEBUG_V, 'checking/setting the local network config')
self._init_network()
self.log.log(DEBUG_V, 'connecting to the NTP server')
# connect to the NTP server, update the offset
if self._update_ntp_offset(deactivate_if_unavailable=True):
self.log.log(DEBUG, 'NTP is ready')
# search for the devices matching the pattern
devices = glob.glob(self.settings['boards']['search_pattern'])
self.log.log(DEBUG_V, 'found the following boards: ' + devices.__str__())
# instantiate the boards we found
faulty_init = False # a flag to see if every board started how it should
for d in devices:
# skip blacklisted boards
if d in self.settings['boards']['blacklist']:
self.log.log(DEBUG, 'excluded ' + d + ' because it is on the blacklist')
continue
# the following comment stops pycharm from nagging about a bare 'except' being too broad
# noinspection PyBroadException
try:
self.boards[d] = Optoboard(self.settings, self.log, d, self.utc_offset)
self.boards[d].start()
except Exception:
# log the error messages and do a simple console print for better visibility
print('----------ERROR----------')
self.log.log(ERROR, 'fatal error during init of board ' + d)
self.log.log(ERROR, str(sys.exc_info()[0]) + ': ' + str(sys.exc_info()[1]))
stacktrace = ''
for stacktrace_line in traceback.format_tb(sys.exc_info()[2]):
stacktrace += stacktrace_line
self.log.log(ERROR, 'stacktrace:\n' + stacktrace)
# set the flag
faulty_init = True
# delayed cleanup (if we hab a problem) to see what boards are initialising correctly
if faulty_init:
self.log.log(ERROR, 'an error happened during init, terminating')
self.cleanup()
return
if len(self.boards) == 0 and self.settings['keyboard']['require']:
self.log.log(ERROR, 'no boards connected, terminating')
self.cleanup()
return
self.log.log(DEBUG, 'connected to the boards')
# load and apply the calibration
if self.settings['calibration']['active']:
# do not load the old calibration data if we explicitly activated the calibration mode
self.log.log(INFO, 'calibration active, old calibration data will not be loaded!')
else:
self.log.log(DEBUG, 'applying calibration')
self._read_calibration()
# display a warning if velocity is disabled
if not self.settings['midi']['velocity_enable']:
self.log.log(WARNING, 'velocity is disabled')
# set up midi
if self.settings['midi']['enable']:
# suppress the warning from pycharm we get because of the conditional imports
# noinspection PyUnboundLocalVariable
self.midi = MidiHandler(self.settings, self)
# call the init callbacks
if self.callbacks_start is not None:
self.callbacks_start.call()
self.log.log(DEBUG, 'init done')
def _init_network(self):
""" Initialises the network components """
# check if the interface set in settings actually exists
# init pyroute2.IPDB and query for the available network interfaces
interfaces = []
with IPDB(mode='implicit') as ipdb:
for key in ipdb.interfaces.keys():
if not type(key) is int and key != 'lo':
interfaces += [key]
# check if the selected interface exists and query the user to select one if it doesn't
if len(interfaces) == 0:
self.log.log(ERROR, 'no network interfaces found, terminating')
self.cleanup()
return
elif self.settings['local_machine']['ifname'] not in interfaces:
self.log.log(INFO, 'no usable default network interface set, prompting the user to select one')
# prompt the user to select a network interface
self.settings['local_machine']['ifname'] = self.prompt_selection(
interfaces,
'Select a network interface. The correct one will likely start with "eth" or "enp".',
'Write one of those in "settings.ini -> [local_machine] -> ifname" to set it as default.',
add_none=False
)
# try to set the IP of the machine
try:
# set the local IP
with IPDB(mode='implicit') as ipdb:
with ipdb.interfaces[self.settings['local_machine']['ifname']] as i:
# set the name of the interface
i.ifname = self.settings['local_machine']['ifname']
# remove every ip address that is currently bound to the interface
# NOTE: this may be a bad idea if the same network interface is used to connect to the something else
for addr in i.ipaddr:
i.del_ip(*addr)
# add our own IP address
i.add_ip(self.settings['local_machine']['ip'], 24)
except NetlinkError as e:
# catch the error telling us about the operation not being permitted, re-raise if it was the wrong message
if e.args[1] == 'Operation not permitted':
self.log.log(
WARNING,
'unable to set the local machines IP, trying to go on anyway. Starting the program as root should help.')
else:
raise e
# if the NTP server is running on the local machine: set the ntp_ip to the external IP of this machine
if self.settings['time_sync']['ntp_ip'] in ['localhost', '127.0.0.1']:
self.log.log(DEBUG_V, 'correcting the IP address of the NTP server')
self.settings['time_sync']['ntp_ip'] = self.get_external_ip()
self.log.log(DEBUG_V, 'NTP server IP set to ' + self.settings['time_sync']['ntp_ip'])
def loop(self):
"""
The main loop of the program. This should only be called using a wrapper to catch (and log) occurring errors.
Includes some things that aren't looped but have to be done immediately before starting the loop.
"""
# test if we should run the program at all
if not self._run:
self.log.log(DEBUG_V, 'the main loop is disabled so we won\'t start the loop')
return
# try to sync the time again
self._update_ntp_offset()
# display a hint if the calibration is active
if self.midi is not None and self.settings['calibration']['active']:
self.log.log(WARNING, 'calibration mode is active! the primary midi output will be muted for {}s'.format(
self.settings['midi']['primary_mute_duration']
))
self.midi.mute_primary(self.settings['midi']['primary_mute_duration'])
# calculate the minimum length of a cycle
cycle_duration_min = 1. / self.settings['misc']['cycles_frequency_max']
self.log.log(DEBUG_V, 'updating the last known value of the boards internal timer and starting the TCP stream')
for b in self.boards:
self.boards[b].update_last_known_c3 = True
self.boards[b].eth_start()
# set timestamps
self.start_time = time.time()
self._last_cycle_time = time.time()
self.log.log(INFO, 'starting the main loop')
self.log.log(INFO, 'the offset to UTC is {}'.format(self.utc_offset))
# the only way this loop should be exited is via self.cleanup()
while self._run:
# set up an array to collect all updates
updates = []
# update every board
for addr, board in self.boards.items():
updates += board.update()
# process the updates
for update in updates:
if update['pressed']:
# key pressed
self.log.log(DATA, 'key {} pressed (velocity: {:3d}), UTC timestamp: {:.{:d}f}'.format(
update['note'],
update['velocity'],
update['timestamp'],
self.settings['log']['timestamp_precision']
))
if self.midi is not None:
self.midi.note_on(update['note'], update['velocity'])
else:
# key released
self.log.log(DATA, 'key {} released, UTC timestamp: {:.{:d}f}'.format(
update['note'],
update['timestamp'],
self.settings['log']['timestamp_precision']
))
if self.midi is not None:
self.midi.note_off(update['note'])
if self.callback_note_event is not None:
v = update['velocity'] if 'velocity' in update else 127
self.callback_note_event.call(
pressed=update['pressed'],
note=update['note'],
velocity=v,
timestamp=update['timestamp']
)
# handle midi events
if self.midi is not None:
self.midi.update()
# call the callbacks for the loop
if self.callbacks_loop is not None:
self.callbacks_loop.call()
# calculate the length of the current cycle and sleep for a bit if necessary
cycle_duration = time.time() - self._last_cycle_time
self._last_cycle_time = time.time()
if cycle_duration < cycle_duration_min:
time.sleep(cycle_duration_min - cycle_duration)
# calculate cycles per second and log them
cycle_frequency = 1. / cycle_duration
perf_log_level = INFO if self.settings['misc']['show_performance'] else DEBUG_VVV
self.log.log(perf_log_level, 'finished a cycle in {:4f}s, {:.3f} cycles per second'.format(
cycle_duration,
cycle_frequency
))
# show a warning if the cycle frequency is below the limit
if cycle_frequency < self.settings['misc']['cycles_frequency_min']:
self.log.log(WARNING, 'cycle frequency too low: ' + str(cycle_frequency) + 'Hz')
self.total_cycles += 1
def sync_time(self):
self.log.log(INFO, 'syncing the time of the boards, the keyboard will not be usable until this is done')
for addr, board in self.boards.items():
# close the ethernet connection
board.eth_close()
# get rid of the message from the board telling us about the sudden disconnect
board.serial_command()
self._update_ntp_offset()
# do the time sync
board.update_time()
# restart the ethernet connection
board.eth_start()
# flush the receive buffers of the board and set the flag to update counter3 again
for addr, board in self.boards.items():
board.eth_flush()
board.update_last_known_c3 = True
self.log.log(
INFO,
'time sync done but better wait one more second in case we had to use the fallback time sync'
)
def cleanup(self):
"""
Cleans up everything after we're done.
This does not kill the program! You may want to use a return statement after the function call if it happens in
the middle of a function.
"""
# avoid to run the cleanup multiple times
# only necessary for the cleanup callbacks as the rest of the cleanup routine is idempotent
if self.clean:
self.log.log(DEBUG_V, 'cleanup already done, skipping it')
return
# print some general statistics on this run
if self.start_time is not None:
runtime = time.time() - self.start_time
self.log.log(DEBUG_V, '{:d} cycles in total in {:.3f} seconds, on average {:.3f} cycles per second'.format(
self.total_cycles,
runtime,
self.total_cycles / runtime
))
self.log.log(DEBUG_V, 'starting cleanup')
# close the midi connection
if self.midi is not None:
self.midi.close()
# close the boards and remove them from the known boards
for addr, board in self.boards.items():
board.close()
self.boards.clear()
# call the callbacks for the loop
if self.callbacks_cleanup is not None:
self.callbacks_cleanup.call()
self.log.log(DEBUG, 'cleanup done, setting the program to not run anymore')
# flush the loggers to ensure what we wrote in them doesn't get lost
for handler in self.log.handlers:
handler.flush()
# stop QueueListener if used
if self.log_queue_listener is not None:
self.log_queue_listener.stop()
# reset the start time
self.start_time = None
# set the flag that the program is cleaned up
self.clean = True
# force shutdown in case we called this function from somewhere in the running program (maybe because of an error)
self._run = False
#
# --------- UTILITY ---------
#
def _read_calibration(self):
"""
Read the calibration data from file and pass it to the boards.
Structure of the data:
{<board_id>: {<channel_id: [<min_value>, <max_value>]} }
The index of a Dict can only be a string in JSON so we have to pay attention to that.
"""
# build the path to the calibration file
path = os.path.join(
os.curdir,
self.settings['calibration']['file_folder'],
self.settings['calibration']['file_name']
)
# just activate the calibration if the file does not exist
if not os.path.exists(path):
self.log.log(WARNING, 'the calibration file does not exist. activating calibration mode')
self.settings['calibration']['active'] = True
return
with open(path, mode='r') as f:
try:
cal_data = json.load(f)
except json.JSONDecodeError:
self.log.log(WARNING, 'unable to decode calibration data, activating calibration mode')
self.settings['calibration']['active'] = True
return
for addr, board in self.boards.items():
if str(board.id) in cal_data:
complete = board.set_calibration(cal_data[str(board.id)])
# activate the calibration if the data wasn't complete for a board
if not complete:
self.log.log(
WARNING,
'calibration data for board ' + str(board.id) + ' incomplete, activating calibration mode'
)
self.settings['calibration']['active'] = True
else:
self.log.log(
WARNING,
'calibration data for board ' + str(board.id) + ' missing, activating calibration mode'
)
self.settings['calibration']['active'] = True
self.log.log(DEBUG_V, 'calibration data read from file')
def write_calibration(self):
"""
Get the calibration data from the boards and serialise it into a JSON file.
This overwrites any previously saved data and includes only the boards present in this run.
"""
# don't save calibration data when we didn't fully start the program
if self.start_time is None:
return
# build the path for the containing folder
path = os.path.join(os.curdir, self.settings['calibration']['file_folder'])
# make sure the folder exists
if not os.path.exists(path):
os.makedirs(path)
# add the filename to the path
path = os.path.join(path, self.settings['calibration']['file_name'])
# collect the calibration data
calibration = {}
for addr, board in self.boards.items():
calibration[board.id] = board.get_calibration()
# read the old data from the file
try:
with open(path, mode='r') as f:
try:
data_old = json.load(f)
except json.JSONDecodeError:
data_old = {}
except FileNotFoundError:
data_old = {}
# write the old data in calibration (only where we don't have newer data)
for board, channels in data_old.items():
if int(board) not in calibration:
calibration[board] = channels
else:
for channel, min_max in channels.items():
if int(channel) not in calibration[int(board)]:
calibration[board][channel] = min_max
# write the data in the file
with open(path, mode='w') as f:
json.dump(calibration, f)
self.log.log(DEBUG_V, 'calibration data written to file')
def prompt_selection(
self,
options: MutableSequence,
header: str = 'select one of the options',
footer: Optional[str] = None,
add_none: bool = True,
aliases: Optional[List[str]] = None
) -> Any:
"""
Prompt the user to choose between several options via console.
The first option will always be None (if not disabled). It will also be returned if the list is empty or the
user aborts with Ctrl+C.
Do not use this prompt in the main loop as it will block the loop from executing leading to buffer overflows
on the boards.
:param options: an enumerable object, its items will be converted to strings to display the choices
:param header: a string to display as the header of the prompt
:param footer: None or a string to display after the options but before the input
:param add_none: add None as first element of the list?
:param aliases: a list of aliases to display instead of the options themselves
:return: None or the selected item from options
"""
# sanity check
if len(options) == 0:
return None
if aliases is not None:
assert len(aliases) >= len(options)
# add None as the first element
if add_none:
options.insert(0, None)
if aliases is not None:
aliases.insert(0, 'None')
# Flush the log handlers (including the one for the console) so we don't get interrupted
for handler in self.log.handlers:
handler.flush()
# write our prompt
sys.stderr.write('\n')
sys.stderr.write('--- ### {} ###\n'.format(header))
sys.stderr.write('--- You can cancel the selection with Ctrl+C.\n')
for n, item in enumerate(options):
if aliases is None:
# display the content of this option
sys.stderr.write('--- {:2d} – {}\n'.format(n, item))
else:
# display the alias for this option
sys.stderr.write('--- {:2d} – {}\n'.format(n, aliases[n]))
if footer is not None:
sys.stderr.write('--- {}\n'.format(footer))
sys.stderr.write('\n')
# redirect stdout to stderr to have the output from input() on stderr as well
sys.stdout = sys.stderr
# evaluate our input and give the user some extra attempts if needed
try:
mistakes = 2
selection = input('--- enter the number of your selection: ')
while True:
try:
selection = int(selection)
if 0 <= selection < len(options):
# valid input received, restore sys.stdout and return the result
sys.stdout = sys.__stdout__
return options[selection]
except ValueError:
pass
sys.stderr.write('--- invalid input!\n')
if mistakes <= 0:
sys.stdout = sys.__stdout__
return None
selection = input('--- try again ({}): '.format(mistakes))
mistakes -= 1
except KeyboardInterrupt:
sys.stdout = sys.__stdout__
return None
def get_external_ip(self):
""" Return the IP of we expose to the other devices in the same local network. """
with IPDB(mode='implicit') as ipdb:
# get all IPs bound to the interface
addrs = ipdb.interfaces[self.settings['local_machine']['ifname']].ipaddr.ipv4
# check if there is not exactly one IP bound to the interface and warn if so
if len(addrs) != 1:
self.log.log(
WARNING,
'there are {:d} IPs (IPv4) bound to the interface, exactly only one is expected'.format(len(addrs))
)
if len(addrs) > 1:
self.log.log(WARNING, 'using the first one')
elif self.settings['local_machine']['ip']:
self.log.log(WARNING, 'using the statically set one')
return self.settings['local_machine']['ip']
else:
self.log.log(WARNING, 'no correction possible, returning None')
return None
# return the first member of the first (and only) address (the actual IP, the second one is the netmask)
return addrs[0]['address']
def _update_ntp_offset(self, deactivate_if_unavailable=False):
# do nothing if the NTP server is disabled
if self.settings['time_sync']['ntp_ip'] is None:
return False
try:
ntp_client = ntplib.NTPClient()
ntp_response = ntp_client.request(self.settings['time_sync']['ntp_ip'])
self.utc_offset = ntp_response.tx_timestamp - time.time()
self.log.log(
DEBUG_V,
'query to the NTP server was successful, NTP timestamp: {:f}. offset of time.time() to UTC: {:f}'.format(
ntp_response.tx_timestamp,
self.utc_offset
)
)
return True
except ntplib.NTPException as e:
if self.settings['time_sync']['ntp_require']:
# we absolutely need the NTP server, raise the exception again
raise e
# write a warning
self.log.log(WARNING, 'could not connect to the NTP server, message: ' + str(e))
# deactivate the queries to the NTP server
if deactivate_if_unavailable:
self.settings['time_sync']['ntp_ip'] = None
self.log.log(
INFO,
'queries to the NTP server have been deactivated, the timestamps will not have the current UTC time '
'but will be accurate relative to the other timestamps during this run')
return False
def safely_call(self, func: Callable, *args, **kwargs):
"""
A wrapper for a function to catch any exception and log it. It does filer out Ctrl + C.
Call it like a normal function but the first argument has to be the function you actually want to call.
Don't overuse this, this is only intended to be used on the start() and loop() in this class.
Usage: safely_call(function, positional_arg_1, kw1=arg, positional_arg_2, kw2=arg)
The order or position of keyword arguments does not matter.
:param func: the function you want to call safely
:param args:
:param kwargs:
:return: this returns None on errors or what the called function returned
"""
# the following comment stops pycharm from nagging about a bare 'except' being too broad
# noinspection PyBroadException
try:
self.log.log(DEBUG_VV, 'safely calling {} (args: {}, kwargs: {})'.format(func, args, kwargs))
return func(*args, **kwargs)
except KeyboardInterrupt:
# CTRL + C pressed, write calibration, cleanup and terminate
if self.settings['calibration']['active']:
self.write_calibration()
self.cleanup()
return None
except Exception:
# log the error messages and do a simple console print for better visibility
print('----------ERROR----------')
self.log.log(ERROR, str(sys.exc_info()[0]) + ': ' + str(sys.exc_info()[1]))
stack_ = ''
for stack_line_ in traceback.format_tb(sys.exc_info()[2]):
stack_ += stack_line_
self.log.log(ERROR, 'stacktrace:\n' + stack_)
self.cleanup()
return None
if __name__ == '__main__':
main = MRKeyboard()
main.safely_call(main.start)
main.safely_call(main.loop)