trying new tc service decorator

This commit is contained in:
Robin Müller 2022-08-12 22:33:16 +02:00
parent 3fa700804e
commit b9cdfdf6c8
No known key found for this signature in database
GPG Key ID: 9C287E88FED11DF3
9 changed files with 52 additions and 24 deletions

View File

@ -13,9 +13,9 @@
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/tmtcc.py" />
<option name="PARAMETERS" value="-s bpx -o hk -d 6 --hk" />
<option name="PARAMETERS" value="-s bpx -o hk -d 6" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="true" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />

2
deps/tmtccmd vendored

@ -1 +1 @@
Subproject commit 92e9b3d05c01c836eadbacd729e375f42d336f95
Subproject commit 5e153e556716d340641563f2a18478a664fa4b92

View File

@ -1,5 +1,7 @@
from config.definitions import CustomServiceList
from config.object_ids import BPX_HANDLER_ID
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd import DefaultProcedureInfo
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
@ -24,7 +26,10 @@ class BpxOpCodes:
REBOOT = ["4", "reboot"]
def pack_bpx_commands(q: DefaultPusQueueHelper, op_code: str):
@service_provider(CustomServiceList.BPX_BATTERY.value)
def pack_bpx_commands(
_info: DefaultProcedureInfo, q: DefaultPusQueueHelper, op_code: str
):
if op_code in BpxOpCodes.HK:
q.add_log_cmd("Requesting BPX battery HK set")
sid = make_sid(object_id=BPX_HANDLER_ID, set_id=BpxSetIds.GET_HK_SET)

View File

@ -7,8 +7,23 @@
"""
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.pus_3_fsfw_hk import generate_one_hk_command, make_sid
from gomspace.gomspace_common import *
from gomspace.gomspace_common import (
GsInfo,
GomspaceOpCodes,
TableEntry,
Channel,
pack_set_param_command,
TableIds,
pack_get_param_command,
pack_gnd_wdt_reset_command,
pack_ping_command,
GomspaceDeviceActionIds,
pack_reboot_command,
SetIds,
)
from config.object_ids import P60_DOCK_HANDLER
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.util import ObjectIdU32
class P60OpCodes:
@ -85,7 +100,7 @@ class P60DockHkTable:
def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code: str):
objb = object_id.as_bytes
if op_code in P60OpCodes.STACK_3V3_ON:
q.add_log_cmd(GsInfo.STACK_3V3_ON)
q.add_log_cmd(P60Info.STACK_3V3_ON)
q.add_pus_tc(
pack_set_param_command(
objb,
@ -95,7 +110,7 @@ def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
)
)
if op_code in P60OpCodes.STACK_3V3_OFF:
q.add_log_cmd(GsInfo.STACK_3V3_OFF)
q.add_log_cmd(P60Info.STACK_3V3_OFF)
q.add_pus_tc(
pack_set_param_command(
objb,
@ -105,7 +120,7 @@ def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
)
)
if op_code in P60OpCodes.STACK_5V_ON:
q.add_log_cmd(GsInfo.STACK_5V_ON)
q.add_log_cmd(P60Info.STACK_5V_ON)
q.add_pus_tc(
pack_set_param_command(
objb,
@ -115,7 +130,7 @@ def pack_p60dock_cmds(object_id: ObjectIdU32, q: DefaultPusQueueHelper, op_code:
)
)
if op_code in P60OpCodes.STACK_5V_OFF:
q.add_log_cmd(GsInfo.STACK_5V_OFF)
q.add_log_cmd(P60Info.STACK_5V_OFF)
q.add_pus_tc(
pack_set_param_command(
objb,

View File

@ -2,8 +2,9 @@ import enum
import json
from config.definitions import CustomServiceList
from tmtccmd import DefaultProcedureInfo
from tmtccmd.config.tmtc import tmtc_definitions_provider
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
from tmtccmd.tc.pus_8_funccmd import generate_action_command
from tmtccmd.config import OpCodeEntry, TmtcDefinitionWrapper
from config.object_ids import SCEX_HANDLER_ID
@ -62,7 +63,8 @@ def add_scex_cmds(defs: TmtcDefinitionWrapper):
)
def pack_scex_cmds(q: DefaultPusQueueHelper, op_code: str):
@service_provider(CustomServiceList.SCEX.value)
def pack_scex_cmds(_info: DefaultProcedureInfo, q: DefaultPusQueueHelper, op_code: str):
if op_code in OpCodes.PING:
q.add_log_cmd(Info.PING)
app_data = bytes([0])

View File

@ -3,7 +3,6 @@
from typing import cast
from pus_tc.devs.common_power import pack_power_commands
from pus_tc.devs.power import add_power_cmd_defs
from pus_tc.devs.rtd import pack_rtd_commands
from pus_tc.devs.scex import pack_scex_cmds
from pus_tc.system.controllers import (
@ -13,17 +12,17 @@ from pus_tc.system.controllers import (
from tmtccmd import DefaultProcedureInfo
from tmtccmd.config import CoreServiceList
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import FeedWrapper, DefaultPusQueueHelper
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc.decorator import route_to_registered_service_handlers
from tmtccmd.tc.pus_5_event import (
pack_generic_service_5_test_into,
)
from tmtccmd.pus.pus_17_test import pack_service_17_ping_command
from pus_tc.service_200_mode import pack_service200_test_into
from pus_tc.service_200_mode import pack_service_200_test_into
from pus_tc.devs.p60dock import pack_p60dock_cmds
from pus_tc.devs.pdu2 import pack_pdu2_commands
from pus_tc.devs.pdu1 import pack_pdu1_commands
from pus_tc.devs.bpx_batt import pack_bpx_commands
from pus_tc.devs.acu import pack_acu_commands
from pus_tc.devs.solar_array_deployment import pack_solar_array_deployment_test_into
from pus_tc.devs.imtq import pack_imtq_test_into
@ -89,7 +88,7 @@ def handle_default_procedure(
if service == CoreServiceList.SERVICE_17.value:
return queue_helper.add_pus_tc(pack_service_17_ping_command())
if service == CoreServiceList.SERVICE_200.value:
return pack_service200_test_into(q=queue_helper)
return pack_service_200_test_into(q=queue_helper)
if service == CustomServiceList.P60DOCK.value:
object_id = cast(ObjectIdU32, obj_id_man.get(P60_DOCK_HANDLER))
return pack_p60dock_cmds(object_id=object_id, q=queue_helper, op_code=op_code)
@ -104,8 +103,8 @@ def handle_default_procedure(
if service == CustomServiceList.ACU.value:
object_id = cast(ObjectIdU32, obj_id_man.get(ACU_HANDLER_ID))
return pack_acu_commands(object_id=object_id, q=queue_helper, op_code=op_code)
if service == CustomServiceList.BPX_BATTERY.value:
return pack_bpx_commands(q=queue_helper, op_code=op_code)
# if service == CustomServiceList.BPX_BATTERY.value:
# return pack_bpx_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TCS.value:
return pack_tcs_sys_commands(q=queue_helper, op_code=op_code)
if service == CustomServiceList.TMP1075_1.value:
@ -216,4 +215,5 @@ def handle_default_procedure(
)
if service == CustomServiceList.SCEX.value:
return pack_scex_cmds(q=queue_helper, op_code=op_code)
LOGGER.warning(f"Invalid Service {service}")
if not route_to_registered_service_handlers(info, queue_helper, op_code):
LOGGER.warning(f"Invalid Service {service}")

View File

@ -13,7 +13,7 @@ from config.object_ids import TEST_DEVICE_ID
TEST_DEVICE_OBJ_ID = TEST_DEVICE_ID
def pack_service200_test_into(q: DefaultPusQueueHelper):
def pack_service_200_test_into(q: DefaultPusQueueHelper):
q.add_log_cmd("Testing Service 200")
# Object ID: Dummy Device
obj_id = TEST_DEVICE_OBJ_ID

View File

@ -42,6 +42,7 @@ from pus_tc.devs.reaction_wheels import pack_rw_ass_cmds, pack_set_speed_command
class OpCodes:
"""FT: Functional Test"""
TV_SETUP_TCS_FT_ON = ["s", "tcs-ft-on"]
RESET_SCHED = ["reset-sched", "rs"]
HEATER = ["heater"]
@ -712,7 +713,7 @@ def enable_listen_to_hk_for_x_seconds(
q.add_pus_tc(cmd)
generate_time_tagged_cmd(
struct.pack("!I", int(round(time.time() + interval_seconds))),
gen_disable_listen_to_hk_for_x_seconds(q, diag, device, sid)
gen_disable_listen_to_hk_for_x_seconds(q, diag, device, sid),
)

View File

@ -1,9 +1,11 @@
from datetime import datetime
from config.definitions import CustomServiceList
from spacepackets.ecss import PusTelecommand
from tmtccmd import DefaultProcedureInfo
from tmtccmd.logging import get_console_logger
from tmtccmd.tc import DefaultPusQueueHelper
from tmtccmd.tc import DefaultPusQueueHelper, service_provider
LOGGER = get_console_logger()
@ -16,7 +18,10 @@ class Info:
SET_CURRENT_TIME = "Setting current time in ASCII format"
def pack_set_current_time_ascii_command(q: DefaultPusQueueHelper):
@service_provider(CustomServiceList.TIME.value)
def pack_set_current_time_ascii_command(
_info: DefaultProcedureInfo, q: DefaultPusQueueHelper, _op_code: str
):
time_test_current_time = datetime.utcnow().isoformat() + "Z" + "\0"
current_time_ascii = time_test_current_time.encode("ascii")
LOGGER.info(f"Current time in ASCII format: {current_time_ascii}")