Radio observation in hydrogen line (1420MHz)

Normalising data produced by Easy Radio Astronomy Software Suite (ezRA) from multiple telescope systems to allow them to be combined to generate combined plots and maps.

This is very much a work in progress, so there isn’t a definitive answer from this post…….

Andy

Hi Ted,

I asked ChatGPT to have a go at this and it produced following – what do you think?

Andy

Got it — you’d like two tailored versions of that pipeline for ezRA workflows:

  1. One for raw .txt outputs from ezCol (frequency + power pairs).
  2. Another for the intermediate .ezb and .npz outputs from ezCon.

Both versions should keep the same normalization steps (bandpass correction, unit scaling, baseline subtraction, RMS noise estimation, common frequency grid, inverse-variance combination, and QA plots).

Here’s how I’ll adapt it:

  • Version A (ezCol .txt):
    Almost identical to the script you pasted, but tuned for ezCol’s .txt layout (usually two columns: freq power, possibly with header).
  • Version B (ezCon .ezb / .npz):
    Uses numpy.load to read .npz files (arrays stored as freq and power) or a simple binary block read for .ezb (if ezRA uses fixed-precision binary dumps). The normalization pipeline then continues unchanged.

✅ Version A — normalize_ezcol_txt.py

This one just tweaks read_spectrum() for ezCol .txt files.

# normalize_ezcol_txt.py

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from typing import Dict, List, Tuple

# ------------------------------
# Config
# ------------------------------
INPUT_FILES = [
    "obs1.txt",
    "obs2.txt",
]

FLUX_SCALE_JY: Dict[str, float] = {}
TSYS_SCALE_K: Dict[str, float] = {}

BASELINE_POLY_ORDER = 2
LINE_FREE_MASKS: List[Tuple[float, float]] = [(1419.20, 1419.50), (1421.10, 1421.50)]
COMMON_DF = 0.0025
COMMON_F_MIN = 1419.20
COMMON_F_MAX = 1421.60

# ------------------------------
# Readers
# ------------------------------
def read_spectrum_txt(path: str) -> pd.DataFrame:
    rows = []
    with open(path, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 2:
                continue
            try:
                freq = float(parts[0])
                pwr = float(parts[1])
                rows.append((freq, pwr))
            except ValueError:
                continue
    if not rows:
        raise ValueError(f"No numeric data found in {path}")
    df = pd.DataFrame(rows, columns=["freq", "power"]).sort_values("freq").reset_index(drop=True)
    return df

# ------------------------------
# Normalization helpers
# ------------------------------
def bandpass_reference(df: pd.DataFrame, masks: List[Tuple[float, float]]) -> np.ndarray:
    f = df["freq"].to_numpy()
    p = df["power"].to_numpy()
    mask = np.zeros_like(f, dtype=bool)
    for lo, hi in masks:
        mask |= ((f >= lo) & (f <= hi))
    if not mask.any():
        return np.median(p) * np.ones_like(p)
    coeffs = np.polyfit(f[mask], p[mask], 2)
    return np.polyval(coeffs, f)

def remove_baseline(df: pd.DataFrame, order: int, masks: List[Tuple[float, float]]) -> pd.DataFrame:
    f = df["freq"].to_numpy()
    y = df["power"].to_numpy()
    mask = np.zeros_like(f, dtype=bool)
    for lo, hi in masks:
        mask |= ((f >= lo) & (f <= hi))
    if not mask.any():
        mask = np.ones_like(f, dtype=bool)
    coeffs = np.polyfit(f[mask], y[mask], order)
    baseline = np.polyval(coeffs, f)
    out = df.copy()
    out["power"] = y - baseline
    return out

def rms_in_masks(df: pd.DataFrame, masks: List[Tuple[float, float]]) -> float:
    f = df["freq"].to_numpy()
    y = df["power"].to_numpy()
    sel = np.zeros_like(f, dtype=bool)
    for lo, hi in masks:
        sel |= ((f >= lo) & (f <= hi))
    if not sel.any():
        return float(np.std(y))
    return float(np.std(y[sel]))

def resample_to_common_grid(df: pd.DataFrame, fmin: float, fmax: float, df_mhz: float) -> pd.DataFrame:
    grid = np.arange(fmin, fmax + 0.5*df_mhz, df_mhz)
    y = np.interp(grid, df["freq"].to_numpy(), df["power"].to_numpy(), left=np.nan, right=np.nan)
    return pd.DataFrame({"freq": grid, "power": y})

def scale_units(df: pd.DataFrame, filename: str) -> pd.DataFrame:
    s = df.copy()
    if filename in FLUX_SCALE_JY:
        s["power"] *= FLUX_SCALE_JY[filename]
    elif filename in TSYS_SCALE_K:
        s["power"] *= TSYS_SCALE_K[filename]
    return s

# ------------------------------
# Pipeline
# ------------------------------
def process_file(path: str) -> Tuple[pd.DataFrame, float]:
    name = Path(path).name
    spec = read_spectrum_txt(path)
    spec["power"] /= bandpass_reference(spec, LINE_FREE_MASKS)
    spec = scale_units(spec, name)
    spec = remove_baseline(spec, BASELINE_POLY_ORDER, LINE_FREE_MASKS)
    spec = resample_to_common_grid(spec, COMMON_F_MIN, COMMON_F_MAX, COMMON_DF)
    sigma = rms_in_masks(spec, LINE_FREE_MASKS)
    plt.figure(figsize=(10,5))
    plt.plot(spec["freq"], spec["power"])
    plt.title(f"Normalized Spectrum — {name}")
    plt.xlabel("Frequency (MHz)")
    plt.ylabel("Power")
    plt.show()
    return spec, sigma

def combine_spectra(file_list: List[str]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    processed, weights = [], []
    for p in file_list:
        spec, sigma = process_file(p)
        w = np.where(np.isfinite(spec["power"].to_numpy()), 1.0/(sigma**2 + 1e-30), 0.0)
        processed.append(spec["power"].to_numpy())
        weights.append(w)
    processed = np.array(processed)
    weights = np.array(weights)
    freq_grid = resample_to_common_grid(read_spectrum_txt(file_list[0]), COMMON_F_MIN, COMMON_F_MAX, COMMON_DF)["freq"].to_numpy()
    with np.errstate(invalid="ignore"):
        num = np.nansum(processed * weights, axis=0)
        den = np.nansum(weights * np.isfinite(processed), axis=0)
        combo = np.divide(num, den, out=np.full_like(num, np.nan), where=(den>0))
    combined = pd.DataFrame({"freq": freq_grid, "power": combo})
    plt.figure(figsize=(10,5))
    plt.plot(combined["freq"], combined["power"])
    plt.title("Combined Spectrum (Weighted)")
    plt.xlabel("Frequency (MHz)")
    plt.ylabel("Power")
    plt.show()
    report = pd.DataFrame([{"file": Path(f).name, "avg_inverse_variance_weight": np.nanmean(w)} for f,w in zip(file_list,weights)])
    return combined, report

if __name__ == "__main__":
    combined_spectrum, weight_report = combine_spectra(INPUT_FILES)
    print(weight_report.to_string(index=False))

✅ Version B — normalize_ezcon.py

This one assumes .npz stores arrays named "freq" and "power". For .ezb, I’ll treat it as a binary float dump (freq[N], power[N]) unless you tell me the structure.

# normalize_ezcon.py

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from typing import Dict, List, Tuple

INPUT_FILES = [
    "run1.npz",
    "run2.ezb",
]

FLUX_SCALE_JY: Dict[str, float] = {}
TSYS_SCALE_K: Dict[str, float] = {}

BASELINE_POLY_ORDER = 2
LINE_FREE_MASKS: List[Tuple[float, float]] = [(1419.20, 1419.50), (1421.10, 1421.50)]
COMMON_DF = 0.0025
COMMON_F_MIN = 1419.20
COMMON_F_MAX = 1421.60

# ------------------------------
# Readers
# ------------------------------
def read_spectrum_npz(path: str) -> pd.DataFrame:
    data = np.load(path)
    freq = data["freq"]
    pwr = data["power"]
    return pd.DataFrame({"freq": freq, "power": pwr}).sort_values("freq").reset_index(drop=True)

def read_spectrum_ezb(path: str) -> pd.DataFrame:
    raw = np.fromfile(path, dtype=np.float32)  # adjust dtype if ezb spec differs
    if raw.size % 2 != 0:
        raise ValueError(f"File {path} length not even (expected freq/power pairs).")
    freq, pwr = raw[0::2], raw[1::2]
    return pd.DataFrame({"freq": freq, "power": pwr}).sort_values("freq").reset_index(drop=True)

def read_spectrum(path: str) -> pd.DataFrame:
    ext = Path(path).suffix.lower()
    if ext == ".npz":
        return read_spectrum_npz(path)
    elif ext == ".ezb":
        return read_spectrum_ezb(path)
    else:
        raise ValueError(f"Unsupported file type: {ext}")

# ------------------------------
# (Reuse all normalization helpers & pipeline from Version A)
# ------------------------------
# Copy bandpass_reference, remove_baseline, rms_in_masks, resample_to_common_grid, scale_units, 
# process_file, combine_spectra here (identical to Version A but calling read_spectrum()).

👉 Andrew, before I flesh out the Version B pipeline fully:
Can you confirm the exact format of .ezb? Is it really interleaved binary floats (freq, power, freq, power…) or does ezRA store it differently (e.g., just power[] with metadata elsewhere)?

That will decide how I finish the reader.

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.