eive-tmtc/eive_tmtc/tmtc/acs/reaction_wheels.py

412 lines
14 KiB
Python
Raw Normal View History

2021-06-25 12:07:16 +02:00
# -*- coding: utf-8 -*-
2022-05-05 01:21:57 +02:00
"""reaction_wheels.py
@brief Tests for the reaction wheel handler
2021-06-25 12:07:16 +02:00
@author J. Meier
@date 20.06.2021
2021-06-25 12:07:16 +02:00
"""
2023-02-17 20:00:46 +01:00
import enum
2021-06-28 14:08:04 +02:00
import struct
2022-10-18 13:26:32 +02:00
from typing import List
2022-07-04 15:22:53 +02:00
2022-11-29 16:53:29 +01:00
from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.config.object_ids import RW1_ID, RW2_ID, RW3_ID, RW4_ID
from tmtccmd.config import TmtcDefinitionWrapper, OpCodeEntry
from tmtccmd.config.tmtc import tmtc_definitions_provider
2022-08-08 16:32:18 +02:00
from tmtccmd.tc import DefaultPusQueueHelper
2022-05-10 18:34:15 +02:00
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
generate_one_diag_command,
make_sid,
2022-10-18 14:08:12 +02:00
enable_periodic_hk_command_with_interval,
2022-10-18 14:15:38 +02:00
disable_periodic_hk_command,
2022-05-10 18:34:15 +02:00
)
2021-10-01 10:55:56 +02:00
from spacepackets.ecss.tc import PusTelecommand
2023-01-31 12:56:13 +01:00
from tmtccmd.tc.pus_200_fsfw_mode import pack_mode_data, Mode, Subservice
2022-11-29 16:53:29 +01:00
from eive_tmtc.config.definitions import CustomServiceList
2022-10-18 13:26:32 +02:00
from tmtccmd.util import ObjectIdU32
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
2022-05-05 01:21:57 +02:00
2023-01-16 14:13:06 +01:00
class OpCodesDev:
2022-05-05 01:21:57 +02:00
SPEED = ["0", "speed"]
ON = ["1", "on"]
NML = ["2", "nml"]
OFF = ["3", "off"]
2022-05-10 18:34:15 +02:00
GET_STATUS = ["4", "status"]
GET_TM = ["5", "tm"]
2022-10-18 14:08:12 +02:00
ENABLE_STATUS_HK = ["6", "enable_status_hk"]
DISABLE_STATUS_HK = ["7", "disable_status_hk"]
2022-05-05 01:21:57 +02:00
2023-01-16 14:13:06 +01:00
class InfoDev:
2022-05-05 01:21:57 +02:00
SPEED = "Set speed"
ON = "Set On"
NML = "Set Normal"
OFF = "Set Off"
2022-05-10 18:34:15 +02:00
GET_STATUS = "Get Status HK"
2022-05-05 01:21:57 +02:00
GET_TM = "Get TM HK"
2022-10-18 14:08:12 +02:00
ENABLE_STATUS_HK = "Enable Status HK"
DISABLE_STATUS_HK = "Disable Status HK"
2022-05-05 01:21:57 +02:00
class OpCodesAss:
ON = ["0", "on"]
NML = ["1", "nml"]
OFF = ["2", "off"]
2022-10-18 13:32:22 +02:00
ALL_SPEED_UP = ["3", "speed_up"]
ALL_SPEED_OFF = ["4", "speed_off"]
2022-05-05 01:21:57 +02:00
class InfoAss:
ON = "Mode On: 3/4 RWs min. on"
NML = "Mode Normal: 3/4 RWs min. normal"
OFF = "Mode Off: All RWs off"
2022-10-18 13:32:22 +02:00
ALL_SPEED_UP = "Speed up consecutively"
ALL_SPEED_OFF = "Speed down to 0"
2021-06-25 12:07:16 +02:00
2023-02-17 20:00:46 +01:00
class RwSetId(enum.IntEnum):
2021-06-25 12:07:16 +02:00
STATUS_SET_ID = 4
TEMPERATURE_SET_ID = 8
2022-05-05 01:21:57 +02:00
LAST_RESET = 2
TM_SET = 9
2021-06-25 12:07:16 +02:00
2023-01-16 14:13:06 +01:00
class RwCommandId:
2021-06-25 12:07:16 +02:00
RESET_MCU = bytearray([0x0, 0x0, 0x0, 0x01])
# Reads status information from reaction wheel into dataset with id 4
GET_RW_STATUS = bytearray([0x0, 0x0, 0x0, 0x04])
2021-06-28 14:08:04 +02:00
INIT_RW_CONTROLLER = bytearray([0x0, 0x0, 0x0, 0x05])
2021-06-25 12:07:16 +02:00
SET_SPEED = bytearray([0x0, 0x0, 0x0, 0x06])
# Reads temperature from reaction wheel into dataset with id 8
GET_TEMPERATURE = bytearray([0x0, 0x0, 0x0, 0x08])
2021-06-30 15:08:43 +02:00
GET_TM = bytearray([0x0, 0x0, 0x0, 0x09])
2021-06-25 12:07:16 +02:00
class SpeedDefinitions:
RPM_100 = 1000
RPM_5000 = 5000
class RampTime:
2021-06-25 15:25:22 +02:00
MS_1000 = 1000
2021-06-25 12:07:16 +02:00
@tmtc_definitions_provider
def add_rw_cmds(defs: TmtcDefinitionWrapper):
2022-07-05 02:12:54 +02:00
oce = OpCodeEntry()
2023-01-16 14:13:06 +01:00
oce.add(info=InfoDev.SPEED, keys=OpCodesDev.SPEED)
oce.add(info=InfoDev.ON, keys=OpCodesDev.ON)
oce.add(info=InfoDev.OFF, keys=OpCodesDev.OFF)
oce.add(info=InfoDev.NML, keys=OpCodesDev.NML)
oce.add(info=InfoDev.GET_STATUS, keys=OpCodesDev.GET_STATUS)
oce.add(info=InfoDev.GET_TM, keys=OpCodesDev.GET_TM)
oce.add(info=InfoDev.ENABLE_STATUS_HK, keys=OpCodesDev.ENABLE_STATUS_HK)
oce.add(info=InfoDev.DISABLE_STATUS_HK, keys=OpCodesDev.DISABLE_STATUS_HK)
2022-07-05 02:12:54 +02:00
defs.add_service(
2022-05-05 01:21:57 +02:00
name=CustomServiceList.REACTION_WHEEL_1.value,
info="Reaction Wheel 1",
2022-07-05 02:12:54 +02:00
op_code_entry=oce,
2022-05-05 01:21:57 +02:00
)
2022-07-05 02:12:54 +02:00
defs.add_service(
2022-05-05 01:21:57 +02:00
name=CustomServiceList.REACTION_WHEEL_2.value,
info="Reaction Wheel 2",
2022-07-05 02:12:54 +02:00
op_code_entry=oce,
2022-05-05 01:21:57 +02:00
)
2022-07-05 02:12:54 +02:00
defs.add_service(
2022-05-05 01:21:57 +02:00
name=CustomServiceList.REACTION_WHEEL_3.value,
info="Reaction Wheel 3",
2022-07-05 02:12:54 +02:00
op_code_entry=oce,
2022-05-05 01:21:57 +02:00
)
2022-07-05 02:12:54 +02:00
defs.add_service(
2022-05-05 01:21:57 +02:00
name=CustomServiceList.REACTION_WHEEL_4.value,
info="Reaction Wheel 4",
2022-07-05 02:12:54 +02:00
op_code_entry=oce,
2022-05-05 01:21:57 +02:00
)
2022-07-05 02:12:54 +02:00
oce = OpCodeEntry()
oce.add(info=InfoAss.ON, keys=OpCodesAss.ON)
oce.add(info=InfoAss.NML, keys=OpCodesAss.NML)
oce.add(info=InfoAss.OFF, keys=OpCodesAss.OFF)
2022-10-18 15:41:41 +02:00
oce.add(info=InfoAss.ALL_SPEED_UP, keys=OpCodesAss.ALL_SPEED_UP)
oce.add(info=InfoAss.ALL_SPEED_OFF, keys=OpCodesAss.ALL_SPEED_OFF)
2022-07-05 02:12:54 +02:00
defs.add_service(
2022-05-05 01:21:57 +02:00
name=CustomServiceList.RW_ASSEMBLY.value,
info="Reaction Wheel Assembly",
2022-07-05 02:12:54 +02:00
op_code_entry=oce,
2022-05-05 01:21:57 +02:00
)
2022-01-18 14:03:56 +01:00
def pack_single_rw_test_into(
2022-08-08 16:32:18 +02:00
object_id: bytes, rw_idx: int, q: DefaultPusQueueHelper, op_code: str
2022-07-04 17:59:09 +02:00
):
2023-01-16 14:13:06 +01:00
if op_code in OpCodesDev.SPEED:
2022-10-18 15:41:41 +02:00
speed, ramp_time = prompt_speed_ramp_time()
2022-10-18 14:15:38 +02:00
q.add_log_cmd(
2023-01-16 14:13:06 +01:00
f"RW {rw_idx}: {InfoDev.SPEED} with target "
2022-10-18 14:15:38 +02:00
f"speed {speed / 10.0} RPM and {ramp_time} ms ramp time"
)
2022-07-04 15:22:53 +02:00
q.add_pus_tc(pack_set_speed_command(object_id, speed, ramp_time))
2021-06-25 12:07:16 +02:00
2023-01-16 14:13:06 +01:00
if op_code in OpCodesDev.ON:
q.add_log_cmd(f"RW {rw_idx}: {InfoDev.ON}")
2023-01-16 15:05:33 +01:00
mode_data = pack_mode_data(object_id, Mode.ON, 0)
2022-07-04 15:22:53 +02:00
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
2021-06-29 16:10:03 +02:00
2023-01-16 14:13:06 +01:00
if op_code in OpCodesDev.NML:
q.add_log_cmd(f"RW {rw_idx}: {InfoDev.NML}")
2023-01-16 15:05:33 +01:00
mode_data = pack_mode_data(object_id, Mode.NORMAL, 0)
2022-07-04 15:22:53 +02:00
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
2021-06-28 14:08:04 +02:00
2023-01-16 14:13:06 +01:00
if op_code in OpCodesDev.OFF:
q.add_log_cmd(f"RW {rw_idx}: {InfoDev.OFF}")
2023-01-16 15:05:33 +01:00
mode_data = pack_mode_data(object_id, Mode.OFF, 0)
2022-07-04 15:22:53 +02:00
q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=mode_data))
2021-06-30 15:08:43 +02:00
2023-01-16 14:13:06 +01:00
if op_code in OpCodesDev.GET_TM:
q.add_log_cmd(f"RW {rw_idx}: {InfoDev.GET_TM}")
2022-07-04 15:22:53 +02:00
q.add_pus_tc(
generate_one_hk_command(
2023-01-16 14:13:06 +01:00
sid=make_sid(object_id=object_id, set_id=RwSetId.TM_SET)
2022-07-04 15:22:53 +02:00
)
2022-01-18 14:03:56 +01:00
)
2023-01-16 14:13:06 +01:00
if op_code in OpCodesDev.GET_STATUS:
q.add_log_cmd(f"RW {rw_idx}: {InfoDev.GET_STATUS}")
2022-07-04 15:22:53 +02:00
q.add_pus_tc(
generate_one_diag_command(
2023-01-16 14:13:06 +01:00
sid=make_sid(object_id=object_id, set_id=RwSetId.STATUS_SET_ID)
2022-07-04 15:22:53 +02:00
)
2022-05-10 18:34:15 +02:00
)
2023-01-16 14:13:06 +01:00
if op_code in OpCodesDev.ENABLE_STATUS_HK:
q.add_log_cmd(f"RW {rw_idx}: {InfoDev.ENABLE_STATUS_HK}")
2022-10-18 14:08:12 +02:00
interval = float(input("Please enter HK interval in floating point seconds: "))
cmds = enable_periodic_hk_command_with_interval(
2023-01-16 14:13:06 +01:00
True, make_sid(object_id, RwSetId.STATUS_SET_ID), interval
2022-10-18 14:08:12 +02:00
)
for cmd in cmds:
q.add_pus_tc(cmd)
2023-01-16 14:13:06 +01:00
if op_code in OpCodesDev.DISABLE_STATUS_HK:
q.add_log_cmd(f"RW {rw_idx}: {InfoDev.DISABLE_STATUS_HK}")
2022-10-18 14:15:38 +02:00
q.add_pus_tc(
disable_periodic_hk_command(
2023-01-16 14:13:06 +01:00
True, make_sid(object_id, RwSetId.STATUS_SET_ID)
2022-10-18 14:15:38 +02:00
)
)
2021-06-25 12:07:16 +02:00
2022-08-08 16:32:18 +02:00
def pack_rw_ass_cmds(q: DefaultPusQueueHelper, object_id: bytes, op_code: str):
2022-05-05 01:21:57 +02:00
if op_code in OpCodesAss.OFF:
2023-01-16 15:05:33 +01:00
data = pack_mode_data(object_id=object_id, mode=Mode.OFF, submode=0)
2022-07-04 17:59:09 +02:00
q.add_pus_tc(
PusTelecommand(
2023-01-17 18:37:16 +01:00
service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=data
2022-07-04 17:59:09 +02:00
)
2022-05-10 18:34:15 +02:00
)
2022-05-05 01:21:57 +02:00
if op_code in OpCodesAss.ON:
2023-01-16 15:05:33 +01:00
data = pack_mode_data(object_id=object_id, mode=Mode.ON, submode=0)
2022-07-04 17:59:09 +02:00
q.add_pus_tc(
PusTelecommand(
2023-01-17 18:37:16 +01:00
service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=data
2022-07-04 17:59:09 +02:00
)
2022-05-10 18:34:15 +02:00
)
2022-05-05 01:21:57 +02:00
if op_code in OpCodesAss.NML:
2023-01-16 15:05:33 +01:00
data = pack_mode_data(object_id=object_id, mode=Mode.NORMAL, submode=0)
2022-07-04 17:59:09 +02:00
q.add_pus_tc(
PusTelecommand(
2023-01-17 18:37:16 +01:00
service=200, subservice=Subservice.TC_MODE_COMMAND, app_data=data
2022-07-04 17:59:09 +02:00
)
2022-05-10 18:34:15 +02:00
)
2022-10-18 13:32:22 +02:00
if op_code in OpCodesAss.ALL_SPEED_UP:
2022-10-18 15:41:41 +02:00
speed, ramp_time = prompt_speed_ramp_time()
rw_speed_up_cmd_consec(q, [RW1_ID, RW2_ID, RW3_ID, RW4_ID], speed, ramp_time)
2022-10-18 13:32:22 +02:00
if op_code in OpCodesAss.ALL_SPEED_OFF:
2022-10-18 15:41:41 +02:00
rw_speed_down_cmd_consec(
q, [RW1_ID, RW2_ID, RW3_ID, RW4_ID], prompt_ramp_time()
)
def prompt_speed_ramp_time() -> (int, int):
speed = int(
input("Specify speed [0.1 RPM, 0 or range [-65000, -1000] and [1000, 65000]: ")
)
return speed, prompt_ramp_time()
def prompt_ramp_time() -> int:
return int(input("Specify ramp time [ms, range [10, 20000]]: "))
2022-05-05 01:21:57 +02:00
2022-05-25 11:10:57 +02:00
def pack_set_speed_command(
2022-07-04 15:22:53 +02:00
object_id: bytes, speed: int, ramp_time_ms: int
2022-05-25 11:10:57 +02:00
) -> PusTelecommand:
2022-01-18 14:03:56 +01:00
"""With this function a command is packed to set the speed of a reaction wheel
2022-05-05 02:00:18 +02:00
:param object_id: The object id of the reaction wheel handler.
2022-10-18 14:08:12 +02:00
:param speed: Valid speeds are 0, [-65000, -1000] and [1000, 65000]. Values are
2022-05-05 01:21:57 +02:00
specified in 0.1 * RPM
2022-05-25 15:31:45 +02:00
:param ramp_time_ms: The time after which the reaction wheel will reach the commanded speed.
2022-10-18 14:08:12 +02:00
Valid times are 10 - 20000 ms
2021-06-25 12:07:16 +02:00
"""
2022-05-25 15:31:45 +02:00
if speed > 0:
if speed < 1000 or speed > 65000:
raise ValueError(
"Invalid RW speed specified. "
"Allowed range is [1000, 65000] 0.1 * RPM"
)
2022-06-02 18:51:40 +02:00
elif speed < 0:
2022-05-25 15:31:45 +02:00
if speed < -65000 or speed > -1000:
raise ValueError(
"Invalid RW speed specified. "
"Allowed range is [-65000, -1000] 0.1 * RPM"
)
2022-06-02 18:51:40 +02:00
else:
# Speed is 0
pass
2022-05-25 15:31:45 +02:00
if ramp_time_ms < 0 or (
2022-10-18 13:36:27 +02:00
ramp_time_ms > 0 and (ramp_time_ms > 20000 or ramp_time_ms < 10)
2022-05-25 15:31:45 +02:00
):
2022-10-18 13:36:27 +02:00
raise ValueError("Invalid Ramp Speed time. Allowed range is [10-20000] ms")
2023-01-16 14:13:06 +01:00
command_id = RwCommandId.SET_SPEED
2021-06-25 12:07:16 +02:00
command = bytearray()
2022-05-05 01:21:57 +02:00
command += object_id + command_id
2022-01-18 14:03:56 +01:00
command = command + struct.pack("!i", speed)
2022-05-25 15:31:45 +02:00
command = command + ramp_time_ms.to_bytes(length=2, byteorder="big")
2022-07-04 15:22:53 +02:00
command = PusTelecommand(service=8, subservice=128, app_data=command)
2021-06-25 12:07:16 +02:00
return command
2022-10-18 13:26:32 +02:00
def handle_rw_hk_data(
printer: FsfwTmTcPrinter, object_id: ObjectIdU32, set_id: int, hk_data: bytes
):
pw = PrintWrapper(printer)
current_idx = 0
2023-01-16 14:13:06 +01:00
if set_id == RwSetId.STATUS_SET_ID:
2022-10-18 13:26:32 +02:00
pw.dlog(
f"Received Status HK (ID {set_id}) from Reaction Wheel {object_id.name}"
)
fmt_str = "!IiiBB"
inc_len = struct.calcsize(fmt_str)
(temp, speed, ref_speed, state, clc_mode) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
current_idx += inc_len
speed_rpm = speed / 10.0
ref_speed_rpm = ref_speed / 10.0
pw.dlog(
f"Temperature {temp} C | Speed {speed_rpm} rpm | Reference Speed {ref_speed_rpm} rpm"
)
pw.dlog(
f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
f"4: Running, speed changing"
)
pw.dlog(
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
f"1: High Current Mode (0.6 A)"
)
printer.print_validity_buffer(hk_data[current_idx:], 5)
2023-01-16 14:13:06 +01:00
if set_id == RwSetId.LAST_RESET:
2022-10-18 13:26:32 +02:00
pw.dlog(
f"Received Last Reset HK (ID {set_id}) from Reaction Wheel {object_id.name}"
)
fmt_str = "!BB"
inc_len = struct.calcsize(fmt_str)
(last_not_cleared_reset_status, current_reset_status) = struct.unpack(
fmt_str, hk_data[current_idx : current_idx + inc_len]
)
current_idx += inc_len
pw.dlog(
f"Last Non-Cleared (Cached) Reset Status {last_not_cleared_reset_status} | "
f"Current Reset Status {current_reset_status}"
)
2023-01-16 14:13:06 +01:00
if set_id == RwSetId.TM_SET:
2022-10-18 13:26:32 +02:00
pw.dlog(f"Received TM HK (ID {set_id}) from Reaction Wheel {object_id.name}")
fmt_str = "!BiffBBiiIIIIIIIIIIIIIIII"
inc_len = struct.calcsize(fmt_str)
(
last_reset_status,
mcu_temp,
pressure_sens_temp,
pressure,
state,
clc_mode,
current_speed,
ref_speed,
num_invalid_crc_packets,
num_invalid_len_packets,
num_invalid_cmd_packets,
num_of_cmd_executed_requests,
num_of_cmd_replies,
uart_num_of_bytes_written,
uart_num_of_bytes_read,
uart_num_parity_errors,
uart_num_noise_errors,
uart_num_frame_errors,
uart_num_reg_overrun_errors,
uart_total_num_errors,
spi_num_bytes_written,
spi_num_bytes_read,
spi_num_reg_overrun_errors,
spi_total_num_errors,
) = struct.unpack(fmt_str, hk_data[current_idx : current_idx + inc_len])
pw.dlog(
f"MCU Temperature {mcu_temp} | Pressure Sensore Temperature {pressure_sens_temp} C"
)
pw.dlog(f"Last Reset Status {last_reset_status}")
pw.dlog(
f"Current Limit Control mode {clc_mode}. 0: Low Current Mode (0.3 A), "
f"1: High Current Mode (0.6 A)"
)
pw.dlog(f"Speed {current_speed} rpm | Reference Speed {ref_speed} rpm")
pw.dlog(
f"State {state}. 0: Error, 1: Idle, 2: Coasting, 3: Running, speed stable, "
f"4: Running, speed changing"
)
pw.dlog(f"Number Of Invalid Packets:")
pw.dlog("CRC | Length | CMD")
pw.dlog(
f"{num_invalid_crc_packets} | {num_invalid_len_packets} | {num_invalid_cmd_packets}"
)
pw.dlog(
f"Num Of CMD Executed Requests {num_of_cmd_executed_requests} | "
f"Num of CMD Replies {num_of_cmd_replies}"
)
pw.dlog("UART COM information:")
pw.dlog(
f"NumBytesWritten | NumBytesRead | ParityErrs | NoiseErrs | FrameErrs | "
f"RegOverrunErrs | TotalErrs"
)
pw.dlog(
f"{uart_num_of_bytes_written} | {uart_num_of_bytes_read} | {uart_num_parity_errors} | "
f"{uart_num_noise_errors} | {uart_num_frame_errors} | {uart_num_reg_overrun_errors} | "
f"{uart_total_num_errors}"
)
pw.dlog("SPI COM Info:")
pw.dlog(f"NumBytesWritten | NumBytesRead | RegOverrunErrs | TotalErrs")
pw.dlog(
f"{spi_num_bytes_written} | {spi_num_bytes_read} | {spi_num_reg_overrun_errors} | "
f"{spi_total_num_errors}"
)
if current_idx > 0:
printer.print_validity_buffer(
validity_buffer=hk_data[current_idx:], num_vars=27
)
def rw_speed_up_cmd_consec(
q: DefaultPusQueueHelper, obids: List[bytes], speed: int, ramp_time: int
):
for oid in obids:
q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=speed, ramp_time_ms=ramp_time)
)
def rw_speed_down_cmd_consec(
q: DefaultPusQueueHelper, obids: List[bytes], ramp_time: int
):
for oid in obids:
q.add_pus_tc(
pack_set_speed_command(object_id=oid, speed=0, ramp_time_ms=ramp_time)
)