This commit is contained in:
Kevin Jahaziel Leon Morales 2025-03-23 14:47:27 -07:00 committed by GitHub
commit 9b5b96cc95
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -0,0 +1,259 @@
"""
This code is still in development and is not yet ready for production use.
Kevin Leon @ Electronic Cats
Original Creation Date: Jan 30, 2025
This code is beerware; if you see me (or any other Electronic Cats
member) at the local, and you've found our code helpful,
please buy us a round!
Distributed as-is; no warranty is given.
"""
import sys
import serial
import threading
import argparse
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
START_OF_FRAME = "SCAN"
END_OF_FRAME = "END"
FREQ_FRAME_MARK = "FREQ"
DEFAULT_COLOR_MAP = "BuGn"
DEFAULT_RSSI_OFFSET = -11
SCAN_WIDTH = 33
DEFAULT_START_FREQ = 150
DEFAULT_STEP_PER_FREQ = 0.2
DEFAULT_END_FREQ = 960
DEFAULT_BAUDRATE = 115200
LIMIT_COUNT = 2
def LOG_INFO(message):
"""Function to log information."""
print(f"[INFO] {message}")
def LOG_ERROR(message):
"""Function to log error."""
print(f"\x1b[31;1m[ERROR] {message}\x1b[0m")
def LOG_WARNING(message):
"""Function to log warning."""
print(f"\x1b[33;1m[WARNING] {message}\x1b[0m")
class SpectrumScan:
def __init__(self):
self.device_uart = serial.Serial(timeout=2)
self.recv_running = False
self.no_bytes_count = 0
self.fig, self.ax = plt.subplots(figsize=(12, 6))
self.im = None
self.recv_worker = None
self.current_freq = DEFAULT_START_FREQ
self.start_freq = DEFAULT_START_FREQ
self.end_freq = DEFAULT_END_FREQ
self.rssi_offset = DEFAULT_RSSI_OFFSET
self.delta_freq = 0
self.data_matrix = np.zeros((SCAN_WIDTH, self.delta_freq))
self.parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description="""
RadioLib SX126x_Spectrum_Scan plotter script. Displays output from SX126x_Spectrum_Scan example
as grayscale and
Depends on pyserial and matplotlib, install by:
'python3 -m pip install pyserial matplotlib'
Step-by-step guide on how to use the script:
1. Upload the SX126x_Spectrum_Scan example to your Arduino board with SX1262 connected.
2. Run the script with appropriate arguments.
3. Once the scan is complete, output files will be saved to out/
""",
)
self.fig.canvas.mpl_connect("close_event", self.on_close)
self.__load_parser()
def __load_parser(self):
self.parser.add_argument(
"port",
type=str,
help="COM port to connect to the device",
)
self.parser.add_argument(
"-b",
"--baudrate",
type=int,
help=f"COM port baudrate (defaults to {DEFAULT_BAUDRATE})",
default=DEFAULT_BAUDRATE,
)
self.parser.add_argument(
"--freqStart",
type=float,
help=f"Starting frequency in MHz (Default to {DEFAULT_START_FREQ})",
default=DEFAULT_START_FREQ,
)
self.parser.add_argument(
"--freqEnd",
type=float,
help=f"End frequency in MHz (Default to {DEFAULT_END_FREQ})",
default=DEFAULT_END_FREQ,
)
self.parser.add_argument(
"--offset",
type=int,
help=f"Default RSSI offset in dBm (defaults to {DEFAULT_RSSI_OFFSET})",
default=DEFAULT_RSSI_OFFSET,
)
def __data_dissector(self, plot_data):
if FREQ_FRAME_MARK in plot_data:
self.current_freq = float(plot_data.split(" ")[1])
if (
self.current_freq >= self.start_freq
and self.current_freq <= self.end_freq
):
if self.current_freq == self.start_freq:
self.data_matrix = np.zeros((SCAN_WIDTH, self.delta_freq))
return
if (START_OF_FRAME in plot_data) and (END_OF_FRAME in plot_data):
if (
self.current_freq >= self.start_freq
and self.current_freq <= self.end_freq
):
scan_line = plot_data[len(START_OF_FRAME) : -len(END_OF_FRAME)].split(
","
)[:-1]
data = list(map(int, scan_line))
index = int(
(self.current_freq - self.start_freq) / DEFAULT_STEP_PER_FREQ
)
self.data_matrix[:, index] = data
def on_close(self, event):
self.recv_running = False
def stop_task(self):
self.recv_running = False
if self.device_uart.is_open:
self.device_uart.close()
if threading.current_thread() is not self.recv_worker:
if self.recv_worker and self.recv_worker.is_alive():
self.recv_worker.join(timeout=2)
def recv_task(self):
with self.device_uart as com:
while self.recv_running:
if self.recv_worker.is_alive():
try:
bytestream = com.readline().decode("utf-8").strip()
if not self.recv_running:
break
if bytestream == "":
# Board connected but not transmitting any data
self.no_bytes_count += 1
if self.no_bytes_count > LIMIT_COUNT:
self.no_bytes_count = 0
LOG_WARNING("No data recived.")
continue
self.__data_dissector(bytestream)
except serial.SerialException as e:
LOG_WARNING(e)
continue
except UnicodeDecodeError as e:
LOG_WARNING("Please check the baud rate, as using a different value than the one set on the device may cause errors.")
LOG_ERROR(e)
continue
com.reset_input_buffer()
com.reset_output_buffer()
com.close()
self.stop_task()
def create_plot(self):
self.ax.set_ylabel("RSSI [dBm]")
self.ax.set_xlabel("Frequency (MHz)")
self.ax.set_aspect("auto")
self.fig.suptitle(
f"SX126x Spectral Scan (Frequency range: {self.start_freq}/{self.end_freq} MHz)"
)
self.fig.canvas.manager.set_window_title(
"PWNLabs/ElectroniCats - Spectral Scan"
)
self.im = self.ax.imshow(
self.data_matrix[:, : self.delta_freq],
cmap=DEFAULT_COLOR_MAP,
aspect="auto",
extent=[
self.start_freq,
self.end_freq,
-4 * (SCAN_WIDTH + 1),
self.rssi_offset,
],
)
self.fig.colorbar(self.im)
manager = plt.get_current_fig_manager()
try:
manager.window.attributes("-topmost", 1)
manager.window.attributes("-topmost", 0)
except AttributeError:
pass
def show_plot(self, i):
self.im.set_data(self.data_matrix)
self.ax.relim()
self.ax.autoscale_view()
def main(self):
self.recv_running = True
args = self.parser.parse_args()
if args.freqStart < DEFAULT_START_FREQ or args.freqStart > DEFAULT_END_FREQ:
LOG_WARNING("Frequency start out of range")
sys.exit(1)
if args.freqEnd < DEFAULT_START_FREQ or args.freqEnd > DEFAULT_END_FREQ:
LOG_WARNING("Frequency start out of range")
sys.exit(1)
if args.freqStart > args.freqEnd:
LOG_WARNING("Frequency start is greater than frequency end")
sys.exit(1)
if args.offset:
self.rssi_offset = args.offset
self.device_uart.port = args.port
self.device_uart.baudrate = args.baudrate
self.current_freq = args.freqStart
self.start_freq = args.freqStart
self.end_freq = args.freqEnd
try:
self.device_uart.open()
except serial.SerialException as e:
LOG_ERROR(e)
return
# Update the initial values with the args values
self.delta_freq = int((self.end_freq - self.start_freq) / DEFAULT_STEP_PER_FREQ)
self.data_matrix = np.zeros((SCAN_WIDTH, self.delta_freq))
# Start the recv task
self.recv_worker = threading.Thread(target=self.recv_task, daemon=True)
self.recv_worker.start()
self.create_plot()
# Do an animation with the data
ani = animation.FuncAnimation(
self.fig, self.show_plot, interval=100, cache_frame_data=False
)
plt.show()
sys.exit(0)
if __name__ == "__main__":
sc = SpectrumScan()
try:
sc.main()
except KeyboardInterrupt:
sc.stop_task()
sys.exit(0)