# pylint: disable=missing-function-docstring
import logging as log
import os
from time import time
import numpy as np
from ADwin import ADwinError
from scipy import signal
from . import helpers, settings
[docs]class MockADwin: # pylint: disable=too-many-instance-attributes
"""Mock class for testing and demonstration of nqontrol"""
NUM_SERVOS = 8
def __init__(self, DeviceNo=0, raiseError=False):
self.DeviceNo = DeviceNo
self._par = [0] * 80
self._fpar = [0] * 80
self._running = [0] * 10
self._data_double = [[]] * 200
self._data_long = [[]] * 200
self.ADwindir = "mock"
self._boot_time = time()
self._last_read = 0
self._n_last = 0
self._raiseError = False
self.reset_trigger = True
self._raiseError = raiseError
# init offset and gain
self._data_double[settings.DATA_OFFSETGAIN - 1] = [1.0] * self.NUM_SERVOS + [
0.0
] * self.NUM_SERVOS
# init filter coefficients
self._data_double[settings.DATA_FILTERCOEFFS - 1] = (
[1.0, 0.0, 0.0, 0.0, 0.0]
* settings.NUMBER_OF_FILTERS
* settings.NUMBER_OF_SOS
)
self._data_long[settings.DATA_LAST_OUTPUT - 1] = [0] * self.NUM_SERVOS
self._data_long[settings.DATA_MONITORS - 1] = [0] * self.NUM_SERVOS
# autolock parameters
self._data_double[settings.DATA_LOCK - 1] = [0.0] * (self.NUM_SERVOS * 3)
self._data_double[settings.DATA_LOCK - 1][0:8] = [0.0] * 8 # thresholds
self._data_double[settings.DATA_LOCK - 1][8:16] = [0.0] * 8 # threshold breaks
self._data_double[settings.DATA_LOCK - 1][16:24] = [1.0] * 8 # amplitudes
self._data_double[settings.DATA_LOCK - 1][24:32] = [0x8000] * 8 # offsets
# helper field, exposes the lockIterator
self._data_double[settings.DATA_LOCK_ITER - 1] = [0] * self.NUM_SERVOS
# helper field, exposes recent lock output as test value
self._data_long[settings.DATA_LOCK_OUTPUT - 1] = [0] * self.NUM_SERVOS
self._data_double[settings.DATA_LOCK_STEPSIZE - 1] = [
helpers.convertFrequency2Stepsize(settings.RAMP_FREQUENCY_MIN)
] * self.NUM_SERVOS
# internal field, direction of the lock step, either +1 or -1
self._lockStepSign = [1] * self.NUM_SERVOS
self._lockPosition = [0.0] * self.NUM_SERVOS
self._lockBuffer = [0.0] * self.NUM_SERVOS
# time difference
self.Set_Par(settings.PAR_TIMEDIFF, 3000)
self.Set_Par(settings.PAR_FIFOSTEPSIZE, 1)
[docs] def Boot(self, file_): # pylint: disable=unused-argument
self._boot_time = time()
[docs] def Workload(self): # pylint: disable=no-self-use
return 42
[docs] def Process_Status(self, no):
return self._running[no - 1]
[docs] def Load_Process(self, process): # pylint: disable=no-self-use
assert os.path.isfile(
process
), f"The ADwin process file '{process}' has to exist."
[docs] def Start_Process(self, no):
self._running[no - 1] = 1
[docs] def Set_FPar(self, index, par):
self._fpar[index - 1] = float(par)
[docs] def Set_Par(self, index, par):
if self._raiseError:
raise ADwinError("test", "Test Error", 13)
if index == 3 and par != 0:
self._last_read = time()
self._par[index - 1] = int(par)
[docs] def Get_Par(self, index):
self._par_special_functions()
return self._par[index - 1]
[docs] def Get_FPar(self, index):
self._fpar_special_functions()
return self._fpar[index - 1]
[docs] def GetData_Double(self, DataNo, Startindex, Count):
self._data_double_special_functions(DataNo)
try:
if DataNo == settings.DATA_LOCK:
log.debug(
f"lock data values where accessed, are: {self._data_double[settings.DATA_LOCK - 1]}"
)
return list(
self._data_double[DataNo - 1][Startindex - 1 : Startindex + Count - 1]
)
except IndexError as e:
raise IndexError(
f"An index error occured: {e}. The relevant indices were startindex {Startindex} and DataNo {DataNo}. The length of the target array was {len(self._data_double[DataNo - 1])}."
)
def _data_double_special_functions(self, no):
if no == settings.DATA_LOCK_ITER:
lcr = self._par[settings.PAR_LCR - 1]
for i in range(1, 9):
indexoffset = i - 1
search = helpers.readBit(lcr, indexoffset)
locked = helpers.readBit(lcr, indexoffset + 8)
if not search and not locked:
self.SetData_Double([-2.0], settings.DATA_LOCK_ITER, i, 1)
else:
# lock stuff
overshoot = self._data_double[settings.DATA_LOCK_STEPSIZE - 1][
i - 1
]
out = np.random.uniform(-1 - overshoot, 1 + overshoot)
self.SetData_Double([out], settings.DATA_LOCK_ITER, i, 1)
[docs] def SetData_Double(self, Data, DataNo, Startindex, Count):
try:
self._data_double[DataNo - 1][
Startindex - 1 : Startindex + Count - 1
] = Data
except IndexError as e:
raise IndexError(
f"An index error occured: {e}. The relevant indices were startindex {Startindex} and DataNo {DataNo}. The length of the target array was {len(self._data_double[DataNo - 1])}."
)
[docs] def GetData_Long(self, DataNo, Startindex, Count):
self._data_long_special_functions(DataNo)
try:
return list(
self._data_long[DataNo - 1][Startindex - 1 : Startindex - 1 + Count]
)
except IndexError as e:
raise IndexError(
f"An index error occured: {e}. The relevant indices were startindex {Startindex} and DataNo {DataNo}. The length of the target array was {len(self._data_long[DataNo - 1])}."
)
[docs] def SetData_Long(self, Data, DataNo, Startindex, Count):
try:
self._data_long[DataNo - 1][Startindex - 1 : Startindex + Count - 1] = Data
except IndexError as e:
raise IndexError(
f"An index error occured: {e}. The relevant indices were startindex {Startindex} and DataNo {DataNo}. The length of the target array was {len(self._data_long[DataNo - 1])}."
)
def _data_long_special_functions(self, no):
if no == settings.DATA_LAST_OUTPUT:
for i in range(1, 9):
if self._isRamp(i):
out = np.random.randint(
0, 2 ** 16
) # some arbitrary number in the correct range, only for temp feedback
else:
out = 2 ** 15
self.SetData_Long([out], settings.DATA_LAST_OUTPUT, i, 1)
[docs] def Fifo_Full(self, index): # pylint: disable=unused-argument
diff = time() - self._last_read
fifoStepsize = self.Get_Par(6)
n = int(diff * settings.SAMPLING_RATE / fifoStepsize)
n = n + self._n_last
if n > settings.FIFO_BUFFER_SIZE:
n = settings.FIFO_BUFFER_SIZE
self._n_last = n
return int(n)
def _isRamp(self, i):
lcr = self._par[settings.PAR_LCR - 1]
indexoffset = i - 1
search = helpers.readBit(lcr, indexoffset)
return bool(search)
def _readSwitches(self, channel):
c = self.Get_Par(10 + channel)
# read control bits
auxSw = helpers.readBit(c, 9)
offsetSw = helpers.readBit(c, 2)
outputSw = helpers.readBit(c, 1)
inputSw = helpers.readBit(c, 0)
return inputSw, offsetSw, auxSw, outputSw
@staticmethod
def _limitOutput(output):
# Limit the output to 16bit
output[output > 0xFFFF] = 0xFFFF
output[output < 0] = 0
return output
def _frequency(self, channel):
return helpers.convertStepsize2Frequency(
self._data_double[settings.DATA_LOCK_STEPSIZE - 1][channel - 1]
)
@staticmethod
def _sawtooth(amp, freq, amount):
# due to how the sawtooth signal is constructed, it's normally going from -1 to 1
# so we got to divide by 2 so we can cleanly multiply by amplitude!
# we also want it to start at 0, thus the +1
# and in order to get upwards and downwards, the sawtooth function needs to be passed the 0.5 as extra parameter
# the - 0.1 in the sawtooth is just a little phase offset
t = np.linspace(0, amount / 1000 * 1, amount)
return amp * (signal.sawtooth(2 * np.pi * freq * (t - 0.1), 0.5))
def _constructOutput(self, amount, input_, aux):
channel = self.Get_Par(settings.PAR_ACTIVE_CHANNEL)
inputSw, offsetSw, auxSw, outputSw = self._readSwitches(channel)
# lock stuff
lock = helpers.readBit(self._par[settings.PAR_LCR - 1], (channel - 1))
locked = helpers.readBit(self._par[settings.PAR_LCR - 1], (channel - 1) + 8)
amplitude = self._data_double[settings.DATA_LOCK - 1][channel - 1 + 16]
offset = self._data_double[settings.DATA_LOCK - 1][channel - 1 + 24]
if lock and not locked:
# apply possible overflows on search start and search end (if iterator stepsize is bigger than +-1)
log.debug(
f"amplitude {amplitude} freq {self._frequency(channel)} amount {amount}"
)
output = self._sawtooth(amplitude, self._frequency(channel), amount)
# uncomment for testing: np.savetxt("foo.csv", output, delimiter=",")
# for some reason python cannot find peaks
log.debug(f"constructed output of search state {output}")
log.debug(
f"minimum and maximum in volts {np.min(output)}, {np.max(output)}"
)
output = helpers.convertVolt2Float(output, signed=False) + offset - 0x8000
log.debug(
f"minimum and maximum as float {np.min(output)}, {np.max(output)}"
)
log.debug(f"offset in search state (as float) {offset}")
elif locked:
output = np.full(amount, self._lockPosition[channel - 1]).astype(int)
else:
output = np.full(amount, 0x8000).astype(int)
if inputSw:
output = input_
if offsetSw:
offset = self.GetData_Double(settings.DATA_OFFSETGAIN, channel + 8, 1)
output = output + offset
if outputSw:
output = (output - 0x8000) * self.GetData_Double(
settings.DATA_OFFSETGAIN, channel, 1
) + 0x8000
else:
output = np.full(amount, 0x8000).astype(int)
if auxSw:
output = output + (aux - 0x8000)
return output
return self._limitOutput(output)
@staticmethod
def _extract_value(par, offset=0):
shifted = np.right_shift(par, offset)
return np.bitwise_and(shifted, 0xFF)
[docs] def GetFifo_Double(self, index, amount):
"""GetFifo_Double returns a list of the fifo buffer.
Parameters
----------
index :
index of the list
amount :
number of entries
Returns
-------
list
"""
assert index == settings.DATA_FIFO, f"Index has to be {settings.DATA_FIFO}."
amount = int(amount)
# Creating random data
input_ = np.random.normal(0x8000, 10, size=amount).astype(int)
aux = np.random.normal(0x8000, 10, size=amount).astype(int)
output = self._constructOutput(amount, input_, aux)
# concatenating bits
crunch = np.left_shift(input_, 32)
crunch = np.add(crunch, np.left_shift(aux, 16))
crunch = np.add(crunch, output)
# setting last readout time to current
self._last_read = time()
self._n_last = self._n_last - amount
return crunch
[docs] def Get_Processdelay(self, index): # pylint: disable=no-self-use,unused-argument
return 1e9 / settings.SAMPLING_RATE
def _par_special_functions(self):
self._par[0] = time() - self._boot_time # Timestamp
if self.reset_trigger:
self._par[1] = 0 # Resetting trigger
else:
self._par[1] = 0xFFFF # Not resetting trigger
self._auto_lock()
def _auto_lock(
self,
): # pylint: disable=too-many-statements, too-many-locals, too-many-branches
# locking emulation
aux = 0x8000
data = self._data_double[settings.DATA_LOCK - 1]
for servo in range(1, 9):
indexoffset = servo - 1
lcr = self._par[settings.PAR_LCR - 1]
gcr = self._par[settings.PAR_GCR - 1]
search = helpers.readBit(lcr, servo - 1)
locked = helpers.readBit(lcr, servo - 1 + 8)
relock = helpers.readBit(lcr, servo - 1 + 16)
greater = helpers.readBit(gcr, (servo - 1))
rampmode = helpers.readBit(gcr, self.NUM_SERVOS + (servo - 1))
threshold = data[servo - 1]
threshold_break = data[servo - 1 + 8]
amplitude = data[servo - 1 + 16]
offset = data[servo - 1 + 24]
if self._lockBuffer[servo - 1] > 0:
self._lockBuffer[
servo - 1
] = 0 # we dont really have a cycle counting down as on ADwin, where this line is just lockBuffer -= 1 basically
if not search and not locked:
# not searching, lock iter
self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1] = -2.0
if search:
log.debug(
f"using aux {helpers.convertFloat2Volt(aux, self._par[settings.PAR_SENSITIVITY - 1] >> servo * 2 + 14 & 3, False)}"
)
# ensure locked is 0 when searching
self._par[settings.PAR_LCR - 1] = helpers.clearBit(
self._par[settings.PAR_LCR - 1], indexoffset + 8
)
if self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1] == -2.0:
self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1] = 0.0
# turn off input, output, aux
c = self._par[10 + (servo - 1)]
# clear control bits
c = helpers.clearBit(c, 0) # input
c = helpers.clearBit(c, 1) # output
c = helpers.clearBit(c, 9) # aux
self._par[10 + (servo - 1)] = c
# technically filter history is cleared here, but its not implemented on mockADwin
# a lock is found
if not rampmode and (
(greater and aux > threshold) or ((aux < threshold) and not greater)
):
log.warning("found lock ")
log.debug(
f"lcr {self._par[settings.PAR_LCR - 1]} gcr {self._par[settings.PAR_GCR - 1]} offset {indexoffset}"
)
# set locked
self._par[settings.PAR_LCR - 1] = helpers.setBit(
self._par[settings.PAR_LCR - 1], indexoffset + 8
)
# set not searching (search = 0)
self._par[settings.PAR_LCR - 1] = helpers.clearBit(
self._par[settings.PAR_LCR - 1], indexoffset
)
c = self._par[10 + (servo - 1)]
c = helpers.setBit(c, 0) # enable input
c = helpers.setBit(c, 1) # enable output
# set the lockBuffer to 10000 cycles (on ADwin this is .1 seconds)
# please note this wont really work, we just reset this to 0 right away whenever we invoke this function
self._lockBuffer[servo - 1] = 10000
self._par[10 + (servo - 1)] = c
else: # searching for lock
if self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1] <= -1:
self._lockStepSign[servo - 1] = 1
elif self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1] >= 1:
self._lockStepSign[servo - 1] = -1
# iter = iter + stepsign * step
self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1] = (
self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1]
+ self._lockStepSign[servo - 1]
* self._data_double[settings.DATA_LOCK_STEPSIZE - 1][servo - 1]
)
c = self._par[10 + (servo - 1)]
# clear control bits
c = helpers.clearBit(c, 0) # input
c = helpers.clearBit(c, 1) # output
self._par[10 + (servo - 1)] = c
self._lockPosition[servo - 1] = (
amplitude
* self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1]
* 0x8000
/ 10
+ offset
)
if locked:
# if lock fails, set locked to 0, but only if the buffer phase is already over
# we know this isnt pretty, but it's the same on ADwin because ADbasic is terrible, so shut up PYLINT!
if ( # pylint: disable=too-many-boolean-expressions
(greater and (aux < threshold_break))
or (not greater and (aux > threshold_break))
and self._lockBuffer[servo - 1] == 0
) or rampmode:
log.warning("breaking lock")
if relock or rampmode:
self._par[settings.PAR_LCR - 1] = helpers.setBit(
self._par[settings.PAR_LCR - 1], indexoffset
) # activate lock search again
else:
self._data_double[settings.DATA_LOCK_ITER - 1][servo - 1] = -2.0
# clear locked bit
self._par[settings.PAR_LCR - 1] = helpers.clearBit(
self._par[settings.PAR_LCR - 1], indexoffset + 8
)
# normally the actual output signal would be added, but in test mode we always get perfect lock, eh
self._data_long[settings.DATA_LOCK_OUTPUT - 1][
servo - 1
] = self._lockPosition[servo - 1]
def _fpar_special_functions(self):
pass