You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
4.6 KiB
Python

#!/usr/bin/env python
"""Simple signal generator for testing transmit
Continuously output a carrier with single sideband sinusoid amplitude
modulation.
Terminate with cntl-C.
"""
import argparse
import math
import signal
import time
import numpy as np
import SoapySDR
from SoapySDR import * #SOAPY_SDR_ constants
def siggen_app(
args,
rate,
ampl=0.7,
freq=None,
tx_bw=None,
tx_chan=0,
tx_gain=None,
tx_ant=None,
clock_rate=None,
wave_freq=None
):
"""Generate signal until an interrupt signal is received."""
if wave_freq is None:
wave_freq = rate / 10
sdr = SoapySDR.Device(args)
#set clock rate first
if clock_rate is not None:
sdr.setMasterclock_rate(clock_rate)
#set sample rate
sdr.setSampleRate(SOAPY_SDR_TX, tx_chan, rate)
print("Actual Tx Rate %f Msps"%(sdr.getSampleRate(SOAPY_SDR_TX, tx_chan) / 1e6))
#set bandwidth
if tx_bw is not None:
sdr.setBandwidth(SOAPY_SDR_TX, tx_chan, tx_bw)
#set antenna
print("Set the antenna")
if tx_ant is not None:
sdr.setAntenna(SOAPY_SDR_TX, tx_chan, tx_ant)
#set overall gain
print("Set the gain")
if tx_gain is not None:
sdr.setGain(SOAPY_SDR_TX, tx_chan, tx_gain)
#tune frontends
print("Tune the frontend")
if freq is not None:
sdr.setFrequency(SOAPY_SDR_TX, tx_chan, freq)
print("Create Tx stream")
tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [tx_chan])
print("Activate Tx Stream")
sdr.activateStream(tx_stream)
phase_acc = 0
phase_inc = 2*math.pi*wave_freq/rate
stream_mtu = sdr.getStreamMTU(tx_stream)
samps_chan = np.array([ampl]*stream_mtu, np.complex64)
time_last_print = time.time()
total_samps = 0
state = dict(running=True)
def signal_handler(signum, _):
print('Signal handler called with signal {}'.format(signum))
state['running'] = False
signal.signal(signal.SIGINT, signal_handler)
while state['running']:
phase_acc_next = phase_acc + stream_mtu*phase_inc
phases = np.linspace(phase_acc, phase_acc_next, stream_mtu)
samps_chan = ampl*np.exp(1j * phases).astype(np.complex64)
phase_acc = phase_acc_next
while phase_acc > math.pi * 2:
phase_acc -= math.pi * 2
status = sdr.writeStream(tx_stream, [samps_chan], samps_chan.size, timeoutUs=1000000)
if status.ret != samps_chan.size:
raise Exception("Expected writeStream() to consume all samples! %d" % status.ret)
total_samps += status.ret
if time.time() > time_last_print + 5.0:
rate = total_samps / (time.time() - time_last_print) / 1e6
print("Python siggen rate: %f Msps" % rate)
total_samps = 0
time_last_print = time.time()
#cleanup streams
print("Cleanup stream")
sdr.deactivateStream(tx_stream)
sdr.closeStream(tx_stream)
print("Done!")
def main():
"""Parse command line arguments and start sig-gen."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--args", type=str, help="device factor arguments", default="")
parser.add_argument("--rate", type=float, help="Tx and Rx sample rate", default=1e6)
parser.add_argument("--ampl", type=float, help="Tx digital amplitude rate", default=0.7)
parser.add_argument("--tx-ant", type=str, help="Optional Tx antenna")
parser.add_argument("--tx-gain", type=float, help="Optional Tx gain (dB)")
parser.add_argument("--tx-chan", type=int, help="Transmitter channel (def=0)", default=0)
parser.add_argument("--freq", type=float, help="Optional Tx and Rx freq (Hz)")
parser.add_argument("--tx-bw", type=float, help="Optional Tx filter bw (Hz)", default=5e6)
parser.add_argument("--wave-freq", type=float, help="Baseband waveform freq (Hz)")
parser.add_argument("--clock-rate", type=float, help="Optional clock rate (Hz)")
parser.add_argument("--debug", action='store_true', help="Output debug messages")
parser.add_argument(
"--abort-on-error", action='store_true',
help="Halts operations if the SDR logs an error")
options = parser.parse_args()
siggen_app(
args=options.args,
rate=options.rate,
ampl=options.ampl,
freq=options.freq,
tx_bw=options.tx_bw,
tx_ant=options.tx_ant,
tx_gain=options.tx_gain,
tx_chan=options.tx_chan,
clock_rate=options.clock_rate,
wave_freq=options.wave_freq,
)
if __name__ == '__main__':
main()