Merge remote-tracking branch 'origin/main' into irini

This commit is contained in:
Robin Müller 2022-08-16 18:52:09 +02:00
commit 631dc9ea9c
20 changed files with 269 additions and 28 deletions

View File

@ -13,9 +13,9 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="-s bpx -o hk -d 6 --hk" />
<option name="PARAMETERS" value="-s bpx -o hk -d 6" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="MGM Disable HK" type="PythonConfigurationType" factoryName="Python" folderName="ACS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="-s acs-ctrl -o disable-mgm-hk" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="MGM Enable HK" type="PythonConfigurationType" factoryName="Python" folderName="ACS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="-s acs-ctrl -o enable-mgm-hk -l" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

24
.run/MGM HK Once.run.xml Normal file
View File

@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="MGM HK Once" type="PythonConfigurationType" factoryName="Python" folderName="ACS">
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="-s acs-ctrl -o req-mgm-hk -d 5" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>

View File

@ -3,6 +3,9 @@
<module name="tmtc" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
@ -12,7 +15,7 @@
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />

View File

@ -14,6 +14,7 @@ list yields a list of all related PRs for each release.
- Major Update for `tmtccmd` and `spacepackets` dependencies which improves user API significantly.
PR: https://egit.irs.uni-stuttgart.de/eive/eive-tmtc/pulls/102
- Add commands to request MGM HK or enable/disable periodic HK for it
# [v1.12.0] 05.07.2022

View File

@ -43,6 +43,7 @@ class CustomServiceList(enum.Enum):
PDEC_HANDLER = "pdec_handler"
STR_IMG_HELPER = "str_img_helper"
SYRLINKS = "syrlinks"
ACS_CTRL = "acs-ctrl"
ACS_ASS = "acs-ass"
SUS_ASS = "sus-ass"
TCS = "tcs"

View File

@ -129,7 +129,7 @@ RW_ASSEMBLY = bytes([0x73, 0x00, 0x00, 0x04])
# Controllers
TCS_CONTROLLER = bytes([0x43, 0x40, 0x00, 0x01])
ACS_CONTROLLER = bytes([0x43, 0x40, 0x00, 0x02])
ACS_CONTROLLER = bytes([0x43, 0x00, 0x00, 0x02])
CORE_CONTROLLER_ID = bytes([0x43, 0x00, 0x00, 0x03])

2
deps/tmtccmd vendored

@ -1 +1 @@
Subproject commit 92e9b3d05c01c836eadbacd729e375f42d336f95
Subproject commit 403c5fd2b1cd28af8de2b39f308b633892e660e5

View File

@ -1,5 +1,7 @@
from config.definitions import CustomServiceList
from config.object_ids import BPX_HANDLER_ID
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd import DefaultProcedureInfo
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
@ -24,7 +26,10 @@ class BpxOpCodes:
REBOOT = ["4", "reboot"]
def pack_bpx_commands(q: DefaultPusQueueHelper, op_code: str):
@service_provider(CustomServiceList.BPX_BATTERY.value)
def pack_bpx_commands(
_info: DefaultProcedureInfo, q: DefaultPusQueueHelper, op_code: str
):
if op_code in BpxOpCodes.HK:
q.add_log_cmd("Requesting BPX battery HK set")
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET)

View File

@ -7,8 +7,23 @@
"""
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from gomspace.gomspace_common import *
from gomspace.gomspace_common import (
GsInfo,
GomspaceOpCodes,
TableEntry,
Channel,
pack_set_param_command,
TableIds,
pack_get_param_command,
pack_gnd_wdt_reset_command,
pack_ping_command,
GomspaceDeviceActionIds,
pack_reboot_command,
SetIds,
)
from config.object_ids import P60_DOCK_HANDLER
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.util import ObjectIdU32
class P60OpCodes:
@ -85,7 +100,7 @@ class P60DockHkTable:
def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
objb = object_id.as_bytes
if op_code in P60OpCodes.STACK_3V3_ON:
q.add_log_cmd(GsInfo.STACK_3V3_ON)
q.add_log_cmd(P60Info.STACK_3V3_ON)
q.add_pus_tc(
pack_set_param_command(
objb,
@ -95,7 +110,7 @@ def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
)
)
if op_code in P60OpCodes.STACK_3V3_OFF:
q.add_log_cmd(GsInfo.STACK_3V3_OFF)
q.add_log_cmd(P60Info.STACK_3V3_OFF)
q.add_pus_tc(
pack_set_param_command(
objb,
@ -105,7 +120,7 @@ def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
)
)
if op_code in P60OpCodes.STACK_5V_ON:
q.add_log_cmd(GsInfo.STACK_5V_ON)
q.add_log_cmd(P60Info.STACK_5V_ON)
q.add_pus_tc(
pack_set_param_command(
objb,
@ -115,7 +130,7 @@ def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
)
)
if op_code in P60OpCodes.STACK_5V_OFF:
q.add_log_cmd(GsInfo.STACK_5V_OFF)
q.add_log_cmd(P60Info.STACK_5V_OFF)
q.add_pus_tc(
pack_set_param_command(
objb,

View File

@ -2,8 +2,9 @@ import enum
import json
from config.definitions import CustomServiceList
from tmtccmd import DefaultProcedureInfo
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper
from config.object_ids import SCEX_HANDLER_ID
@ -62,7 +63,8 @@ def add_scex_cmds(defs: TmtcDefinitionWrapper):
)
def pack_scex_cmds(q: DefaultPusQueueHelper, op_code: str):
@service_provider(CustomServiceList.SCEX.value)
def pack_scex_cmds(_info: DefaultProcedureInfo, q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.PING:
q.add_log_cmd(Info.PING)
app_data = bytes([0])

View File

@ -3,7 +3,6 @@
from typing import cast
from pus_tc.devs.common_power import pack_power_commands
from pus_tc.devs.power import add_power_cmd_defs
from pus_tc.devs.rtd import pack_rtd_commands
from pus_tc.devs.scex import pack_scex_cmds
from pus_tc.system.controllers import (
@ -13,17 +12,17 @@ from pus_tc.system.controllers import (
from tmtccmd import DefaultProcedureInfo
from tmtccmd.config import CoreServiceList
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import FeedWrapper, DefaultPusQueueHelper
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.decorator import route_to_registered_service_handlers
from tmtccmd.tc.pus_5_event import (
pack_generic_service_5_test_into,
)
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
from pus_tc.service_200_mode import pack_service200_test_into
from pus_tc.service_200_mode import pack_service_200_test_into
from pus_tc.devs.p60dock import pack_p60dock_cmds
from pus_tc.devs.pdu2 import pack_pdu2_commands
from pus_tc.devs.pdu1 import pack_pdu1_commands
from pus_tc.devs.bpx_batt import pack_bpx_commands
from pus_tc.devs.acu import pack_acu_commands
from pus_tc.devs.solar_array_deployment import pack_solar_array_deployment_test_into
from pus_tc.devs.imtq import pack_imtq_test_into
@ -89,7 +88,7 @@ def handle_default_procedure(
if service == CoreServiceList.SERVICE_17.value:
return queue_helper.add_pus_tc(pack_service_17_ping_command())
if service == CoreServiceList.SERVICE_200.value:
return pack_service200_test_into(q=queue_helper)
return pack_service_200_test_into(q=queue_helper)
if service == CustomServiceList.P60DOCK.value:
object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER))
return pack_p60dock_cmds(object_id=object_id, q=queue_helper, op_code=op_code)
@ -104,8 +103,8 @@ def handle_default_procedure(
if service == CustomServiceList.ACU.value:
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code)
if service == CustomServiceList.BPX_BATTERY.value:
return pack_bpx_commands(q=queue_helper, op_code=op_code)
# if service == CustomServiceList.BPX_BATTERY.value:
# return pack_bpx_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TCS.value:
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TMP1075_1.value:
@ -216,4 +215,5 @@ def handle_default_procedure(
)
if service == CustomServiceList.SCEX.value:
return pack_scex_cmds(q=queue_helper, op_code=op_code)
if not route_to_registered_service_handlers(info, queue_helper, op_code):
LOGGER.warning(f"Invalid Service {service}")

View File

@ -13,7 +13,7 @@ from config.object_ids import TEST_DEVICE_ID
TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID
def pack_service200_test_into(q: DefaultPusQueueHelper):
def pack_service_200_test_into(q: DefaultPusQueueHelper):
q.add_log_cmd("Testing Service 200")
# Object ID: Dummy Device
obj_id = TEST_DEVICE_OBJ_ID

View File

@ -0,0 +1 @@
from . import acs_ctrl

125
pus_tc/system/acs_ctrl.py Normal file
View File

@ -0,0 +1,125 @@
import enum
import struct
from config.definitions import CustomServiceList
from config.object_ids import ACS_CONTROLLER
from pus_tm.defs import PrintWrapper
from tmtccmd import DefaultProcedureInfo, get_console_logger
from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry,
)
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid, \
enable_periodic_hk_command_with_interval, disable_periodic_hk_command
from tmtccmd.util.tmtc_printer import FsfwTmTcPrinter
LOGGER = get_console_logger()
class SetIds(enum.IntEnum):
MGM_SET = 0
class OpCodes:
REQUEST_MGM_HK = ["0", "req-mgm-hk"]
ENABLE_MGM_HK = ["1", "enable-mgm-hk"]
DISABLE_MGM_HK = ["1", "disable-mgm-hk"]
class Info:
REQUEST_MGM_HK = "Request MGM HK once"
ENABLE_MGM_HK = "Enable MGM HK data generation"
DISABLE_MGM_HK = "Disable MGM HK data generation"
@tmtc_definitions_provider
def acs_cmd_defs(defs: TmtcDefinitionWrapper):
oce = OpCodeEntry()
oce.add(keys=OpCodes.REQUEST_MGM_HK, info=Info.REQUEST_MGM_HK)
oce.add(keys=OpCodes.ENABLE_MGM_HK, info=Info.ENABLE_MGM_HK)
oce.add(keys=OpCodes.DISABLE_MGM_HK, info=Info.DISABLE_MGM_HK)
defs.add_service(
name=CustomServiceList.ACS_CTRL.value, info="ACS Controller", op_code_entry=oce
)
@service_provider(CustomServiceList.ACS_CTRL.value)
def pack_acs_ctrl_command(
_info: DefaultProcedureInfo, q: DefaultPusQueueHelper, op_code: str
):
sid = make_sid(ACS_CONTROLLER, SetIds.MGM_SET)
if op_code in OpCodes.REQUEST_MGM_HK:
q.add_log_cmd(Info.REQUEST_MGM_HK)
q.add_pus_tc(generate_one_hk_command(sid))
elif op_code in OpCodes.ENABLE_MGM_HK:
q.add_log_cmd(Info.ENABLE_MGM_HK)
cmd_tuple = enable_periodic_hk_command_with_interval(False, sid, 2.0)
q.add_pus_tc(cmd_tuple[0])
q.add_pus_tc(cmd_tuple[1])
elif op_code in OpCodes.DISABLE_MGM_HK:
q.add_log_cmd(Info.DISABLE_MGM_HK)
q.add_pus_tc(disable_periodic_hk_command(False, sid))
else:
LOGGER.info(f"Unknown op code {op_code}")
def handle_acs_ctrl_mgm_data(printer: FsfwTmTcPrinter, hk_data: bytes):
current_idx = 0
pw = PrintWrapper(printer)
if len(hk_data) < 61:
pw.dlog(
f"ACS CTRL HK: MGM HK data with length {len(hk_data)} shorter than expected 61 bytes"
)
pw.dlog(f"Raw Data: {hk_data.hex(sep=',')}")
return
def unpack_float_tuple(idx: int) -> (tuple, int):
f_tuple = struct.unpack(
float_tuple_fmt_str,
hk_data[idx : idx + struct.calcsize(float_tuple_fmt_str)],
)
idx += struct.calcsize(float_tuple_fmt_str)
return f_tuple, idx
float_tuple_fmt_str = "!fff"
mgm_0_lis3_floats_ut, current_idx = unpack_float_tuple(current_idx)
mgm_1_rm3100_floats_ut, current_idx = unpack_float_tuple(current_idx)
mgm_2_lis3_floats_ut, current_idx = unpack_float_tuple(current_idx)
mgm_3_rm3100_floats_ut, current_idx = unpack_float_tuple(current_idx)
i32_tuple_fmt = "!iii"
imtq_mgm_nt = struct.unpack(
i32_tuple_fmt,
hk_data[current_idx : current_idx + struct.calcsize(i32_tuple_fmt)],
)
imtq_mgm_ut = tuple(val / 1000.0 for val in imtq_mgm_nt)
current_idx += struct.calcsize(i32_tuple_fmt)
pw.dlog("ACS CTRL HK: MGM values [X,Y,Z] in floating point uT: ")
mgm_lists = [
mgm_0_lis3_floats_ut,
mgm_1_rm3100_floats_ut,
mgm_2_lis3_floats_ut,
mgm_3_rm3100_floats_ut,
imtq_mgm_ut,
]
formatted_list = []
# Reserve 8 decimal digits, use precision 3
float_str_fmt = "[{:8.3f}, {:8.3f}, {:8.3f}]"
for mgm_entry in mgm_lists[0:4]:
formatted_list.append(float_str_fmt.format(*mgm_entry))
formatted_list.append(hk_data[current_idx])
formatted_list.append(float_str_fmt.format(*mgm_lists[4]))
print_str_list = [
"ACS Board MGM 0 LIS3MDL",
"ACS Board MGM 1 RM3100",
"ACS Board MGM 2 LIS3MDL",
"ACS Board MGM 3 RM3100",
"IMTQ Actuation Status:",
"IMTQ MGM:",
]
for entry in zip(print_str_list, formatted_list):
pw.dlog(f"{entry[0].ljust(28)}: {entry[1]}")
current_idx += 1
assert current_idx == 61

View File

@ -42,6 +42,7 @@ from pus_tc.devs.reaction_wheels import pack_rw_ass_cmds, pack_set_speed_command
class OpCodes:
"""FT: Functional Test"""
TV_SETUP_TCS_FT_ON = ["s", "tcs-ft-on"]
RESET_SCHED = ["reset-sched", "rs"]
HEATER = ["heater"]
@ -712,7 +713,7 @@ def enable_listen_to_hk_for_x_seconds(
q.add_pus_tc(cmd)
generate_time_tagged_cmd(
struct.pack("!I", int(round(time.time() + interval_seconds))),
gen_disable_listen_to_hk_for_x_seconds(q, diag, device, sid)
gen_disable_listen_to_hk_for_x_seconds(q, diag, device, sid),
)

View File

@ -1,9 +1,11 @@
from datetime import datetime
from config.definitions import CustomServiceList
from spacepackets.ecss import PusTelecommand
from tmtccmd import DefaultProcedureInfo
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
LOGGER = get_console_logger()
@ -16,7 +18,10 @@ class Info:
SET_CURRENT_TIME = "Setting current time in ASCII format"
def pack_set_current_time_ascii_command(q: DefaultPusQueueHelper):
@service_provider(CustomServiceList.TIME.value)
def pack_set_current_time_ascii_command(
_info: DefaultProcedureInfo, q: DefaultPusQueueHelper, _op_code: str
):
time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0"
current_time_ascii = time_test_current_time.encode("ascii")
LOGGER.info(f"Current time in ASCII format: {current_time_ascii}")

View File

@ -1,5 +1,5 @@
"""HK Handling for EIVE OBSW"""
from pus_tc.system.acs_ctrl import handle_acs_ctrl_mgm_data
from pus_tm.devs.plpcdu import handle_plpcdu_hk
from pus_tm.devs.rad_sensor import handle_rad_sensor_data
from pus_tm.devs.sus import handle_sus_hk
@ -160,6 +160,8 @@ def handle_regular_hk_print(
handle_thermal_controller_hk_data(
object_id=object_id, printer=printer, set_id=set_id, hk_data=hk_data
)
elif objb == obj_ids.ACS_CONTROLLER:
handle_acs_ctrl_mgm_data(printer, hk_data)
else:
LOGGER.info(
f"Service 3 TM: Parsing for object {object_id} and set ID {set_id} "

View File

@ -61,6 +61,11 @@ from pus_tc.procedure_packer import handle_default_procedure
LOGGER = get_console_logger()
# Put rotating file logger parameters here for quick changes
ROTATING_TIMED_LOGGER_INTERVAL_WHEN = TimedLogWhen.PER_MINUTE
ROTATING_TIMED_LOGGER_INTERVAL = 30
class PusHandler(SpecificApidHandlerBase):
def __init__(
self,
@ -198,7 +203,10 @@ def main():
sys.exit(0)
tmtc_logger = RegularTmtcLogWrapper()
printer = FsfwTmTcPrinter(tmtc_logger.logger)
raw_logger = RawTmtcTimedLogWrapper(when=TimedLogWhen.PER_HOUR, interval=2)
raw_logger = RawTmtcTimedLogWrapper(
when=ROTATING_TIMED_LOGGER_INTERVAL_WHEN,
interval=ROTATING_TIMED_LOGGER_INTERVAL
)
pus_verificator = PusVerificator()
ccsds_handler, tc_handler = setup_tmtc(
pus_verificator, printer, raw_logger, setup_wrapper.params.use_gui