#!/usr/bin/env python
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "ephem",
# "matplotlib",
# "mutagen",
# "numpy",
# "pillow",
# "platformdirs",
# "requests",
# ]
# ///
import re
import platform
import subprocess
import sys
from urllib.parse import urlsplit
from PIL import Image
import ephem
import datetime
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import numpy as np
import requests
import json
import shutil
from mutagen.oggvorbis import OggVorbis
import re
import os.path
import argparse
import pathlib
from matplotlib.offsetbox import AnchoredText
from matplotlib.gridspec import GridSpec
from platformdirs import user_cache_dir
from pathlib import Path
USE_XDG = True
if USE_XDG:
APP_NAME = 'ikhnos'
APP_AUTHOR = 'adamkalis'
CACHE_DIR = Path(user_cache_dir(APP_NAME, APP_AUTHOR))
else:
CACHE_DIR = Path('.')
DEMODDATA_FILENAME_PATTERN = r'data_(\d+)_(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})'
DEMODDATA_FILENAME_TIME_FORMAT = '%Y-%m-%dT%H-%M-%S'
class NoAudioError(Exception):
pass
FREQ_RANGES = [24.0, 25, 29.0, 30.4, 33.2, 38.5, 50, 69.0, 77.0, 83.8, 100, 115.2, 160,174, 200, 230.4, 300]
def parse_args():
parser = argparse.ArgumentParser(description='Analyze observations')
parser.add_argument('observations', nargs='+', type=int,
help='One or more observation IDs from SatNOGS Network')
parser.add_argument('-t', '--tle-path', nargs='?', const='tle', default='tle', type=pathlib.Path,
help='Path of the TLE file, if not set the default value is "tle" in the current directory')
parser.add_argument('-f', '--frequency-offset', nargs='?', const=0.0, default=0.0, type=float,
help='Offset in KHz to add to the expected center frequency, this can be negative or positive number, default value: 0.0')
parser.add_argument('-r', '--frequency-range', nargs='?', const=24.0, default=24.0, type=float, choices=FREQ_RANGES,
help='The plus-minus KHz from the centered frequency as it is in the X axis in observation waterfall, default value: 24.0')
parser.add_argument('-d', '--observation-duration', nargs='?', default=0, type=int, help='Set observation duration')
parser.add_argument('-e', '--tle-epoch-threshold', nargs='?', const=5, default=25, type=int,
help='Set the threshold of the difference of observation start time and TLE epoch in days, default value: 5')
parser.add_argument('--store-tle', action='store_true', help='Append observation tle to the TLE file before plotting')
parser.add_argument('-C', '--keep-created-files', action='store_true', help='Keep files created by analysis')
parser.add_argument('-A', '--keep-audio', action='store_true', help='Keep downloaded audio file')
parser.add_argument('-W', '--keep-waterfall', action='store_true', help='Keep downloaded waterfall file')
parser.add_argument('--open', action='store_true', help='Open generated images with the default Image Viewer')
parser.add_argument('-v', '--verbose', action='store_true', help='Be more verbose')
args = parser.parse_args()
plt.switch_backend('Agg')
# Check tle file exists
if not args.tle_path.exists():
print(f"Error: TLE input file not found. No such file: '{args.tle_path}'")
sys.exit(1)
return args
def fetch_observation(observation_id):
observation_path = CACHE_DIR / f'{observation_id:d}.json'
if not observation_path.exists():
print(f"Requesting observation {observation_id:d}")
r = requests.get(f'https://network.satnogs.org/api/observations/{observation_id:d}/?format=json')
observation = json.loads(r.content.decode('utf8'))
with open(observation_path, 'w') as fp:
json.dump(observation, fp, indent=2)
else:
with open(observation_path) as fp:
observation = json.load(fp)
print("Downloading the observation page for getting frequency and tle")
observation_link = f"https://network.satnogs.org/observations/{observation_id:d}"
obs_html = requests.get(observation_link).content
tle_regex = r"
(1 .*)
(2 .*)"
tle_matches = re.search(tle_regex, obs_html.decode("utf-8"))
tle = [tle_matches.group(1), tle_matches.group(2)]
sat_name_regex = r"data-target=\"#SatelliteModal\" data-id=\".*\">\n.*- (.*)"
sat_name_matches = re.search(sat_name_regex, obs_html.decode("utf-8"))
sat_name = sat_name_matches[1]
freq_regex = r"(\d*\.\d*) (\w)Hz"
freq_matches = re.findall(freq_regex, obs_html.decode("utf-8")).pop()
if freq_matches[1] == 'G':
freq0 = float(freq_matches[0]) * 1000
elif freq_matches[1] == 'M':
freq0 = float(freq_matches[0])
else:
raise NotImplementedError
audio_path = CACHE_DIR / f'{observation_id:d}.ogg'
waterfall_path = CACHE_DIR / f'{observation_id:d}.png'
output_path = Path('.') / sat_name / f'{observation_id:d}'
# Check output path exists
if not output_path.exists():
output_path.mkdir(parents=True)
if not os.path.isfile(audio_path):
if observation['payload']:
print("Downloading " + observation['payload'] + ' for getting the right duration')
observation_ogg_data = requests.get(observation['payload'], stream=True).content
elif observation['archive_url']:
print("Downloading " + observation['archive_url'] + ' for getting the right duration')
observation_ogg_data = requests.get(observation['archive_url'], stream=True).content
else:
raise NoAudioError
with open(audio_path, 'wb') as out_file:
out_file.write(observation_ogg_data)
if not os.path.isfile(waterfall_path):
print("Downloading " + observation['waterfall'] + ' for the background image')
img_data = requests.get(observation['waterfall']).content
with open(waterfall_path, 'wb') as handler:
handler.write(img_data)
return observation, tle, freq0, sat_name
def plot_overlay(freq_limits, time_limits):
fig = plt.figure(figsize=(10 * 0.8,20))
ax = fig.add_subplot(111)
ax.set_ylabel("Time (seconds)")
ax.set_xlabel("Frequency (kHz)")
ax.set_xlim(*freq_limits)
ax.set_ylim(*time_limits)
ax.tick_params(axis='x', colors='red')
ax.tick_params(axis='y', colors='red')
return fig, ax
def plot_textbox(observation_id, ax, observation, tle_basename):
fig_label = f"SatNOGS Observation {observation_id:d}\nStation: {observation['ground_station']}-{observation['station_name']}\n{observation['start']}\n{observation['end']}\ntle: {tle_basename}"
at = AnchoredText(fig_label, prop=dict(size=15), frameon=True, loc='upper left')
at.patch.set_boxstyle("round,pad=0.,rounding_size=0.2")
ax.add_artist(at)
def plot_overlay_new(plot_metadata):
# Read plot metadata
figsize = plot_metadata['figsize']
gridspec = plot_metadata['gridspec']
xlim_kHz = plot_metadata['xlim_kHz']
ylim_s = plot_metadata['ylim_s']
ylim_num = plot_metadata['ylim_num']
# Generate figure and axes
gs = GridSpec(**gridspec)
fig = plt.figure(figsize=figsize)
ax_utc = fig.add_subplot(gs[0])
ax_utc.set_xlabel("Frequency (kHz)", color='red')
ax_utc.set_xlim(*xlim_kHz)
ax_seconds = ax_utc.twinx()
ax_seconds.set_ylim(*ylim_s)
ax_seconds.set_ylabel("Time (seconds)", color='red')
ax_utc.tick_params(axis='x', colors='red')
ax_utc.tick_params(axis='y', colors='red')
ax_seconds.tick_params(axis='y', colors='red')
# Add UTC time labels
ax_utc.set_ylim(*ylim_num)
ax_utc.set_ylabel('Time (UTC)', color='red')
ax_utc.yaxis_date()
ax_utc.yaxis.set_major_locator(mdates.MinuteLocator(interval=1))
ax_utc.yaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
for spine in ax_seconds.spines.values():
spine.set_color('red')
return fig, ax_seconds, ax_utc
def combine_images(background, overlay, output_filename):
background_img = Image.open(background)
background_img = background_img.convert("RGBA")
overlay_img = Image.open(overlay)
overlay_img = overlay_img.convert("RGBA")
ov = Image.new('RGBA',background_img.size, (0, 0, 0, 0))
ov.paste(overlay_img, (0,0))
background_img.paste(ov, (0,0), ov)
background_img.save(output_filename, "PNG")
background_img.close()
overlay_img.close()
def parse_demoddata_timestamps(observation):
"""
Parse timestamps from URL path of demoddata objects in observation metadata
"""
times = []
for item in observation['demoddata']:
filename = Path(urlsplit(item['payload_demod']).path).name
match = re.match(DEMODDATA_FILENAME_PATTERN, filename)
if match:
time_str = match.group(2)
time = datetime.datetime.strptime(time_str, DEMODDATA_FILENAME_TIME_FORMAT)
# time = time.astimezone(datetime.timezone.utc)
times.append(time)
return times
def run_ikhnos(observation_id, observation, tle, freq0, sat_name, args):
audio_path = CACHE_DIR / f'{observation_id:d}.ogg'
waterfall_path = CACHE_DIR / f'{observation_id:d}.png'
output_path = Path('.') / sat_name / f'{observation_id:d}'
# Read satnogs waterfall plot metadata (if available)
with Image.open(waterfall_path) as image:
plot_metadata = (
json.loads(image.info["satnogs:wf-plot"])
if "satnogs:wf-plot" in image.info.keys()
else None
)
tstart = observation['start'].replace('Z', '')
start = datetime.datetime.strptime(tstart, "%Y-%m-%dT%H:%M:%S")
ystart = datetime.datetime(start.year, 1, 1)
#+1 day in timedelta as TLE show the day and a fraction of it so Jan 1st is 1 not 0.
obs_epoch_day = ((start - ystart).total_seconds() + datetime.timedelta(days=1).total_seconds()) / datetime.timedelta(days=1).total_seconds()
if args.observation_duration == 0:
f = OggVorbis(audio_path)
nseconds = int(round(f.info.length))
else:
nseconds = args.observation_duration
tle_file = args.tle_path
tle_basename = os.path.basename(tle_file)
with open(tle_file) as f:
tle_lines = f.read().splitlines()
if not tle_lines:
print(f"ERROR: TLE file is empty. File does not contain any lines: '{tle_file}'")
sys.exit(1)
tles = []
for i in range(0, len(tle_lines), 3):
tles.append(tle_lines[i:i + 3])
tle_sets_read = 0
for tba_tle in tles:
overlay_path = CACHE_DIR / f"{observation_id:d} {tba_tle[0].replace('/','_')}_freq_diff.png"
epoch_regex = r".{20}\s*(\d*\.\d*)"
epoch_matches = re.search(epoch_regex, tba_tle[1])
epoch_day = epoch_matches.group(1)
obs_tle_epochs_diff = float(epoch_day) - obs_epoch_day
bef_after = 'B'
if obs_tle_epochs_diff > 0:
bef_after = 'A'
if abs(obs_tle_epochs_diff) > args.tle_epoch_threshold:
print("TLE too old/new: Epoch differs by more than {:.1f} days\n"
"from observation start time (threshold: {:.1f} days)".format(
obs_tle_epochs_diff,
args.tle_epoch_threshold))
continue
tle_sets_read += 1
print('%s - %s - %s' % (tle_sets_read, obs_tle_epochs_diff, bef_after))
# Read NORAD and COSPAR ID for this TLE
obj_regex = r"1 (.....). (........)"
[(norad_id, cospar_id)] = re.findall(obj_regex, tba_tle[1])
t, dfreq, dfreqt = propagate(observation, start, nseconds, freq0, tle, tba_tle)
if plot_metadata:
# "new" method: Create figure and axes from plot metadata
fig, ax, ax_utc = plot_overlay_new(plot_metadata)
else:
# "deprecated" method: Create figure and axes from collected information
fig, ax = plot_overlay(
freq_limits=(-args.frequency_range, args.frequency_range),
time_limits=(0, nseconds),
)
# Add label with observation_id
plot_textbox(observation_id, ax, observation, tle_basename)
# Plot line with propagation results
ax.plot(dfreq + args.frequency_offset, t, color='red', linewidth=1)
# Terrestrial transmission
ax.plot(dfreqt - 5 + args.frequency_offset, t, color='purple', linewidth=1)
if plot_metadata and observation['demoddata']:
timestamps = np.array(parse_demoddata_timestamps(observation))
x = np.zeros_like(timestamps) + 0.95 * ax_utc.get_xlim()[0]
y = timestamps
ax_utc.plot(x, y, marker='*', linestyle='', color='red', markersize=10)
# bbox_inches='tight' was always used before the client started to create metadata
bbox_inches = 'tight' if not plot_metadata else None
fig.savefig(overlay_path, bbox_inches=bbox_inches, transparent=True)
plt.close(fig)
combined_path = output_path / (tstart + '_' + str(observation_id) + '_' + norad_id + '_' + cospar_id.replace(' ', '') + '_'+ tba_tle[0].replace(' ','-').replace('/','_') + '_' + str(abs(obs_tle_epochs_diff)) + '_' + bef_after + '_r' + str(args.frequency_range) + '_f' + str(args.frequency_offset) + ".png")
combine_images(
background=waterfall_path,
overlay=overlay_path,
output_filename=combined_path,
)
print(f"Output written to '{combined_path}'")
if args.open and platform.system() == 'Linux':
subprocess.call(['xdg-open', combined_path])
if not args.keep_created_files:
os.remove(overlay_path)
if not args.keep_audio:
os.remove(audio_path)
if not args.keep_waterfall:
os.remove(waterfall_path)
def propagate(observation, start, nseconds, freq0, tle, tba_tle):
satellite1 = ephem.readtle('sat', tle[0], tle[1])
satellite2 = ephem.readtle(tba_tle[0], tba_tle[1], tba_tle[2])
observer = ephem.Observer()
observer.lat = str(observation['station_lat'])
observer.lon = str(observation['station_lng'])
observer.elevation = observation['station_alt']
times = [start+datetime.timedelta(seconds=s) for s in range(0, nseconds)]
v1 = []
v2 = []
vt = []
for t in times:
observer.date = t
satellite1.compute(observer)
satellite2.compute(observer)
v1.append(satellite1.range_velocity)
v2.append(satellite2.range_velocity)
vt.append(0)
freq1 = freq0*(1.0-np.array(v1)/299792458.0)
freq2 = freq0*(1.0-np.array(v2)/299792458.0)
freqt = freq0*(1.0-np.array(vt)/299792458.0)
dfreq = (freq2-freq1)*1000.0
dfreqt = (freqt-freq1)*1000.0
t = np.arange(nseconds)
return t, dfreq, dfreqt
def main():
args = parse_args()
for observation_id in args.observations:
try:
observation, tle, freq0, sat_name = fetch_observation(observation_id)
except NoAudioError:
print(f"Warning: No audio file, skipping observation {observation_id:d}")
continue
if args.verbose:
print(observation)
if args.store_tle:
with open(args.tle_path, 'a') as fp:
fp.write(f'TLE in Observation {observation_id}\n')
fp.write(observation['tle1'] + '\n')
fp.write(observation['tle2'] + '\n')
run_ikhnos(observation_id, observation, tle, freq0, sat_name, args)
if __name__ == '__main__':
main()