Normalising data produced by Easy Radio Astronomy Software Suite (ezRA) from multiple telescope systems to allow them to be combined to generate combined plots and maps.
This is very much a work in progress, so there isn’t a definitive answer from this post…….
Andy
Hi Ted,
I asked ChatGPT to have a go at this and it produced following – what do you think?
Andy
Got it — you’d like two tailored versions of that pipeline for ezRA workflows:
- One for raw
.txt
outputs from ezCol (frequency + power pairs). - Another for the intermediate
.ezb
and.npz
outputs from ezCon.
Both versions should keep the same normalization steps (bandpass correction, unit scaling, baseline subtraction, RMS noise estimation, common frequency grid, inverse-variance combination, and QA plots).
Here’s how I’ll adapt it:
- Version A (ezCol
.txt
):
Almost identical to the script you pasted, but tuned for ezCol’s.txt
layout (usually two columns:freq power
, possibly with header). - Version B (ezCon
.ezb
/.npz
):
Usesnumpy.load
to read.npz
files (arrays stored asfreq
andpower
) or a simple binary block read for.ezb
(if ezRA uses fixed-precision binary dumps). The normalization pipeline then continues unchanged.
✅ Version A — normalize_ezcol_txt.py
This one just tweaks read_spectrum()
for ezCol .txt
files.
# normalize_ezcol_txt.py
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from typing import Dict, List, Tuple
# ------------------------------
# Config
# ------------------------------
INPUT_FILES = [
"obs1.txt",
"obs2.txt",
]
FLUX_SCALE_JY: Dict[str, float] = {}
TSYS_SCALE_K: Dict[str, float] = {}
BASELINE_POLY_ORDER = 2
LINE_FREE_MASKS: List[Tuple[float, float]] = [(1419.20, 1419.50), (1421.10, 1421.50)]
COMMON_DF = 0.0025
COMMON_F_MIN = 1419.20
COMMON_F_MAX = 1421.60
# ------------------------------
# Readers
# ------------------------------
def read_spectrum_txt(path: str) -> pd.DataFrame:
rows = []
with open(path, "r") as f:
for line in f:
parts = line.strip().split()
if len(parts) != 2:
continue
try:
freq = float(parts[0])
pwr = float(parts[1])
rows.append((freq, pwr))
except ValueError:
continue
if not rows:
raise ValueError(f"No numeric data found in {path}")
df = pd.DataFrame(rows, columns=["freq", "power"]).sort_values("freq").reset_index(drop=True)
return df
# ------------------------------
# Normalization helpers
# ------------------------------
def bandpass_reference(df: pd.DataFrame, masks: List[Tuple[float, float]]) -> np.ndarray:
f = df["freq"].to_numpy()
p = df["power"].to_numpy()
mask = np.zeros_like(f, dtype=bool)
for lo, hi in masks:
mask |= ((f >= lo) & (f <= hi))
if not mask.any():
return np.median(p) * np.ones_like(p)
coeffs = np.polyfit(f[mask], p[mask], 2)
return np.polyval(coeffs, f)
def remove_baseline(df: pd.DataFrame, order: int, masks: List[Tuple[float, float]]) -> pd.DataFrame:
f = df["freq"].to_numpy()
y = df["power"].to_numpy()
mask = np.zeros_like(f, dtype=bool)
for lo, hi in masks:
mask |= ((f >= lo) & (f <= hi))
if not mask.any():
mask = np.ones_like(f, dtype=bool)
coeffs = np.polyfit(f[mask], y[mask], order)
baseline = np.polyval(coeffs, f)
out = df.copy()
out["power"] = y - baseline
return out
def rms_in_masks(df: pd.DataFrame, masks: List[Tuple[float, float]]) -> float:
f = df["freq"].to_numpy()
y = df["power"].to_numpy()
sel = np.zeros_like(f, dtype=bool)
for lo, hi in masks:
sel |= ((f >= lo) & (f <= hi))
if not sel.any():
return float(np.std(y))
return float(np.std(y[sel]))
def resample_to_common_grid(df: pd.DataFrame, fmin: float, fmax: float, df_mhz: float) -> pd.DataFrame:
grid = np.arange(fmin, fmax + 0.5*df_mhz, df_mhz)
y = np.interp(grid, df["freq"].to_numpy(), df["power"].to_numpy(), left=np.nan, right=np.nan)
return pd.DataFrame({"freq": grid, "power": y})
def scale_units(df: pd.DataFrame, filename: str) -> pd.DataFrame:
s = df.copy()
if filename in FLUX_SCALE_JY:
s["power"] *= FLUX_SCALE_JY[filename]
elif filename in TSYS_SCALE_K:
s["power"] *= TSYS_SCALE_K[filename]
return s
# ------------------------------
# Pipeline
# ------------------------------
def process_file(path: str) -> Tuple[pd.DataFrame, float]:
name = Path(path).name
spec = read_spectrum_txt(path)
spec["power"] /= bandpass_reference(spec, LINE_FREE_MASKS)
spec = scale_units(spec, name)
spec = remove_baseline(spec, BASELINE_POLY_ORDER, LINE_FREE_MASKS)
spec = resample_to_common_grid(spec, COMMON_F_MIN, COMMON_F_MAX, COMMON_DF)
sigma = rms_in_masks(spec, LINE_FREE_MASKS)
plt.figure(figsize=(10,5))
plt.plot(spec["freq"], spec["power"])
plt.title(f"Normalized Spectrum — {name}")
plt.xlabel("Frequency (MHz)")
plt.ylabel("Power")
plt.show()
return spec, sigma
def combine_spectra(file_list: List[str]) -> Tuple[pd.DataFrame, pd.DataFrame]:
processed, weights = [], []
for p in file_list:
spec, sigma = process_file(p)
w = np.where(np.isfinite(spec["power"].to_numpy()), 1.0/(sigma**2 + 1e-30), 0.0)
processed.append(spec["power"].to_numpy())
weights.append(w)
processed = np.array(processed)
weights = np.array(weights)
freq_grid = resample_to_common_grid(read_spectrum_txt(file_list[0]), COMMON_F_MIN, COMMON_F_MAX, COMMON_DF)["freq"].to_numpy()
with np.errstate(invalid="ignore"):
num = np.nansum(processed * weights, axis=0)
den = np.nansum(weights * np.isfinite(processed), axis=0)
combo = np.divide(num, den, out=np.full_like(num, np.nan), where=(den>0))
combined = pd.DataFrame({"freq": freq_grid, "power": combo})
plt.figure(figsize=(10,5))
plt.plot(combined["freq"], combined["power"])
plt.title("Combined Spectrum (Weighted)")
plt.xlabel("Frequency (MHz)")
plt.ylabel("Power")
plt.show()
report = pd.DataFrame([{"file": Path(f).name, "avg_inverse_variance_weight": np.nanmean(w)} for f,w in zip(file_list,weights)])
return combined, report
if __name__ == "__main__":
combined_spectrum, weight_report = combine_spectra(INPUT_FILES)
print(weight_report.to_string(index=False))
✅ Version B — normalize_ezcon.py
This one assumes .npz
stores arrays named "freq"
and "power"
. For .ezb
, I’ll treat it as a binary float dump (freq[N], power[N]
) unless you tell me the structure.
# normalize_ezcon.py
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from typing import Dict, List, Tuple
INPUT_FILES = [
"run1.npz",
"run2.ezb",
]
FLUX_SCALE_JY: Dict[str, float] = {}
TSYS_SCALE_K: Dict[str, float] = {}
BASELINE_POLY_ORDER = 2
LINE_FREE_MASKS: List[Tuple[float, float]] = [(1419.20, 1419.50), (1421.10, 1421.50)]
COMMON_DF = 0.0025
COMMON_F_MIN = 1419.20
COMMON_F_MAX = 1421.60
# ------------------------------
# Readers
# ------------------------------
def read_spectrum_npz(path: str) -> pd.DataFrame:
data = np.load(path)
freq = data["freq"]
pwr = data["power"]
return pd.DataFrame({"freq": freq, "power": pwr}).sort_values("freq").reset_index(drop=True)
def read_spectrum_ezb(path: str) -> pd.DataFrame:
raw = np.fromfile(path, dtype=np.float32) # adjust dtype if ezb spec differs
if raw.size % 2 != 0:
raise ValueError(f"File {path} length not even (expected freq/power pairs).")
freq, pwr = raw[0::2], raw[1::2]
return pd.DataFrame({"freq": freq, "power": pwr}).sort_values("freq").reset_index(drop=True)
def read_spectrum(path: str) -> pd.DataFrame:
ext = Path(path).suffix.lower()
if ext == ".npz":
return read_spectrum_npz(path)
elif ext == ".ezb":
return read_spectrum_ezb(path)
else:
raise ValueError(f"Unsupported file type: {ext}")
# ------------------------------
# (Reuse all normalization helpers & pipeline from Version A)
# ------------------------------
# Copy bandpass_reference, remove_baseline, rms_in_masks, resample_to_common_grid, scale_units,
# process_file, combine_spectra here (identical to Version A but calling read_spectrum()).
👉 Andrew, before I flesh out the Version B pipeline fully:
Can you confirm the exact format of .ezb
? Is it really interleaved binary floats (freq, power, freq, power…
) or does ezRA store it differently (e.g., just power[]
with metadata elsewhere)?
That will decide how I finish the reader.