diff --git a/main.py b/main.py index 57f6670..058e70f 100644 --- a/main.py +++ b/main.py @@ -7,12 +7,15 @@ from SoapySDR import * #SOAPY_SDR_* constants import protocols.Nexus nexus = protocols.Nexus.Nexus_TempHumidity() -data = nexus.generateData(244, 1, 0, 100) -samples = nexus.generateSamples(baseband_samplerate=2e6, id=244, channel=1, temp=30, humidity=100) -with open('nexus.iq', 'wb') as f: - samples.tofile(f) - +with open('output.complex', 'wb') as f: + for bt_id in range(0,255): + for bt_channel in range(0,4): + samples = nexus.generateSamples(baseband_samplerate=2e6, id=bt_id, channel=bt_channel, temp=-50, humidity=100) + samples.tofile(f) + print("Generated Sensor ID %d Nexus Samples" % bt_id) + # +""" ## Transmit Part SDR_ARGS = {'driver': 'lime'} @@ -46,4 +49,5 @@ sdr.writeStream(txStream, time.sleep(0.1) sdr.deactivateStream(txStream) -sdr.closeStream(txStream) \ No newline at end of file +sdr.closeStream(txStream) +""" \ No newline at end of file diff --git a/modulators/OOKModulator.py b/modulators/OOKModulator.py index 1adf153..51e0b5a 100644 --- a/modulators/OOKModulator.py +++ b/modulators/OOKModulator.py @@ -11,7 +11,7 @@ class OOKModulator: def __init__(self, baseband_samplerate, am_frequency): self.baseband_samplerate = baseband_samplerate self.am_frequency = am_frequency - self.samples = generateAMSine(self.baseband_samplerate, 0, 4000, 0) + self.samples = generateAMSine(self.baseband_samplerate, 0, 1000, 0) def addPadding(self, uSDuration): sampleCount = int((uSDuration / 1000000.0) * self.baseband_samplerate) self.samples = np.append(self.samples, generateAMSine(self.baseband_samplerate, 0, sampleCount, 0)) @@ -19,10 +19,10 @@ class OOKModulator: sampleCount = int((uSDuration / 1000000.0) * self.baseband_samplerate) self.samples = np.append(self.samples, generateAMSine(self.baseband_samplerate, self.am_frequency, sampleCount, 1)) def getSamples(self, numpyType): - self.addPadding(4000) + self.addPadding(1000) return self.samples.astype(numpyType) def getSamplesAndReset(self, numpyType): - self.addPadding(4000) + self.addPadding(1000) returnSamples = self.samples.astype(numpyType) - self.samples = generateAMSine(self.baseband_samplerate, 0, 4000, 0) + self.samples = generateAMSine(self.baseband_samplerate, 0, 1000, 0) return returnSamples diff --git a/protocols/Nexus.py b/protocols/Nexus.py index 4de3e70..ae48f0b 100644 --- a/protocols/Nexus.py +++ b/protocols/Nexus.py @@ -21,14 +21,16 @@ class Nexus_TempHumidity: def generateSamples(self, baseband_samplerate=2e6, id=244, channel=1, temp=30, humidity=100, numpyType=np.complex64): modulator = OOKModulator(baseband_samplerate=baseband_samplerate, am_frequency=22.471e3) bits = self.generateData(id, channel, temp, humidity) - for j in bits: + for repeat in range(0, 4): + for j in bits: + modulator.addModulation(500) + modulator.addPadding(1000 * (1 + int(j))) modulator.addModulation(500) - modulator.addPadding(1000 * (1 + int(j))) - modulator.addModulation(500) - modulator.addPadding(4000) + modulator.addPadding(4000) return modulator.getSamplesAndReset(numpyType) def generatePacket(self, id=244, channel=1, temp=30, humidity=100): + temp = int(temp * 10) packet = [ (id >> 4) & 0x0f, id & 0x0f, diff --git a/siggen.py b/siggen.py new file mode 100644 index 0000000..16e17e2 --- /dev/null +++ b/siggen.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +"""Simple signal generator for testing transmit + +Continuously output a carrier with single sideband sinusoid amplitude +modulation. + +Terminate with cntl-C. +""" + +import argparse +import math +import signal +import time + +import numpy as np + +import SoapySDR +from SoapySDR import * #SOAPY_SDR_ constants + +def siggen_app( + args, + rate, + ampl=0.7, + freq=None, + tx_bw=None, + tx_chan=0, + tx_gain=None, + tx_ant=None, + clock_rate=None, + wave_freq=None +): + """Generate signal until an interrupt signal is received.""" + + if wave_freq is None: + wave_freq = rate / 10 + + sdr = SoapySDR.Device(args) + #set clock rate first + if clock_rate is not None: + sdr.setMasterclock_rate(clock_rate) + + #set sample rate + sdr.setSampleRate(SOAPY_SDR_TX, tx_chan, rate) + print("Actual Tx Rate %f Msps"%(sdr.getSampleRate(SOAPY_SDR_TX, tx_chan) / 1e6)) + + #set bandwidth + if tx_bw is not None: + sdr.setBandwidth(SOAPY_SDR_TX, tx_chan, tx_bw) + + #set antenna + print("Set the antenna") + if tx_ant is not None: + sdr.setAntenna(SOAPY_SDR_TX, tx_chan, tx_ant) + + #set overall gain + print("Set the gain") + if tx_gain is not None: + sdr.setGain(SOAPY_SDR_TX, tx_chan, tx_gain) + + #tune frontends + print("Tune the frontend") + if freq is not None: + sdr.setFrequency(SOAPY_SDR_TX, tx_chan, freq) + + + print("Create Tx stream") + tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [tx_chan]) + print("Activate Tx Stream") + sdr.activateStream(tx_stream) + phase_acc = 0 + phase_inc = 2*math.pi*wave_freq/rate + stream_mtu = sdr.getStreamMTU(tx_stream) + samps_chan = np.array([ampl]*stream_mtu, np.complex64) + + time_last_print = time.time() + total_samps = 0 + + state = dict(running=True) + + def signal_handler(signum, _): + print('Signal handler called with signal {}'.format(signum)) + state['running'] = False + + signal.signal(signal.SIGINT, signal_handler) + + while state['running']: + phase_acc_next = phase_acc + stream_mtu*phase_inc + phases = np.linspace(phase_acc, phase_acc_next, stream_mtu) + samps_chan = ampl*np.exp(1j * phases).astype(np.complex64) + phase_acc = phase_acc_next + while phase_acc > math.pi * 2: + phase_acc -= math.pi * 2 + + status = sdr.writeStream(tx_stream, [samps_chan], samps_chan.size, timeoutUs=1000000) + if status.ret != samps_chan.size: + raise Exception("Expected writeStream() to consume all samples! %d" % status.ret) + total_samps += status.ret + + if time.time() > time_last_print + 5.0: + rate = total_samps / (time.time() - time_last_print) / 1e6 + print("Python siggen rate: %f Msps" % rate) + total_samps = 0 + time_last_print = time.time() + + #cleanup streams + print("Cleanup stream") + sdr.deactivateStream(tx_stream) + sdr.closeStream(tx_stream) + print("Done!") + +def main(): + """Parse command line arguments and start sig-gen.""" + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + + parser.add_argument("--args", type=str, help="device factor arguments", default="") + parser.add_argument("--rate", type=float, help="Tx and Rx sample rate", default=1e6) + parser.add_argument("--ampl", type=float, help="Tx digital amplitude rate", default=0.7) + parser.add_argument("--tx-ant", type=str, help="Optional Tx antenna") + parser.add_argument("--tx-gain", type=float, help="Optional Tx gain (dB)") + parser.add_argument("--tx-chan", type=int, help="Transmitter channel (def=0)", default=0) + parser.add_argument("--freq", type=float, help="Optional Tx and Rx freq (Hz)") + parser.add_argument("--tx-bw", type=float, help="Optional Tx filter bw (Hz)", default=5e6) + parser.add_argument("--wave-freq", type=float, help="Baseband waveform freq (Hz)") + parser.add_argument("--clock-rate", type=float, help="Optional clock rate (Hz)") + parser.add_argument("--debug", action='store_true', help="Output debug messages") + parser.add_argument( + "--abort-on-error", action='store_true', + help="Halts operations if the SDR logs an error") + + options = parser.parse_args() + + + siggen_app( + args=options.args, + rate=options.rate, + ampl=options.ampl, + freq=options.freq, + tx_bw=options.tx_bw, + tx_ant=options.tx_ant, + tx_gain=options.tx_gain, + tx_chan=options.tx_chan, + clock_rate=options.clock_rate, + wave_freq=options.wave_freq, + ) + +if __name__ == '__main__': + main() diff --git a/smokealarmtest.py b/smokealarmtest.py new file mode 100644 index 0000000..3c1a120 --- /dev/null +++ b/smokealarmtest.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +import argparse +import math +import signal +import time +import time +import SoapySDR +from SoapySDR import * #SOAPY_SDR_* constants + + +import numpy as np + +import protocols.Nexus + + +from modulators.OOKModulator import OOKModulator + +A = 1 +import struct +def bitfield(n): + return [int(digit) for digit in bin(n)[2:].zfill(8)] + +ookTest = OOKModulator(baseband_samplerate=2e6, am_frequency=22.471e3) + + +for i in range(4): + #bits = [ 0,1,0,0,1,0,0,0,1,1,0,0,0,0,1,1,0,0,0,1,0,0,0,0,1,1,1,0,1,1,0,1,0,1,1,1 ] + #bits = bitsUnitID[::-1] + byte1 = 0x00 + byte2 = 0x00 + byte3 = 0x00 + + PARAM_unit = 5 # max 30 + PARAM_id = 1337 # max 32767 + + byte1 = PARAM_unit & 0x1f # | PARAM_id << 3 + byte2 = ((PARAM_id & 0x0f) >> 11) + byte3 = 0x3 + + + + + print(bitfield(byte1)) + print(bitfield(byte2)) + print(bitfield(byte3)) + bits = bitfield(byte1)[::-1] + bitfield(byte2)[::-1] + bitfield(byte3) + #bits = data2 + print(bits) + print(len(bits)) + + for bit in bits: + if bit == 0: + # Short Pulse + ookTest.addModulation(436) + ookTest.addPadding(1299) + else: + # Long Pulse + ookTest.addModulation(1202) + ookTest.addPadding(526) + #Stop Pulse + ookTest.addModulation(434) + ookTest.addPadding(434) + ookTest.addModulation(434) + ookTest.addPadding(434) + +output = ookTest.getSamples(np.complex64) + +with open('output.complex', 'wb') as f: + output.tofile(f) +""" +## Transmit Part +SDR_ARGS = {'driver': 'lime'} + +sdr = SoapySDR.Device(SDR_ARGS) + +sdr.setSampleRate(SOAPY_SDR_TX, 0, 2e6) +sdr.setAntenna(SOAPY_SDR_TX, 0, 'Auto') +sdr.setFrequency(SOAPY_SDR_TX, 0, 433.92e6) +sdr.setGain(SOAPY_SDR_TX, 0, 70) + +txStream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32 , [0]) +sdr.activateStream(txStream) +sdr.setHardwareTime(0) + +flags = SOAPY_SDR_HAS_TIME | SOAPY_SDR_END_BURST +status = sdr.writeStream(txStream, [output], output.size, timeoutUs=1000000) + +sdr.writeStream(txStream, + output, output.size, + #flags=flags, + timeoutUs=int(1e6)) + + +sdr.deactivateStream(txStream) +sdr.closeStream(txStream) +""" \ No newline at end of file diff --git a/testIntertek_Remotesocket.py b/testIntertek_Remotesocket copy.py similarity index 100% rename from testIntertek_Remotesocket.py rename to testIntertek_Remotesocket copy.py