the endianness is actually really important here #266

Merged
meggert merged 1 commits from add-cmds-to-test-scheduling into main 2023-12-13 11:28:10 +01:00
2 changed files with 54 additions and 15 deletions

View File

@ -5,22 +5,25 @@ import struct
from pathlib import Path from pathlib import Path
from typing import Tuple from typing import Tuple
from eive_tmtc.pus_tm.defs import PrintWrapper
from eive_tmtc.config.definitions import CustomServiceList
from spacepackets.ecss import PusTelecommand from spacepackets.ecss import PusTelecommand
from tmtccmd.config import TmtcDefinitionWrapper from tmtccmd.config import TmtcDefinitionWrapper
from tmtccmd.tmtc import DefaultPusQueueHelper
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.pus.tc.s3_fsfw_hk import make_sid, generate_one_hk_command
from tmtccmd.pus.s20_fsfw_param import (
create_scalar_u8_parameter,
create_load_param_cmd,
)
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
from tmtccmd.pus.s11_tc_sched import (
create_enable_tc_sched_cmd,
create_disable_tc_sched_cmd,
)
from tmtccmd.pus.s20_fsfw_param import (
create_load_param_cmd,
create_scalar_u8_parameter,
)
from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid
from tmtccmd.tmtc import DefaultPusQueueHelper
from eive_tmtc.config.definitions import CustomServiceList
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
from eive_tmtc.pus_tm.defs import PrintWrapper
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -66,6 +69,7 @@ class ActionId(enum.IntEnum):
MV_HELPER = 53 MV_HELPER = 53
RM_HELPER = 54 RM_HELPER = 54
MKDIR_HELPER = 55 MKDIR_HELPER = 55
ENABLE_SCHEDULER = 56
class ParamId(enum.IntEnum): class ParamId(enum.IntEnum):
@ -115,6 +119,8 @@ class OpCode:
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt" RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
AUTO_SWITCH_ENABLE = "auto_switch_enable" AUTO_SWITCH_ENABLE = "auto_switch_enable"
AUTO_SWITCH_DISABLE = "auto_switch_disable" AUTO_SWITCH_DISABLE = "auto_switch_disable"
ENABLE_SCHEDULER = "enable_scheduler"
DISABLE_SCHEDULER = "disable_scheduler"
class Info: class Info:
@ -142,6 +148,8 @@ class Info:
MKDIR_HELPER = "Filesystem Directory Creation Helper" MKDIR_HELPER = "Filesystem Directory Creation Helper"
AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image" AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image"
AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature" AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature"
ENABLE_SCHEDULER = "Enable scheduler"
DISABLE_SCHEDULER = "Disable scheduler"
class Chip(enum.IntEnum): class Chip(enum.IntEnum):
@ -241,6 +249,8 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER) oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER)
oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER) oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER)
oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER) oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER)
oce.add(keys=OpCode.ENABLE_SCHEDULER, info=Info.ENABLE_SCHEDULER)
oce.add(keys=OpCode.DISABLE_SCHEDULER, info=Info.DISABLE_SCHEDULER)
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
@ -514,6 +524,12 @@ def pack_core_commands( # noqa C901
q.add_pus_tc( q.add_pus_tc(
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data) create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data)
) )
elif op_code == OpCode.ENABLE_SCHEDULER:
q.add_log_cmd(Info.ENABLE_SCHEDULER)
q.add_pus_tc(create_enable_tc_sched_cmd())
elif op_code == OpCode.DISABLE_SCHEDULER:
q.add_log_cmd(Info.DISABLE_SCHEDULER)
q.add_pus_tc(create_disable_tc_sched_cmd())
else: else:
_LOGGER.warning( _LOGGER.warning(
f"Unknown operation code {op_code} for core controller commands" f"Unknown operation code {op_code} for core controller commands"

View File

@ -1,10 +1,15 @@
from spacepackets.ecss import PusTelecommand, PusService import struct
import datetime
import math
from spacepackets.ecss import PusService, PusTelecommand
from tmtccmd.config import CoreServiceList from tmtccmd.config import CoreServiceList
from tmtccmd.config.tmtc import ( from tmtccmd.config.tmtc import (
tmtc_definitions_provider,
TmtcDefinitionWrapper,
OpCodeEntry, OpCodeEntry,
TmtcDefinitionWrapper,
tmtc_definitions_provider,
) )
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
from tmtccmd.pus.s17_test import create_service_17_ping_command from tmtccmd.pus.s17_test import create_service_17_ping_command
from tmtccmd.tmtc import service_provider from tmtccmd.tmtc import service_provider
from tmtccmd.tmtc.decorator import ServiceProviderParams from tmtccmd.tmtc.decorator import ServiceProviderParams
@ -14,12 +19,14 @@ class OpCodes:
PING = "ping" PING = "ping"
TRIGGER_EVENT = "trig_event" TRIGGER_EVENT = "trig_event"
PING_WITH_DATA = "ping_with_data" PING_WITH_DATA = "ping_with_data"
SCHEDULE_PING = "sched_ping"
class Info: class Info:
PING = "Simple Ping and Connection Test" PING = "Simple Ping and Connection Test"
TRIGGER_EVENT = "Trigger an event" TRIGGER_EVENT = "Trigger an event"
PING_WITH_DATA = "Ping with data. Size of sent data is sent back" PING_WITH_DATA = "Ping with data. Size of sent data is sent back"
SCHEDULE_PING = "Schedule a ping"
@tmtc_definitions_provider @tmtc_definitions_provider
@ -28,6 +35,7 @@ def add_test_defs(defs: TmtcDefinitionWrapper):
oce.add(keys=OpCodes.PING, info=Info.PING) oce.add(keys=OpCodes.PING, info=Info.PING)
oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT) oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT)
oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA) oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA)
oce.add(keys=OpCodes.SCHEDULE_PING, info=Info.SCHEDULE_PING)
defs.add_service( defs.add_service(
name=CoreServiceList.SERVICE_17_ALT, name=CoreServiceList.SERVICE_17_ALT,
@ -46,6 +54,21 @@ def pack_test_command(p: ServiceProviderParams):
if info.op_code == OpCodes.TRIGGER_EVENT: if info.op_code == OpCodes.TRIGGER_EVENT:
q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]") q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]")
q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128)) q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128))
if info.op_code == OpCodes.SCHEDULE_PING:
q.add_log_cmd("Sending scheduled PUS ping")
# Generate a UNIX timestamp 30 seconds in the future using the datetime API with a UTC timezone
now = datetime.datetime.now(tz=datetime.timezone.utc)
second_offset_to_now = input("Please specify offset to now in seconds: ")
now += datetime.timedelta(seconds=int(second_offset_to_now))
unix_stamp = struct.pack("!I", math.floor(now.timestamp()))
print(f"Sending ping scheuled at {now}")
ping = PusTelecommand(service=PusService.S17_TEST, subservice=128)
q.add_pus_tc(
create_time_tagged_cmd(
release_time=unix_stamp,
tc_to_insert=ping,
)
)
if info.op_code == OpCodes.PING_WITH_DATA: if info.op_code == OpCodes.PING_WITH_DATA:
q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]") q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]")
while True: while True: