There's a band of ISM (Industry, Science, and Medicine) spectrum at 915ish MHz. ISM spectrum is a place where unlicensed devices can operate under FCC part 15 restrictions (or part 18 if they're licenced). This is where all those "900MHz" phones and whatnot live.

There's potentially a lot of interesting stuff lurking in this band: running a simple flowgraph to watch the waterfall of it shows a bunch of tiny packets flying around. Unfortunately, it's really hard to figure out where/when they are, so you're left dumping the spectrum to a file, and then post-processing it. Doing that in GRC is a bit ugly, so I decided to pull the data into IPython and see what I could do over here.

Part of this is that we don't know what we're looking for, or where. To make it easier to work with, we'll capture a buuuunch of data, then find the packets within it iteratively. This is very memory-conserving: doing the whole thing at once runs off into Bad Places in terms of performance. Note that GNU Radio's native components don't have this issue, which is the main benefit of its architecture. However, I find writing a GNU Radio block to test my code a bit tedious/distracting from the actual goal. This file is a great clean-up of the five(!) different notebooks I made in the process of figuring out reasonable ways of solving these problems; mixing in blocks probably would have just made it even more scattershot in the first versions.

(NB: the code below takes in a complex short integer input. If you're capturing in the usual complex format, you'll need to adjust the fetch_x() function to handle the appropriate input format. It'll probably be just a symbol size of 2*4 bytes, then fromfile("foo.sdr", dtype="complex64"), skipping the subsequent hoops I have to jump through to go from two signed ints to a reasonably-sized complex value.)

In [430]:

```
%pylab inline
pylab.rcParams["figure.figsize"] = 14,4
import scipy
import scipy.signal
import gc
import os
import json
```

In [431]:

```
# We're going to need a wackypants way to get to the right places here
FILENAME="912_snoop_si16.sdr"
SAMPLE_SIZE = 2*2 # make this 2*4 if you're using complex inputs
Fs = 4e6 # Sample rate of our input
Fc = 912e6 # Carrier frequency of the input; not really used, but good to know.
def fetch_x(sample_offset, sample_count):
f = file(FILENAME)
f.seek(SAMPLE_SIZE*sample_offset)
b = f.read(SAMPLE_SIZE*sample_count)
f.close()
# If you're using complex input, just do
# return np.frombuffer(b, dtype="complex64")
x = np.frombuffer(b, dtype="int16")
x = x.reshape(len(x)/2,2)
xc = (x[:,0] + 1.0j*x[:,1]).astype("complex64")
del(x)
gc.collect()
return xc
s = os.stat(FILENAME)
file_length_samples = s.st_size / SAMPLE_SIZE
file_length_samples
```

Out[431]:

Before we get to the process of finding the actual packets, we need to create the structures that will contain the information about those packets.

The core unit here is the "Hit", which is just a frequency+bandwidth with sample_start+sample_length. That is: it's a bounding box on the waterfall plot of the input file. We can then look for places where these butt into each other, and merge them together. This is generally only going to happen when we have a partial hit for a given signal. We'll get those partials any time a packet happens to cross one of our sample buffer blocks.

There's also a HitList class, which is just an array of Hits that knows what to do when merging two Hits together, and has an algorithm to find merge candidates. This could be improved a bunch by moving to something like a quadtree for the initial pass, and stuffing them in as we go along, but, that's for later.

In [432]:

```
class Hit:
"""Container for a single packet "hit" in the data stream
We use these to keep notes on where in the RF stream a packet might be found.
They key variables here are:
* Fc -- center frequency
* bw -- bandwidth
* Fs -- sample rate of the original data stream
* sample_start -- offset in the original system's sample stream where it's found
* sample_end -- length of the sample in the origin's sample rate
"""
def __init__(self, Fs, Fc, bw, sample_start, sample_length, bw_fudge=None, sample_start_fudge=2000, sample_length_fudge=8000):
"""Create a new Hit entry
* Fs -- sample rate of original system
* Fc -- center frequency of packet
* bw -- bandwidth of packet
* sample_start -- initial sample position of packet (in Fs)
* sample_length -- length of packet (in Fs)
* bw_fudge -- amount to grow the bandwidth for purposes of capture and matching (default: +2x top and bottom, so 5x)
* sample_start_fudge -- amount to grow the sample span at its head
* sample_length_fudge -- amount to grow the sample span at its tail
"""
self.Fs = Fs
self.Fc = Fc
self.bw = bw
self.bw_fudge = bw_fudge
self.sample_start_fudge = sample_start_fudge
self.sample_length_fudge = sample_length_fudge
if None == bw_fudge:
#self.bw += 4*bw
self.bw_fudge = 2*bw
else:
#self.bw += bw_fudge
pass
self.sample_start = sample_start # - sample_start_fudge
self.sample_length = sample_length # + sample_length_fudge
def f_low(self, fudge=True):
"Low end of the frequency span for this Hit, including fudge factors"
fudge = self.bw_fudge if fudge else 0
return self.Fc - self.bw/2 - fudge
def f_high(self, fudge=True):
"High end of the frequency span for this Hit, including fudge factors"
fudge = self.bw_fudge if fudge else 0
return self.Fc + self.bw/2 + fudge
def in_band(self, f):
"Query if the given frequency is within the frequency span for this Hit"
return self.between(self.f_low(), self.f_high(), f)
def s_low(self, fudge=True):
"Sample offset of the start of the packet in the origin stream (measured in source samples)"
fudge = self.sample_start_fudge if fudge else 0
return int(self.sample_start - fudge)
def s_high(self, fudge=True):
"Sample offset of the end of the packet in the origin stream (measured in source samples)"
fudge = self.sample_length_fudge if fudge else 0
return int(self.sample_start+self.sample_length + fudge)
def s_length(self, fudge=True):
return self.s_high(fudge) - self.s_low(fudge)
def in_time(self, s):
"Query if the given sample number is within the frequency span for this Hit"
return self.between(self.s_low(), self.s_high(), s)
def between(self, a, b, q):
"Helper method: check if q falls between a and b. Will reorder a/b internally if needed."
if b < a:
t = a
a = b
b = t
return ( (a <= q) and (q <= b) )
def could_merge(self, h2):
"Given another Hit h2, see if they overlap in both samples and frequency."
f0_in = self.in_band(h2.f_low())
f1_in = self.in_band(h2.f_high())
s0_in = self.in_time(h2.s_low())
s1_in = self.in_time(h2.s_high())
if (f0_in or f1_in) and (s0_in or s1_in):
return True
return False
def merge(self, h2):
"Given another Hit h2, find a new Hit that subsumes both in time and freqency."
f_low = min(self.f_low(False), h2.f_low(False))
f_high = max(self.f_high(False), h2.f_high(False))
s_low = min(self.s_low(False), h2.s_low(False))
s_high = max(self.s_high(False), h2.s_high(False))
f_bw = f_high - f_low
s_length = s_high - s_low
return Hit(self.Fs, (f_low+f_high)/2.0, f_bw, s_low, s_high-s_low, self.bw_fudge, self.sample_start_fudge, self.sample_length_fudge)
def __str__(self):
"Helper method."
return "%f / %f, %d + %d" % (self.Fc, self.bw, self.sample_start, self.sample_length)
class HitList:
"""Container for a bunch of Hit entries, and coordinator of merging passes
This is where all the Hits go, and coordinates the merge process for gluing Hits together into larger packets.
"""
def __init__(self):
self.hits = []
self.merged = None # Initialize to None as a sentry for run_merge's bootstrap
def mark(self, Fs, Fc, bw, sample_start, sample_length, **kwargs):
h = Hit(Fs, Fc, bw, sample_start, sample_length, **kwargs)
self.hits.append(h)
def _do_merge(self, hits):
"Helper method: runs a merge pass on the input array of Hits, returns new array of Hits post-merge"
merged = []
for i,h_i in enumerate(hits):
did_merge = False
for m,h_m in enumerate(merged):
if h_i.could_merge(h_m) or h_m.could_merge(h_i):
merged[m] = h_m.merge(h_i)
did_merge = True
break
if not did_merge:
merged.append(h_i)
return merged
def run_merge(self):
"Run a merge pass over the current merged hits, further merging. Uses input if no merge has been run."
if None == self.merged:
l0 = len(self.hits)
self.merged = self._do_merge(self.hits)
else:
l0 = len(self.merged)
self.merged = self._do_merge(self.merged)
return (l0, len(self.merged))
hl = HitList()
```

This is where the first bit of magic happens: we find all the packets in the original file. This is done by buffering in blocks of data, identifying statistical outliers, and then running a state machine to extract adjacent outliers.

Once we have that list, we can start looking at each packet in detail.

To find the packets without loading the whole file into memory at the same time, we work through it in large-ish chunks, finding the Hits within each chunk. Then, after we've gone through the whole file, we run the merge algorithm over the HitList to get down to the minimal set of packets in the entire file.

To ensure we don't miss anything, we take buffers which overlap each other by a bit. This results in redundant Hits in a lot of cases, but that's fine: the merge algorithm will just take care of them.

To find candidate packets, we basically apply a simplistic AGC+squelch to each buffer of data. We first run specgram to get the spectrogram of the data. You can think of a spectrogram as a waterfall display turned into a two dimensional array of data. It's basically (frequency bin, time bin) => energy.

We sample a bunch of points of that data to find out what the distribution of energy for each (frequency,time) bin is. We build an Empirical Cumulative Density Function (ECDF) of the data from this sample, so we have a rough idea where the Nth percentile falls. We then go through the data and threshold it as being 98th percentile or above, or below 98th percentile. This gives us another 2-D array of (frequency,time) -> {above,below}

The 98th percentile here is entirely arbitrary, and may not be appropriate if you're trying to monitor a band where more than 2% of the frequencies are in use. You can try twiddling it around for our local conditions. Similarly, the sample size used to determine the 98th percentile is arbitrary, though you want it to be reasonably large. 1000 has served me well enough, but I've also used 10k in this project, without really noticing any differences.

Also, the width of the frequency bins may matter. You want to have a rough idea what the minimum target bandwidth you're after is, as a very strong narrow signal will get washed out if your bins are much larger than it.

In [433]:

```
target_bw = 50000
max_bw = 200000
N_sample = 1000
p_cutoff = 0.95
nFFTs = int(2**(np.ceil(np.log2(Fs/target_bw*2)))) # We want buckets of at most target_bw/2 Hz
bin_bw = Fs/nFFTs
minrun = 10
samples_per_block = nFFTs*1000
samples_per_step = samples_per_block - 3 * minrun*nFFTs
def run_step(block_no, hl):
start0 = block_no*samples_per_step
xc = fetch_x(start0, samples_per_block)
a = specgram(xc, NFFT=nFFTs, Fs=Fs, noverlap=0, scale_by_freq=False); None
clf()
# Build a quick ECDF of this block, as a janky sort of AGC
pt_sample = a[0].flatten()[ np.random.uniform(low=0, high=a[0].size, size=N_sample).astype("int") ]
pt_sample.sort()
# Find the p_cutoff'th percentile and use that as our threshold
th = pt_sample[int(p_cutoff*N_sample)]
# A quick transition-based state machine to find occupied channels
states = np.zeros(nFFTs).astype("bool")
starts = np.zeros(nFFTs)
retval = []
for i in range(a[0].shape[1]):
r = (a[0][:,i] > th).astype("bool")
starts[r & (~states)] = i
term_is = (states & ~r).nonzero()[0]
for j in term_is:
if minrun <= i - starts[j]:
hl.mark(Fs, 0.0+a[1][j]+bin_bw/2, bin_bw, start0 + nFFTs*starts[j], nFFTs*(i-starts[j]), bw_fudge=max_bw/2)
starts[j] = 0
states = r
i = a[0].shape[1]
term_is = states.nonzero()[0]
for j in term_is:
if minrun <= i - starts[j]:
hl.mark(Fs, 0.0+a[1][j]+bin_bw/2, bin_bw, start0 + nFFTs*starts[j], nFFTs*(i-starts[j]), bw_fudge=max_bw/2)
starts[j] = 0
gc.collect()
return a[0].shape
```

In [434]:

```
# We run with this weird inside-out pattern to minimize
# possible memory leaks from the processing above. Without
# attention to detail here, you can wind up leaking out
# all the RAM in your system.
for block_no in range(file_length_samples/samples_per_step):
run_step(block_no, hl)
gc.collect()
if 0 == block_no % 25:
print len(hl.hits), file_length_samples, block_no*samples_per_step
len(hl.hits)
```

Out[434]:

Now we've got a bunch of possible packet locations. Unfortunately, we know they're likely to contain redundant Hits, both for packets that straddle the boundaries, as well as for packets whose carrier happened to fall on bin frequency boundaries, and therefore bounced around between bins.

We now run the merge operation, which boils down to "Make each Hit a little bigger in both time and frequency, then see if it overlaps with other packets that are also made a little bigger. When two overlap, make a new Hit that covers both of them and delete the two original Hits. Keep running this process until the number of Hits doesn't change when you run." In theory, you shouldn't see this number change after the first pass, but it's idempotent, so we run it multiple times to make sure we cover any bugs/thinkos in the implementation.

There are so many better ways to do this, but this way is super easy to explain and very simple to code. A much better way to do this is to build a quadtree structure. If you refactor this to do that, could you also make it into a proper gnuradio block while you're at it? Pull requests welcome.

The run_merge() method returns the length before running and the length afterwards, so we can monitor its progress from the outside.

In [435]:

```
hl.run_merge() # This will take a while; it's doing a lot of heavy lifting
```

Out[435]:

In [436]:

```
l0,l1 = hl.run_merge() # Hopefully this is the last merge step, but who knows.
(l0,l1)
```

Out[436]:

In [437]:

```
# And now we just let it run for a bit.
while l0 != l1:
l0,l1 = hl.run_merge()
print (l0,l1)
(l0,l1)
```

Out[437]:

We should now have some packets identified. Time to see what they look like!

In [438]:

```
# [ (h.Fc, h.s_low(), h.s_high()) for h in hl.merged[0:20] ]
```

In [439]:

```
# [ (h.Fc, h.s_low(), h.s_high()) for h in hl.hits[0:10] ]
```

Let's pull out the samples for the first packet and see what it looks like.

In [440]:

```
h = hl.merged[0]
y = fetch_x(h.s_low(), h.s_length())
specgram(y, NFFT=64, noverlap=63, Fs=Fs); None
title(str(h))
```

Out[440]:

In [484]:

```
def extract_channel(f_sample, f_center, f_bw, x):
"""Extract a channel of width f_bw at f_center, from f_sample
Returns Fs,y, the new sample rate and an array of complex samples
"""
# Cribbing from the GNURadio xlate, we use their block diagram
my_state = {} # cheating backchannel for debug
if (f_center + f_bw/2.0) > f_sample / 2.0:
f_bw = f_sample/2.0 - f_center - 1000
elif (f_center - f_bw/2.0) < -f_sample/2.0:
f_bw = f_center-f_sample/2 - 100
# Create a LPF for our target bandwidth
n_taps = 100 # XXX Total random shot in the dark
r_cutoff = float(f_bw)/f_sample
base_lpf = scipy.signal.remez(n_taps, [0, r_cutoff, r_cutoff*2.0, 0.5], [1,0])
# Shift this LPF up to our center frequency
fwT0 = 2.0*np.pi*f_center/f_sample
lpf = base_lpf * np.exp(1.0j*fwT0 * np.arange(0, n_taps))
dec_rate = int(f_sample / (2*f_bw))
x_filtered = scipy.signal.lfilter(lpf, 1.0, x)
dec_is = np.arange(0, len(x_filtered), dec_rate)
y = x_filtered[dec_is]
y *= np.exp(-1.0j*fwT0*dec_is)
my_state["n_taps"] = n_taps
my_state["r_cutoff"] = r_cutoff
my_state["lpf"] = lpf
my_state["base_lpf"] = base_lpf
my_state["fwT0"] = fwT0
my_state["dec_rate"] = dec_rate
# my_state["dec_is"] = dec_is
# my_state["x_filtered"] = x_filtered
return f_sample/dec_rate, y, my_state
```

Now, let's see what that zero'th packet looks like when zoomed in

In [442]:

```
def plot_h(h):
y = fetch_x(h.s_low(), h.s_length())
Fs_yp, yp, _ = extract_channel(Fs, h.Fc, h.bw, y)
a = specgram(yp, Fs=Fs_yp, NFFT=32, noverlap=31)
title("%d: %s" % (i,str(h)))
return { "Fs_yp": Fs_yp, "yp": yp, "a": a }
i = 0
h = hl.merged[i]
plot_h(h)
None
```

To do this, we need to put the center of our packet at baseband (zero Hz), and cut out the noise at the head and tail of the packet. This can be done similarly to the packet extraction itself, but a bit simpler.

First, we create another ECDF, but this time for the entire downsampled packet buffer. From there, we apply a threshold of 98% (98% is arbitrary, like it was last time), and then find the average frequency that's above the threshold. We also find the length of the head and tail times where no frequency is above the threshold. Note that these sample counts are going to be in the sample rate of this packet's buffer, not the overall system rate (ie they're in units of Fs_yp, not Fs).

Once we have these three numbers, we can really focus in on our packet.

In [443]:

```
def plot_h(h):
y = fetch_x(h.s_low(), h.s_length())
Fs_yp, yp, _ = extract_channel(Fs, h.Fc, h.bw, y)
a = specgram(yp, Fs=Fs_yp, NFFT=32, noverlap=31)
clf()
p_threshold = 0.98
energies = a[0].flatten()
energies.sort()
threshold = energies[int(p_threshold*len(energies))]
in_use = a[0] > threshold
# Weighted mean
mean_f = np.sum(np.sum(in_use, axis=1) * a[1])/np.sum(in_use)
times_in_use = np.sum(in_use, axis=0) > 0
head_skip = 0
for t in times_in_use:
if t:
break
head_skip += 1
tail_skip = 0
for t in times_in_use[::-1]:
if t:
break
tail_skip += 1
head_skip_Fs = int(head_skip * Fs / Fs_yp)
tail_skip_Fs = int(tail_skip * Fs / Fs_yp)
y = fetch_x(h.s_low()+head_skip_Fs, h.s_length()-tail_skip_Fs)
Fs_yp, yp, _ = extract_channel(Fs, h.Fc+mean_f, h.bw, y)
a = specgram(yp, Fs=Fs_yp, NFFT=32, noverlap=31)
title("%d: %s" % (i,str(h)))
return yp
i = 0
h = hl.merged[i]
yp = plot_h(h)
None
```