What is a practical way of generating low resolution images of Milky Way using interferometry with 2 x 150cm dishes at 1420MHz?
What is a practical way of generating low resolution images of Milky Way using interferometry with 2 x 150cm dishes at 1420MHz?
Following suggested process and Python script comes from ChatGPT:
Great question, Andrew. With only two 1.5 m dishes at 1420 MHz you can still make recognisably “blocky” Milky Way H I maps by doing single-baseline, Earth-rotation synthesis and stacking drift scans. Here’s a practical recipe that works with hobby gear (e.g., RSPduo or two locked SDRs).
Hardware/setup
- Dishes & LNA: 2 × 1.5 m offset dishes, low-loss feed + LNA at the feed (NF ≤ 0.6 dB), 1420 MHz band-pass (≈ 5–10 MHz wide).
- Baseline: Start short: B = 3–6 m. This keeps sensitivity to degree-scale structure (fringe spacing θ ≈ λ/B → 2.0–4.0° at 1420 MHz where λ≈0.21 m). Longer baselines (20–100 m) will over-resolve the extended H I.
- Clocking: Common 10 MHz + 1 PPS (GPSDO) to both receivers for coherent correlation.
- Pointing: Fix both dishes to the same az/el. No tracking needed—use drift scans.
- Polarisation: Match both (e.g., linear H).
- Bandwidth / rate: 2–4 MHz bandwidth into an FX correlator; channelise to 5–10 kHz for ~1–2 km s⁻¹ velocity resolution.
Calibration (quick but effective)
- Delay & phase: Observe a bright unresolved continuum source (Cyg A, Cas A) for 10–20 min near transit. Solve per-channel delay/phase; save solutions.
- Bandpass: Use the same scan for bandpass amplitude.
- Absolute gain (optional): Inject a weak broadband noise diode (on/off) into both chains to get K/Jy, or just work in relative units.
Observing plan (to build a 2D “low-res” map)
- Declination strips: Choose 3–5 elevations that correspond to different declinations crossing the Galactic plane (e.g., b≈0° spans). At each elevation, run a 3–5 hr drift centred near LST that puts the Galactic centre/plane through the beam.
- Earth-rotation synthesis: Over a few hours the projected baseline rotates in u-v a bit; with a short baseline this mostly gives you a 1-D spatial frequency along RA. Multiple declination strips give you coarse 2-D coverage.
- Repeat nights: Do the same set on 2–3 nights to improve SNR and fill LST gaps.
Data reduction (line imaging, simple & robust)
- Correlate: Cross-correlate complex voltages → complex visibilities Vs(ν, t). Apply delay/phase/bandpass solutions.
- Continuum removal: For each time/declination strip, estimate continuum using line-free channels and subtract to isolate H I line emission per channel.
- Grid visibilities: Grid amplitude (or real part after fringe-stopping) vs hour angle for each frequency channel.
- Make 1-D images per strip: Inverse-FT the gridded data along RA to get a 1-D brightness profile convolved with the dish beam and fringe response.
- Assemble 2-D map: Stack those 1-D strips by declination (simple nearest-neighbour or Gaussian gridding). Convolve the final cube to a common ~3–5° beam (your dish HPBW is ~10°; baseline gives ~2–4° fringes; smoothing tames artefacts).
- Velocity integration: Produce:
- Moment-0 (integrated intensity) over, say, −150 to +150 km s⁻¹.
- Channel maps (e.g., 10 km s⁻¹ slices) to show Galactic rotation features.
 
Practical numbers & tips
- Dish beam: HPBW ≈ 1.22 λ/D ≈ 1.22×0.21/1.5 ≈ 0.17 rad ≈ 10°. Your final pixel size of 2–3° is realistic.
- Baseline choice:
- 3 m → fringe spacing ≈ 4.0°
- 5 m → 2.4°
 Start at 4–5 m; shorten if the plane looks “stripy” (over-resolved).
 
- Integration: With 2–4 MHz total BW and 5–10 kHz channels, 1–2 s correlator dumps averaged to 10–30 s per sample works well.
- Fringe-stopping: For each pointing, “stop” the phase on the field centre to keep the Galactic emission mostly in the real part (simplifies the 1-D inversion).
- RFI: Use median filtering across frequency, flag channels with impulsive bursts, and excise known LTE/ISM bands.
- Absolute LSRK velocities: Apply Doppler correction (LSRK) so channels line up night-to-night.
Software outline (fits your existing stack)
- Front-end: GNU Radio (or SoapySDR) to acquire dual streams; polyphase filterbank (PFB) → complex spectra.
- FX correlator: Multiply one arm by the complex conjugate of the other per channel, average to desired dump time.
- Calibration: Apply saved per-channel delay/phase. Solve with a simple least-squares on the calibrator scan.
- Mapping: Python (NumPy/SciPy/Astropy) to grid visibilities, 1-D iFFT per strip, stack to 2-D, then smooth. Optional 1-D CLEAN along RA before stacking reduces ringing.
What you’ll get
- A coarse 2–5° resolution H I map of the Galactic plane showing the bright inner Galaxy and major spiral arm ridges in selected velocity slices—surprisingly recognisable, despite only one baseline.
If you want, I can sketch a ready-to-run correlator + mapper script tailored to your RSPduo setup at 1420.405 MHz, 3–5 m baseline, 2 MHz BW, 10 kHz channels, and output FITS moment maps.
Done! I dropped a ready-to-run Python correlator/mapper into the canvas below.
#!/usr/bin/env python3
“””
Single-baseline H I (1420.405 MHz) FX-correlator + drift-scan mapper
for two time-synchronous complex baseband recordings (e.g., from SDRplay RSPduo).
Usage examples
————–
1) Calibrate on a continuum source (solve per-channel delay/phase):
python hi_single_baseline.py calibrate \
–a a_calib.c64 –b b_calib.c64 \
–fs 2000000 –fc 1420405000 –chan 10000 \
–dump 10 \
–site-lat 52.682 –site-lon -1.825 –site-alt 110 \
–start “2025-10-20T21:00:00Z” \
–ra 19.991 –dec 40.733 \
–out calib_cygA
2) Correlate a science drift strip with optional application of calibration:
python hi_single_baseline.py correlate \
–a a_strip1.c64 –b b_strip1.c64 \
–fs 2000000 –fc 1420405000 –chan 10000 \
–dump 10 \
–site-lat 52.682 –site-lon -1.825 –site-alt 110 \
–start “2025-10-20T22:10:00Z” \
–ra 17.761 –dec -28.94 \
–baseline-e 5.0 –baseline-n 0.0 –baseline-u 0.0 \
–apply-cal calib_cygA.npz \
–out strip_dec_-29
3) Assemble several correlated strips into a coarse RA×Dec cube and make a
moment-0 FITS map:
python hi_single_baseline.py assemble \
–inputs strip_dec_-29_corr.npz strip_dec_-15_corr.npz strip_dec_+0_corr.npz \
–smooth-deg 3.0 –out milkyway_cube
I/O format
———-
– Input complex files are raw interleaved float32 IQ (complex64), little-endian,
contiguous (GNU Radio File Sink default for complex64).
– Outputs are NPZ (NumPy) with metadata + optional FITS for images.
Design notes
————
– FX correlator: channelize with an N-point FFT where N = Fs/Δf (e.g., 200 for 2 MHz/10 kHz),
multiply spectrum_A * conj(spectrum_B), average over dump.
– Fringe stopping: rotate visibilities by -2π f * τ_geom for the phase centre, per time and channel.
– Continuum removal: estimate a baseline from designated line-free channels and subtract.
– Imaging: produce 1-D RA brightness per strip by taking the real-part (after fringe-stop) vs time,
then regrid onto RA; finally stack strips by Dec to produce a coarse 2-D cube.
Dependencies
————
Python 3.9+, numpy, scipy, astropy, tqdm
“””
from __future__ import annotations
import argparse
import os
import sys
import math
import struct
from dataclasses import dataclass
from typing import Tuple, Optional, List
import numpy as np
from numpy.typing import NDArray
try:
from astropy.io import fits
from astropy.time import Time
from astropy.coordinates import EarthLocation, SkyCoord, AltAz
import astropy.units as u
ASTROPY_OK = True
except Exception:
ASTROPY_OK = False
try:
from tqdm import tqdm
except Exception:
def tqdm(x, **kwargs):
return x
# —————————— Utility ———————————
@dataclass
class Site:
lat_deg: float
lon_deg: float
alt_m: float
@dataclass
class BaselineENU:
E: float
N: float
U: float
@dataclass
class Meta:
fs: float
fc: float
chan_hz: float
dump_s: float
nchan: int
start_utc: str
ra_deg: float
dec_deg: float
site: Site
baseline: BaselineENU
# —————————— Low-level I/O —————————-
def read_c64_chunk(fh, ncomplex: int) -> Optional[NDArray[np.complex64]]:
buf = fh.read(ncomplex * 8)
if not buf or len(buf) < ncomplex*8:
return None
arr = np.frombuffer(buf, dtype=np.float32)
arr = arr.view(np.complex64)
return arr
# —————————— DSP helpers —————————–
def next_pow2(n: int) -> int:
return 1 << (n – 1).bit_length()
def pfb_fft_frames(x: NDArray[np.complex64], nfft: int, win: NDArray[np.float32]) -> NDArray[np.complex64]:
# Simple windowed FFT per contiguous block of nfft samples
nblocks = len(x) // nfft
if nblocks == 0:
return np.empty((0, nfft), np.complex64)
xb = x[: nblocks * nfft].reshape(nblocks, nfft)
xb = xb * win
X = np.fft.fftshift(np.fft.fft(xb, axis=1), axes=1)
return X.astype(np.complex64, copy=False)
def geom_delay_seconds(t_utc: float, site: Site, baseline: BaselineENU, ra_deg: float, dec_deg: float) -> float:
“””Return geometric delay (seconds) for baseline ENU to a source at (ra,dec).
We approximate using ENU dot s_hat in topocentric coordinates.
“””
if not ASTROPY_OK:
return 0.0
t = Time(t_utc, format=’unix’, scale=’utc’)
loc = EarthLocation(lat=site.lat_deg*u.deg, lon=site.lon_deg*u.deg, height=site.alt_m*u.m)
sc = SkyCoord(ra=ra_deg*u.deg, dec=dec_deg*u.deg, frame=’icrs’)
altaz = sc.transform_to(AltAz(obstime=t, location=loc))
# Direction cosines in ENU frame
az = altaz.az.to_value(u.rad)
el = altaz.alt.to_value(u.rad)
s_e = math.cos(el) * math.sin(az)
s_n = math.cos(el) * math.cos(az)
s_u = math.sin(el)
# Project baseline (meters) onto direction cosines and divide by c
c = 299792458.0
proj_m = baseline.E * s_e + baseline.N * s_n + baseline.U * s_u
return proj_m / c
def fringe_stop_phase(nu_hz: NDArray[np.float64], tau_s: float) -> NDArray[np.complex64]:
ph = -2.0 * np.pi * nu_hz * tau_s
return np.exp(1j * ph).astype(np.complex64)
# —————————— Calibration —————————–
def solve_delay_phase(calA: NDArray[np.complex64], calB: NDArray[np.complex64]) -> Tuple[NDArray[np.float32], NDArray[np.float32]]:
“””Solve per-channel relative delay/phase between A and B from cross-spectrum.
Returns (delay_ps, phase_rad) across channels.
“””
# Average over time frames
cs = np.mean(calA * np.conj(calB), axis=0)
# Unwrap phase across channels
ph = np.unwrap(np.angle(cs).astype(np.float64))
# Estimate slope (delay) via linear fit: phase = -2π ν τ + φ0
n = len(ph)
nu = np.linspace(-0.5, 0.5, n, endpoint=False) # normalized
A = np.vstack([nu, np.ones_like(nu)]).T
tau_norm, phi0 = np.linalg.lstsq(A, ph, rcond=None)[0] # phase ≈ tau_norm*nu + phi0
# Convert normalized slope to delay: dφ/dν = -2π τ ; here ν spans BW, so scale later in apply
return np.full(n, tau_norm, np.float32), np.full(n, phi0, np.float32)
def apply_calibration(XA: NDArray[np.complex64], XB: NDArray[np.complex64], tau_norm: NDArray[np.float32], phi0: NDArray[np.float32]) -> Tuple[NDArray[np.complex64], NDArray[np.complex64]]:
nchan = XA.shape[1]
nu_norm = np.linspace(-0.5, 0.5, nchan, endpoint=False).astype(np.float32)
ph = tau_norm * nu_norm + phi0
rot = np.exp(-1j * ph).astype(np.complex64)
# Rotate B to align with A
XB_cal = XB * rot
return XA, XB_cal
# —————————— Main operations ————————-
def do_calibrate(args):
meta = prepare_meta(args)
nfft = int(round(meta.fs / meta.chan_hz))
if meta.fs % meta.chan_hz != 0:
print(f”[warn] Fs/chan is not integer; using NFFT={nfft}”)
win = np.hanning(nfft).astype(np.float32)
dump_samp = int(round(meta.dump_s * meta.fs))
with open(args.a, ‘rb’) as fa, open(args.b, ‘rb’) as fb:
XA_all = []
XB_all = []
n_to_read = dump_samp # one dump is enough for solve; but read a few
max_frames = int(5 * meta.fs // dump_samp) + 1
frames = 0
while frames < max_frames:
a = read_c64_chunk(fa, n_to_read)
b = read_c64_chunk(fb, n_to_read)
if a is None or b is None:
break
XA = pfb_fft_frames(a, nfft, win)
XB = pfb_fft_frames(b, nfft, win)
m = min(len(XA), len(XB))
if m == 0:
break
XA_all.append(XA[:m])
XB_all.append(XB[:m])
frames += 1
if not XA_all:
raise RuntimeError(“No data read for calibration.”)
XA = np.vstack(XA_all)
XB = np.vstack(XB_all)
tau_norm, phi0 = solve_delay_phase(XA, XB)
np.savez_compressed(args.out + ‘.npz’, tau_norm=tau_norm, phi0=phi0, meta=meta.__dict__)
print(f”Saved calibration to {args.out}.npz”)
def prepare_meta(args) -> Meta:
nchan = int(round(args.fs / args.chan))
return Meta(
fs=float(args.fs),
fc=float(args.fc),
chan_hz=float(args.chan),
dump_s=float(args.dump),
nchan=nchan,
start_utc=args.start,
ra_deg=float(args.ra),
dec_deg=float(args.dec),
site=Site(args.site_lat, args.site_lon, args.site_alt),
baseline=BaselineENU(args.baseline_e, args.baseline_n, args.baseline_u),
)
def do_correlate(args):
meta = prepare_meta(args)
nfft = int(round(meta.fs / meta.chan_hz))
win = np.hanning(nfft).astype(np.float32)
dump_samp = int(round(meta.dump_s * meta.fs))
# Frequencies for FFT bins (Hz), fftshifted
freqs = (np.arange(nfft) – nfft/2) * (meta.fs / nfft) + meta.fc
# Load calibration if provided
tau_norm = phi0 = None
if args.apply_cal:
cal = np.load(args.apply_cal, allow_pickle=True)
tau_norm = cal[‘tau_norm’]
phi0 = cal[‘phi0’]
# Output containers
times_unix = []
vis = [] # complex visibility per dump and per channel
t0 = Time(meta.start_utc, format=’isot’, scale=’utc’).unix if ASTROPY_OK else 0.0
with open(args.a, ‘rb’) as fa, open(args.b, ‘rb’) as fb:
samp_read = 0
dump_idx = 0
while True:
a = read_c64_chunk(fa, dump_samp)
b = read_c64_chunk(fb, dump_samp)
if a is None or b is None:
break
XA = pfb_fft_frames(a, nfft, win)
XB = pfb_fft_frames(b, nfft, win)
m = min(len(XA), len(XB))
if m == 0:
break
# Average spectra over frames inside the dump window
XA_mean = np.mean(XA[:m], axis=0)
XB_mean = np.mean(XB[:m], axis=0)
if tau_norm is not None:
XA_mean, XB_mean = apply_calibration(XA_mean[None, :], XB_mean[None, :], tau_norm, phi0)
XA_mean = XA_mean[0]
XB_mean = XB_mean[0]
# Raw cross-spectrum (per channel)
cs = XA_mean * np.conj(XB_mean)
# Fringe stopping at phase centre
if ASTROPY_OK:
t_dump_center = t0 + (dump_idx + 0.5)*meta.dump_s
tau_g = geom_delay_seconds(t_dump_center, meta.site, meta.baseline, meta.ra_deg, meta.dec_deg)
rot = fringe_stop_phase(freqs, tau_g)
cs *= rot
vis.append(cs.astype(np.complex64))
times_unix.append(t0 + dump_idx*meta.dump_s)
dump_idx += 1
vis = np.vstack(vis) if vis else np.empty((0, nfft), np.complex64)
times_unix = np.array(times_unix, dtype=np.float64)
# Save raw correlated product
base = args.out + ‘_corr’
np.savez_compressed(base + ‘.npz’, vis=vis, times_unix=times_unix, freqs=freqs.astype(np.float64), meta=meta.__dict__)
print(f”Saved visibilities to {base}.npz shape={vis.shape}”)
# Continuum removal (simple): identify line-free channels from edges
edge = max(4, int(0.15 * nfft))
mask = np.zeros(nfft, dtype=bool)
mask[:edge] = True
mask[-edge:] = True
# baseline from masked channels per time
cont = np.median(vis[:, mask].real, axis=1, keepdims=True)
vis_line = vis.copy()
vis_line.real -= cont
np.savez_compressed(base + ‘_line.npz’, vis=vis_line, times_unix=times_unix, freqs=freqs.astype(np.float64), meta=meta.__dict__)
print(f”Saved continuum-subtracted vis to {base}_line.npz”)
# 1-D RA regridding for simple strip image (moment-0)
if ASTROPY_OK:
# Convert time to RA at phase centre: RA(t) ≈ RA0 + HA_rate * (t – t_mid)
tmid = 0.5*(times_unix[0] + times_unix[-1]) if len(times_unix) > 1 else times_unix[0]
# Earth rotates 15 deg/hr in HA; RA axis spacing per dump is ~15 deg/hr * dump_s/3600
dra_per_dump = 15.0 * (meta.dump_s / 3600.0) # degrees of HA per dump
ra_axis = (np.arange(len(times_unix)) – (len(times_unix)-1)/2.0) * dra_per_dump + meta.ra_deg
# Moment-0 over a velocity window (default full band)
mom0_real = np.sum(vis_line.real, axis=1)
# Save strip as FITS vector with RA axis
if ASTROPY_OK:
hdu = fits.PrimaryHDU(mom0_real.astype(np.float32))
hdr = hdu.header
hdr[‘CTYPE1’] = ‘RA—TAN’
hdr[‘CUNIT1’] = ‘deg’
hdr[‘CRVAL1’] = float(meta.ra_deg)
hdr[‘CRPIX1’] = (len(ra_axis)+1)/2.0
hdr[‘CDELT1’] = dra_per_dump
hdr[‘OBJECT’] = ‘HI_STRIP’
hdr[‘BUNIT’] = ‘arb’
hdr[‘COMMENT’] = ‘Moment-0 (sum over channels) of continuum-subtracted real(vis)’
fn = base + ‘_strip_m0.fits’
hdu.writeto(fn, overwrite=True)
print(f”Saved strip moment-0 FITS: {fn}”)
def do_assemble(args):
# Load several *_corr_line.npz strips and grid to Dec × RA
strips = []
ras_all = []
decs = []
for fn in args.inputs:
z = np.load(fn, allow_pickle=True)
vis = z[‘vis’]
times = z[‘times_unix’]
meta = z[‘meta’].item()
# RA axis for this strip
if not ASTROPY_OK:
raise RuntimeError(‘Astropy required for assemble step.’)
dump_s = float(meta[‘dump_s’])
ra0 = float(meta[‘ra_deg’])
dra_per_dump = 15.0 * (dump_s / 3600.0)
ra_axis = (np.arange(len(times)) – (len(times)-1)/2.0) * dra_per_dump + ra0
# Strip moment-0 vector
m0 = np.sum(vis.real, axis=1).astype(np.float32)
strips.append((ra_axis, m0))
ras_all.append(ra_axis)
decs.append(float(meta[‘dec_deg’]))
# Define common RA grid
ra_min = max([ra.min() for ra, _ in strips])
ra_max = min([ra.max() for ra, _ in strips])
n_ra = int((ra_max – ra_min) / 0.5) + 1 # 0.5 deg grid
ra_grid = np.linspace(ra_min, ra_max, n_ra, endpoint=True)
# Grid strips
decs_arr = np.array(decs, dtype=np.float32)
order = np.argsort(decs_arr)
decs_sorted = decs_arr[order]
img = np.zeros((len(strips), n_ra), dtype=np.float32)
for i, idx in enumerate(order):
ra_axis, m0 = strips[idx]
m0i = np.interp(ra_grid, ra_axis, m0, left=np.nan, right=np.nan)
# Simple nan->nearest fill
mask = np.isnan(m0i)
if np.any(mask):
m0i[mask] = np.interp(ra_grid[mask], ra_grid[~mask], m0i[~mask])
img[i, :] = m0i
# Optional smoothing across Dec
if args.smooth_deg > 0:
from scipy.ndimage import gaussian_filter
# Convert desired smoothing (deg) to pixels; assume strips ~constant spacing
if len(decs_sorted) > 1:
d_dec = np.median(np.diff(decs_sorted))
sigma_pix = max(0.0, args.smooth_deg / max(d_dec, 1e-6)) / 2.355
if sigma_pix > 0:
img = gaussian_filter(img, sigma=(sigma_pix, 0.0))
# Save FITS image (Dec × RA)
if ASTROPY_OK:
hdu = fits.PrimaryHDU(img.astype(np.float32))
hdr = hdu.header
hdr[‘CTYPE1’] = ‘RA—TAN’
hdr[‘CUNIT1’] = ‘deg’
hdr[‘CRVAL1’] = float(ra_grid[0])
hdr[‘CRPIX1’] = 1.0
hdr[‘CDELT1’] = float(ra_grid[1] – ra_grid[0]) if len(ra_grid) > 1 else 1.0
hdr[‘CTYPE2’] = ‘DEC–TAN’
hdr[‘CUNIT2’] = ‘deg’
hdr[‘CRVAL2’] = float(decs_sorted[0])
hdr[‘CRPIX2’] = 1.0
hdr[‘CDELT2’] = float(decs_sorted[1] – decs_sorted[0]) if len(decs_sorted) > 1 else 1.0
hdr[‘BUNIT’] = ‘arb’
hdr[‘OBJECT’] = ‘MW_HI_M0’
fn = args.out + ‘_m0.fits’
hdu.writeto(fn, overwrite=True)
print(f”Saved assembled moment-0 FITS map: {fn}”)
# —————————— CLI ————————————-
def build_parser():
p = argparse.ArgumentParser(description=’Single-baseline H I correlator and mapper’)
sub = p.add_subparsers(dest=’cmd’, required=True)
common = argparse.ArgumentParser(add_help=False)
common.add_argument(‘–a’, required=True, help=’Path to antenna A complex64 file’)
common.add_argument(‘–b’, required=True, help=’Path to antenna B complex64 file’)
common.add_argument(‘–fs’, type=float, default=2_000_000, help=’Sample rate (Hz), e.g. 2e6′)
common.add_argument(‘–fc’, type=float, default=1_420_405_000, help=’Center frequency (Hz)’)
common.add_argument(‘–chan’, type=float, default=10_000, help=’Channel width (Hz)’)
common.add_argument(‘–dump’, type=float, default=10.0, help=’Visibility dump time (s)’)
common.add_argument(‘–site-lat’, dest=’site_lat’, type=float, required=True, help=’Site latitude (deg)’)
common.add_argument(‘–site-lon’, dest=’site_lon’, type=float, required=True, help=’Site longitude (deg)’)
common.add_argument(‘–site-alt’, dest=’site_alt’, type=float, default=100.0, help=’Site altitude (m)’)
common.add_argument(‘–baseline-e’, type=float, default=5.0, help=’Baseline East (m)’)
common.add_argument(‘–baseline-n’, type=float, default=0.0, help=’Baseline North (m)’)
common.add_argument(‘–baseline-u’, type=float, default=0.0, help=’Baseline Up (m)’)
common.add_argument(‘–start’, required=True, help=’Recording start UTC, ISO e.g. 2025-10-20T22:10:00Z’)
common.add_argument(‘–ra’, type=float, required=True, help=’Phase-centre Right Ascension (deg)’)
common.add_argument(‘–dec’, type=float, required=True, help=’Phase-centre Declination (deg)’)
common.add_argument(‘–out’, required=True, help=’Output prefix (no extension)’)
pc = sub.add_parser(‘calibrate’, parents=[common], help=’Solve per-channel delay/phase on a bright calibrator’)
pr = sub.add_parser(‘correlate’, parents=[common], help=’Correlate and fringe-stop a drift strip’)
pr.add_argument(‘–apply-cal’, help=’Calibration npz from “calibrate” step’)
pa = sub.add_parser(‘assemble’, help=’Assemble multiple strips into a moment-0 FITS map’)
pa.add_argument(‘–inputs’, nargs=’+’, required=True, help=’List of *_line.npz strip files to assemble’)
pa.add_argument(‘–smooth-deg’, type=float, default=3.0, help=’Gaussian smoothing across Dec (deg FWHM)’)
pa.add_argument(‘–out’, required=True, help=’Output prefix for FITS map’)
return p
def main(argv=None):
argv = sys.argv[1:] if argv is None else argv
p = build_parser()
args = p.parse_args(argv)
if args.cmd == ‘calibrate’:
do_calibrate(args)
elif args.cmd == ‘correlate’:
do_correlate(args)
elif args.cmd == ‘assemble’:
do_assemble(args)
else:
p.error(‘Unknown command’)
if __name__ == ‘__main__’:
main()
The script above does the following:
- Ingests two complex64 IQ recordings (A/B) from your RSPduo flowgraph.
- Does FX correlation with 10 kHz channels over 2 MHz, dumps every N seconds.
- Fringe-stops at your chosen phase centre using site + ENU baseline.
- Saves calibrated visibilities and a continuum-subtracted set.
- Exports a 1-D strip FITS (moment-0) and can assemble several strips into a coarse RA×Dec moment-0 FITS map.
How to record the inputs in GNU Radio (quick recipe)
Use two parallel SDRplay RSPduo sources sharing the same LO/clock (master/slave mode), each feeding:
- Throttle disabled (hardware timed).
- Complex Float32 stream at Fs = 2.000 MS/s, Fc = 1420.405 MHz, same gain settings, same IF mode.
- Optional: 5–10 MHz RF BPF + digital low-pass/decimate to 2.0 MHz.
- (Optional) PFB Channelizer if you prefer pre-channelization, but the script already channelizes.
- File Sink (complex64) → a_YYYYmmddThhmmssZ.c64andb_YYYYmmddThhmmssZ.c64.
Tips:
- Enable external 10 MHz + 1 PPS (GPSDO) to lock both tuners.
- Start both file sinks at the same flowgraph time (one top-level flowgraph with two sources is easiest).
- Note the exact UTC start to pass as --start.
Minimal end-to-end run
- Calibrator scan (e.g., Cyg A near transit):
python hi_single_baseline.py calibrate \
  --a a_calib.c64 --b b_calib.c64 \
  --fs 2000000 --fc 1420405000 --chan 10000 --dump 10 \
  --site-lat 52.682 --site-lon -1.825 --site-alt 110 \
  --start "2025-10-30T20:15:00Z" \
  --ra 299.868 --dec 40.733 \
  --out calib_cygA
- Science drift strip (choose RA/Dec near your strip centre; baseline ~E=5 m, N=0):
python hi_single_baseline.py correlate \
  --a a_strip1.c64 --b b_strip1.c64 \
  --fs 2000000 --fc 1420405000 --chan 10000 --dump 10 \
  --site-lat 52.682 --site-lon -1.825 --site-alt 110 \
  --start "2025-10-30T22:10:00Z" \
  --ra 266.405 --dec -28.94 \
  --baseline-e 5.0 --baseline-n 0.0 --baseline-u 0.0 \
  --apply-cal calib_cygA.npz \
  --out strip_dec_-29
This writes:
- strip_dec_-29_corr.npz(raw vis)
- strip_dec_-29_corr_line.npz(continuum-subtracted)
- strip_dec_-29_corr_strip_m0.fits(1-D moment-0 along RA)
- Assemble multiple strips (repeat step 2 at a few declinations; then):
python hi_single_baseline.py assemble \
  --inputs strip_dec_-29_corr_line.npz strip_dec_-15_corr_line.npz strip_dec_+0_corr_line.npz \
  --smooth-deg 3.0 \
  --out milkyway_cube
This outputs milkyway_cube_m0.fits (coarse Milky Way H I map).