Sniffing the ISM band at 912MHz

There's a band of ISM (Industry, Science, and Medicine) spectrum at 915ish MHz. ISM spectrum is a place where unlicensed devices can operate under FCC part 15 restrictions (or part 18 if they're licenced). This is where all those "900MHz" phones and whatnot live.

There's potentially a lot of interesting stuff lurking in this band: running a simple flowgraph to watch the waterfall of it shows a bunch of tiny packets flying around. Unfortunately, it's really hard to figure out where/when they are, so you're left dumping the spectrum to a file, and then post-processing it. Doing that in GRC is a bit ugly, so I decided to pull the data into IPython and see what I could do over here.

Part of this is that we don't know what we're looking for, or where. To make it easier to work with, we'll capture a buuuunch of data, then find the packets within it iteratively. This is very memory-conserving: doing the whole thing at once runs off into Bad Places in terms of performance. Note that GNU Radio's native components don't have this issue, which is the main benefit of its architecture. However, I find writing a GNU Radio block to test my code a bit tedious/distracting from the actual goal. This file is a great clean-up of the five(!) different notebooks I made in the process of figuring out reasonable ways of solving these problems; mixing in blocks probably would have just made it even more scattershot in the first versions.

(NB: the code below takes in a complex short integer input. If you're capturing in the usual complex format, you'll need to adjust the fetch_x() function to handle the appropriate input format. It'll probably be just a symbol size of 2*4 bytes, then fromfile("foo.sdr", dtype="complex64"), skipping the subsequent hoops I have to jump through to go from two signed ints to a reasonably-sized complex value.)

In [430]:
%pylab inline

pylab.rcParams["figure.figsize"] = 14,4

import scipy
import scipy.signal

import gc

import os
import json
Populating the interactive namespace from numpy and matplotlib

Data access helper function

To work from disk, we're going to do a bit of our own crappy buffering. This function is how you request a span of samples from the input file. It'll return them as a complex np.array.

In [431]:
# We're going to need a wackypants way to get to the right places here

FILENAME="912_snoop_si16.sdr"
SAMPLE_SIZE = 2*2 # make this 2*4 if you're using complex inputs

Fs = 4e6   # Sample rate of our input
Fc = 912e6 # Carrier frequency of the input; not really used, but good to know.



def fetch_x(sample_offset, sample_count):
    f = file(FILENAME)
    f.seek(SAMPLE_SIZE*sample_offset)
    b = f.read(SAMPLE_SIZE*sample_count)
    f.close()

    # If you're using complex input, just do 
    # return np.frombuffer(b, dtype="complex64")
    x = np.frombuffer(b, dtype="int16")
    x = x.reshape(len(x)/2,2)

    xc = (x[:,0] + 1.0j*x[:,1]).astype("complex64")
    del(x)
    gc.collect()
    return xc

s = os.stat(FILENAME)
file_length_samples = s.st_size / SAMPLE_SIZE
file_length_samples
Out[431]:
120777916

Data structures

Before we get to the process of finding the actual packets, we need to create the structures that will contain the information about those packets.

The core unit here is the "Hit", which is just a frequency+bandwidth with sample_start+sample_length. That is: it's a bounding box on the waterfall plot of the input file. We can then look for places where these butt into each other, and merge them together. This is generally only going to happen when we have a partial hit for a given signal. We'll get those partials any time a packet happens to cross one of our sample buffer blocks.

There's also a HitList class, which is just an array of Hits that knows what to do when merging two Hits together, and has an algorithm to find merge candidates. This could be improved a bunch by moving to something like a quadtree for the initial pass, and stuffing them in as we go along, but, that's for later.

In [432]:
class Hit:
    """Container for a single packet "hit" in the data stream
    
    We use these to keep notes on where in the RF stream a packet might be found.
    
    They key variables here are:
    * Fc -- center frequency
    * bw -- bandwidth
    * Fs -- sample rate of the original data stream
    * sample_start -- offset in the original system's sample stream where it's found
    * sample_end -- length of the sample in the origin's sample rate
    """
    def __init__(self, Fs, Fc, bw, sample_start, sample_length, bw_fudge=None, sample_start_fudge=2000, sample_length_fudge=8000):
        """Create a new Hit entry
         * Fs -- sample rate of original system
         * Fc -- center frequency of packet
         * bw -- bandwidth of packet
         * sample_start -- initial sample position of packet (in Fs)
         * sample_length -- length of packet (in Fs)
         * bw_fudge -- amount to grow the bandwidth for purposes of capture and matching (default: +2x top and bottom, so 5x)
         * sample_start_fudge -- amount to grow the sample span at its head
         * sample_length_fudge -- amount to grow the sample span at its tail
        """
        self.Fs = Fs
        self.Fc = Fc
        self.bw = bw
        
        self.bw_fudge = bw_fudge
        
        self.sample_start_fudge = sample_start_fudge
        self.sample_length_fudge = sample_length_fudge
        
        if None == bw_fudge:
            #self.bw += 4*bw
            self.bw_fudge = 2*bw
        else:
            #self.bw += bw_fudge
            pass
        
        self.sample_start = sample_start # - sample_start_fudge
        self.sample_length = sample_length # + sample_length_fudge
  
    def f_low(self, fudge=True):
        "Low end of the frequency span for this Hit, including fudge factors"
        fudge = self.bw_fudge if fudge else 0
        return self.Fc - self.bw/2 - fudge
    
    def f_high(self, fudge=True):
        "High end of the frequency span for this Hit, including fudge factors"
        fudge = self.bw_fudge if fudge else 0
        return self.Fc + self.bw/2 + fudge
    
    def in_band(self, f):
        "Query if the given frequency is within the frequency span for this Hit"
        return self.between(self.f_low(), self.f_high(), f)
    
    def s_low(self, fudge=True):
        "Sample offset of the start of the packet in the origin stream (measured in source samples)"
        fudge = self.sample_start_fudge if fudge else 0
        return int(self.sample_start - fudge)
    
    def s_high(self, fudge=True):
        "Sample offset of the end of the packet in the origin stream (measured in source samples)"
        fudge = self.sample_length_fudge if fudge else 0
        return int(self.sample_start+self.sample_length + fudge)
    
    def s_length(self, fudge=True):
        return self.s_high(fudge) - self.s_low(fudge)
    
    def in_time(self, s):
        "Query if the given sample number is within the frequency span for this Hit"
        return self.between(self.s_low(), self.s_high(), s)
    
    def between(self, a, b, q):
        "Helper method: check if q falls between a and b. Will reorder a/b internally if needed."
        if b < a:
            t = a
            a = b
            b = t
            
        return ( (a <= q) and (q <= b) )
    
    def could_merge(self, h2):
        "Given another Hit h2, see if they overlap in both samples and frequency."
        f0_in = self.in_band(h2.f_low())
        f1_in = self.in_band(h2.f_high())
        
        s0_in = self.in_time(h2.s_low())
        s1_in = self.in_time(h2.s_high())
        
        if (f0_in or f1_in) and (s0_in or s1_in):
            return True
        
        return False
    
    def merge(self, h2):
        "Given another Hit h2, find a new Hit that subsumes both in time and freqency."
        f_low = min(self.f_low(False), h2.f_low(False))
        f_high = max(self.f_high(False), h2.f_high(False))
        
        s_low = min(self.s_low(False), h2.s_low(False))
        s_high = max(self.s_high(False), h2.s_high(False))
    
        f_bw = f_high - f_low
        s_length = s_high - s_low
        
        return Hit(self.Fs, (f_low+f_high)/2.0, f_bw, s_low, s_high-s_low, self.bw_fudge, self.sample_start_fudge, self.sample_length_fudge)
    
    def __str__(self):
        "Helper method."
        return "%f / %f, %d + %d" % (self.Fc, self.bw, self.sample_start, self.sample_length)
    
    
class HitList:
    """Container for a bunch of Hit entries, and coordinator of merging passes
    
    This is where all the Hits go, and coordinates the merge process for gluing Hits together into larger packets.
    
    """
    def __init__(self):
        self.hits = []        
        self.merged = None # Initialize to None as a sentry for run_merge's bootstrap
        
    def mark(self, Fs, Fc, bw, sample_start, sample_length, **kwargs):
        h = Hit(Fs, Fc, bw, sample_start, sample_length, **kwargs)
      
        self.hits.append(h)
    
    def _do_merge(self, hits):
        "Helper method: runs a merge pass on the input array of Hits, returns new array of Hits post-merge"
        merged = []
        
        for i,h_i in enumerate(hits):
            did_merge = False
                        
            for m,h_m in enumerate(merged):
                if h_i.could_merge(h_m) or h_m.could_merge(h_i):
                    merged[m] = h_m.merge(h_i)
                    did_merge = True
                    break
                    
            if not did_merge:
                merged.append(h_i)
        return merged
    
    def run_merge(self):
        "Run a merge pass over the current merged hits, further merging.  Uses input if no merge has been run."
        if None == self.merged:
            l0 = len(self.hits)
            self.merged = self._do_merge(self.hits)
        else:
            l0 = len(self.merged)
            self.merged = self._do_merge(self.merged)
        return (l0, len(self.merged))
    
hl = HitList()

Find packets

This is where the first bit of magic happens: we find all the packets in the original file. This is done by buffering in blocks of data, identifying statistical outliers, and then running a state machine to extract adjacent outliers.

Once we have that list, we can start looking at each packet in detail.

Buffering

To find the packets without loading the whole file into memory at the same time, we work through it in large-ish chunks, finding the Hits within each chunk. Then, after we've gone through the whole file, we run the merge algorithm over the HitList to get down to the minimal set of packets in the entire file.

To ensure we don't miss anything, we take buffers which overlap each other by a bit. This results in redundant Hits in a lot of cases, but that's fine: the merge algorithm will just take care of them.

Finding packet candidates with statistics

To find candidate packets, we basically apply a simplistic AGC+squelch to each buffer of data. We first run specgram to get the spectrogram of the data. You can think of a spectrogram as a waterfall display turned into a two dimensional array of data. It's basically (frequency bin, time bin) => energy.

We sample a bunch of points of that data to find out what the distribution of energy for each (frequency,time) bin is. We build an Empirical Cumulative Density Function (ECDF) of the data from this sample, so we have a rough idea where the Nth percentile falls. We then go through the data and threshold it as being 98th percentile or above, or below 98th percentile. This gives us another 2-D array of (frequency,time) -> {above,below}

The 98th percentile here is entirely arbitrary, and may not be appropriate if you're trying to monitor a band where more than 2% of the frequencies are in use. You can try twiddling it around for our local conditions. Similarly, the sample size used to determine the 98th percentile is arbitrary, though you want it to be reasonably large. 1000 has served me well enough, but I've also used 10k in this project, without really noticing any differences.

Also, the width of the frequency bins may matter. You want to have a rough idea what the minimum target bandwidth you're after is, as a very strong narrow signal will get washed out if your bins are much larger than it.

Assembling possible hits

After we've identified individual bins that may contain packets, we walk across this array of (frequency,time) bins. We move across it left-to-right, moving forward in time with each step. When we see a frequency row go from below-threshold to above-threshold, we note the time when that row went high. Then, when we later on see that same row go from above-threshold to below-threshold, we can figure out how long that frequency bin was above the threshold. If it's above some minimum length, we feel very confident that this is a strong signal that stands out from the background noise, and isn't just random noise. I'm using a relatively long run length here, which gets rid of marginal signals that float around at the 98th percentile.

In [433]:
target_bw = 50000
max_bw = 200000
N_sample = 1000
p_cutoff = 0.95

nFFTs = int(2**(np.ceil(np.log2(Fs/target_bw*2)))) # We want buckets of at most target_bw/2 Hz

bin_bw = Fs/nFFTs

minrun = 10

samples_per_block = nFFTs*1000
samples_per_step = samples_per_block - 3 * minrun*nFFTs


def run_step(block_no, hl):
    start0 = block_no*samples_per_step
    xc = fetch_x(start0, samples_per_block)
    
    a = specgram(xc, NFFT=nFFTs, Fs=Fs, noverlap=0, scale_by_freq=False); None
    clf()
    # Build a quick ECDF of this block, as a janky sort of AGC
    pt_sample = a[0].flatten()[ np.random.uniform(low=0, high=a[0].size, size=N_sample).astype("int") ]
    pt_sample.sort()
    
    # Find the p_cutoff'th percentile and use that as our threshold
    th = pt_sample[int(p_cutoff*N_sample)]
 
    # A quick transition-based state machine to find occupied channels
    states = np.zeros(nFFTs).astype("bool")
    starts = np.zeros(nFFTs)

    retval = []

    for i in range(a[0].shape[1]):
        r = (a[0][:,i] > th).astype("bool")
        starts[r & (~states)] = i
        term_is = (states & ~r).nonzero()[0]

        for j in term_is:
            if minrun <= i - starts[j]:
                hl.mark(Fs, 0.0+a[1][j]+bin_bw/2, bin_bw, start0 + nFFTs*starts[j], nFFTs*(i-starts[j]), bw_fudge=max_bw/2)
            starts[j] = 0

        states = r
        
    i = a[0].shape[1]
    term_is = states.nonzero()[0]
    for j in term_is:
        if minrun <= i - starts[j]:
            hl.mark(Fs, 0.0+a[1][j]+bin_bw/2, bin_bw, start0 + nFFTs*starts[j], nFFTs*(i-starts[j]), bw_fudge=max_bw/2)
        starts[j] = 0   

    gc.collect()

    return a[0].shape
    
In [434]:
# We run with this weird inside-out pattern to minimize
# possible memory leaks from the processing above.  Without
# attention to detail here, you can wind up leaking out
# all the RAM in your system.

for block_no in range(file_length_samples/samples_per_step):
    run_step(block_no, hl)
    gc.collect()
    if 0 == block_no % 25:
        print len(hl.hits), file_length_samples, block_no*samples_per_step
    
    
len(hl.hits)
8 120777916 0
517 120777916 6208000
790 120777916 12416000
1121 120777916 18624000
1594 120777916 24832000
1935 120777916 31040000
2371 120777916 37248000
2671 120777916 43456000
2988 120777916 49664000
3442 120777916 55872000
3749 120777916 62080000
4151 120777916 68288000
4466 120777916 74496000
4754 120777916 80704000
5097 120777916 86912000
5323 120777916 93120000
5584 120777916 99328000
5974 120777916 105536000
6261 120777916 111744000
6650 120777916 117952000

Out[434]:
6870
<matplotlib.figure.Figure at 0x7f12c2ec0610>

Merge

Now we've got a bunch of possible packet locations. Unfortunately, we know they're likely to contain redundant Hits, both for packets that straddle the boundaries, as well as for packets whose carrier happened to fall on bin frequency boundaries, and therefore bounced around between bins.

We now run the merge operation, which boils down to "Make each Hit a little bigger in both time and frequency, then see if it overlaps with other packets that are also made a little bigger. When two overlap, make a new Hit that covers both of them and delete the two original Hits. Keep running this process until the number of Hits doesn't change when you run." In theory, you shouldn't see this number change after the first pass, but it's idempotent, so we run it multiple times to make sure we cover any bugs/thinkos in the implementation.

There are so many better ways to do this, but this way is super easy to explain and very simple to code. A much better way to do this is to build a quadtree structure. If you refactor this to do that, could you also make it into a proper gnuradio block while you're at it? Pull requests welcome.

The run_merge() method returns the length before running and the length afterwards, so we can monitor its progress from the outside.

In [435]:
hl.run_merge() # This will take a while; it's doing a lot of heavy lifting
Out[435]:
(6870, 1034)
In [436]:
l0,l1 = hl.run_merge() # Hopefully this is the last merge step, but who knows.
(l0,l1)
Out[436]:
(1034, 942)
In [437]:
# And now we just let it run for a bit.
while l0 != l1:
    l0,l1 = hl.run_merge()
    print (l0,l1)
(l0,l1)
(942, 933)
(933, 933)

Out[437]:
(933, 933)

Packets identified

We should now have some packets identified. Time to see what they look like!

In [438]:
# [ (h.Fc, h.s_low(), h.s_high()) for h in hl.merged[0:20] ]
In [439]:
# [ (h.Fc, h.s_low(), h.s_high()) for h in hl.hits[0:10] ]

Let's pull out the samples for the first packet and see what it looks like.

In [440]:
h = hl.merged[0]

y = fetch_x(h.s_low(), h.s_length())
specgram(y, NFFT=64, noverlap=63, Fs=Fs); None
title(str(h))
Out[440]:
<matplotlib.text.Text at 0x7f12c29b4e10>

Okay, this looks reasonable, but we need to focus in on it. To do that, we're going to use channel extraction to pull out just that bit of bandwidth.

Channel Extraction

Let's add in our channel extractor from previous work. This lets us get to smaller bands around the channel of interest. This just pulls out the band around a given center frequency, and samples it down to a lower bandwidth to make it easier to work with.

In [484]:
def extract_channel(f_sample, f_center, f_bw, x):
    """Extract a channel of width f_bw at f_center, from f_sample
    
    Returns Fs,y, the new sample rate and an array of complex samples
    """
    
    # Cribbing from the GNURadio xlate, we use their block diagram
    
    my_state = {} # cheating backchannel for debug
    
    if   (f_center + f_bw/2.0) > f_sample / 2.0:
        f_bw = f_sample/2.0 - f_center - 1000
    elif (f_center - f_bw/2.0) < -f_sample/2.0:
        f_bw = f_center-f_sample/2 - 100
    
    # Create a LPF for our target bandwidth
    n_taps = 100 # XXX Total random shot in the dark    
    r_cutoff = float(f_bw)/f_sample    
    base_lpf = scipy.signal.remez(n_taps, [0, r_cutoff, r_cutoff*2.0, 0.5], [1,0])
    
    # Shift this LPF up to our center frequency
    fwT0 = 2.0*np.pi*f_center/f_sample
    
    lpf = base_lpf * np.exp(1.0j*fwT0 * np.arange(0, n_taps))
    
    dec_rate = int(f_sample / (2*f_bw))
    
    x_filtered = scipy.signal.lfilter(lpf, 1.0, x)
    
    dec_is = np.arange(0, len(x_filtered), dec_rate)
    y = x_filtered[dec_is]
    y *= np.exp(-1.0j*fwT0*dec_is)
    
    my_state["n_taps"] = n_taps
    my_state["r_cutoff"] = r_cutoff
    my_state["lpf"] = lpf
    my_state["base_lpf"] = base_lpf
    my_state["fwT0"] = fwT0
    my_state["dec_rate"] = dec_rate
    # my_state["dec_is"] = dec_is
    # my_state["x_filtered"] = x_filtered
    
    return f_sample/dec_rate, y, my_state

Now, let's see what that zero'th packet looks like when zoomed in

In [442]:
def plot_h(h):
    y = fetch_x(h.s_low(), h.s_length())
    Fs_yp, yp, _ = extract_channel(Fs, h.Fc, h.bw, y)
    a = specgram(yp, Fs=Fs_yp, NFFT=32, noverlap=31)
    title("%d: %s" % (i,str(h)))
    return { "Fs_yp": Fs_yp, "yp": yp, "a": a }

i = 0
h = hl.merged[i]

plot_h(h)
None

Decoding

Now, let's decode the packets. The goal here is to get to a bitstream that's appropriate for the signal of interest for each packet, which is tricky.

Frequency lock and time trimming

To do this, we need to put the center of our packet at baseband (zero Hz), and cut out the noise at the head and tail of the packet. This can be done similarly to the packet extraction itself, but a bit simpler.

First, we create another ECDF, but this time for the entire downsampled packet buffer. From there, we apply a threshold of 98% (98% is arbitrary, like it was last time), and then find the average frequency that's above the threshold. We also find the length of the head and tail times where no frequency is above the threshold. Note that these sample counts are going to be in the sample rate of this packet's buffer, not the overall system rate (ie they're in units of Fs_yp, not Fs).

Once we have these three numbers, we can really focus in on our packet.

In [443]:
def plot_h(h):
    y = fetch_x(h.s_low(), h.s_length())
    Fs_yp, yp, _ = extract_channel(Fs, h.Fc, h.bw, y)
    a = specgram(yp, Fs=Fs_yp, NFFT=32, noverlap=31)
    clf()
    
    p_threshold = 0.98
    
    energies = a[0].flatten()
    energies.sort()
    
    threshold = energies[int(p_threshold*len(energies))]
    
    in_use = a[0] > threshold
    
    # Weighted mean
    mean_f = np.sum(np.sum(in_use, axis=1) * a[1])/np.sum(in_use)
    
    times_in_use = np.sum(in_use, axis=0) > 0
    head_skip = 0
    for t in times_in_use:
        if t:
            break
        head_skip += 1
        
    
    tail_skip = 0
    for t in times_in_use[::-1]:
        if t:
            break
        tail_skip += 1
    
    
    head_skip_Fs = int(head_skip * Fs / Fs_yp)
    tail_skip_Fs = int(tail_skip * Fs / Fs_yp)

    y = fetch_x(h.s_low()+head_skip_Fs, h.s_length()-tail_skip_Fs)
    Fs_yp, yp, _ = extract_channel(Fs, h.Fc+mean_f, h.bw, y)
    a = specgram(yp, Fs=Fs_yp, NFFT=32, noverlap=31)
    
    title("%d: %s" % (i,str(h)))
    return yp

i = 0
h = hl.merged[i]

yp = plot_h(h)
None