You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

408 lines
13 KiB
Python

import ctypes
import struct
"""
** 11.09.2019 : Added Numeric Pager support by cuddlycheetah (github.com/cuddlycheetah)
** 14.10.2019 : Added Repeating Transmission + Single Preamble Mode (github.com/cuddlycheetah)
** 22.10.2019 : Made the Original (from rpitx) to a Python Encoder. (github.com/cuddlycheetah)
"""
#The sync word exists at the start of every batch.
#A batch is 16 words, a sync word occurs every 16 data words.
SYNC=0x7CD215D8
#The idle word is used as padding before the address word, at the end
#of a message to indicate that the message is finished. Interestingly, the
#idle word does not have a valid CRC code, the sync word does.
IDLE=0x7A89C197
#One frame consists of a pair of two words
FRAME_SIZE = 2
#One batch consists of 8 frames, 16 words
BATCH_SIZE = 16
#The preamble comes before a message, is a series of alternating
#1,0,1,0... bits, at least 576 bits. It exists to allow the receiver
#to synchronize with the transmitter
PREAMBLE_LENGTH = 576
#These bits appear as the first bit of a word, for an address word and
#one for a data word
FLAG_ADDRESS = 0x000000
FLAG_MESSAGE = 0x100000
#The last two bits of an address word's data represent the data type
#0x3 for text, 0x0 for numeric.
FLAG_TEXT_DATA = 0x3
FLAG_NUMERIC_DATA = 0x0
#Each data word can contain 20 bits of text information. Each character is
#7 bits wide, encoded. The bit order of the characters is reversed from
#the normal bit order; the most significant bit of a word corresponds to the
#least significant bit of a character it is encoding. The characters are split
#across the words of a message to ensure maximal usage of all bits.
TEXT_BITS_PER_WORD = 20
#As mentioned above, are 7 bit ASCII encoded
TEXT_BITS_PER_CHAR = 7
NUMERIC_BITS_PER_WORD = 20
NUMERIC_BITS_PER_DIGIT = 4
#Length of CRC codes in bits
CRC_BITS=10
#The CRC generator polynomial
CRC_GENERATOR=0b11101101001
'''*
* Calculate the CRC error checking code for the given word.
* Messages use a 10 bit CRC computed from the 21 data bits.
* This is calculated through a binary polynomial long division, returning
* the remainder.
* See https:#en.wikipedia.org/wiki/Cyclic_redundancy_check#Computation
* for more information.
'''
def crc(inputMsg):
#Align MSB of denominatorerator with MSB of message
denominator = CRC_GENERATOR << 20
#Message is right-padded with zeroes to the message length + crc length
msg = inputMsg << CRC_BITS
#We iterate until denominator has been right-shifted back to it's original value.
for column in range(20 + 1):
#Bit for the column we're aligned to
msgBit = (msg >> (30 - column)) & 1
#If the current bit is zero, don't modify the message self iteration
if msgBit != 0:
#While we would normally subtract in long division, XOR here.
msg ^= denominator
#Shift the denominator over to align with the next column
denominator >>= 1
#At self point 'msg' contains the CRC value we've calculated
return msg & 0x3FF
'''*
* Calculates the even parity bit for a message.
* If the number of bits in the message is even, 0, return 1.
'''
def parity(x):
#Our parity bit
p = 0
#We xor p with each bit of the input value. This works because
#xoring two one-bits will cancel out and leave a zero bit. Thus
#xoring any even number of one bits will result in zero, xoring
#any odd number of one bits will result in one.
for i in range(32):
p ^= (x & 1)
x >>= 1
return p
'''*
* Encodes a 21-bit message by calculating and adding a CRC code and parity bit.
'''
def encodeCodeword(msg):
fullCRC = (msg << CRC_BITS) | crc(msg)
p = parity(fullCRC)
return (fullCRC << 1) | p
'''*
* ASCII encode a null-terminated string as a series of codewords, written
* to (*out). Returns the number of codewords written. Caller should ensure
* that enough memory is allocated in (*out) to contain the message
*
* initial_offset indicates which word in the current batch the function is
* beginning at, that it can insert SYNC words at appropriate locations.
'''
def encodeASCII(initial_offset, text, buff):
#Number of words written to *out
numWordsWritten = 0
#Data for the current word we're writing
currentWord = 0
#Nnumber of bits we've written so far to the current word
currentNumBits = 0
#Position of current word in the current batch
wordPosition = initial_offset
for c in text:
#Encode the character bits backwards
for i in range(TEXT_BITS_PER_CHAR):
currentWord <<= 1
currentWord |= (ord(c) >> i) & 1
currentNumBits+=1
if currentNumBits == TEXT_BITS_PER_WORD:
#Add the MESSAGE flag to our current word and encode it.
buff.append(encodeCodeword(currentWord | FLAG_MESSAGE))
currentWord = 0
currentNumBits = 0
numWordsWritten+=1
wordPosition+=1
if wordPosition == BATCH_SIZE:
#We've filled a full batch, to insert a SYNC word
#and start a one.
buff.append(SYNC)
numWordsWritten+=1
wordPosition = 0
#Write remainder of message
if currentNumBits > 0:
#Pad out the word to 20 bits with zeroes
currentWord <<= 20 - currentNumBits
buff.append(encodeCodeword(currentWord | FLAG_MESSAGE))
numWordsWritten+=1
wordPosition+=1
if wordPosition == BATCH_SIZE:
#We've filled a full batch, to insert a SYNC word
#and start a one.
buff.append(SYNC)
numWordsWritten+=1
wordPosition = 0
return numWordsWritten
# Char Translationtable
mirrorTab = [0x00, 0x08, 0x04, 0x0c, 0x02, 0x0a, 0x06, 0x0e, 0x01, 0x09]
def encodeDigit(ch):
if ord(ch) >= ord('0') and ord(ch) <= ord('9'):
return mirrorTab[ord(ch) - ord('0')]
elif ch == ' ':
return 0x03
elif ch == 'u' or ch == 'U':
return 0x0d
elif ch == '-' or ch == '_':
return 0x0b
elif ch == '(' or ch == '[':
return 0x0f
elif ch == ')' or ch == ']':
return 0x07
else:
return 0x05
def encodeNumeric(initial_offset, text, buff):
#Number of words written to *out
numWordsWritten = 0
#Data for the current word we're writing
currentWord = 0
#Nnumber of bits we've written so far to the current word
currentNumBits = 0
#Position of current word in the current batch
wordPosition = initial_offset
for c in text:
#Encode the digit bits backwards
for i in range(NUMERIC_BITS_PER_DIGIT):
currentWord <<= 1
digit = encodeDigit(c)
digit = ((digit & 1) << 3) | ((digit & 2) << 1) | ((digit & 4) >> 1) | ((digit & 8) >> 3)
currentWord |= (digit >> i) & 1
currentNumBits+=1
if currentNumBits == NUMERIC_BITS_PER_WORD:
#Add the MESSAGE flag to our current word and encode it.
buff.append(encodeCodeword(currentWord | FLAG_MESSAGE))
currentWord = 0
currentNumBits = 0
numWordsWritten+=1
wordPosition+=1
if wordPosition == BATCH_SIZE:
#We've filled a full batch, to insert a SYNC word
#and start a one.
buff.append(SYNC)
numWordsWritten+=1
wordPosition = 0
#Write remainder of message
if currentNumBits > 0:
#Pad out the word to 20 bits with zeroes
currentWord <<= 20 - currentNumBits
buff.append(encodeCodeword(currentWord | FLAG_MESSAGE))
numWordsWritten+=1
wordPosition+=1
if wordPosition == BATCH_SIZE:
#We've filled a full batch, to insert a SYNC word
#and start a one.
buff.append(SYNC)
numWordsWritten+=1
wordPosition = 0
return numWordsWritten
'''*
* An address of 21 bits, only 18 of those bits are encoded in the address
* word itself. The remaining 3 bits are derived from which frame in the batch
* is the address word. This calculates the number of words (not framesnot )
* which must precede the address word so that it is in the right spot. These
* words will be filled with the idle value.
'''
def addressOffset(address):
return (address & 0x7) * FRAME_SIZE
'''*
* Calculates the length in words of a text POCSAG message, the address
* and the number of characters to be transmitted.
'''
def textMessageLength(repeatIndex, address, numChars):
numWords = 0
#Padding before address word.
numWords += addressOffset(address)
#Address word itself
numWords+=1
#numChars * 7 bits per character / 20 bits per word, up
numWords += (numChars * TEXT_BITS_PER_CHAR + (TEXT_BITS_PER_WORD - 1)) / TEXT_BITS_PER_WORD
#Idle word representing end of message
numWords+=1
#Pad out last batch with idles
numWords += BATCH_SIZE - (numWords % BATCH_SIZE)
#Batches consist of 16 words each and are preceded by a sync word.
#So we add one word for every 16 message words
numWords += numWords / BATCH_SIZE
#Preamble of 576 alternating 1,0,1, bits before the message
#Even though self comes first, add it to the length last so it
#doesn't affect the other word-based calculations
if repeatIndex == 0:
numWords += PREAMBLE_LENGTH / 32
return numWords
'''*
* Calculates the length in words of a numeric POCSAG message, the address
* and the number of characters to be transmitted.
'''
def numericMessageLength(repeatIndex, address, numChars):
numWords = 0
#Padding before address word.
numWords += addressOffset(address)
#Address word itself
numWords+=1
#numChars * 7 bits per character / 20 bits per word, up
numWords += (numChars * NUMERIC_BITS_PER_DIGIT + (NUMERIC_BITS_PER_WORD - 1)) / NUMERIC_BITS_PER_WORD
#Idle word representing end of message
numWords+=1
#Pad out last batch with idles
numWords += BATCH_SIZE - (numWords % BATCH_SIZE)
#Batches consist of 16 words each and are preceded by a sync word.
#So we add one word for every 16 message words
numWords += numWords / BATCH_SIZE
#Preamble of 576 alternating 1,0,1, bits before the message
#Even though self comes first, add it to the length last so it
#doesn't affect the other word-based calculations
if repeatIndex == 0:
numWords += PREAMBLE_LENGTH / 32
return numWords
def encodeTransmission(numeric, repeatIndex, address, fb, message, buff):
out=0
#Encode preamble
#Alternating 1,0,1, bits for 576 bits, for receiver to synchronize
#with transmitter
if repeatIndex == 0:
for i in range(PREAMBLE_LENGTH / 32):
buff.append(0xAAAAAAAA)
out+=1
start = out
#Sync
buff.append(SYNC)
out+=1
#Write out padding before adderss word
prefixLength = addressOffset(address)
for i in range(prefixLength):
buff.append(IDLE)
out+=1
#Write address word.
#The last two bits of word's data contain the message type (function bits)
#The 3 least significant bits are dropped, those are encoded by the
#word's location.
buff.append(encodeCodeword(((address >> 3) << 2) | fb))
out+=1
#Encode the message itself
if numeric == True:
out += encodeNumeric(addressOffset(address) + 1, message, buff)
else:
out += encodeASCII(addressOffset(address) + 1, message, buff)
#Finally, an IDLE word indicating the end of the message
buff.append(IDLE)
out+=1
#Pad out the last batch with IDLE to write multiple of BATCH_SIZE + 1
#words (+ 1 is there because of the SYNC words)
written = out - start
padding = (BATCH_SIZE + 1) - written % (BATCH_SIZE + 1)
for i in range(padding):
buff.append(IDLE)
out+=1
def parseAddress(address):
if address.find('A') > 0:
return [address[:address.index('A')], 0]
elif address.find('B') > 0:
return [address[:address.index('B')], 1]
elif address.find('C') > 0:
return [address[:address.index('C')], 2]
elif address.find('D') > 0:
return [address[:address.index('D')], 3]
else:
return [address, 3]
def encodeTXBatch(messages, repeatNum = 2, inverted = False):
transmission = []
pocsagData = []
msgIndex = 0
for message in messages:
msgNumeric = message[0]
msgAddrParsed = parseAddress(message[1])
msgAddress = int(msgAddrParsed[0])
msgBits = msgAddrParsed[1]
msgText = message[2]
for repeat in range(repeatNum):
encodeTransmission(msgNumeric, msgIndex, msgAddress, msgBits, msgText, transmission)
msgIndex += 1
for word in transmission:
uint = ctypes.c_uint32(word).value
if not inverted:
uint = ~uint
pocsagData.append(int((uint>>24) & 0xFF))
pocsagData.append(int((uint>>16) & 0xFF))
pocsagData.append(int((uint>>8) & 0xFF))
pocsagData.append(int(uint & 0xFF))
return pocsagData