# -*- coding: utf-8 -*- """ @file ploc_supervisor.py @brief Tests for commanding the supervisor of the PLOC. The supervisor is programmed by Thales. @author J. Meier @date 10.07.2021 """ import struct from spacepackets.ecss.tc import PusTelecommand from tmtccmd.logging import get_console_logger from tmtccmd.tc import QueueHelper from tmtccmd.tc.pus_200_fsfw_modes import pack_mode_data, Modes from tmtccmd.util import ObjectIdU32 from utility.input_helper import InputHelper LOGGER = get_console_logger() latchup_id_dict = { "0": "0.85V", "1": "1.8V", "2": "MISC", "3": "3.3V", "4": "NVM_4XO", "5": "MISSION", "6": "SAFECOTS", } MANUAL_INPUT = "1" update_file_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/supervisor/update.bin", "/mnt/sd0/ploc/supervisor/update.bin"], "3": [ "/mnt/sd0/ploc/supervisor/update-large.bin", "/mnt/sd0/ploc/supervisor/update-large.bin", ], "4": [ "/mnt/sd0/ploc/supervisor/update-small.bin", "/mnt/sd0/ploc/supervisor/update-small.bin", ], "5": [ "/mnt/sd0/ploc/supervisor/mpsoc-uart-working.bin", "/mnt/sd0/ploc/supervisor/mpsoc-uart-working.bin", ], } event_buffer_path_dict = { MANUAL_INPUT: ["manual input", ""], "2": ["/mnt/sd0/ploc/supervisor", "/mnt/sd0/ploc/supervisor"], } class SupvActionIds: HK_REPORT = 1 START_MPSOC = 3 SHUTWOWN_MPSOC = 4 SEL_MPSOC_BOOT_IMAGE = 5 SET_BOOT_TIMEOUT = 6 SET_MAX_RESTART_TRIES = 7 RESET_MPSOC = 8 SET_TIME_REF = 9 DISABLE_HK = 10 GET_BOOT_STATUS_REPORT = 11 UPDATE_AVAILABLE = 12 ENABLE_LATCHUP_ALERT = 15 DISABLE_LATCHUP_ALERT = 16 SET_ALERT_LIMIT = 18 SET_ADC_SWEEP_PERIOD = 20 SET_ADC_ENABLED_CHANNELS = 21 SET_ADC_WINDOW_AND_STRIDE = 22 SET_ADC_THRESHOLD = 23 GET_LATCHUP_STATUS_REPORT = 24 COPY_ADC_DATA_TO_MRAM = 25 SELECT_NVM = 27 RUN_AUTO_EM_TESTS = 28 WIPE_MRAM = 29 DUMP_MRAM = 30 SET_GPIO = 34 READ_GPIO = 35 RESTART_SUPERVISOR = 36 FACTORY_RESET_CLEAR_ALL = 37 LOGGING_REQUEST_COUNTERS = 38 UPDATE_IMAGE_DATA = 39 FACTORY_RESET_CLEAR_MIRROR = 40 FACTORY_RESET_CLEAR_CIRCULAR = 41 START_MPSOC_QUIET = 45 SET_SHUTDOWN_TIMEOUT = 46 FACTORY_FLASH = 47 PERFORM_UPDATE = 48 TERMINATE_SUPV_HELPER = 49 ENABLE_AUTO_TM = 50 DISABLE_AUTO_TM = 51 LOGGING_REQUEST_EVENT_BUFFERS = 54 LOGGING_CLEAR_COUNTERS = 55 LOGGING_SET_TOPIC = 56 REQUEST_ADC_REPORT = 57 RESET_PL = 58 ENABLE_NVMS = 59 CONTINUE_UPDATE = 60 class SupvHkIds: HK_REPORT = 52 BOOT_STATUS_REPORT = 53 def pack_ploc_supv_commands(object_id: ObjectIdU32, q: QueueHelper, op_code: str): q.add_log_cmd(f"Testing PLOC Supervisor with object id: {object_id.as_hex_string}") obyt = object_id.as_bytes if op_code == "0": q.add_log_cmd("PLOC Supervisor: Set mode off") command = pack_mode_data(object_id.as_bytes, Modes.OFF, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code == "1": q.add_log_cmd("PLOC Supervisor: Set mode on") command = pack_mode_data(object_id.as_bytes, Modes.ON, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code == "2": q.add_log_cmd("PLOC Supervisor: Mode Normal") command = pack_mode_data(object_id.as_bytes, Modes.NORMAL, 0) q.add_pus_tc(PusTelecommand(service=200, subservice=1, app_data=command)) if op_code == "3": q.add_log_cmd("PLOC Supervisor: TC Get Hk Report") command = obyt + struct.pack("!I", SupvActionIds.HK_REPORT) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) elif op_code == "5": q.add_log_cmd("PLOC Supervisor: Start MPSoC") command = obyt + struct.pack("!I", SupvActionIds.START_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "6": q.add_log_cmd("PLOC Supervisor: Shutdown MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.SHUTWOWN_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "7": q.add_log_cmd("PLOC Supervisor: Select MPSoC boot image") mem = int(input("MEM (NVM0 - 0 or NVM1 - 1): ")) bp0 = int(input("BP0 (0 or 1): ")) bp1 = int(input("BP1 (0 or 1): ")) bp2 = int(input("BP2 (0 or 1): ")) command = pack_sel_boot_image_cmd(object_id.as_bytes, mem, bp0, bp1, bp2) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "8": q.add_log_cmd("PLOC Supervisor: Set max restart tries") restart_tries = int(input("Specify maximum restart tries: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionIds.SET_MAX_RESTART_TRIES) + struct.pack("!B", restart_tries) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "9": q.add_log_cmd("PLOC Supervisor: Reset MPSoC") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.RESET_MPSOC) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "10": q.add_log_cmd("PLOC Supervisor: Set time reference") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.SET_TIME_REF) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "11": q.add_log_cmd("PLOC Supervisor: Set boot timeout") boot_timeout = int(input("Specify boot timeout [ms]: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionIds.SET_BOOT_TIMEOUT) + struct.pack("!I", boot_timeout) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "12": q.add_log_cmd("PLOC Supervisor: Disable HK") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.DISABLE_HK) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "13": q.add_log_cmd("PLOC Supervisor: Request boot status report") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.GET_BOOT_STATUS_REPORT ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "17": q.add_log_cmd("PLOC Supervisor: Enable latchup alert") command = pack_lachtup_alert_cmd(object_id.as_bytes, True) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "18": q.add_log_cmd("PLOC Supervisor: Disable latchup alert") command = pack_lachtup_alert_cmd(object_id.as_bytes, False) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "20": q.add_log_cmd("PLOC Supervisor: Set alert limit") command = pack_set_alert_limit_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "23": q.add_log_cmd("PLOC Supervisor: Set ADC enabled channels") command = pack_set_adc_enabled_channels_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "24": q.add_log_cmd("PLOC Supervisor: Set ADC window and stride") command = pack_set_adc_window_and_stride_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "25": q.add_log_cmd("PLOC Supervisor: Set ADC threshold") command = pack_set_adc_threshold_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "26": q.add_log_cmd("PLOC Supervisor: Request latchup status report") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.GET_LATCHUP_STATUS_REPORT ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "27": q.add_log_cmd("PLOC Supervisor: Copy ADC data to MRAM") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.COPY_ADC_DATA_TO_MRAM ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "30": q.add_log_cmd("PLOC Supervisor: Run auto EM tests") command = pack_auto_em_tests_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "31": q.add_log_cmd("PLOC Supervisor: Wipe MRAM") command = pack_mram_wipe_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "35": q.add_log_cmd("PLOC Supervisor: Set GPIO command") command = pack_set_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "36": q.add_log_cmd("PLOC Supervisor: Read GPIO command") command = pack_read_gpio_cmd(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "37": q.add_log_cmd("PLOC Supervisor: Restart supervisor") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.RESTART_SUPERVISOR ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "38": q.add_log_cmd("PLOC Supervisor: Factory reset clear all") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.FACTORY_RESET_CLEAR_ALL ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "39": q.add_log_cmd("PLOC Supervisor: Factory reset clear mirror entries") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.FACTORY_RESET_CLEAR_MIRROR ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "40": q.add_log_cmd("PLOC Supervisor: Factory reset clear circular entries") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.FACTORY_RESET_CLEAR_CIRCULAR ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "42": q.add_log_cmd("PLOC Supervisor: Perform update") command = pack_update_command(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "43": q.add_log_cmd("PLOC Supervisor: Terminate supervisor process") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.TERMINATE_SUPV_HELPER ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "44": q.add_log_cmd("PLOC Supervisor: Start MPSoC quiet") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.START_MPSOC_QUIET ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "45": q.add_log_cmd("PLOC Supervisor: Set shutdown timeout") command = pack_set_shutdown_timeout_command(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "46": q.add_log_cmd("PLOC Supervisor: Factory flash") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.FACTORY_FLASH) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "47": q.add_log_cmd("PLOC Supervisor: Enable auto TM") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.ENABLE_AUTO_TM) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "48": q.add_log_cmd("PLOC Supervisor: Disable auto TM") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.DISABLE_AUTO_TM) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "51": q.add_log_cmd("PLOC Supervisor: Logging request event buffers") command = pack_logging_buffer_request(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "52": q.add_log_cmd("PLOC Supervisor: Logging clear counters") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.LOGGING_CLEAR_COUNTERS ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "53": q.add_log_cmd("PLOC Supervisor: Logging set topic") command = pack_logging_set_topic(object_id.as_bytes) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "54": q.add_log_cmd("PLOC Supervisor: Logging request counters") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.LOGGING_REQUEST_COUNTERS ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "55": q.add_log_cmd("PLOC Supervisor: Request ADC report") command = object_id.as_bytes + struct.pack( "!I", SupvActionIds.REQUEST_ADC_REPORT ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "56": q.add_log_cmd("PLOC Supervisor: Reset PL") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.RESET_PL) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "57": q.add_log_cmd("PLOC Supervisor: Enable NVMs") nvm01 = int(input("Enable (1) or disable(0) NVM 0 and 1: ")) nvm3 = int(input("Enable (1) or disable(0) NVM 3: ")) command = ( object_id.as_bytes + struct.pack("!I", SupvActionIds.ENABLE_NVMS) + struct.pack("B", nvm01) + struct.pack("B", nvm3) ) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) if op_code == "58": q.add_log_cmd("PLOC Supervisor: Continue update") command = object_id.as_bytes + struct.pack("!I", SupvActionIds.CONTINUE_UPDATE) q.add_pus_tc(PusTelecommand(service=8, subservice=128, app_data=command)) def pack_sel_boot_image_cmd( object_id: bytes, mem: int, bp0: int, bp1: int, bp2: int ) -> bytearray: """This function can be used to generate the command to select the image from which the MPSoC will boot @param object_id The object id of the PLOC supervisor handler. @param mem The memory from which the MPSoC shall boot (NVM0 - 0, NVM1 - 1) @param bp0 Partition pin 0 @param bp1 Partition pin 1 @param bp2 Partition pin 2 """ command = object_id + struct.pack("!I", SupvActionIds.SEL_MPSOC_BOOT_IMAGE) command = command + struct.pack("!B", mem) command = command + struct.pack("!B", bp0) command = command + struct.pack("!B", bp1) command = command + struct.pack("!B", bp2) return bytearray(command) def pack_update_available_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the udpate availabe command. @param object_id The object id of the PLOC supervisor handler. """ image_select = 1 image_partition = 0 image_size = 222 image_crc = 0x0 number_of_packets = 150 command = object_id + struct.pack("!I", SupvActionIds.UPDATE_AVAILABLE) command = command + struct.pack("!B", image_select) command = command + struct.pack("!B", image_partition) command = command + struct.pack("!I", image_size) command = command + struct.pack("!I", image_crc) command = command + struct.pack("!I", number_of_packets) return bytearray(command) def pack_watchdogs_enable_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the command to enable or disable watchdogs on the PLOC. @param object_id The object id of the PLOC supervisor handler. @note Enable = 1, Disable = 0 """ watchdog_ps = 1 watchdog_pl = 1 watchdog_int = 0 command = bytearray() command = object_id + struct.pack("!I", SupvActionIds.WATCHDOGS_ENABLE) command = command + struct.pack("!B", watchdog_ps) command = command + struct.pack("!B", watchdog_pl) command = command + struct.pack("!B", watchdog_int) return bytearray(command) def pack_watchdog_config_timeout_cmd(object_id: bytearray) -> bytearray: """ @brief This function packs the command set the timeout of one of the three watchdogs of the PLOC. @param object_id The object id of the PLOC supervisor handler. """ watchdog = int(input("Specify watchdog (0 - PS, 1 - PL, 2 - INT):")) timeout = int(input("Specify timeout (1000 ms - 360000 ms):")) command = bytearray() command = object_id + struct.pack("!I", SupvActionIds.WATCHDOGS_CONFIG_TIMEOUT) command = command + struct.pack("!B", watchdog) command = command + struct.pack("!I", timeout) return command def pack_lachtup_alert_cmd(object_id: bytes, state: bool) -> bytearray: """ @brief This function packs the command to enable or disable a certain latchup alerts. @param object_id The object id of the PLOC supervisor handler. @param state True - enable latchup alert, False - disable latchup alert """ latchup_id = get_latchup_id() command = bytearray() if state: command = object_id + struct.pack("!I", SupvActionIds.ENABLE_LATCHUP_ALERT) else: command = object_id + struct.pack("!I", SupvActionIds.DISABLE_LATCHUP_ALERT) command = command + struct.pack("!B", latchup_id) return bytearray(command) def pack_auto_calibrate_alert_cmd(object_id: bytearray) -> bytearray: """ @brief This function packs the command to auto calibrate a latchup alert. @param object_id The object id of the PLOC supervisor handler. """ latchup_id = get_latchup_id() mg = int(input("Specify MG: ")) command = bytearray() command = object_id + struct.pack("!I", SupvActionIds.AUTO_CALIBRATE_ALERT) command = command + struct.pack("!B", latchup_id) command = command + struct.pack("!I", mg) return command def get_latchup_id() -> int: key_column_width = 10 description_column_width = 50 separator_width = key_column_width + description_column_width + 3 separator_string = separator_width * "-" key_string = "Latchup ID".ljust(key_column_width) description_string = "Description".ljust(description_column_width) print(f"{key_string} | {description_string}") print(separator_string) for key in latchup_id_dict: key_string = key.ljust(key_column_width) description_string = latchup_id_dict[key].ljust(description_column_width) print(f"{key_string} | {description_string}") return int(input("Specify latchup ID: ")) def pack_set_alert_limit_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the command to set the limit of a latchup alert. @param object_id The object id of the PLOC supervisor handler. """ latchup_id = get_latchup_id() dutycycle = int(input("Specify dutycycle: ")) command = bytearray() command = object_id + struct.pack("!I", SupvActionIds.SET_ALERT_LIMIT) command = command + struct.pack("!B", latchup_id) command = command + struct.pack("!I", dutycycle) return bytearray(command) def pack_set_adc_enabled_channels_cmd(object_id: bytes) -> bytearray: """ @brief This function packs the command to enable or disable channels of the ADC. @param object_id The object id of the PLOC supervisor handler. """ ch = int(input("Specify ch: 0x"), 16) cmd = object_id + struct.pack("!I", SupvActionIds.SET_ADC_ENABLED_CHANNELS) cmd = cmd + struct.pack("!H", ch) return bytearray(cmd) def pack_set_adc_window_and_stride_cmd(object_id: bytes) -> bytearray: window_size = int(input("Specify window size: ")) striding_step_size = int(input("Specify striding step size: ")) command = object_id + struct.pack("!I", SupvActionIds.SET_ADC_WINDOW_AND_STRIDE) command = command + struct.pack("!H", window_size) command = command + struct.pack("!H", striding_step_size) return bytearray(command) def pack_set_adc_threshold_cmd(object_id: bytes) -> bytearray: threshold = int(input("Specify threshold: ")) command = object_id + struct.pack("!I", SupvActionIds.SET_ADC_THRESHOLD) command = command + struct.pack("!I", threshold) return bytearray(command) def pack_select_nvm_cmd(object_id: bytes) -> bytearray: mem = int(input("Specify NVM (0 - NVM0, 1 - MVM1): ")) command = object_id + struct.pack("!I", SupvActionIds.SELECT_NVM) command = command + struct.pack("!B", mem) return bytearray(command) def pack_auto_em_tests_cmd(object_id: bytes) -> bytearray: test = int(input("Specify test (1 - complete, 2 - short): ")) command = object_id + struct.pack("!I", SupvActionIds.RUN_AUTO_EM_TESTS) command = command + struct.pack("!B", test) return bytearray(command) def pack_mram_wipe_cmd(object_id: bytes) -> bytearray: start = int(input("Start address: 0x"), 16) stop = int(input("Stop address: 0x"), 16) command = object_id + struct.pack("!I", SupvActionIds.WIPE_MRAM) command = command + struct.pack("!I", start) command = command + struct.pack("!I", stop) return bytearray(command) def pack_update_command(object_id: bytes) -> bytearray: command = bytearray() memory_id = int(input("Specify memory ID: ")) start_address = int(input("Specify start address: 0x"), 16) update_file = get_update_file() command += object_id command += struct.pack("!I", SupvActionIds.PERFORM_UPDATE) command += bytearray(update_file, "utf-8") # Adding null terminator command += struct.pack("!B", 0) command += struct.pack("!B", memory_id) command += struct.pack("!I", start_address) return bytearray(command) def pack_set_shutdown_timeout_command(object_id: bytes) -> bytearray: command = bytearray() command += object_id command += struct.pack("!I", SupvActionIds.SET_SHUTDOWN_TIMEOUT) timeout = int(input("Specify shutdown timeout (ms): ")) command += struct.pack("!I", timeout) return command def pack_logging_buffer_request(object_id: bytes) -> bytearray: command = bytearray() command += object_id command += struct.pack("!I", SupvActionIds.LOGGING_REQUEST_EVENT_BUFFERS) path = get_event_buffer_path() command += bytearray(path, "utf-8") return command def pack_set_gpio_cmd(object_id: bytes) -> bytearray: port = int(input("Specify port: 0x"), 16) pin = int(input("Specify pin: 0x"), 16) val = int(input("Specify val: 0x"), 16) command = object_id + struct.pack("!I", SupvActionIds.SET_GPIO) command = command + struct.pack("!B", port) command = command + struct.pack("!B", pin) command = command + struct.pack("!B", val) return bytearray(command) def pack_read_gpio_cmd(object_id: bytes) -> bytearray: port = int(input("Specify port: 0x"), 16) pin = int(input("Specify pin: 0x"), 16) command = object_id + struct.pack("!I", SupvActionIds.READ_GPIO) command = command + struct.pack("!B", port) command = command + struct.pack("!B", pin) return bytearray(command) def pack_logging_set_topic(object_id: bytes) -> bytearray: command = object_id + struct.pack("!I", SupvActionIds.LOGGING_SET_TOPIC) tpc = int(input("Specify logging topic: ")) command += struct.pack("!B", tpc) return bytearray(command) def get_update_file() -> str: LOGGER.info("Specify update file ") input_helper = InputHelper(update_file_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc Supervisor: Specify absolute name of update file: ") else: file = update_file_dict[key][1] return file def get_event_buffer_path() -> str: LOGGER.info("Specify path where to store event buffer file ") input_helper = InputHelper(event_buffer_path_dict) key = input_helper.get_key() if key == MANUAL_INPUT: file = input("Ploc Supervisor: Specify path: ") else: file = event_buffer_path_dict[key][1] return file