master
cheetah 5 years ago
parent 1388bf9ef2
commit 3c061e95ff

@ -7,12 +7,15 @@ from SoapySDR import * #SOAPY_SDR_* constants
import protocols.Nexus
nexus = protocols.Nexus.Nexus_TempHumidity()
data = nexus.generateData(244, 1, 0, 100)
samples = nexus.generateSamples(baseband_samplerate=2e6, id=244, channel=1, temp=30, humidity=100)
with open('nexus.iq', 'wb') as f:
samples.tofile(f)
with open('output.complex', 'wb') as f:
for bt_id in range(0,255):
for bt_channel in range(0,4):
samples = nexus.generateSamples(baseband_samplerate=2e6, id=bt_id, channel=bt_channel, temp=-50, humidity=100)
samples.tofile(f)
print("Generated Sensor ID %d Nexus Samples" % bt_id)
#
"""
## Transmit Part
SDR_ARGS = {'driver': 'lime'}
@ -46,4 +49,5 @@ sdr.writeStream(txStream,
time.sleep(0.1)
sdr.deactivateStream(txStream)
sdr.closeStream(txStream)
sdr.closeStream(txStream)
"""

@ -11,7 +11,7 @@ class OOKModulator:
def __init__(self, baseband_samplerate, am_frequency):
self.baseband_samplerate = baseband_samplerate
self.am_frequency = am_frequency
self.samples = generateAMSine(self.baseband_samplerate, 0, 4000, 0)
self.samples = generateAMSine(self.baseband_samplerate, 0, 1000, 0)
def addPadding(self, uSDuration):
sampleCount = int((uSDuration / 1000000.0) * self.baseband_samplerate)
self.samples = np.append(self.samples, generateAMSine(self.baseband_samplerate, 0, sampleCount, 0))
@ -19,10 +19,10 @@ class OOKModulator:
sampleCount = int((uSDuration / 1000000.0) * self.baseband_samplerate)
self.samples = np.append(self.samples, generateAMSine(self.baseband_samplerate, self.am_frequency, sampleCount, 1))
def getSamples(self, numpyType):
self.addPadding(4000)
self.addPadding(1000)
return self.samples.astype(numpyType)
def getSamplesAndReset(self, numpyType):
self.addPadding(4000)
self.addPadding(1000)
returnSamples = self.samples.astype(numpyType)
self.samples = generateAMSine(self.baseband_samplerate, 0, 4000, 0)
self.samples = generateAMSine(self.baseband_samplerate, 0, 1000, 0)
return returnSamples

@ -21,14 +21,16 @@ class Nexus_TempHumidity:
def generateSamples(self, baseband_samplerate=2e6, id=244, channel=1, temp=30, humidity=100, numpyType=np.complex64):
modulator = OOKModulator(baseband_samplerate=baseband_samplerate, am_frequency=22.471e3)
bits = self.generateData(id, channel, temp, humidity)
for j in bits:
for repeat in range(0, 4):
for j in bits:
modulator.addModulation(500)
modulator.addPadding(1000 * (1 + int(j)))
modulator.addModulation(500)
modulator.addPadding(1000 * (1 + int(j)))
modulator.addModulation(500)
modulator.addPadding(4000)
modulator.addPadding(4000)
return modulator.getSamplesAndReset(numpyType)
def generatePacket(self, id=244, channel=1, temp=30, humidity=100):
temp = int(temp * 10)
packet = [
(id >> 4) & 0x0f,
id & 0x0f,

@ -0,0 +1,149 @@
#!/usr/bin/env python
"""Simple signal generator for testing transmit
Continuously output a carrier with single sideband sinusoid amplitude
modulation.
Terminate with cntl-C.
"""
import argparse
import math
import signal
import time
import numpy as np
import SoapySDR
from SoapySDR import * #SOAPY_SDR_ constants
def siggen_app(
args,
rate,
ampl=0.7,
freq=None,
tx_bw=None,
tx_chan=0,
tx_gain=None,
tx_ant=None,
clock_rate=None,
wave_freq=None
):
"""Generate signal until an interrupt signal is received."""
if wave_freq is None:
wave_freq = rate / 10
sdr = SoapySDR.Device(args)
#set clock rate first
if clock_rate is not None:
sdr.setMasterclock_rate(clock_rate)
#set sample rate
sdr.setSampleRate(SOAPY_SDR_TX, tx_chan, rate)
print("Actual Tx Rate %f Msps"%(sdr.getSampleRate(SOAPY_SDR_TX, tx_chan) / 1e6))
#set bandwidth
if tx_bw is not None:
sdr.setBandwidth(SOAPY_SDR_TX, tx_chan, tx_bw)
#set antenna
print("Set the antenna")
if tx_ant is not None:
sdr.setAntenna(SOAPY_SDR_TX, tx_chan, tx_ant)
#set overall gain
print("Set the gain")
if tx_gain is not None:
sdr.setGain(SOAPY_SDR_TX, tx_chan, tx_gain)
#tune frontends
print("Tune the frontend")
if freq is not None:
sdr.setFrequency(SOAPY_SDR_TX, tx_chan, freq)
print("Create Tx stream")
tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [tx_chan])
print("Activate Tx Stream")
sdr.activateStream(tx_stream)
phase_acc = 0
phase_inc = 2*math.pi*wave_freq/rate
stream_mtu = sdr.getStreamMTU(tx_stream)
samps_chan = np.array([ampl]*stream_mtu, np.complex64)
time_last_print = time.time()
total_samps = 0
state = dict(running=True)
def signal_handler(signum, _):
print('Signal handler called with signal {}'.format(signum))
state['running'] = False
signal.signal(signal.SIGINT, signal_handler)
while state['running']:
phase_acc_next = phase_acc + stream_mtu*phase_inc
phases = np.linspace(phase_acc, phase_acc_next, stream_mtu)
samps_chan = ampl*np.exp(1j * phases).astype(np.complex64)
phase_acc = phase_acc_next
while phase_acc > math.pi * 2:
phase_acc -= math.pi * 2
status = sdr.writeStream(tx_stream, [samps_chan], samps_chan.size, timeoutUs=1000000)
if status.ret != samps_chan.size:
raise Exception("Expected writeStream() to consume all samples! %d" % status.ret)
total_samps += status.ret
if time.time() > time_last_print + 5.0:
rate = total_samps / (time.time() - time_last_print) / 1e6
print("Python siggen rate: %f Msps" % rate)
total_samps = 0
time_last_print = time.time()
#cleanup streams
print("Cleanup stream")
sdr.deactivateStream(tx_stream)
sdr.closeStream(tx_stream)
print("Done!")
def main():
"""Parse command line arguments and start sig-gen."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--args", type=str, help="device factor arguments", default="")
parser.add_argument("--rate", type=float, help="Tx and Rx sample rate", default=1e6)
parser.add_argument("--ampl", type=float, help="Tx digital amplitude rate", default=0.7)
parser.add_argument("--tx-ant", type=str, help="Optional Tx antenna")
parser.add_argument("--tx-gain", type=float, help="Optional Tx gain (dB)")
parser.add_argument("--tx-chan", type=int, help="Transmitter channel (def=0)", default=0)
parser.add_argument("--freq", type=float, help="Optional Tx and Rx freq (Hz)")
parser.add_argument("--tx-bw", type=float, help="Optional Tx filter bw (Hz)", default=5e6)
parser.add_argument("--wave-freq", type=float, help="Baseband waveform freq (Hz)")
parser.add_argument("--clock-rate", type=float, help="Optional clock rate (Hz)")
parser.add_argument("--debug", action='store_true', help="Output debug messages")
parser.add_argument(
"--abort-on-error", action='store_true',
help="Halts operations if the SDR logs an error")
options = parser.parse_args()
siggen_app(
args=options.args,
rate=options.rate,
ampl=options.ampl,
freq=options.freq,
tx_bw=options.tx_bw,
tx_ant=options.tx_ant,
tx_gain=options.tx_gain,
tx_chan=options.tx_chan,
clock_rate=options.clock_rate,
wave_freq=options.wave_freq,
)
if __name__ == '__main__':
main()

@ -0,0 +1,96 @@
#!/usr/bin/env python
import argparse
import math
import signal
import time
import time
import SoapySDR
from SoapySDR import * #SOAPY_SDR_* constants
import numpy as np
import protocols.Nexus
from modulators.OOKModulator import OOKModulator
A = 1
import struct
def bitfield(n):
return [int(digit) for digit in bin(n)[2:].zfill(8)]
ookTest = OOKModulator(baseband_samplerate=2e6, am_frequency=22.471e3)
for i in range(4):
#bits = [ 0,1,0,0,1,0,0,0,1,1,0,0,0,0,1,1,0,0,0,1,0,0,0,0,1,1,1,0,1,1,0,1,0,1,1,1 ]
#bits = bitsUnitID[::-1]
byte1 = 0x00
byte2 = 0x00
byte3 = 0x00
PARAM_unit = 5 # max 30
PARAM_id = 1337 # max 32767
byte1 = PARAM_unit & 0x1f # | PARAM_id << 3
byte2 = ((PARAM_id & 0x0f) >> 11)
byte3 = 0x3
print(bitfield(byte1))
print(bitfield(byte2))
print(bitfield(byte3))
bits = bitfield(byte1)[::-1] + bitfield(byte2)[::-1] + bitfield(byte3)
#bits = data2
print(bits)
print(len(bits))
for bit in bits:
if bit == 0:
# Short Pulse
ookTest.addModulation(436)
ookTest.addPadding(1299)
else:
# Long Pulse
ookTest.addModulation(1202)
ookTest.addPadding(526)
#Stop Pulse
ookTest.addModulation(434)
ookTest.addPadding(434)
ookTest.addModulation(434)
ookTest.addPadding(434)
output = ookTest.getSamples(np.complex64)
with open('output.complex', 'wb') as f:
output.tofile(f)
"""
## Transmit Part
SDR_ARGS = {'driver': 'lime'}
sdr = SoapySDR.Device(SDR_ARGS)
sdr.setSampleRate(SOAPY_SDR_TX, 0, 2e6)
sdr.setAntenna(SOAPY_SDR_TX, 0, 'Auto')
sdr.setFrequency(SOAPY_SDR_TX, 0, 433.92e6)
sdr.setGain(SOAPY_SDR_TX, 0, 70)
txStream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32 , [0])
sdr.activateStream(txStream)
sdr.setHardwareTime(0)
flags = SOAPY_SDR_HAS_TIME | SOAPY_SDR_END_BURST
status = sdr.writeStream(txStream, [output], output.size, timeoutUs=1000000)
sdr.writeStream(txStream,
output, output.size,
#flags=flags,
timeoutUs=int(1e6))
sdr.deactivateStream(txStream)
sdr.closeStream(txStream)
"""
Loading…
Cancel
Save