Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TLE line identification #21

Merged
merged 4 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 58 additions & 26 deletions pygac/gac_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class GACReader(object):
scan_freq = 2.0/1000.0
"""Scanning frequency (scanlines per millisecond)"""

def __init__(self):
def __init__(self, tle_thresh=7):
self.tle_thresh = tle_thresh
self.head = None
self.scans = None
self.spacecraft_name = None
Expand Down Expand Up @@ -174,7 +175,7 @@ def lineno2msec(self, scan_line_number):
return (scan_line_number - 1) / self.scan_freq

def compute_lonlat(self, utcs=None, clock_drift_adjust=True):
tle1, tle2 = self.get_tle_lines()
tle1, tle2 = self.get_tle_lines(threshold=self.tle_thresh)

scan_points = np.arange(3.5, 2048, 5)

Expand Down Expand Up @@ -254,6 +255,26 @@ def get_calibrated_channels(self):
self.spacecraft_name)
return channels

@staticmethod
def tle2datetime64(times):
"""Convert TLE timestamps to numpy.datetime64

Args:
times (float): TLE timestamps as %y%j.1234, e.g. 18001.25
"""
# Convert %y%j.12345 to %Y%j.12345 (valid for 1950-2049)
times = np.where(times > 50000, times + 1900000, times + 2000000)

# Convert float to datetime64
doys = (times % 1000).astype('int') - 1
years = (times // 1000).astype('int')
msecs = np.rint(24 * 3600 * 1000 * (times % 1))
times64 = (years - 1970).astype('datetime64[Y]').astype('datetime64[ms]')
times64 += doys.astype('timedelta64[D]')
times64 += msecs.astype('timedelta64[ms]')

return times64

def get_tle_file(self):
conf = ConfigParser.ConfigParser()
try:
Expand All @@ -273,45 +294,56 @@ def get_tle_file(self):
with open(tle_filename, 'r') as fp_:
return fp_.readlines()

def get_tle_lines(self):
def get_tle_lines(self, threshold):
"""Find closest two line elements (TLEs) for the current orbit

Raises:
IndexError, if the closest TLE is more than <threshold> days apart
"""
if self.tle_lines is not None:
return self.tle_lines
tle_data = self.get_tle_file()
tm = self.times[0]
sdate = (int(tm.strftime("%Y%j")) +
(tm.hour +
(tm.minute +
(tm.second +
tm.microsecond / 1000000.0) / 60.0) / 60.0) / 24.0)

dates = np.array([float(line[18:32]) for line in tle_data[::2]])
dates = np.where(dates > 50000, dates + 1900000, dates + 2000000)

tle_data = self.get_tle_file()
sdate = np.datetime64(self.times[0], '[ms]')
dates = self.tle2datetime64(
np.array([float(line[18:32]) for line in tle_data[::2]]))

# Find index "iindex" such that dates[iindex-1] < sdate <= dates[iindex]
# Notes:
# 1. If sdate < dates[0] then iindex = 0
# 2. If sdate > dates[-1] then iindex = len(dates), beyond the right boundary!
iindex = np.searchsorted(dates, sdate)

if ((iindex == 0 and abs(sdate - dates[0]) > 7) or
(iindex == len(dates) - 1 and abs(sdate - dates[-1]) > 7)):
raise IndexError(
"Can't find tle data for %s on the %s" %
(self.spacecraft_name, tm.isoformat()))

if abs(sdate - dates[iindex - 1]) < abs(sdate - dates[iindex]):
if iindex in (0, len(dates)):
if iindex == len(dates):
# Reset index if beyond the right boundary (see note 2. above)
iindex -= 1
elif abs(sdate - dates[iindex - 1]) < abs(sdate - dates[iindex]):
# Choose the closest of the two surrounding dates
iindex -= 1

if abs(sdate - dates[iindex]) > 3:
LOG.warning("Found TLE data for %f that is %f days appart",
sdate, abs(sdate - dates[iindex]))
# Make sure the TLE we found is within the threshold
delta_days = abs(sdate - dates[iindex]) / np.timedelta64(1, 'D')
if delta_days > threshold:
raise IndexError(
"Can't find tle data for %s within +/- %d days around %s" %
(self.spacecraft_name, threshold, sdate))

if delta_days > 3:
LOG.warning("Found TLE data for %s that is %f days appart",
sdate, delta_days)
else:
LOG.debug("Found TLE data for %f that is %f days appart",
sdate, abs(sdate - dates[iindex]))
LOG.debug("Found TLE data for %s that is %f days appart",
sdate, delta_days)

# Select TLE data
tle1 = tle_data[iindex * 2]
tle2 = tle_data[iindex * 2 + 1]
self.tle_lines = tle1, tle2
return tle1, tle2

def get_angles(self):
tle1, tle2 = self.get_tle_lines()
tle1, tle2 = self.get_tle_lines(threshold=self.tle_thresh)
orb = Orbital(self.spacecrafts_orbital[self.spacecraft_id],
line1=tle1, line2=tle2)

Expand Down
69 changes: 50 additions & 19 deletions pygac/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,11 @@

import datetime
import unittest
import mock
import numpy as np
from pygac.gac_reader import GACReader


class GACReaderDummy(GACReader):
def _get_times(self):
pass

def get_header_timestamp(self):
pass

def read(self, filename):
pass

@property
def tsm_affected_intervals(self):
pass


class TestGacReader(unittest.TestCase):
"""Test the common GAC Reader"""

Expand All @@ -52,9 +38,10 @@ def test_to_datetime64(self):
msg='Conversion from (year, jday, msec) to datetime64 '
'is not correct')

def test_midnight_scanline(self):
@mock.patch.multiple('pygac.gac_reader.GACReader', __abstractmethods__=set())
def test_midnight_scanline(self, *mocks):
"""Test midnight scanline computation"""
reader = GACReaderDummy()
reader = GACReader()

# Define test cases...
# ... midnight scanline exists
Expand All @@ -72,15 +59,59 @@ def test_midnight_scanline(self):
self.assertEqual(reader.get_midnight_scanline(), scanline,
msg='Incorrect midnight scanline')

def test_miss_lines(self):
reader = GACReaderDummy()
@mock.patch.multiple('pygac.gac_reader.GACReader', __abstractmethods__=set())
def test_miss_lines(self, *mocks):
"""Test detection of missing scanlines"""
reader = GACReader()
lines = [2, 4, 5, 6, 10, 11, 12]
miss_lines_ref = [1, 3, 7, 8, 9]
reader.scans = np.zeros(len(lines), dtype=[('scan_line_number', 'i2')])
reader.scans['scan_line_number'] = lines
self.assertTrue((reader.get_miss_lines() == miss_lines_ref).all(),
msg='Missing scanlines not detected correctly')

def test_tle2datetime64(self, *mocks):
"""Test conversion from TLE timestamps to datetime64"""
dates = np.array([70365.1234, 18001.25])
dates64_exp = [np.datetime64(datetime.datetime(1970, 12, 31, 2, 57, 41, 760000), '[ms]'),
np.datetime64(datetime.datetime(2018, 1, 1, 6, 0), '[ms]')]
dates64 = GACReader.tle2datetime64(dates)
self.assertTrue(np.all(dates64 == dates64_exp))

@mock.patch.multiple('pygac.gac_reader.GACReader', __abstractmethods__=set())
@mock.patch('pygac.gac_reader.GACReader.get_tle_file')
def test_get_tle_lines(self, get_tle_file, *mocks):
"""Test identification of closest TLE lines"""
tle_data = ['1 38771U 12049A 18363.63219793 -.00000013 00000-0 14176-4 0 9991\r\n',
'2 38771 98.7297 60.1350 0002062 95.9284 25.0713 14.21477560325906\r\n',
'1 38771U 12049A 18364.62426010 -.00000015 00000-0 13136-4 0 9990\r\n', # 2018-12-30 14:58
'2 38771 98.7295 61.1159 0002062 94.5796 60.2561 14.21477636326047\r\n',
'1 38771U 12049A 18364.94649306 -.00000018 00000-0 12040-4 0 9996\r\n', # 2018-12-30 22:42
'2 38771 98.7295 61.4345 0002060 94.1226 268.7521 14.21477633326092\r\n',
'1 38771U 12049A 18365.81382142 -.00000015 00000-0 13273-4 0 9991\r\n',
'2 38771 98.7294 62.2921 0002057 92.7653 26.0030 14.21477711326215\r\n']

expected = {
datetime.datetime(2018, 12, 20, 12, 0): None,
datetime.datetime(2018, 12, 28, 12, 0): 0,
datetime.datetime(2018, 12, 30, 16, 0): 2,
datetime.datetime(2018, 12, 30, 20, 0): 4,
datetime.datetime(2019, 1, 1, 12, 0): 6,
datetime.datetime(2019, 1, 8, 12, 0): None
}

get_tle_file.return_value = tle_data
reader = GACReader()
for time, tle_idx in expected.items():
reader.times = [time]
reader.tle_lines = None
if tle_idx is None:
self.assertRaises(IndexError, reader.get_tle_lines, threshold=7)
else:
tle1, tle2 = reader.get_tle_lines(threshold=7)
self.assertEqual(tle1, tle_data[tle_idx])
self.assertEqual(tle2, tle_data[tle_idx + 1])


def suite():
"""The suite for test_reader"""
Expand Down