HK level is a CLI argument now
EIVE/-/pipeline/head There was a failure building this commit Details

This commit is contained in:
Robin Müller 2023-07-10 16:07:01 +02:00
parent 069f84d220
commit fdb14cbdc5
Signed by: muellerr
GPG Key ID: 407F9B00F858F270
6 changed files with 28 additions and 14 deletions

View File

@ -14,6 +14,10 @@ list yields a list of all related PRs for each release.
- New TCS controller events
## Changed
- HK level can be specified as CLI argument now.
# [v5.1.0] 2023-06-28
## Added

View File

@ -45,14 +45,13 @@ _LOGGER = logging.getLogger(__name__)
FORWARD_SENSOR_TEMPS = False
# TODO: Transform this into a CLI argument
HK_OUTPUT_LEVEL = 1
def handle_hk_packet(
raw_tm: bytes,
obj_id_dict: ObjectIdDictT,
printer: FsfwTmTcPrinter,
hk_level: int
):
tm_packet = Service3FsfwTm.unpack(raw_telemetry=raw_tm, custom_hk_handling=False)
named_obj_id = obj_id_dict.get(tm_packet.object_id.as_bytes)
@ -73,9 +72,9 @@ def handle_hk_packet(
hk_data=hk_data,
)
try:
if HK_OUTPUT_LEVEL == 1:
if hk_level == 1:
pass
elif HK_OUTPUT_LEVEL > 1:
elif hk_level > 1:
handle_regular_hk_print(
printer=printer,
object_id=named_obj_id,

View File

@ -29,6 +29,7 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
verif_wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
hk_level: int
):
if len(packet) < 8:
_LOGGER.warning("Detected packet shorter than 8 bytes!")
@ -48,7 +49,7 @@ def pus_factory_hook( # noqa C901 : Complexity okay here
if service == 1:
handle_service_1_fsfw_packet(wrapper=verif_wrapper, raw_tm=packet)
elif service == 3:
handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict)
handle_hk_packet(printer=printer, raw_tm=packet, obj_id_dict=obj_id_dict, hk_level=hk_level)
elif service == 5:
handle_event_packet(raw_tm=packet, pw=pw)
elif service == 8:

View File

@ -6,6 +6,7 @@ class CtrlSetId(enum.IntEnum):
DEVICE_SENSORS = 1
SUS_TEMP_SENSORS = 2
HEATER_INFO = 4
TCS_CTRL_INFO = 5
class TcsSubmode(enum.IntEnum):

View File

@ -29,9 +29,9 @@ classifiers = [
"Topic :: Scientific/Engineering"
]
dependencies = [
"tmtccmd ~= 5.0.0rc0",
# "tmtccmd ~= 5.0.0rc0",
"python-dateutil ~= 2.8",
# tmtccmd @ git+https://github.com/robamu-org/tmtccmd@<gitRev>#egg=tmtccmd
tmtccmd @ git+https://github.com/robamu-org/tmtccmd@1b110d321ef85#egg=tmtccmd
]
[project.urls]

View File

@ -166,16 +166,18 @@ class PusHandler(SpecificApidHandlerBase):
wrapper: VerificationWrapper,
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
hk_level: int
):
super().__init__(PUS_APID, None)
self.printer = printer
self.verif_wrapper = wrapper
self.raw_logger = raw_logger
self.hk_level = hk_level
def handle_tm(self, packet: bytes, _user_args: any):
# with open("tc.bin", "wb") as of:
# of.write(packet)
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger)
pus_factory_hook(packet, self.verif_wrapper, self.printer, self.raw_logger, self.hk_level)
class UnknownApidHandler(GenericApidHandlerBase):
@ -335,13 +337,18 @@ class TcHandler(TcHandlerBase):
_LOGGER.info("Finished CFDP queue")
def setup_params() -> SetupWrapper:
def setup_params() -> (SetupWrapper, int):
hook_obj = EiveHookObject(default_json_path())
params = SetupParams()
parser_wrapper = PreArgsParsingWrapper()
parser_wrapper.create_default_parent_parser()
parser_wrapper.create_default_parser()
parser_wrapper.add_def_proc_and_cfdp_as_subparsers()
tmtc_parser, cfdp_parser = parser_wrapper.add_def_proc_and_cfdp_as_subparsers()
tmtc_parser.add_argument(
"--hk",
help="HK output level",
default=2,
)
post_arg_parsing_wrapper = parser_wrapper.parse(
setup_params=params, hook_obj=hook_obj
)
@ -352,13 +359,14 @@ def setup_params() -> SetupWrapper:
post_arg_parsing_wrapper.set_params_with_prompts(proc_param_wrapper)
else:
post_arg_parsing_wrapper.set_params_without_prompts(proc_param_wrapper)
hk_level = int(post_arg_parsing_wrapper.args_raw.hk)
params.apid = PUS_APID
if params.com_if is None:
raise ValueError("could not determine a COM interface.")
setup_wrapper = SetupWrapper(
hook_obj=hook_obj, setup_params=params, proc_param_wrapper=proc_param_wrapper
)
return setup_wrapper
return setup_wrapper, hk_level
def setup_cfdp_handler() -> CfdpInCcsdsWrapper:
@ -400,12 +408,13 @@ def setup_tmtc_handlers(
printer: FsfwTmTcPrinter,
raw_logger: RawTmtcTimedLogWrapper,
gui: bool,
hk_level: int
) -> (CcsdsTmHandler, TcHandler):
cfdp_in_ccsds_wrapper = setup_cfdp_handler()
verification_wrapper = VerificationWrapper(
verificator, _LOGGER, printer.file_logger
)
pus_handler = PusHandler(verification_wrapper, printer, raw_logger)
pus_handler = PusHandler(verification_wrapper, printer, raw_logger, hk_level)
ccsds_handler = CustomCcsdsTmHandler(generic_handler=UnknownApidHandler(None))
ccsds_handler.add_apid_handler(pus_handler)
ccsds_handler.add_apid_handler(cfdp_in_ccsds_wrapper)
@ -444,7 +453,7 @@ def main(): # noqa C901: Complexity okay here.
# TODO: -V CLI argument to enable this?
_LOGGER.setLevel(_LOG_LEVEL)
try:
setup_wrapper = setup_params()
setup_wrapper, hk_level = setup_params()
except KeyboardInterrupt as e:
_LOGGER.info(f"{e}. Exiting")
sys.exit(0)
@ -457,7 +466,7 @@ def main(): # noqa C901: Complexity okay here.
)
pus_verificator = PusVerificator()
ccsds_handler, tc_handler = setup_tmtc_handlers(
pus_verificator, printer, raw_logger, setup_wrapper.params.use_gui
pus_verificator, printer, raw_logger, setup_wrapper.params.use_gui, hk_level
)
tmtccmd.setup(setup_wrapper)