Uncategorized

What is a practical way of generating low resolution images of Milky Way using interferometry with 2 x 150cm dishes at 1420MHz?

What is a practical way of generating low resolution images of Milky Way using interferometry with 2 x 150cm dishes at 1420MHz?

Following suggested process and Python script comes from ChatGPT:

Great question, Andrew. With only two 1.5 m dishes at 1420 MHz you can still make recognisably “blocky” Milky Way H I maps by doing single-baseline, Earth-rotation synthesis and stacking drift scans. Here’s a practical recipe that works with hobby gear (e.g., RSPduo or two locked SDRs).

Hardware/setup

  • Dishes & LNA: 2 × 1.5 m offset dishes, low-loss feed + LNA at the feed (NF ≤ 0.6 dB), 1420 MHz band-pass (≈ 5–10 MHz wide).
  • Baseline: Start short: B = 3–6 m. This keeps sensitivity to degree-scale structure (fringe spacing θ ≈ λ/B → 2.0–4.0° at 1420 MHz where λ≈0.21 m). Longer baselines (20–100 m) will over-resolve the extended H I.
  • Clocking: Common 10 MHz + 1 PPS (GPSDO) to both receivers for coherent correlation.
  • Pointing: Fix both dishes to the same az/el. No tracking needed—use drift scans.
  • Polarisation: Match both (e.g., linear H).
  • Bandwidth / rate: 2–4 MHz bandwidth into an FX correlator; channelise to 5–10 kHz for ~1–2 km s⁻¹ velocity resolution.

Calibration (quick but effective)

  1. Delay & phase: Observe a bright unresolved continuum source (Cyg A, Cas A) for 10–20 min near transit. Solve per-channel delay/phase; save solutions.
  2. Bandpass: Use the same scan for bandpass amplitude.
  3. Absolute gain (optional): Inject a weak broadband noise diode (on/off) into both chains to get K/Jy, or just work in relative units.

Observing plan (to build a 2D “low-res” map)

  • Declination strips: Choose 3–5 elevations that correspond to different declinations crossing the Galactic plane (e.g., b≈0° spans). At each elevation, run a 3–5 hr drift centred near LST that puts the Galactic centre/plane through the beam.
  • Earth-rotation synthesis: Over a few hours the projected baseline rotates in u-v a bit; with a short baseline this mostly gives you a 1-D spatial frequency along RA. Multiple declination strips give you coarse 2-D coverage.
  • Repeat nights: Do the same set on 2–3 nights to improve SNR and fill LST gaps.

Data reduction (line imaging, simple & robust)

  1. Correlate: Cross-correlate complex voltages → complex visibilities Vs(ν, t). Apply delay/phase/bandpass solutions.
  2. Continuum removal: For each time/declination strip, estimate continuum using line-free channels and subtract to isolate H I line emission per channel.
  3. Grid visibilities: Grid amplitude (or real part after fringe-stopping) vs hour angle for each frequency channel.
  4. Make 1-D images per strip: Inverse-FT the gridded data along RA to get a 1-D brightness profile convolved with the dish beam and fringe response.
  5. Assemble 2-D map: Stack those 1-D strips by declination (simple nearest-neighbour or Gaussian gridding). Convolve the final cube to a common ~3–5° beam (your dish HPBW is ~10°; baseline gives ~2–4° fringes; smoothing tames artefacts).
  6. Velocity integration: Produce:
    • Moment-0 (integrated intensity) over, say, −150 to +150 km s⁻¹.
    • Channel maps (e.g., 10 km s⁻¹ slices) to show Galactic rotation features.

Practical numbers & tips

  • Dish beam: HPBW ≈ 1.22 λ/D ≈ 1.22×0.21/1.5 ≈ 0.17 rad ≈ 10°. Your final pixel size of 2–3° is realistic.
  • Baseline choice:
    • 3 m → fringe spacing ≈ 4.0°
    • 5 m → 2.4°
      Start at 4–5 m; shorten if the plane looks “stripy” (over-resolved).
  • Integration: With 2–4 MHz total BW and 5–10 kHz channels, 1–2 s correlator dumps averaged to 10–30 s per sample works well.
  • Fringe-stopping: For each pointing, “stop” the phase on the field centre to keep the Galactic emission mostly in the real part (simplifies the 1-D inversion).
  • RFI: Use median filtering across frequency, flag channels with impulsive bursts, and excise known LTE/ISM bands.
  • Absolute LSRK velocities: Apply Doppler correction (LSRK) so channels line up night-to-night.

Software outline (fits your existing stack)

  • Front-end: GNU Radio (or SoapySDR) to acquire dual streams; polyphase filterbank (PFB) → complex spectra.
  • FX correlator: Multiply one arm by the complex conjugate of the other per channel, average to desired dump time.
  • Calibration: Apply saved per-channel delay/phase. Solve with a simple least-squares on the calibrator scan.
  • Mapping: Python (NumPy/SciPy/Astropy) to grid visibilities, 1-D iFFT per strip, stack to 2-D, then smooth. Optional 1-D CLEAN along RA before stacking reduces ringing.

What you’ll get

  • coarse 2–5° resolution H I map of the Galactic plane showing the bright inner Galaxy and major spiral arm ridges in selected velocity slices—surprisingly recognisable, despite only one baseline.

If you want, I can sketch a ready-to-run correlator + mapper script tailored to your RSPduo setup at 1420.405 MHz, 3–5 m baseline, 2 MHz BW, 10 kHz channels, and output FITS moment maps.

Done! I dropped a ready-to-run Python correlator/mapper into the canvas below.

#!/usr/bin/env python3

“””

Single-baseline H I (1420.405 MHz) FX-correlator + drift-scan mapper

for two time-synchronous complex baseband recordings (e.g., from SDRplay RSPduo).

Usage examples

————–

1) Calibrate on a continuum source (solve per-channel delay/phase):

   python hi_single_baseline.py calibrate \

     –a a_calib.c64 –b b_calib.c64 \

     –fs 2000000 –fc 1420405000 –chan 10000 \

     –dump 10 \

     –site-lat 52.682 –site-lon -1.825 –site-alt 110 \

     –start “2025-10-20T21:00:00Z” \

     –ra 19.991 –dec 40.733 \

     –out calib_cygA

2) Correlate a science drift strip with optional application of calibration:

   python hi_single_baseline.py correlate \

     –a a_strip1.c64 –b b_strip1.c64 \

     –fs 2000000 –fc 1420405000 –chan 10000 \

     –dump 10 \

     –site-lat 52.682 –site-lon -1.825 –site-alt 110 \

     –start “2025-10-20T22:10:00Z” \

     –ra 17.761 –dec -28.94 \

     –baseline-e 5.0 –baseline-n 0.0 –baseline-u 0.0 \

     –apply-cal calib_cygA.npz \

     –out strip_dec_-29

3) Assemble several correlated strips into a coarse RA×Dec cube and make a

   moment-0 FITS map:

   python hi_single_baseline.py assemble \

     –inputs strip_dec_-29_corr.npz strip_dec_-15_corr.npz strip_dec_+0_corr.npz \

     –smooth-deg 3.0 –out milkyway_cube

I/O format

———-

– Input complex files are raw interleaved float32 IQ (complex64), little-endian,

  contiguous (GNU Radio File Sink default for complex64).

– Outputs are NPZ (NumPy) with metadata + optional FITS for images.

Design notes

————

– FX correlator: channelize with an N-point FFT where N = Fs/Δf (e.g., 200 for 2 MHz/10 kHz),

  multiply spectrum_A * conj(spectrum_B), average over dump.

– Fringe stopping: rotate visibilities by -2π f * τ_geom for the phase centre, per time and channel.

– Continuum removal: estimate a baseline from designated line-free channels and subtract.

– Imaging: produce 1-D RA brightness per strip by taking the real-part (after fringe-stop) vs time,

  then regrid onto RA; finally stack strips by Dec to produce a coarse 2-D cube.

Dependencies

————

Python 3.9+, numpy, scipy, astropy, tqdm

“””

from __future__ import annotations

import argparse

import os

import sys

import math

import struct

from dataclasses import dataclass

from typing import Tuple, Optional, List

import numpy as np

from numpy.typing import NDArray

try:

    from astropy.io import fits

    from astropy.time import Time

    from astropy.coordinates import EarthLocation, SkyCoord, AltAz

    import astropy.units as u

    ASTROPY_OK = True

except Exception:

    ASTROPY_OK = False

try:

    from tqdm import tqdm

except Exception:

    def tqdm(x, **kwargs):

        return x

# —————————— Utility ———————————

@dataclass

class Site:

    lat_deg: float

    lon_deg: float

    alt_m: float

@dataclass

class BaselineENU:

    E: float

    N: float

    U: float

@dataclass

class Meta:

    fs: float

    fc: float

    chan_hz: float

    dump_s: float

    nchan: int

    start_utc: str

    ra_deg: float

    dec_deg: float

    site: Site

    baseline: BaselineENU

# —————————— Low-level I/O —————————-

def read_c64_chunk(fh, ncomplex: int) -> Optional[NDArray[np.complex64]]:

    buf = fh.read(ncomplex * 8)

    if not buf or len(buf) < ncomplex*8:

        return None

    arr = np.frombuffer(buf, dtype=np.float32)

    arr = arr.view(np.complex64)

    return arr

# —————————— DSP helpers —————————–

def next_pow2(n: int) -> int:

    return 1 << (n – 1).bit_length()

def pfb_fft_frames(x: NDArray[np.complex64], nfft: int, win: NDArray[np.float32]) -> NDArray[np.complex64]:

    # Simple windowed FFT per contiguous block of nfft samples

    nblocks = len(x) // nfft

    if nblocks == 0:

        return np.empty((0, nfft), np.complex64)

    xb = x[: nblocks * nfft].reshape(nblocks, nfft)

    xb = xb * win

    X = np.fft.fftshift(np.fft.fft(xb, axis=1), axes=1)

    return X.astype(np.complex64, copy=False)

def geom_delay_seconds(t_utc: float, site: Site, baseline: BaselineENU, ra_deg: float, dec_deg: float) -> float:

    “””Return geometric delay (seconds) for baseline ENU to a source at (ra,dec).

    We approximate using ENU dot s_hat in topocentric coordinates.

    “””

    if not ASTROPY_OK:

        return 0.0

    t = Time(t_utc, format=’unix’, scale=’utc’)

    loc = EarthLocation(lat=site.lat_deg*u.deg, lon=site.lon_deg*u.deg, height=site.alt_m*u.m)

    sc = SkyCoord(ra=ra_deg*u.deg, dec=dec_deg*u.deg, frame=’icrs’)

    altaz = sc.transform_to(AltAz(obstime=t, location=loc))

    # Direction cosines in ENU frame

    az = altaz.az.to_value(u.rad)

    el = altaz.alt.to_value(u.rad)

    s_e = math.cos(el) * math.sin(az)

    s_n = math.cos(el) * math.cos(az)

    s_u = math.sin(el)

    # Project baseline (meters) onto direction cosines and divide by c

    c = 299792458.0

    proj_m = baseline.E * s_e + baseline.N * s_n + baseline.U * s_u

    return proj_m / c

def fringe_stop_phase(nu_hz: NDArray[np.float64], tau_s: float) -> NDArray[np.complex64]:

    ph = -2.0 * np.pi * nu_hz * tau_s

    return np.exp(1j * ph).astype(np.complex64)

# —————————— Calibration —————————–

def solve_delay_phase(calA: NDArray[np.complex64], calB: NDArray[np.complex64]) -> Tuple[NDArray[np.float32], NDArray[np.float32]]:

    “””Solve per-channel relative delay/phase between A and B from cross-spectrum.

    Returns (delay_ps, phase_rad) across channels.

    “””

    # Average over time frames

    cs = np.mean(calA * np.conj(calB), axis=0)

    # Unwrap phase across channels

    ph = np.unwrap(np.angle(cs).astype(np.float64))

    # Estimate slope (delay) via linear fit: phase = -2π ν τ + φ0

    n = len(ph)

    nu = np.linspace(-0.5, 0.5, n, endpoint=False)  # normalized

    A = np.vstack([nu, np.ones_like(nu)]).T

    tau_norm, phi0 = np.linalg.lstsq(A, ph, rcond=None)[0]  # phase ≈ tau_norm*nu + phi0

    # Convert normalized slope to delay: dφ/dν = -2π τ ; here ν spans BW, so scale later in apply

    return np.full(n, tau_norm, np.float32), np.full(n, phi0, np.float32)

def apply_calibration(XA: NDArray[np.complex64], XB: NDArray[np.complex64], tau_norm: NDArray[np.float32], phi0: NDArray[np.float32]) -> Tuple[NDArray[np.complex64], NDArray[np.complex64]]:

    nchan = XA.shape[1]

    nu_norm = np.linspace(-0.5, 0.5, nchan, endpoint=False).astype(np.float32)

    ph = tau_norm * nu_norm + phi0

    rot = np.exp(-1j * ph).astype(np.complex64)

    # Rotate B to align with A

    XB_cal = XB * rot

    return XA, XB_cal

# —————————— Main operations ————————-

def do_calibrate(args):

    meta = prepare_meta(args)

    nfft = int(round(meta.fs / meta.chan_hz))

    if meta.fs % meta.chan_hz != 0:

        print(f”[warn] Fs/chan is not integer; using NFFT={nfft}”)

    win = np.hanning(nfft).astype(np.float32)

    dump_samp = int(round(meta.dump_s * meta.fs))

    with open(args.a, ‘rb’) as fa, open(args.b, ‘rb’) as fb:

        XA_all = []

        XB_all = []

        n_to_read = dump_samp  # one dump is enough for solve; but read a few

        max_frames = int(5 * meta.fs // dump_samp) + 1

        frames = 0

        while frames < max_frames:

            a = read_c64_chunk(fa, n_to_read)

            b = read_c64_chunk(fb, n_to_read)

            if a is None or b is None:

                break

            XA = pfb_fft_frames(a, nfft, win)

            XB = pfb_fft_frames(b, nfft, win)

            m = min(len(XA), len(XB))

            if m == 0:

                break

            XA_all.append(XA[:m])

            XB_all.append(XB[:m])

            frames += 1

    if not XA_all:

        raise RuntimeError(“No data read for calibration.”)

    XA = np.vstack(XA_all)

    XB = np.vstack(XB_all)

    tau_norm, phi0 = solve_delay_phase(XA, XB)

    np.savez_compressed(args.out + ‘.npz’, tau_norm=tau_norm, phi0=phi0, meta=meta.__dict__)

    print(f”Saved calibration to {args.out}.npz”)

def prepare_meta(args) -> Meta:

    nchan = int(round(args.fs / args.chan))

    return Meta(

        fs=float(args.fs),

        fc=float(args.fc),

        chan_hz=float(args.chan),

        dump_s=float(args.dump),

        nchan=nchan,

        start_utc=args.start,

        ra_deg=float(args.ra),

        dec_deg=float(args.dec),

        site=Site(args.site_lat, args.site_lon, args.site_alt),

        baseline=BaselineENU(args.baseline_e, args.baseline_n, args.baseline_u),

    )

def do_correlate(args):

    meta = prepare_meta(args)

    nfft = int(round(meta.fs / meta.chan_hz))

    win = np.hanning(nfft).astype(np.float32)

    dump_samp = int(round(meta.dump_s * meta.fs))

    # Frequencies for FFT bins (Hz), fftshifted

    freqs = (np.arange(nfft) – nfft/2) * (meta.fs / nfft) + meta.fc

    # Load calibration if provided

    tau_norm = phi0 = None

    if args.apply_cal:

        cal = np.load(args.apply_cal, allow_pickle=True)

        tau_norm = cal[‘tau_norm’]

        phi0 = cal[‘phi0’]

    # Output containers

    times_unix = []

    vis = []  # complex visibility per dump and per channel

    t0 = Time(meta.start_utc, format=’isot’, scale=’utc’).unix if ASTROPY_OK else 0.0

    with open(args.a, ‘rb’) as fa, open(args.b, ‘rb’) as fb:

        samp_read = 0

        dump_idx = 0

        while True:

            a = read_c64_chunk(fa, dump_samp)

            b = read_c64_chunk(fb, dump_samp)

            if a is None or b is None:

                break

            XA = pfb_fft_frames(a, nfft, win)

            XB = pfb_fft_frames(b, nfft, win)

            m = min(len(XA), len(XB))

            if m == 0:

                break

            # Average spectra over frames inside the dump window

            XA_mean = np.mean(XA[:m], axis=0)

            XB_mean = np.mean(XB[:m], axis=0)

            if tau_norm is not None:

                XA_mean, XB_mean = apply_calibration(XA_mean[None, :], XB_mean[None, :], tau_norm, phi0)

                XA_mean = XA_mean[0]

                XB_mean = XB_mean[0]

            # Raw cross-spectrum (per channel)

            cs = XA_mean * np.conj(XB_mean)

            # Fringe stopping at phase centre

            if ASTROPY_OK:

                t_dump_center = t0 + (dump_idx + 0.5)*meta.dump_s

                tau_g = geom_delay_seconds(t_dump_center, meta.site, meta.baseline, meta.ra_deg, meta.dec_deg)

                rot = fringe_stop_phase(freqs, tau_g)

                cs *= rot

            vis.append(cs.astype(np.complex64))

            times_unix.append(t0 + dump_idx*meta.dump_s)

            dump_idx += 1

    vis = np.vstack(vis) if vis else np.empty((0, nfft), np.complex64)

    times_unix = np.array(times_unix, dtype=np.float64)

    # Save raw correlated product

    base = args.out + ‘_corr’

    np.savez_compressed(base + ‘.npz’, vis=vis, times_unix=times_unix, freqs=freqs.astype(np.float64), meta=meta.__dict__)

    print(f”Saved visibilities to {base}.npz  shape={vis.shape}”)

    # Continuum removal (simple): identify line-free channels from edges

    edge = max(4, int(0.15 * nfft))

    mask = np.zeros(nfft, dtype=bool)

    mask[:edge] = True

    mask[-edge:] = True

    # baseline from masked channels per time

    cont = np.median(vis[:, mask].real, axis=1, keepdims=True)

    vis_line = vis.copy()

    vis_line.real -= cont

    np.savez_compressed(base + ‘_line.npz’, vis=vis_line, times_unix=times_unix, freqs=freqs.astype(np.float64), meta=meta.__dict__)

    print(f”Saved continuum-subtracted vis to {base}_line.npz”)

    # 1-D RA regridding for simple strip image (moment-0)

    if ASTROPY_OK:

        # Convert time to RA at phase centre: RA(t) ≈ RA0 + HA_rate * (t – t_mid)

        tmid = 0.5*(times_unix[0] + times_unix[-1]) if len(times_unix) > 1 else times_unix[0]

        # Earth rotates 15 deg/hr in HA; RA axis spacing per dump is ~15 deg/hr * dump_s/3600

        dra_per_dump = 15.0 * (meta.dump_s / 3600.0)  # degrees of HA per dump

        ra_axis = (np.arange(len(times_unix)) – (len(times_unix)-1)/2.0) * dra_per_dump + meta.ra_deg

        # Moment-0 over a velocity window (default full band)

        mom0_real = np.sum(vis_line.real, axis=1)

        # Save strip as FITS vector with RA axis

        if ASTROPY_OK:

            hdu = fits.PrimaryHDU(mom0_real.astype(np.float32))

            hdr = hdu.header

            hdr[‘CTYPE1’] = ‘RA—TAN’

            hdr[‘CUNIT1’] = ‘deg’

            hdr[‘CRVAL1’] = float(meta.ra_deg)

            hdr[‘CRPIX1’] = (len(ra_axis)+1)/2.0

            hdr[‘CDELT1’] = dra_per_dump

            hdr[‘OBJECT’] = ‘HI_STRIP’

            hdr[‘BUNIT’] = ‘arb’

            hdr[‘COMMENT’] = ‘Moment-0 (sum over channels) of continuum-subtracted real(vis)’

            fn = base + ‘_strip_m0.fits’

            hdu.writeto(fn, overwrite=True)

            print(f”Saved strip moment-0 FITS: {fn}”)

def do_assemble(args):

    # Load several *_corr_line.npz strips and grid to Dec × RA

    strips = []

    ras_all = []

    decs = []

    for fn in args.inputs:

        z = np.load(fn, allow_pickle=True)

        vis = z[‘vis’]

        times = z[‘times_unix’]

        meta = z[‘meta’].item()

        # RA axis for this strip

        if not ASTROPY_OK:

            raise RuntimeError(‘Astropy required for assemble step.’)

        dump_s = float(meta[‘dump_s’])

        ra0 = float(meta[‘ra_deg’])

        dra_per_dump = 15.0 * (dump_s / 3600.0)

        ra_axis = (np.arange(len(times)) – (len(times)-1)/2.0) * dra_per_dump + ra0

        # Strip moment-0 vector

        m0 = np.sum(vis.real, axis=1).astype(np.float32)

        strips.append((ra_axis, m0))

        ras_all.append(ra_axis)

        decs.append(float(meta[‘dec_deg’]))

    # Define common RA grid

    ra_min = max([ra.min() for ra, _ in strips])

    ra_max = min([ra.max() for ra, _ in strips])

    n_ra = int((ra_max – ra_min) / 0.5) + 1  # 0.5 deg grid

    ra_grid = np.linspace(ra_min, ra_max, n_ra, endpoint=True)

    # Grid strips

    decs_arr = np.array(decs, dtype=np.float32)

    order = np.argsort(decs_arr)

    decs_sorted = decs_arr[order]

    img = np.zeros((len(strips), n_ra), dtype=np.float32)

    for i, idx in enumerate(order):

        ra_axis, m0 = strips[idx]

        m0i = np.interp(ra_grid, ra_axis, m0, left=np.nan, right=np.nan)

        # Simple nan->nearest fill

        mask = np.isnan(m0i)

        if np.any(mask):

            m0i[mask] = np.interp(ra_grid[mask], ra_grid[~mask], m0i[~mask])

        img[i, :] = m0i

    # Optional smoothing across Dec

    if args.smooth_deg > 0:

        from scipy.ndimage import gaussian_filter

        # Convert desired smoothing (deg) to pixels; assume strips ~constant spacing

        if len(decs_sorted) > 1:

            d_dec = np.median(np.diff(decs_sorted))

            sigma_pix = max(0.0, args.smooth_deg / max(d_dec, 1e-6)) / 2.355

            if sigma_pix > 0:

                img = gaussian_filter(img, sigma=(sigma_pix, 0.0))

    # Save FITS image (Dec × RA)

    if ASTROPY_OK:

        hdu = fits.PrimaryHDU(img.astype(np.float32))

        hdr = hdu.header

        hdr[‘CTYPE1’] = ‘RA—TAN’

        hdr[‘CUNIT1’] = ‘deg’

        hdr[‘CRVAL1’] = float(ra_grid[0])

        hdr[‘CRPIX1’] = 1.0

        hdr[‘CDELT1’] = float(ra_grid[1] – ra_grid[0]) if len(ra_grid) > 1 else 1.0

        hdr[‘CTYPE2’] = ‘DEC–TAN’

        hdr[‘CUNIT2’] = ‘deg’

        hdr[‘CRVAL2’] = float(decs_sorted[0])

        hdr[‘CRPIX2’] = 1.0

        hdr[‘CDELT2’] = float(decs_sorted[1] – decs_sorted[0]) if len(decs_sorted) > 1 else 1.0

        hdr[‘BUNIT’] = ‘arb’

        hdr[‘OBJECT’] = ‘MW_HI_M0’

        fn = args.out + ‘_m0.fits’

        hdu.writeto(fn, overwrite=True)

        print(f”Saved assembled moment-0 FITS map: {fn}”)

# —————————— CLI ————————————-

def build_parser():

    p = argparse.ArgumentParser(description=’Single-baseline H I correlator and mapper’)

    sub = p.add_subparsers(dest=’cmd’, required=True)

    common = argparse.ArgumentParser(add_help=False)

    common.add_argument(‘–a’, required=True, help=’Path to antenna A complex64 file’)

    common.add_argument(‘–b’, required=True, help=’Path to antenna B complex64 file’)

    common.add_argument(‘–fs’, type=float, default=2_000_000, help=’Sample rate (Hz), e.g. 2e6′)

    common.add_argument(‘–fc’, type=float, default=1_420_405_000, help=’Center frequency (Hz)’)

    common.add_argument(‘–chan’, type=float, default=10_000, help=’Channel width (Hz)’)

    common.add_argument(‘–dump’, type=float, default=10.0, help=’Visibility dump time (s)’)

    common.add_argument(‘–site-lat’, dest=’site_lat’, type=float, required=True, help=’Site latitude (deg)’)

    common.add_argument(‘–site-lon’, dest=’site_lon’, type=float, required=True, help=’Site longitude (deg)’)

    common.add_argument(‘–site-alt’, dest=’site_alt’, type=float, default=100.0, help=’Site altitude (m)’)

    common.add_argument(‘–baseline-e’, type=float, default=5.0, help=’Baseline East (m)’)

    common.add_argument(‘–baseline-n’, type=float, default=0.0, help=’Baseline North (m)’)

    common.add_argument(‘–baseline-u’, type=float, default=0.0, help=’Baseline Up (m)’)

    common.add_argument(‘–start’, required=True, help=’Recording start UTC, ISO e.g. 2025-10-20T22:10:00Z’)

    common.add_argument(‘–ra’, type=float, required=True, help=’Phase-centre Right Ascension (deg)’)

    common.add_argument(‘–dec’, type=float, required=True, help=’Phase-centre Declination (deg)’)

    common.add_argument(‘–out’, required=True, help=’Output prefix (no extension)’)

    pc = sub.add_parser(‘calibrate’, parents=[common], help=’Solve per-channel delay/phase on a bright calibrator’)

    pr = sub.add_parser(‘correlate’, parents=[common], help=’Correlate and fringe-stop a drift strip’)

    pr.add_argument(‘–apply-cal’, help=’Calibration npz from “calibrate” step’)

    pa = sub.add_parser(‘assemble’, help=’Assemble multiple strips into a moment-0 FITS map’)

    pa.add_argument(‘–inputs’, nargs=’+’, required=True, help=’List of *_line.npz strip files to assemble’)

    pa.add_argument(‘–smooth-deg’, type=float, default=3.0, help=’Gaussian smoothing across Dec (deg FWHM)’)

    pa.add_argument(‘–out’, required=True, help=’Output prefix for FITS map’)

    return p

def main(argv=None):

    argv = sys.argv[1:] if argv is None else argv

    p = build_parser()

    args = p.parse_args(argv)

    if args.cmd == ‘calibrate’:

        do_calibrate(args)

    elif args.cmd == ‘correlate’:

        do_correlate(args)

    elif args.cmd == ‘assemble’:

        do_assemble(args)

    else:

        p.error(‘Unknown command’)

if __name__ == ‘__main__’:

    main()

The script above does the following:

  • Ingests two complex64 IQ recordings (A/B) from your RSPduo flowgraph.
  • Does FX correlation with 10 kHz channels over 2 MHz, dumps every N seconds.
  • Fringe-stops at your chosen phase centre using site + ENU baseline.
  • Saves calibrated visibilities and a continuum-subtracted set.
  • Exports a 1-D strip FITS (moment-0) and can assemble several strips into a coarse RA×Dec moment-0 FITS map.

How to record the inputs in GNU Radio (quick recipe)

Use two parallel SDRplay RSPduo sources sharing the same LO/clock (master/slave mode), each feeding:

  • Throttle disabled (hardware timed).
  • Complex Float32 stream at Fs = 2.000 MS/sFc = 1420.405 MHz, same gain settings, same IF mode.
  • Optional: 5–10 MHz RF BPF + digital low-pass/decimate to 2.0 MHz.
  • (Optional) PFB Channelizer if you prefer pre-channelization, but the script already channelizes.
  • File Sink (complex64) → a_YYYYmmddThhmmssZ.c64 and b_YYYYmmddThhmmssZ.c64.

Tips:

  • Enable external 10 MHz + 1 PPS (GPSDO) to lock both tuners.
  • Start both file sinks at the same flowgraph time (one top-level flowgraph with two sources is easiest).
  • Note the exact UTC start to pass as --start.

Minimal end-to-end run

  1. Calibrator scan (e.g., Cyg A near transit):
python hi_single_baseline.py calibrate \
  --a a_calib.c64 --b b_calib.c64 \
  --fs 2000000 --fc 1420405000 --chan 10000 --dump 10 \
  --site-lat 52.682 --site-lon -1.825 --site-alt 110 \
  --start "2025-10-30T20:15:00Z" \
  --ra 299.868 --dec 40.733 \
  --out calib_cygA
  1. Science drift strip (choose RA/Dec near your strip centre; baseline ~E=5 m, N=0):
python hi_single_baseline.py correlate \
  --a a_strip1.c64 --b b_strip1.c64 \
  --fs 2000000 --fc 1420405000 --chan 10000 --dump 10 \
  --site-lat 52.682 --site-lon -1.825 --site-alt 110 \
  --start "2025-10-30T22:10:00Z" \
  --ra 266.405 --dec -28.94 \
  --baseline-e 5.0 --baseline-n 0.0 --baseline-u 0.0 \
  --apply-cal calib_cygA.npz \
  --out strip_dec_-29

This writes:

  • strip_dec_-29_corr.npz (raw vis)
  • strip_dec_-29_corr_line.npz (continuum-subtracted)
  • strip_dec_-29_corr_strip_m0.fits (1-D moment-0 along RA)
  1. Assemble multiple strips (repeat step 2 at a few declinations; then):
python hi_single_baseline.py assemble \
  --inputs strip_dec_-29_corr_line.npz strip_dec_-15_corr_line.npz strip_dec_+0_corr_line.npz \
  --smooth-deg 3.0 \
  --out milkyway_cube

This outputs milkyway_cube_m0.fits (coarse Milky Way H I map).

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.