Merge branch 'develop' of https://egit.irs.uni-stuttgart.de/eive/eive-tmtc into meier/ploc

This commit is contained in:
2022-05-13 18:39:59 +02:00
39 changed files with 1146 additions and 671 deletions

View File

@ -1,3 +1,4 @@
from pus_tc.devs.gps import GpsOpCodes
from tmtccmd.config import (
add_op_code_entry,
add_service_op_code_entry,
@ -6,9 +7,283 @@ from tmtccmd.config import (
OpCodeDictKeys,
)
from config.definitions import CustomServiceList
from pus_tc.devs.heater import add_heater_cmds
from pus_tc.devs.reaction_wheels import add_rw_cmds
from pus_tc.devs.bpx_batt import BpxOpCodes
def get_eive_service_op_code_dict(service_op_code_dict: ServiceOpCodeDictT):
add_bpx_cmd_definitions(cmd_dict=service_op_code_dict)
add_core_controller_definitions(cmd_dict=service_op_code_dict)
add_pl_pcdu_cmds(cmd_dict=service_op_code_dict)
add_pcdu_cmds(cmd_dict=service_op_code_dict)
add_imtq_cmds(cmd_dict=service_op_code_dict)
add_rad_sens_cmds(cmd_dict=service_op_code_dict)
add_rw_cmds(cmd_dict=service_op_code_dict)
add_ploc_mpsoc_cmds(cmd_dict=service_op_code_dict)
add_ploc_supv_cmds(cmd_dict=service_op_code_dict)
add_system_cmds(cmd_dict=service_op_code_dict)
add_time_cmds(cmd_dict=service_op_code_dict)
add_syrlinks_cmds(cmd_dict=service_op_code_dict)
add_gps_cmds(cmd_dict=service_op_code_dict)
add_str_cmds(cmd_dict=service_op_code_dict)
add_ccsds_cmds(cmd_dict=service_op_code_dict)
add_pdec_cmds(cmd_dict=service_op_code_dict)
add_heater_cmds(cmd_dict=service_op_code_dict)
add_tmp_sens_cmds(cmd_dict=service_op_code_dict)
def add_tmp_sens_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = {
"0": ("TMP1075 Tests", {OpCodeDictKeys.TIMEOUT: 2.2}),
}
service_tuple = ("TMP1075 1", op_code_dict)
cmd_dict[CustomServiceList.TMP1075_1.value] = service_tuple
service_tuple = ("TMP1075 2", op_code_dict)
cmd_dict[CustomServiceList.TMP1075_2.value] = service_tuple
def add_pdec_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_pdec_handler = {
"0": ("PDEC Handler: Print CLCW", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("PDEC Handler: Print PDEC monitor", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_pdec_handler_tuple = ("PDEC Handler", op_code_dict_srv_pdec_handler)
cmd_dict[CustomServiceList.PDEC_HANDLER.value] = service_pdec_handler_tuple
def add_ccsds_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_ccsds_handler = {
"0": ("CCSDS Handler: Set low rate", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("CCSDS Handler: Set high rate", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("CCSDS Handler: Enable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("CCSDS Handler: Disable transmitter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("CCSDS Handler: Set arbitrary bitrate", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": (
"CCSDS Handler: Enable tx clock manipulator",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"6": (
"CCSDS Handler: Disable tx clock manipulator",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"7": (
"CCSDS Handler: Update tx data on rising edge",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"8": (
"CCSDS Handler: Update tx data on falling edge",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
}
service_ccsds_handler_tuple = ("CCSDS Handler", op_code_dict_srv_ccsds_handler)
cmd_dict[CustomServiceList.CCSDS_HANDLER.value] = service_ccsds_handler_tuple
def add_gps_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = {
GpsOpCodes.RESET_GNSS.value: ("Reset GPS", {OpCodeDictKeys.TIMEOUT: 2.0})
}
service_tuple = ("GPS 0", op_code_dict)
cmd_dict[CustomServiceList.GPS_0.value] = service_tuple
cmd_dict[CustomServiceList.GPS_1.value] = service_tuple
def add_str_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_star_tracker = {
"0": (
"Star Tracker: Mode On, Submode Bootloader",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"1": ("Star Tracker: Mode On, Submode Firmware", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Star Tracker: Mode Normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Star Tracker: Mode Off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Star Tracker: Mode Raw", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Star Tracker: Ping", {OpCodeDictKeys.TIMEOUT: 5.0}),
"6": (
"Star Tracker: Switch to bootloader program",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"7": ("Star Tracker: Request temperature", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": ("Star Tracker: Request version", {OpCodeDictKeys.TIMEOUT: 2.0}),
"9": ("Star Tracker: Request interface", {OpCodeDictKeys.TIMEOUT: 2.0}),
"10": ("Star Tracker: Request power", {OpCodeDictKeys.TIMEOUT: 2.0}),
"11": (
"Star Tracker: Set subscription parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"12": (
"Star Tracker: Boot image (requires bootloader mode)",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"13": ("Star Tracker: Request time", {OpCodeDictKeys.TIMEOUT: 2.0}),
"14": ("Star Tracker: Request solution", {OpCodeDictKeys.TIMEOUT: 2.0}),
"15": ("Star Tracker: Upload image", {OpCodeDictKeys.TIMEOUT: 2.0}),
"16": ("Star Tracker: Download image", {OpCodeDictKeys.TIMEOUT: 2.0}),
"17": ("Star Tracker: Set limit parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"18": ("Star Tracker: Set tracking parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"19": ("Star Tracker: Set mounting parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"20": ("Star Tracker: Set camera parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"22": (
"Star Tracker: Set centroiding parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"23": ("Star Tracker: Set LISA parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"24": ("Star Tracker: Set matching parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"25": (
"Star Tracker: Set validation parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"26": ("Star Tracker: Set algo parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"27": ("Star Tracker: Take image", {OpCodeDictKeys.TIMEOUT: 2.0}),
"28": ("Star Tracker: Stop str helper", {OpCodeDictKeys.TIMEOUT: 2.0}),
"30": (
"Star Tracker: Set name of download image",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"31": ("Star Tracker: Request histogram", {OpCodeDictKeys.TIMEOUT: 2.0}),
"32": ("Star Tracker: Request contrast", {OpCodeDictKeys.TIMEOUT: 2.0}),
"33": ("Star Tracker: Set json filename", {OpCodeDictKeys.TIMEOUT: 2.0}),
"35": ("Star Tracker: Flash read", {OpCodeDictKeys.TIMEOUT: 2.0}),
"36": ("Star Tracker: Set flash read filename", {OpCodeDictKeys.TIMEOUT: 2.0}),
"37": ("Star Tracker: Get checksum", {OpCodeDictKeys.TIMEOUT: 2.0}),
"49": ("Star Tracker: Request camera parameter", {OpCodeDictKeys.TIMEOUT: 2.0}),
"50": ("Star Tracker: Request limits", {OpCodeDictKeys.TIMEOUT: 2.0}),
"51": (
"Star Tracker: Set image processor parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"52": (
"Star Tracker: (EGSE only) Load camera ground config ",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"53": (
"Star Tracker: (EGSE only) Load camera flight config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"54": (
"Star Tracker: Request log level parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"55": (
"Star Tracker: Request mounting parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"56": (
"Star Tracker: Request image processor parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"57": (
"Star Tracker: Request centroiding parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"58": ("Star Tracker: Request lisa parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"59": (
"Star Tracker: Request matching parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"60": (
"Star Tracker: Request tracking parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"61": (
"Star Tracker: Request validation parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"62": ("Star Tracker: Request algo parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"63": (
"Star Tracker: Request subscription parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"64": (
"Star Tracker: Request log subscription parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"65": (
"Star Tracker: Request debug camera parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"66": ("Star Tracker: Set log level parameters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"67": (
"Star Tracker: Set log subscription parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"68": (
"Star Tracker: Set debug camera parameters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"69": ("Star Tracker: Firmware update", {OpCodeDictKeys.TIMEOUT: 2.0}),
"70": (
"Star Tracker: Disable timestamp generation",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"71": (
"Star Tracker: Enable timestamp generation",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
}
service_star_tracker_tuple = ("Star tracker", op_code_dict_srv_star_tracker)
cmd_dict[CustomServiceList.STAR_TRACKER.value] = service_star_tracker_tuple
def add_syrlinks_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_syrlinks_handler = {
"0": ("Syrlinks Handler: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Syrlinks Handler: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Syrlinks Handler: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Syrlinks Handler: Set TX standby", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Syrlinks Handler: Set TX modulation", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": ("Syrlinks Handler: Set TX carrier wave", {OpCodeDictKeys.TIMEOUT: 2.0}),
"6": ("Syrlinks Handler: Read TX status", {OpCodeDictKeys.TIMEOUT: 2.0}),
"7": ("Syrlinks Handler: Read TX waveform", {OpCodeDictKeys.TIMEOUT: 2.0}),
"8": (
"Syrlinks Handler: Read TX AGC value high byte ",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"9": (
"Syrlinks Handler: Read TX AGC value low byte ",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"12": (
"Syrlinks Handler: Write LCL config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"13": (
"Syrlinks Handler: Read RX status registers",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"14": (
"Syrlinks Handler: Read LCL config register",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"15": (
"Syrlinks Handler: Set waveform OQPSK",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"16": (
"Syrlinks Handler: Set waveform BPSK",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"17": (
"Syrlinks Handler: Set second config",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"18": (
"Syrlinks Handler: Enable debug output",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"19": (
"Syrlinks Handler: Disable debug output",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
}
service_syrlinks_handler_tuple = (
"Syrlinks Handler",
op_code_dict_srv_syrlinks_handler,
)
cmd_dict[CustomServiceList.SYRLINKS.value] = service_syrlinks_handler_tuple
def add_bpx_cmd_definitions(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
@ -471,6 +746,12 @@ def add_pcdu_cmds(cmd_dict: ServiceOpCodeDictT):
info="PDU2 Device",
op_code_entry=op_code_dict,
)
op_code_dict = {
"0": ("ACU: Tests", {OpCodeDictKeys.TIMEOUT: 2.0}),
"51": ("ACU: Print channel statistics", {OpCodeDictKeys.TIMEOUT: 2.0}),
}
service_tuple = ("ACU Devices", op_code_dict)
cmd_dict[CustomServiceList.ACU.value] = service_tuple
def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT):
@ -489,25 +770,6 @@ def add_imtq_cmds(cmd_dict: ServiceOpCodeDictT):
cmd_dict[CustomServiceList.IMTQ.value] = service_imtq_tuple
def add_rw_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_rw = {
"0": ("Reaction Wheel: Run all commands", {OpCodeDictKeys.TIMEOUT: 2.0}),
"1": ("Reaction Wheel: Set speed", {OpCodeDictKeys.TIMEOUT: 2.0}),
"2": ("Reaction Wheel: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
"3": ("Reaction Wheel: Set mode normal", {OpCodeDictKeys.TIMEOUT: 2.0}),
"4": ("Reaction Wheel: Set mode off", {OpCodeDictKeys.TIMEOUT: 2.0}),
"5": (
"Reaction Wheel: Send get-telemetry-command",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
}
service_rw_tuple = ("Reaction Wheel", op_code_dict_srv_rw)
cmd_dict[CustomServiceList.REACTION_WHEEL_1.value] = service_rw_tuple
cmd_dict[CustomServiceList.REACTION_WHEEL_2.value] = service_rw_tuple
cmd_dict[CustomServiceList.REACTION_WHEEL_3.value] = service_rw_tuple
cmd_dict[CustomServiceList.REACTION_WHEEL_4.value] = service_rw_tuple
def add_rad_sens_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict_srv_rad_sensor = {
"0": ("Radiation Sensor: Set mode on", {OpCodeDictKeys.TIMEOUT: 2.0}),
@ -612,16 +874,28 @@ def add_ploc_supv_cmds(cmd_dict: ServiceOpCodeDictT):
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"42": ("PLOC Supervisor: Perform update", {OpCodeDictKeys.TIMEOUT: 2.0}),
"43": ("PLOC Supervisor: Terminate supervisor process", {OpCodeDictKeys.TIMEOUT: 2.0}),
"43": (
"PLOC Supervisor: Terminate supervisor process",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"44": ("PLOC Supervisor: Start MPSoC quiet", {OpCodeDictKeys.TIMEOUT: 2.0}),
"45": ("PLOC Supervisor: Set shutdown timeout", {OpCodeDictKeys.TIMEOUT: 2.0}),
"46": ("PLOC Supervisor: Factory flash", {OpCodeDictKeys.TIMEOUT: 2.0}),
"47": ("PLOC Supervisor: Enable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"48": ("PLOC Supervisor: Disable auto TM", {OpCodeDictKeys.TIMEOUT: 2.0}),
"51": ("PLOC Supervisor: Logging request event buffers", {OpCodeDictKeys.TIMEOUT: 2.0}),
"52": ("PLOC Supervisor: Logging clear counters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"51": (
"PLOC Supervisor: Logging request event buffers",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"52": (
"PLOC Supervisor: Logging clear counters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"53": ("PLOC Supervisor: Logging set topic", {OpCodeDictKeys.TIMEOUT: 2.0}),
"54": ("PLOC Supervisor: Logging request counters", {OpCodeDictKeys.TIMEOUT: 2.0}),
"54": (
"PLOC Supervisor: Logging request counters",
{OpCodeDictKeys.TIMEOUT: 2.0},
),
"55": ("PLOC Supervisor: Request ADC Report", {OpCodeDictKeys.TIMEOUT: 2.0}),
"56": ("PLOC Supervisor: Reset PL", {OpCodeDictKeys.TIMEOUT: 2.0}),
"57": ("PLOC Supervisor: Enable NVMs", {OpCodeDictKeys.TIMEOUT: 2.0}),

View File

@ -1,7 +1,7 @@
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from config.object_ids import BPX_HANDLER_ID
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.service_3_housekeeping import generate_one_hk_command, make_sid
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
class BpxSetIds:

View File

@ -2,7 +2,7 @@ import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from config.object_ids import GPS_HANDLER_1_ID, GPS_HANDLER_0_ID

View File

@ -1,59 +1,224 @@
# -*- coding: utf-8 -*-
"""
@file tmtcc_tc_heater.py
@brief Command sequence to test the HeaterHandler
"""Command sequence to test the HeaterHandler
@author J. Meier
@date 30.01.2021
"""
from tmtccmd.config.definitions import QueueCommands
import enum
from config.definitions import CustomServiceList
from config.object_ids import get_object_ids
from tmtccmd.utility.obj_id import ObjectId
from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT
from tmtccmd.tc.pus_201_fsfw_health import (
pack_set_health_cmd_data,
FsfwHealth,
Subservices,
)
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.config.globals import add_service_op_code_entry, add_op_code_entry
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
class SwitchNumbers:
HEATER_0 = 0
HEATER_1 = 1
HEATER_2 = 2
HEATER_3 = 3
HEATER_4 = 4
HEATER_5 = 5
HEATER_6 = 6
HEATER_7 = 7
HEATER_0_OBC_BRD = 0
HEATER_1_PLOC_PROC_BRD = 1
HEATER_2_ACS_BRD = 2
HEATER_3_PCDU_PDU = 3
HEATER_4_CAMERA = 4
HEATER_5_STR = 5
HEATER_6_DRO = 6
HEATER_7_HPA = 7
NUMBER_OF_SWITCHES = 8
class ActionIds:
SWITCH_HEATER = bytearray([0x0, 0x0, 0x0, 0x0])
class OpCodes:
HEATER_CMD = ["0", "switch-cmd"]
HEATER_EXT_CTRL = ["1", "set-ext-ctrl"]
HEATER_FAULTY_CMD = ["2", "set-faulty"]
HEATER_HEALTHY_CMD = ["3", "set-healthy"]
def pack_heater_test_into(object_id: bytearray, tc_queue: TcQueueT):
tc_queue.appendleft((QueueCommands.PRINT, "Testing Heater Switching"))
class Info:
HEATER_CMD = "Heater Switch Command"
HEATER_EXT_CTRL = "Set to external control"
HEATER_FAULTY_CMD = "Set to faulty"
HEATER_HEALTHY_CMD = "Set to healthy"
heater_number = int(input("Type number of heater to switch [0-7]: "))
if heater_number >= SwitchNumbers.NUMBER_OF_SWITCHES or heater_number < 0:
print("Invalid heater switch number")
return
action = int(input("Turn switch on or off? (0 - off, 1 - on): "))
if action != 0 and action != 1:
print("Invalid action defined. Must be 0 (off) or 1 (on")
debug_string = "Switching heater " + str(heater_number)
tc_queue.appendleft((QueueCommands.PRINT, debug_string))
command = pack_switch_heater_command(object_id, heater_number, action)
command = PusTelecommand(service=8, subservice=128, ssc=300, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
# Needed in OBSW to differentiate between external and internal heater commands
COMMAND_SOURCE_PARAM_EXTERNAL = 1
class ActionIds(enum.IntEnum):
SWITCH_HEATER = 0
def add_heater_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, keys=OpCodes.HEATER_CMD, info=Info.HEATER_CMD
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_HEALTHY_CMD,
info=Info.HEATER_HEALTHY_CMD,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_EXT_CTRL,
info=Info.HEATER_EXT_CTRL,
)
add_op_code_entry(
op_code_dict=op_code_dict,
keys=OpCodes.HEATER_FAULTY_CMD,
info=Info.HEATER_FAULTY_CMD,
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.HEATER.value,
info="Heater Device",
op_code_entry=op_code_dict,
)
def pack_heater_cmds(object_id: bytearray, op_code: str, tc_queue: TcQueueT):
if op_code in OpCodes.HEATER_CMD:
tc_queue.appendleft((QueueCommands.PRINT, "Heater Switching"))
heater_number = prompt_heater()
while True:
action = input("Turn switch on or off? (0 - off, 1 - on): ")
if not action.isdigit():
print("Switch action not valid")
continue
action = int(action)
if action != 0 and action != 1:
print("Invalid action defined. Must be 0 (off) or 1 (on")
continue
break
if action == 1:
act_str = "on"
else:
act_str = "off"
debug_string = f"Switching heater {heater_number} {act_str}"
tc_queue.appendleft((QueueCommands.PRINT, debug_string))
command = pack_switch_heater_command(object_id, heater_number, action)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in OpCodes.HEATER_EXT_CTRL:
heater_number = prompt_heater()
obj_id = heater_idx_to_obj(heater_number)
health_cmd(
tc_queue=tc_queue,
object_id=obj_id,
health=FsfwHealth.EXTERNAL_CTRL,
health_str="External Control",
heater_idx=heater_number,
)
if op_code in OpCodes.HEATER_FAULTY_CMD:
heater_number = prompt_heater()
obj_id = heater_idx_to_obj(heater_number)
health_cmd(
tc_queue=tc_queue,
object_id=obj_id,
health=FsfwHealth.FAULTY,
health_str="Faulty",
heater_idx=heater_number,
)
if op_code in OpCodes.HEATER_HEALTHY_CMD:
heater_number = prompt_heater()
obj_id = heater_idx_to_obj(heater_number)
health_cmd(
tc_queue=tc_queue,
object_id=obj_id,
health=FsfwHealth.HEALTHY,
health_str="Healthy",
heater_idx=heater_number,
)
def heater_idx_to_obj(heater: int) -> ObjectId:
from config.object_ids import (
HEATER_0_OBC_BRD,
HEATER_1_PLOC_PROC_BRD,
HEATER_2_ACS_BRD,
HEATER_3_PCDU_BRD,
HEATER_4_CAMERA,
HEATER_5_STR,
HEATER_6_DRO,
HEATER_7_HPA,
)
obj_id_array = [
HEATER_0_OBC_BRD,
HEATER_1_PLOC_PROC_BRD,
HEATER_2_ACS_BRD,
HEATER_3_PCDU_BRD,
HEATER_4_CAMERA,
HEATER_5_STR,
HEATER_6_DRO,
HEATER_7_HPA,
]
obj_dict = get_object_ids()
obj_id_obj = obj_dict.get(obj_id_array[heater])
if obj_id_obj is None:
return ObjectId.from_bytes(obj_id_array[heater])
return obj_id_obj
def prompt_heater() -> int:
while True:
print("HEATER 0 | PLOC PROC Board")
print("HEATER 1 | PCDU Board")
print("HEATER 2 | ACS Board")
print("HEATER 3 | OBC Board")
print("HEATER 4 | CAMERA")
print("HEATER 5 | STR")
print("HEATER 6 | DRO")
print("HEATER 7 | HPA")
heater_number = input("Type number of heater to switch [0-7]: ")
if not heater_number.isdigit():
print("Heater number not a digit")
continue
heater_number = int(heater_number)
if heater_number >= SwitchNumbers.NUMBER_OF_SWITCHES or heater_number < 0:
print("Invalid heater switch number")
continue
break
return heater_number
def health_cmd(
tc_queue: TcQueueT,
heater_idx: int,
object_id: ObjectId,
health: FsfwHealth,
health_str: str,
):
tc_queue.appendleft(
(
QueueCommands.PRINT,
f"Setting Heater {heater_idx} {object_id} to {health_str}",
)
)
app_data = pack_set_health_cmd_data(object_id=object_id.as_bytes, health=health)
cmd = PusTelecommand(
service=201, subservice=Subservices.TC_SET_HEALTH, app_data=app_data
)
tc_queue.appendleft(cmd.pack_command_tuple())
def pack_switch_heater_command(
object_id: bytearray, switch_nr: int, switch_action: int
) -> bytearray:
"""Function to generate the command switch a heater
@param object_id The object id of the HeaterHandler object.
@param switch_nr The switch number identifying the heater to switch
@param switch_action Action to perform. 0 - Sets switch off, 1 - Sets switch on.
object_id: bytes, switch_nr: int, switch_action: int
) -> PusTelecommand:
"""Function to generate a heater switch command.
:param object_id: The object id of the HeaterHandler object.
:param switch_nr: The switch number identifying the heater to switch
:param switch_action: Action to perform. 0 - Sets switch off, 1 - Sets switch on.
"""
action_id = ActionIds.SWITCH_HEATER
command = bytearray()
command += object_id + action_id
command.append(switch_nr)
command.append(switch_action)
return command
command.append(COMMAND_SOURCE_PARAM_EXTERNAL)
return generate_action_command(
object_id=object_id, action_id=ActionIds.SWITCH_HEATER, app_data=command
)

View File

@ -9,7 +9,7 @@ from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
class ImtqSetIds:

View File

@ -7,7 +7,7 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import (
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,

View File

@ -5,7 +5,7 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import (
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
make_sid,
generate_one_diag_command,

View File

@ -8,7 +8,7 @@
"""
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_3_housekeeping import (
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
generate_one_diag_command,
make_sid,

View File

@ -14,7 +14,7 @@ from tmtccmd.logging import get_console_logger
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from utility.input_helper import InputHelper
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
LOGGER = get_console_logger()
@ -66,7 +66,6 @@ class PlocReplyIds(enum.IntEnum):
TM_CAM_CMD_RPT = 19
def pack_ploc_mpsoc_commands(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:
@ -176,7 +175,11 @@ def pack_ploc_mpsoc_commands(
elif op_code == "16":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC MPSoC: Tc cam command send"))
cam_cmd = input("Specify cam command string: ")
command = object_id + struct.pack("!I", CommandIds.TC_CAM_CMD_SEND) + bytearray(cam_cmd, 'utf-8')
command = (
object_id
+ struct.pack("!I", CommandIds.TC_CAM_CMD_SEND)
+ bytearray(cam_cmd, "utf-8")
)
command = PusTelecommand(service=8, subservice=128, ssc=32, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "17":

View File

@ -12,7 +12,7 @@ from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from utility.input_helper import InputHelper
LOGGER = get_console_logger()
@ -32,8 +32,14 @@ MANUAL_INPUT = "1"
update_file_dict = {
MANUAL_INPUT: ["manual input", ""],
"2": ["/mnt/sd0/ploc/supervisor/update.bin", "/mnt/sd0/ploc/supervisor/update.bin"],
"3": ["/mnt/sd0/ploc/supervisor/update-large.bin", "/mnt/sd0/ploc/supervisor/update-large.bin"],
"4": ["/mnt/sd0/ploc/supervisor/update-small.bin", "/mnt/sd0/ploc/supervisor/update-small.bin"],
"3": [
"/mnt/sd0/ploc/supervisor/update-large.bin",
"/mnt/sd0/ploc/supervisor/update-large.bin",
],
"4": [
"/mnt/sd0/ploc/supervisor/update-small.bin",
"/mnt/sd0/ploc/supervisor/update-small.bin",
],
}
event_buffer_path_dict = {
@ -308,55 +314,51 @@ def pack_ploc_supv_commands(
command = PusTelecommand(service=8, subservice=128, ssc=57, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "43":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Terminate supervisor process"))
command = object_id + struct.pack(
"!I", SupvActionIds.TERMINATE_SUPV_HELPER
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Terminate supervisor process")
)
command = object_id + struct.pack("!I", SupvActionIds.TERMINATE_SUPV_HELPER)
command = PusTelecommand(service=8, subservice=128, ssc=58, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "44":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Start MPSoC quiet"))
command = object_id + struct.pack(
"!I", SupvActionIds.START_MPSOC_QUIET
)
command = object_id + struct.pack("!I", SupvActionIds.START_MPSOC_QUIET)
command = PusTelecommand(service=8, subservice=128, ssc=59, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "45":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Set shutdown timeout"))
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Set shutdown timeout")
)
command = pack_set_shutdown_timeout_command(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=60, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "46":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Factory flash"))
command = object_id + struct.pack(
"!I", SupvActionIds.FACTORY_FLASH
)
command = object_id + struct.pack("!I", SupvActionIds.FACTORY_FLASH)
command = PusTelecommand(service=8, subservice=128, ssc=61, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "47":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Enable auto TM"))
command = object_id + struct.pack(
"!I", SupvActionIds.ENABLE_AUTO_TM
)
command = object_id + struct.pack("!I", SupvActionIds.ENABLE_AUTO_TM)
command = PusTelecommand(service=8, subservice=128, ssc=62, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "48":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Disable auto TM"))
command = object_id + struct.pack(
"!I", SupvActionIds.DISABLE_AUTO_TM
)
command = object_id + struct.pack("!I", SupvActionIds.DISABLE_AUTO_TM)
command = PusTelecommand(service=8, subservice=128, ssc=63, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "51":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Logging request event buffers"))
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Logging request event buffers")
)
command = pack_logging_buffer_request(object_id)
command = PusTelecommand(service=8, subservice=128, ssc=66, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "52":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Logging clear counters"))
command = object_id + struct.pack(
"!I", SupvActionIds.LOGGING_CLEAR_COUNTERS
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Logging clear counters")
)
command = object_id + struct.pack("!I", SupvActionIds.LOGGING_CLEAR_COUNTERS)
command = PusTelecommand(service=8, subservice=128, ssc=67, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "53":
@ -365,18 +367,22 @@ def pack_ploc_supv_commands(
command = PusTelecommand(service=8, subservice=128, ssc=68, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "54":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Logging request counters"))
command = object_id + struct.pack('!I', SupvActionIds.LOGGING_REQUEST_COUNTERS)
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Logging request counters")
)
command = object_id + struct.pack("!I", SupvActionIds.LOGGING_REQUEST_COUNTERS)
command = PusTelecommand(service=8, subservice=128, ssc=69, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "55":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Request ADC report"))
command = object_id + struct.pack('!I', SupvActionIds.REQUEST_ADC_REPORT)
tc_queue.appendleft(
(QueueCommands.PRINT, "PLOC Supervisor: Request ADC report")
)
command = object_id + struct.pack("!I", SupvActionIds.REQUEST_ADC_REPORT)
command = PusTelecommand(service=8, subservice=128, ssc=70, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "56":
tc_queue.appendleft((QueueCommands.PRINT, "PLOC Supervisor: Reset PL"))
command = object_id + struct.pack('!I', SupvActionIds.RESET_PL)
command = object_id + struct.pack("!I", SupvActionIds.RESET_PL)
command = PusTelecommand(service=8, subservice=128, ssc=71, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
elif op_code == "57":
@ -580,30 +586,30 @@ def pack_update_command(object_id: bytearray) -> bytearray:
start_address = int(input("Specify start address: 0x"), 16)
update_file = get_update_file()
command += object_id
command += struct.pack('!I', SupvActionIds.PERFORM_UPDATE)
command += bytearray(update_file, 'utf-8')
command += struct.pack("!I", SupvActionIds.PERFORM_UPDATE)
command += bytearray(update_file, "utf-8")
# Adding null terminator
command += struct.pack('!B', 0)
command += struct.pack('!B', memory_id)
command += struct.pack('!I', start_address)
command += struct.pack("!B", 0)
command += struct.pack("!B", memory_id)
command += struct.pack("!I", start_address)
return command
def pack_set_shutdown_timeout_command(object_id: bytearray) -> bytearray:
command = bytearray()
command += object_id
command += struct.pack('!I', SupvActionIds.SET_SHUTDOWN_TIMEOUT)
command += struct.pack("!I", SupvActionIds.SET_SHUTDOWN_TIMEOUT)
timeout = int(input("Specify shutdown timeout (ms): "))
command += struct.pack('!I', timeout)
command += struct.pack("!I", timeout)
return command
def pack_logging_buffer_request(object_id: bytearray) -> bytearray:
command = bytearray()
command += object_id
command += struct.pack('!I', SupvActionIds.LOGGING_REQUEST_EVENT_BUFFERS)
command += struct.pack("!I", SupvActionIds.LOGGING_REQUEST_EVENT_BUFFERS)
path = get_event_buffer_path()
command += bytearray(path, 'utf-8')
command += bytearray(path, "utf-8")
return command
@ -628,9 +634,9 @@ def pack_read_gpio_cmd(object_id: bytearray) -> bytearray:
def pack_logging_set_topic(objetc_id: bytearray) -> bytearray:
command = objetc_id + struct.pack('!I', SupvActionIds.LOGGING_SET_TOPIC)
command = objetc_id + struct.pack("!I", SupvActionIds.LOGGING_SET_TOPIC)
tpc = int(input("Specify logging topic: "))
command += struct.pack('!B', tpc)
command += struct.pack("!B", tpc)
return command

View File

@ -3,8 +3,8 @@ from typing import Optional
from tmtccmd.config import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes, Subservices
from tmtccmd.tc.service_20_parameter import (
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
from tmtccmd.tc.pus_20_params import (
pack_scalar_double_param_app_data,
pack_fsfw_load_param_cmd,
pack_boolean_parameter_app_data,
@ -234,6 +234,6 @@ def pack_pl_pcdu_mode_cmd(tc_queue: TcQueueT, info: str, mode: Modes, submode: i
)
mode_data = pack_mode_data(object_id=PL_PCDU_ID, mode=mode, submode=submode)
mode_cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(mode_cmd.pack_command_tuple())

View File

@ -11,7 +11,7 @@ from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data
from pus_tc.service_200_mode import pack_mode_data, Modes
class CommandIds:
@ -31,19 +31,19 @@ def pack_rad_sensor_test_into(object_id: bytearray, tc_queue: TcQueueT, op_code:
if op_code == "0":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode on"))
mode_data = pack_mode_data(object_id, 1, 0)
mode_data = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=41, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "1":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode normal"))
mode_data = pack_mode_data(object_id, 2, 0)
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "Rad sensor: Switch to mode off"))
mode_data = pack_mode_data(object_id, 0, 0)
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())

View File

@ -1,21 +1,58 @@
# -*- coding: utf-8 -*-
"""
@file reaction_wheels.py
"""reaction_wheels.py
@brief Tests for the reaction wheel handler
@author J. Meier
@date 20.06.2021
"""
import struct
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.config.definitions import QueueCommands, ServiceOpCodeDictT
from tmtccmd.tc.pus_3_fsfw_hk import (
generate_one_hk_command,
generate_one_diag_command,
make_sid,
)
from tmtccmd.config.globals import add_op_code_entry, add_service_op_code_entry
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from pus_tc.service_200_mode import pack_mode_data
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
from config.definitions import CustomServiceList
class OpCodesDevs:
SPEED = ["0", "speed"]
ON = ["1", "on"]
NML = ["2", "nml"]
OFF = ["3", "off"]
GET_STATUS = ["4", "status"]
GET_TM = ["5", "tm"]
class InfoDevs:
SPEED = "Set speed"
ON = "Set On"
NML = "Set Normal"
OFF = "Set Off"
GET_STATUS = "Get Status HK"
GET_TM = "Get TM HK"
class OpCodesAss:
ON = ["0", "on"]
NML = ["1", "nml"]
OFF = ["2", "off"]
class InfoAss:
ON = "Mode On: 3/4 RWs min. on"
NML = "Mode Normal: 3/4 RWs min. normal"
OFF = "Mode Off: All RWs off"
class RwSetIds:
STATUS_SET_ID = 4
TEMPERATURE_SET_ID = 8
LAST_RESET = 2
TM_SET = 9
class RwCommandIds:
@ -38,66 +75,138 @@ class RampTime:
MS_1000 = 1000
def pack_single_rw_test_into(
object_id: bytearray, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:
tc_queue.appendleft(
(
QueueCommands.PRINT,
"Testing reaction wheel handler with object id: 0x" + object_id.hex(),
)
def add_rw_cmds(cmd_dict: ServiceOpCodeDictT):
op_code_dict = dict()
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.SPEED, keys=OpCodesDevs.SPEED
)
add_op_code_entry(op_code_dict=op_code_dict, info=InfoDevs.ON, keys=OpCodesDevs.ON)
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.OFF, keys=OpCodesDevs.OFF
)
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.NML, keys=OpCodesDevs.NML
)
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.GET_STATUS, keys=OpCodesDevs.GET_STATUS
)
add_op_code_entry(
op_code_dict=op_code_dict, info=InfoDevs.GET_TM, keys=OpCodesDevs.GET_TM
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.REACTION_WHEEL_1.value,
op_code_entry=op_code_dict,
info="Reaction Wheel 1",
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.REACTION_WHEEL_2.value,
op_code_entry=op_code_dict,
info="Reaction Wheel 2",
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.REACTION_WHEEL_3.value,
op_code_entry=op_code_dict,
info="Reaction Wheel 3",
)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.REACTION_WHEEL_4.value,
op_code_entry=op_code_dict,
info="Reaction Wheel 4",
)
op_code_dict = dict()
add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.ON, keys=OpCodesAss.ON)
add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.NML, keys=OpCodesAss.NML)
add_op_code_entry(op_code_dict=op_code_dict, info=InfoAss.OFF, keys=OpCodesAss.OFF)
add_service_op_code_entry(
srv_op_code_dict=cmd_dict,
name=CustomServiceList.RW_ASSEMBLY.value,
op_code_entry=op_code_dict,
info="Reaction Wheel Assembly",
)
if op_code == "0" or op_code == "1":
def pack_single_rw_test_into(
object_id: bytes, rw_idx: int, tc_queue: TcQueueT, op_code: str
) -> TcQueueT:
if op_code in OpCodesDevs.SPEED:
speed = int(input("Specify speed [0.1 RPM]: "))
ramp_time = int(input("Specify ramp time [ms]: "))
tc_queue.appendleft((QueueCommands.PRINT, "Reaction Wheel: Set speed"))
tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.SPEED}"))
command = pack_set_speed_command(object_id, speed, ramp_time)
command = PusTelecommand(service=8, subservice=128, ssc=40, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "2":
tc_queue.appendleft((QueueCommands.PRINT, "Reaction Wheel: Switch to mode on"))
mode_data = pack_mode_data(object_id, 1, 0)
if op_code in OpCodesDevs.ON:
tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.ON}"))
mode_data = pack_mode_data(object_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=41, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
tc_queue.appendleft(
(QueueCommands.PRINT, "Reaction Wheel: Switch to mode normal")
)
mode_data = pack_mode_data(object_id, 2, 0)
if op_code in OpCodesDevs.NML:
tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.NML}"))
mode_data = pack_mode_data(object_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=42, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "Reaction Wheel: Switch to mode off"))
mode_data = pack_mode_data(object_id, 0, 0)
if op_code in OpCodesDevs.OFF:
tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.OFF}"))
mode_data = pack_mode_data(object_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=43, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
tc_queue.appendleft(
(QueueCommands.PRINT, "Reaction Wheel: Send get-telemetry-command")
if op_code in OpCodesDevs.GET_TM:
tc_queue.appendleft((QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.GET_TM}"))
command = generate_one_hk_command(
sid=make_sid(object_id=object_id, set_id=RwSetIds.TM_SET), ssc=0
)
tc_queue.appendleft(command.pack_command_tuple())
if op_code in OpCodesDevs.GET_STATUS:
tc_queue.appendleft(
(QueueCommands.PRINT, f"RW {rw_idx}: {InfoDevs.GET_STATUS}")
)
command = generate_one_diag_command(
sid=make_sid(object_id=object_id, set_id=RwSetIds.STATUS_SET_ID), ssc=0
)
command = object_id + RwCommandIds.GET_TM
command = PusTelecommand(service=8, subservice=128, ssc=44, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
return tc_queue
def pack_set_speed_command(
object_id: bytearray, speed: int, ramp_time: int
) -> bytearray:
def pack_rw_ass_cmds(tc_queue: TcQueueT, object_id: bytes, op_code: str):
if op_code in OpCodesAss.OFF:
data = pack_mode_data(object_id=object_id, mode=Modes.OFF, submode=0)
cmd = PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
)
tc_queue.appendleft(cmd.pack_command_tuple())
if op_code in OpCodesAss.ON:
data = pack_mode_data(object_id=object_id, mode=Modes.ON, submode=0)
cmd = PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
)
tc_queue.appendleft(cmd.pack_command_tuple())
if op_code in OpCodesAss.NML:
data = pack_mode_data(object_id=object_id, mode=Modes.NORMAL, submode=0)
cmd = PusTelecommand(
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=data
)
tc_queue.appendleft(cmd.pack_command_tuple())
def pack_set_speed_command(object_id: bytes, speed: int, ramp_time: int) -> bytearray:
"""With this function a command is packed to set the speed of a reaction wheel
@param object_id The object id of the reaction wheel handler.
@param speed Valid speeds are [-65000, -1000] and [1000, 65000]. Values are specified in 0.1 * RPM
@param ramp_time The time after which the reaction wheel will reached the commanded speed. Valid times are
10 - 10000 ms
:param object_id: The object id of the reaction wheel handler.
:param speed: Valid speeds are [-65000, -1000] and [1000, 65000]. Values are
specified in 0.1 * RPM
:param ramp_time: The time after which the reaction wheel will reached the commanded speed.
Valid times are 10 - 10000 ms
"""
command_id = RwCommandIds.SET_SPEED
command = bytearray()
command = object_id + command_id
command += object_id + command_id
command = command + struct.pack("!i", speed)
command = command + ramp_time.to_bytes(length=2, byteorder="big")
return command

View File

@ -11,7 +11,7 @@ from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.packer import TcQueueT
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from tmtccmd.logging import get_console_logger
from utility.input_helper import InputHelper

View File

@ -8,9 +8,9 @@
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
import struct
@ -64,17 +64,17 @@ def pack_syrlinks_command(
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "3":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode standby"))
command = object_id + struct.pack('!I', CommandIds.SET_TX_MODE_STANDBY)
command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_STANDBY)
command = PusTelecommand(service=8, subservice=128, ssc=10, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "4":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode modulation"))
command = object_id + struct.pack('!I', CommandIds.SET_TX_MODE_MODULATION)
command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_MODULATION)
command = PusTelecommand(service=8, subservice=128, ssc=11, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "5":
tc_queue.appendleft((QueueCommands.PRINT, "syrlinks: Set TX mode CW"))
command = object_id + struct.pack('!I', CommandIds.SET_TX_MODE_CW)
command = object_id + struct.pack("!I", CommandIds.SET_TX_MODE_CW)
command = PusTelecommand(service=8, subservice=128, ssc=12, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "6":
@ -89,26 +89,26 @@ def pack_syrlinks_command(
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "8":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX status"))
command = object_id + struct.pack('!I', CommandIds.READ_TX_STATUS)
command = object_id + struct.pack("!I", CommandIds.READ_TX_STATUS)
command = PusTelecommand(service=8, subservice=128, ssc=13, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "9":
tc_queue.appendleft((QueueCommands.PRINT, "Syrlinks: Read TX waveform"))
command = object_id + struct.pack('!I', CommandIds.READ_TX_WAVEFORM)
command = object_id + struct.pack("!I", CommandIds.READ_TX_WAVEFORM)
command = PusTelecommand(service=8, subservice=128, ssc=14, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "10":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read TX AGC value high byte")
)
command = object_id + struct.pack('!I', CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE)
command = object_id + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_HIGH_BYTE)
command = PusTelecommand(service=8, subservice=128, ssc=15, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "11":
tc_queue.appendleft(
(QueueCommands.PRINT, "Syrlinks: Read TX AGC value low byte")
)
command = object_id + struct.pack('!I', CommandIds.READ_TX_AGC_VALUE_LOW_BYTE)
command = object_id + struct.pack("!I", CommandIds.READ_TX_AGC_VALUE_LOW_BYTE)
command = PusTelecommand(service=8, subservice=128, ssc=16, app_data=command)
tc_queue.appendleft(command.pack_command_tuple())
if op_code == "12":

View File

@ -8,7 +8,7 @@
from tmtccmd.config.definitions import QueueCommands
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.packer import TcQueueT
from tmtccmd.tc.service_200_mode import pack_mode_data
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes
from config.object_ids import TEST_DEVICE_ID
TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID
@ -20,22 +20,22 @@ def pack_service200_test_into(tc_queue: TcQueueT) -> TcQueueT:
obj_id = TEST_DEVICE_OBJ_ID
# Set On Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode On"))
mode_data = pack_mode_data(obj_id, 1, 0)
mode_data = pack_mode_data(obj_id, Modes.ON, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2000, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Normal mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Normal"))
mode_data = pack_mode_data(obj_id, 2, 0)
mode_data = pack_mode_data(obj_id, Modes.NORMAL, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2010, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Raw Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Raw"))
mode_data = pack_mode_data(obj_id, 3, 0)
mode_data = pack_mode_data(obj_id, Modes.RAW, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2020, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
# Set Off Mode
tc_queue.appendleft((QueueCommands.PRINT, "Testing Service 200: Set Mode Off"))
mode_data = pack_mode_data(obj_id, 0, 0)
mode_data = pack_mode_data(obj_id, Modes.OFF, 0)
command = PusTelecommand(service=200, subservice=1, ssc=2030, app_data=mode_data)
tc_queue.appendleft(command.pack_command_tuple())
tc_queue.appendleft((QueueCommands.EXPORT_LOG, "log/tmtc_log_service200.txt"))

View File

@ -1,6 +1,6 @@
import enum
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_200_mode import Modes
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from config.object_ids import ACS_BOARD_ASS_ID, SUS_BOARD_ASS_ID
from .common import command_assembly

View File

@ -1,6 +1,6 @@
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from spacepackets.ecss.tc import PusTelecommand
from tmtccmd.tc.service_200_mode import pack_mode_data, Modes, Subservices
from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes, Subservices
def command_assembly(
@ -13,6 +13,6 @@ def command_assembly(
submode=submode,
)
cmd = PusTelecommand(
service=200, subservice=Subservices.COMMAND_MODE_COMMAND, app_data=mode_data
service=200, subservice=Subservices.TC_MODE_COMMAND, app_data=mode_data
)
tc_queue.appendleft(cmd.pack_command_tuple())

View File

@ -2,9 +2,9 @@ import enum
from tmtccmd.config.definitions import QueueCommands
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_8_functional_cmd import generate_action_command
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.logging import get_console_logger
from tmtccmd.tc.service_3_housekeeping import make_sid, generate_one_hk_command
from tmtccmd.tc.pus_3_fsfw_hk import make_sid, generate_one_hk_command
from config.object_ids import CORE_CONTROLLER_ID
LOGGER = get_console_logger()

View File

@ -1,5 +1,5 @@
from tmtccmd.tc.definitions import TcQueueT, QueueCommands
from tmtccmd.tc.service_200_mode import Modes
from tmtccmd.tc.pus_200_fsfw_modes import Modes
from .common import command_assembly
from config.object_ids import TCS_BOARD_ASS_ID

View File

@ -7,12 +7,12 @@ from typing import Union
from spacepackets.ecss import PusTelecommand
from tmtccmd.com_if.com_interface_base import CommunicationInterface
from tmtccmd.config.definitions import CoreServiceList
from tmtccmd.config.definitions import CoreServiceList, QueueCommands
from tmtccmd.logging import get_console_logger
from tmtccmd.logging.pus import log_raw_pus_tc
from tmtccmd.tc.definitions import TcQueueT
from tmtccmd.tc.service_5_event import pack_generic_service5_test_into
from tmtccmd.pus.service_17_test import pack_service_17_ping_command
from tmtccmd.tc.pus_5_event import pack_generic_service5_test_into
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
from tmtccmd.logging import get_current_time_string
from pus_tc.service_200_mode import pack_service200_test_into
@ -26,8 +26,8 @@ from pus_tc.devs.imtq import pack_imtq_test_into
from pus_tc.devs.tmp1075 import pack_tmp1075_test_into
from pus_tc.devs.ploc_mpsoc import pack_ploc_mpsoc_commands
from pus_tc.devs.ploc_supervisor import pack_ploc_supv_commands
from pus_tc.devs.heater import pack_heater_test_into
from pus_tc.devs.reaction_wheels import pack_single_rw_test_into
from pus_tc.devs.heater import pack_heater_cmds
from pus_tc.devs.reaction_wheels import pack_single_rw_test_into, pack_rw_ass_cmds
from pus_tc.devs.rad_sensor import pack_rad_sensor_test_into
from pus_tc.devs.ploc_upater import pack_ploc_updater_test_into
from pus_tc.devs.ploc_memory_dumper import pack_ploc_memory_dumper_cmd
@ -68,6 +68,7 @@ from config.object_ids import (
STR_IMG_HELPER_ID,
SYRLINKS_HANDLER_ID,
SOLAR_ARRAY_DEPLOYMENT_ID,
RW_ASSEMBLY,
)
@ -75,19 +76,22 @@ LOGGER = get_console_logger()
def pre_tc_send_cb(
packet: bytes,
queue_entry: Union[bytes, QueueCommands],
com_if: CommunicationInterface,
pus_info: Union[PusTelecommand, any],
queue_info: Union[PusTelecommand, any],
file_logger: logging.Logger,
):
log_raw_pus_tc(
packet=packet, srv_subservice=(pus_info.service, pus_info.subservice)
)
tc_info_string = f"Sent {pus_info}"
LOGGER.info(tc_info_string)
file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
com_if.send(data=packet)
if isinstance(queue_entry, bytes) or isinstance(queue_entry, bytearray):
log_raw_pus_tc(
packet=queue_entry, srv_subservice=(queue_info.service, queue_info.subservice)
)
tc_info_string = f"Sent {queue_info}"
LOGGER.info(tc_info_string)
file_logger.info(f"{get_current_time_string(True)}: {tc_info_string}")
com_if.send(data=queue_entry)
elif isinstance(queue_entry, QueueCommands):
if queue_entry == QueueCommands.PRINT:
file_logger.info(queue_info)
def pack_service_queue_user(
service: Union[str, int], op_code: str, service_queue: TcQueueT
@ -134,7 +138,9 @@ def pack_service_queue_user(
)
if service == CustomServiceList.HEATER.value:
object_id = HEATER_ID
return pack_heater_test_into(object_id=object_id, tc_queue=service_queue)
return pack_heater_cmds(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.IMTQ.value:
object_id = IMTQ_HANDLER_ID
return pack_imtq_test_into(
@ -146,24 +152,20 @@ def pack_service_queue_user(
object_id=object_id, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.REACTION_WHEEL_1.value:
object_id = RW1_ID
return pack_single_rw_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
object_id=RW1_ID, rw_idx=1, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.REACTION_WHEEL_2.value:
object_id = RW2_ID
return pack_single_rw_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
object_id=RW2_ID, rw_idx=2, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.REACTION_WHEEL_3.value:
object_id = RW3_ID
return pack_single_rw_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
object_id=RW3_ID, rw_idx=3, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.REACTION_WHEEL_4.value:
object_id = RW4_ID
return pack_single_rw_test_into(
object_id=object_id, tc_queue=service_queue, op_code=op_code
object_id=RW4_ID, rw_idx=4, tc_queue=service_queue, op_code=op_code
)
if service == CustomServiceList.RAD_SENSOR.value:
object_id = RAD_SENSOR_ID
@ -233,6 +235,10 @@ def pack_service_queue_user(
return pack_tcs_sys_commands(tc_queue=service_queue, op_code=op_code)
if service == CustomServiceList.TIME.value:
return pack_set_current_time_ascii_command(tc_queue=service_queue, ssc=0)
if service == CustomServiceList.RW_ASSEMBLY.value:
return pack_rw_ass_cmds(
tc_queue=service_queue, object_id=RW_ASSEMBLY, op_code=op_code
)
LOGGER.warning("Invalid Service !")