Merge pull request 'the endianness is actually really important here' (#266) from add-cmds-to-test-scheduling into main
All checks were successful
EIVE/-/pipeline/head This commit looks good
All checks were successful
EIVE/-/pipeline/head This commit looks good
Reviewed-on: #266 Reviewed-by: Marius Eggert <eggertm@irs.uni-stuttgart.de>
This commit is contained in:
commit
2789453d57
@ -5,22 +5,25 @@ import struct
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Tuple
|
from typing import Tuple
|
||||||
|
|
||||||
from eive_tmtc.pus_tm.defs import PrintWrapper
|
|
||||||
|
|
||||||
from eive_tmtc.config.definitions import CustomServiceList
|
|
||||||
from spacepackets.ecss import PusTelecommand
|
from spacepackets.ecss import PusTelecommand
|
||||||
from tmtccmd.config import TmtcDefinitionWrapper
|
from tmtccmd.config import TmtcDefinitionWrapper
|
||||||
|
|
||||||
from tmtccmd.tmtc import DefaultPusQueueHelper
|
|
||||||
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
|
||||||
from tmtccmd.pus.tc.s3_fsfw_hk import make_sid, generate_one_hk_command
|
|
||||||
from tmtccmd.pus.s20_fsfw_param import (
|
|
||||||
create_scalar_u8_parameter,
|
|
||||||
create_load_param_cmd,
|
|
||||||
)
|
|
||||||
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
|
from tmtccmd.config.tmtc import OpCodeEntry, tmtc_definitions_provider
|
||||||
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
|
|
||||||
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
from tmtccmd.fsfw.tmtc_printer import FsfwTmTcPrinter
|
||||||
|
from tmtccmd.pus.s8_fsfw_action import create_action_cmd
|
||||||
|
from tmtccmd.pus.s11_tc_sched import (
|
||||||
|
create_enable_tc_sched_cmd,
|
||||||
|
create_disable_tc_sched_cmd,
|
||||||
|
)
|
||||||
|
from tmtccmd.pus.s20_fsfw_param import (
|
||||||
|
create_load_param_cmd,
|
||||||
|
create_scalar_u8_parameter,
|
||||||
|
)
|
||||||
|
from tmtccmd.pus.tc.s3_fsfw_hk import generate_one_hk_command, make_sid
|
||||||
|
from tmtccmd.tmtc import DefaultPusQueueHelper
|
||||||
|
|
||||||
|
from eive_tmtc.config.definitions import CustomServiceList
|
||||||
|
from eive_tmtc.config.object_ids import CORE_CONTROLLER_ID
|
||||||
|
from eive_tmtc.pus_tm.defs import PrintWrapper
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -66,6 +69,7 @@ class ActionId(enum.IntEnum):
|
|||||||
MV_HELPER = 53
|
MV_HELPER = 53
|
||||||
RM_HELPER = 54
|
RM_HELPER = 54
|
||||||
MKDIR_HELPER = 55
|
MKDIR_HELPER = 55
|
||||||
|
ENABLE_SCHEDULER = 56
|
||||||
|
|
||||||
|
|
||||||
class ParamId(enum.IntEnum):
|
class ParamId(enum.IntEnum):
|
||||||
@ -115,6 +119,8 @@ class OpCode:
|
|||||||
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
|
RWD_SET_MAX_REBOOT_CNT = "rwd_max_cnt"
|
||||||
AUTO_SWITCH_ENABLE = "auto_switch_enable"
|
AUTO_SWITCH_ENABLE = "auto_switch_enable"
|
||||||
AUTO_SWITCH_DISABLE = "auto_switch_disable"
|
AUTO_SWITCH_DISABLE = "auto_switch_disable"
|
||||||
|
ENABLE_SCHEDULER = "enable_scheduler"
|
||||||
|
DISABLE_SCHEDULER = "disable_scheduler"
|
||||||
|
|
||||||
|
|
||||||
class Info:
|
class Info:
|
||||||
@ -142,6 +148,8 @@ class Info:
|
|||||||
MKDIR_HELPER = "Filesystem Directory Creation Helper"
|
MKDIR_HELPER = "Filesystem Directory Creation Helper"
|
||||||
AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image"
|
AUTO_SWITCH_ENABLE = "Enable Auto-Switch Feature with a specific target image"
|
||||||
AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature"
|
AUTO_SWITCH_DISABLE = "Disable Auto-Switch Feature"
|
||||||
|
ENABLE_SCHEDULER = "Enable scheduler"
|
||||||
|
DISABLE_SCHEDULER = "Disable scheduler"
|
||||||
|
|
||||||
|
|
||||||
class Chip(enum.IntEnum):
|
class Chip(enum.IntEnum):
|
||||||
@ -241,6 +249,8 @@ def add_core_controller_definitions(defs: TmtcDefinitionWrapper):
|
|||||||
oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER)
|
oce.add(keys=OpCode.CP_HELPER, info=Info.CP_HELPER)
|
||||||
oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER)
|
oce.add(keys=OpCode.RM_HELPER, info=Info.RM_HELPER)
|
||||||
oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER)
|
oce.add(keys=OpCode.MKDIR_HELPER, info=Info.MKDIR_HELPER)
|
||||||
|
oce.add(keys=OpCode.ENABLE_SCHEDULER, info=Info.ENABLE_SCHEDULER)
|
||||||
|
oce.add(keys=OpCode.DISABLE_SCHEDULER, info=Info.DISABLE_SCHEDULER)
|
||||||
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
|
defs.add_service(CustomServiceList.CORE.value, "Core Controller", oce)
|
||||||
|
|
||||||
|
|
||||||
@ -514,6 +524,12 @@ def pack_core_commands( # noqa C901
|
|||||||
q.add_pus_tc(
|
q.add_pus_tc(
|
||||||
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data)
|
create_action_cmd(CORE_CONTROLLER_ID, ActionId.MKDIR_HELPER, user_data)
|
||||||
)
|
)
|
||||||
|
elif op_code == OpCode.ENABLE_SCHEDULER:
|
||||||
|
q.add_log_cmd(Info.ENABLE_SCHEDULER)
|
||||||
|
q.add_pus_tc(create_enable_tc_sched_cmd())
|
||||||
|
elif op_code == OpCode.DISABLE_SCHEDULER:
|
||||||
|
q.add_log_cmd(Info.DISABLE_SCHEDULER)
|
||||||
|
q.add_pus_tc(create_disable_tc_sched_cmd())
|
||||||
else:
|
else:
|
||||||
_LOGGER.warning(
|
_LOGGER.warning(
|
||||||
f"Unknown operation code {op_code} for core controller commands"
|
f"Unknown operation code {op_code} for core controller commands"
|
||||||
|
@ -1,10 +1,15 @@
|
|||||||
from spacepackets.ecss import PusTelecommand, PusService
|
import struct
|
||||||
|
import datetime
|
||||||
|
import math
|
||||||
|
|
||||||
|
from spacepackets.ecss import PusService, PusTelecommand
|
||||||
from tmtccmd.config import CoreServiceList
|
from tmtccmd.config import CoreServiceList
|
||||||
from tmtccmd.config.tmtc import (
|
from tmtccmd.config.tmtc import (
|
||||||
tmtc_definitions_provider,
|
|
||||||
TmtcDefinitionWrapper,
|
|
||||||
OpCodeEntry,
|
OpCodeEntry,
|
||||||
|
TmtcDefinitionWrapper,
|
||||||
|
tmtc_definitions_provider,
|
||||||
)
|
)
|
||||||
|
from tmtccmd.pus.s11_tc_sched import create_time_tagged_cmd
|
||||||
from tmtccmd.pus.s17_test import create_service_17_ping_command
|
from tmtccmd.pus.s17_test import create_service_17_ping_command
|
||||||
from tmtccmd.tmtc import service_provider
|
from tmtccmd.tmtc import service_provider
|
||||||
from tmtccmd.tmtc.decorator import ServiceProviderParams
|
from tmtccmd.tmtc.decorator import ServiceProviderParams
|
||||||
@ -14,12 +19,14 @@ class OpCodes:
|
|||||||
PING = "ping"
|
PING = "ping"
|
||||||
TRIGGER_EVENT = "trig_event"
|
TRIGGER_EVENT = "trig_event"
|
||||||
PING_WITH_DATA = "ping_with_data"
|
PING_WITH_DATA = "ping_with_data"
|
||||||
|
SCHEDULE_PING = "sched_ping"
|
||||||
|
|
||||||
|
|
||||||
class Info:
|
class Info:
|
||||||
PING = "Simple Ping and Connection Test"
|
PING = "Simple Ping and Connection Test"
|
||||||
TRIGGER_EVENT = "Trigger an event"
|
TRIGGER_EVENT = "Trigger an event"
|
||||||
PING_WITH_DATA = "Ping with data. Size of sent data is sent back"
|
PING_WITH_DATA = "Ping with data. Size of sent data is sent back"
|
||||||
|
SCHEDULE_PING = "Schedule a ping"
|
||||||
|
|
||||||
|
|
||||||
@tmtc_definitions_provider
|
@tmtc_definitions_provider
|
||||||
@ -28,6 +35,7 @@ def add_test_defs(defs: TmtcDefinitionWrapper):
|
|||||||
oce.add(keys=OpCodes.PING, info=Info.PING)
|
oce.add(keys=OpCodes.PING, info=Info.PING)
|
||||||
oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT)
|
oce.add(keys=OpCodes.TRIGGER_EVENT, info=Info.TRIGGER_EVENT)
|
||||||
oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA)
|
oce.add(keys=OpCodes.PING_WITH_DATA, info=Info.PING_WITH_DATA)
|
||||||
|
oce.add(keys=OpCodes.SCHEDULE_PING, info=Info.SCHEDULE_PING)
|
||||||
|
|
||||||
defs.add_service(
|
defs.add_service(
|
||||||
name=CoreServiceList.SERVICE_17_ALT,
|
name=CoreServiceList.SERVICE_17_ALT,
|
||||||
@ -46,6 +54,21 @@ def pack_test_command(p: ServiceProviderParams):
|
|||||||
if info.op_code == OpCodes.TRIGGER_EVENT:
|
if info.op_code == OpCodes.TRIGGER_EVENT:
|
||||||
q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]")
|
q.add_log_cmd("Sending PUS TC Event Trigger [17, 128]")
|
||||||
q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128))
|
q.add_pus_tc(PusTelecommand(service=PusService.S17_TEST, subservice=128))
|
||||||
|
if info.op_code == OpCodes.SCHEDULE_PING:
|
||||||
|
q.add_log_cmd("Sending scheduled PUS ping")
|
||||||
|
# Generate a UNIX timestamp 30 seconds in the future using the datetime API with a UTC timezone
|
||||||
|
now = datetime.datetime.now(tz=datetime.timezone.utc)
|
||||||
|
second_offset_to_now = input("Please specify offset to now in seconds: ")
|
||||||
|
now += datetime.timedelta(seconds=int(second_offset_to_now))
|
||||||
|
unix_stamp = struct.pack("!I", math.floor(now.timestamp()))
|
||||||
|
print(f"Sending ping scheuled at {now}")
|
||||||
|
ping = PusTelecommand(service=PusService.S17_TEST, subservice=128)
|
||||||
|
q.add_pus_tc(
|
||||||
|
create_time_tagged_cmd(
|
||||||
|
release_time=unix_stamp,
|
||||||
|
tc_to_insert=ping,
|
||||||
|
)
|
||||||
|
)
|
||||||
if info.op_code == OpCodes.PING_WITH_DATA:
|
if info.op_code == OpCodes.PING_WITH_DATA:
|
||||||
q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]")
|
q.add_log_cmd("Sending Ping With Data, Size Reported Back [17, 129]")
|
||||||
while True:
|
while True:
|
||||||
|
Loading…
Reference in New Issue
Block a user