From 5456d799650ef1c6223b571a7086fd45fe363f6b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 13 Dec 2023 11:23:55 +0100 Subject: [PATCH] the endianness is actually really important here --- eive_tmtc/tmtc/core.py | 40 ++++++++++++++++++++++++++++------------ eive_tmtc/tmtc/test.py | 29 ++++++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/eive_tmtc/tmtc/core.py b/eive_tmtc/tmtc/core.py index a8f5faf..94e6a8a 100644 --- a/eive_tmtc/tmtc/core.py +++ b/eive_tmtc/tmtc/core.py @@ -5,22 +5,25 @@ import struct from pathlib import Path from typing import Tuple -from eive_tmtc.pus_tm.defs import PrintWrapper - -from eive_tmtc.config.definitions import CustomServiceList from spacepackets.ecss import PusTelecommand from tmtccmd.config import TmtcDefinitionWrapper - -from tmtccmd.tmtc import DefaultPusQueueHelper -from tmtccmd.pus.s8_fsfw_action import create_action_cmd -from tmtccmd.pus.tc.s3_fsfw_hk import make_sid, generate_one_hk_command -from tmtccmd.pus.s20_fsfw_param import ( - create_scalar_u8_parameter, - create_load_param_cmd, -) from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider -from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter +from tmtccmd.pus.s8_fsfw_action import create_action_cmd +from tmtccmd.pus.s11_tc_sched import ( + create_enable_tc_sched_cmd, + create_disable_tc_sched_cmd, +) +from tmtccmd.pus.s20_fsfw_param import ( + create_load_param_cmd, + create_scalar_u8_parameter, +) +from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid +from tmtccmd.tmtc import DefaultPusQueueHelper + +from eive_tmtc.config.definitions import CustomServiceList +from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID +from eive_tmtc.pus_tm.defs import PrintWrapper _LOGGER = logging.getLogger(__name__) @@ -66,6 +69,7 @@ class ActionId(enum.IntEnum): MV_HELPER = 53 RM_HELPER = 54 MKDIR_HELPER = 55 + ENABLE_SCHEDULER = 56 class ParamId(enum.IntEnum): @@ -115,6 +119,8 @@ class OpCode: RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt" AUTO_SWITCH_ENABLE = "auto_switch_enable" AUTO_SWITCH_DISABLE = "auto_switch_disable" + ENABLE_SCHEDULER = "enable_scheduler" + DISABLE_SCHEDULER = "disable_scheduler" class Info: @@ -142,6 +148,8 @@ class Info: MKDIR_HELPER = "Filesystem Directory Creation Helper" AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image" AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature" + ENABLE_SCHEDULER = "Enable scheduler" + DISABLE_SCHEDULER = "Disable scheduler" class Chip(enum.IntEnum): @@ -241,6 +249,8 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper): oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER) oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER) oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER) + oce.add(keys=OpCode.ENABLE_SCHEDULER, info=Info.ENABLE_SCHEDULER) + oce.add(keys=OpCode.DISABLE_SCHEDULER, info=Info.DISABLE_SCHEDULER) defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce) @@ -514,6 +524,12 @@ def pack_core_commands( # noqa C901 q.add_pus_tc( create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data) ) + elif op_code == OpCode.ENABLE_SCHEDULER: + q.add_log_cmd(Info.ENABLE_SCHEDULER) + q.add_pus_tc(create_enable_tc_sched_cmd()) + elif op_code == OpCode.DISABLE_SCHEDULER: + q.add_log_cmd(Info.DISABLE_SCHEDULER) + q.add_pus_tc(create_disable_tc_sched_cmd()) else: _LOGGER.warning( f"Unknown operation code {op_code} for core controller commands" diff --git a/eive_tmtc/tmtc/test.py b/eive_tmtc/tmtc/test.py index c484a38..a5b36c8 100644 --- a/eive_tmtc/tmtc/test.py +++ b/eive_tmtc/tmtc/test.py @@ -1,10 +1,15 @@ -from spacepackets.ecss import PusTelecommand, PusService +import struct +import datetime +import math + +from spacepackets.ecss import PusService, PusTelecommand from tmtccmd.config import CoreServiceList from tmtccmd.config.tmtc import ( - tmtc_definitions_provider, - TmtcDefinitionWrapper, OpCodeEntry, + TmtcDefinitionWrapper, + tmtc_definitions_provider, ) +from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd from tmtccmd.pus.s17_test import create_service_17_ping_command from tmtccmd.tmtc import service_provider from tmtccmd.tmtc.decorator import ServiceProviderParams @@ -14,12 +19,14 @@ class OpCodes: PING = "ping" TRIGGER_EVENT = "trig_event" PING_WITH_DATA = "ping_with_data" + SCHEDULE_PING = "sched_ping" class Info: PING = "Simple Ping and Connection Test" TRIGGER_EVENT = "Trigger an event" PING_WITH_DATA = "Ping with data. Size of sent data is sent back" + SCHEDULE_PING = "Schedule a ping" @tmtc_definitions_provider @@ -28,6 +35,7 @@ def add_test_defs(defs: TmtcDefinitionWrapper): oce.add(keys=OpCodes.PING, info=Info.PING) oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT) oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA) + oce.add(keys=OpCodes.SCHEDULE_PING, info=Info.SCHEDULE_PING) defs.add_service( name=CoreServiceList.SERVICE_17_ALT, @@ -46,6 +54,21 @@ def pack_test_command(p: ServiceProviderParams): if info.op_code == OpCodes.TRIGGER_EVENT: q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]") q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128)) + if info.op_code == OpCodes.SCHEDULE_PING: + q.add_log_cmd("Sending scheduled PUS ping") + # Generate a UNIX timestamp 30 seconds in the future using the datetime API with a UTC timezone + now = datetime.datetime.now(tz=datetime.timezone.utc) + second_offset_to_now = input("Please specify offset to now in seconds: ") + now += datetime.timedelta(seconds=int(second_offset_to_now)) + unix_stamp = struct.pack("!I", math.floor(now.timestamp())) + print(f"Sending ping scheuled at {now}") + ping = PusTelecommand(service=PusService.S17_TEST, subservice=128) + q.add_pus_tc( + create_time_tagged_cmd( + release_time=unix_stamp, + tc_to_insert=ping, + ) + ) if info.op_code == OpCodes.PING_WITH_DATA: q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]") while True: