Source code for agis

# -*- coding: utf-8 -*-
"""
:File: agis.py

:purpose: Contains implementation of classes Calc_source and Agis

:Authors: LucaZampieri

.. note:
    - In this file, when there is a reference, unless explicitly stated otherwise,
    it refers to Lindegren main article:
    "The astronometric core solution for the Gaia mission - overview of models,
    algorithms, and software implementation" by L. Lindegren, U. Lammer,
    D. Hobbs, W. O'Mullane, U. Bastian, and J.Hernandez
    The reference is usually made in the following way; Ref. Paper [LUDW2011]_ eq. [1]
    - t (float),  time from J2000 [days]
    such that t_ep = 0

Description of what we are trying to achieve in this file
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
It is a simplification of what is in [LUDW2011]_ .
**If in doubt, check with the paper.**

The goal would be to minimize:

.. math::
    min_{s,a} Q = \sum_{l \\in AL} (R^{AL}_l)^2
        + \sum_{l \\in AC}  (R^{AC_l})^2

In the following we might forget about :math:`R^{AL}, R^{AC}` and just use
:math:`R_l` (for simplicity)

The necessary condition for optimality would be that the derivative is null. So
given that we want to optimize with respect to variable **x** we would have:

.. math::
    \\frac{dQ}{dx} = \sum_l 2 R_l \\frac{dR_l}{dx} = 0

Later we will see that x will correspond to the source parameters and the
coefficient describing the attitude. (i.e. :math:`x \in \{s, a\}`, **s** being
the the set of 5 source parameter and a being the coeffeicients of the splines
for each of the four quaternions parameters describing the attitude of the
satellite at each time)

This condition being sufficient if Q is convex.

Q being non-linear we are gonna linearize it. Assuming our initial guess is good
enough we can write the optimal set of parameters
:math:`x^* = x^{ref} + x`

We would get:

.. math::
    R_l(x^*) = R_l(x^{ref}) + \\frac{dR_l(x^{ref})}{dx}(x^*-x^{ref})
    = R_l(x^{ref}) + \\frac{dR(x^{ref})}{dx}  x

    Q(x)=[R_l(x^{ref})^2 + 2 R_l(x^{ref}) \\frac{dR(x^{ref})}{dx}x
          + (\\frac{dR_l(x^{ref}) }{dx}x)^2]

Thus at optimality we should have (:math:`\\frac{dQ}{dx}=0`):

.. math::
    [\\frac{dR_l(x^{ref})}{dx}]^T [\\frac{dR_l(x^{ref})}{dx}] x
    = - \\frac{dR_l(x^{ref})}{dx} R_l(x^{ref})

.. note::
    Maily for next contributors:

    - Chowleski factorization (:math:`LL^T`) is about twice as efficient as LU
      factorization (default used by numpy.linalg.solve/scipy) if the matrix is
      hermitian which is the case for us (we have a symmetric matrix)

"""
# # Imports
# Local modules
import frame_transformations as ft
import constants as const
from satellite import Satellite
from source import Source
from agis_functions import *

# global modules
import numpy as np
from scipy.interpolate import BSpline
from scipy import sparse as sps
import quaternion  # moble's quaternion (numpy compatible quaternions)


[docs]class Calc_source: """ Contains the calculated parameters per source """
[docs] def __init__(self, name=None, obs_times=[], source_params=None, mu_radial=None, mean_color=0, source=None): """ Data structure containing our computed parameters for the source in question. :param name: [string] the name of the source :param obs_times: [list or array] of the observed times for this source :param source_params: [list or array] alpha, delta, parallax, mu_alpha, mu_delta :param mu_radial: [float] radial velocity of the source (appart since we do not solve for radial velocity) :param mean_color: [float] mean color of the source if we want to use explore chromatic aberration :param source: [source] instead of most of the above parameters we can provide a source object instead and take the data from it. Manually providing the parameters will override the source parameter see :class:`source.Source` >>> calc_source = Calc_source('calc_sirio', [1, 2.45, 12], [1, 2, 3, 4, 5], 6) >>> calc_source = Calc_source(obs_times=[1, 2, 3], source=sirio) # where sirio is a source object """ if source is not None: name = 'Calc_' + source.name params = source.get_parameters() source_params = params[0:-1] mu_radial = params[-1] mean_color = source.mean_color self.name = name self.obs_times = obs_times # times at which it has been observed self.s_params = source_params # position at which it has been observed self.mu_radial = mu_radial # not considered an unknown of the problem self.s_old = [self.s_params] self.errors = [] self.mean_color = mean_color
def set_params(self, params): self.s_params = params self.s_old = [self.s_params]
[docs]class Agis:
[docs] def __init__(self, sat, calc_sources=[], real_sources=[], attitude_splines=None, verbose=False, spline_degree=3, attitude_regularisation_factor=0, updating='attitude', degree_error=0, double_telescope=False): """ :param sat: [Satellite] :param calc_sources: [list of Calc_sources] :param real_sources: [liste of Sources] :param attitude_splines: list of splines, only if we update attitude :param verbose: [bool] if True prints intermediate results :param spline_degree: degree of the splines :param attitude_regularization_factor: [float] lambda in the paper. Describes how much we take into account the regularization into our minimization problem :param updating: [string] defines what we are currently updating can be: `'source', 'scanned source' or 'attitude'` :param degree_error: :param double_telescope: [bool] if True uses two telescopes (gaia-like) """ # Raise an error if invalid initialization if len(real_sources) != len(calc_sources): raise ValueError('there must be the same number of real and calc sources') # Objects #: List of the sources objects self.real_sources = real_sources #: List of calculated sources with 1-1 correspondance to the real sources self.calc_sources = calc_sources #: Satellite object that we are using to solve the problem self.sat = sat # Constants #: Degree of the interpolating polynomial self.k = spline_degree #: Order of the spline (degree+1) self.M = self.k + 1 self.attitude_regularisation_factor = attitude_regularisation_factor self.verbose = verbose self.updating = updating self.consider_stellar_aberation = False # TODO: remove because obsolete? self.degree_error = degree_error # [only for source] deviation in vertical direction of the attitude self.double_telescope = double_telescope # bool indicating if we use the double_telescope config # Mutable: self.iter_counter = 0 self.N = 0 # not necessary self.attitude_der_matrix = None # sparse attitude derivative matrix self.attitude_reg_matrix = None # sparse attitude derivative matrix self.discretized_attitude = None # attitude evaluated at all the observed times # Setting observation times all_obs_times = [] #: [dict] containig the observed times assiciated with their source index self.time_dict = {} for source_index, calc_source in enumerate(self.calc_sources): all_obs_times += list(calc_source.obs_times) for t in calc_source.obs_times: self.time_dict[t] = source_index #: [array] containing the combined observation time of all the sources self.all_obs_times = np.sort(all_obs_times) # Set attitude if attitude_splines is not None: # Set everything for the attitude c, t, s = extract_coeffs_knots_from_splines(attitude_splines, self.k) self.c = c.copy() self.att_coeffs, self.att_knots, self.attitude_splines = (c.copy(), t, s) self.att_bases = get_basis_Bsplines(self.att_knots, self.att_coeffs[0], self.k, self.all_obs_times) self.N = self.att_coeffs.shape[1] # number of coeffs per parameter if (self.att_knots.shape[0] - self.att_coeffs.shape[1]) != self.M: raise ValueError('there should be M coeffs less than the number of knots')
# ### Generic functions for all kind of updating -----------------------------------
[docs] def reset_iterations(self): """ Resetting the iteration counter """ print('Not resetting everything! Call again the solver instead') self.iter_counter = 0 for calc_source in self.calc_sources: calc_source.s_old = [] calc_source.errors = []
[docs] def error_function(self): """ | Ref. Paper [LUDW2011]_ eq. [24] | Compute the error function Q .. math:: Q = \sum_{l \\in AL} (R^{AL})^2 + \sum_{l \\in AC} (R^{AC})^2 """ error = 0 for source_index, s in enumerate(self.calc_sources): if self.verbose: print('source: {}'.format(s.s_params)) for j, t_L in enumerate(s.obs_times): R_L_AL, R_L_AC = self.compute_R_L(source_index, t_L) error += (R_L_AL ** 2 + R_L_AC ** 2) return error / self.all_obs_times.shape[0] # /const.rad_per_mas
[docs] def get_field_angles(self, source_index, t): """ Uses functions :meth:`get_attitude_for_source`, :meth:`get_attitude`, :meth:`observed_field_angles`, :meth:`calculated_field_angles`, :meth:`satellite.Satellite.func_attitude` :param source_index: [int] index of the source :param t: [float] [days] time at which :returns: eta_obs, zeta_obs, eta_calc, zeta_calc """ # Set attitude, it depends if we wanna update only sources or also attitude params if self.updating == 'source': attitude = self.get_attitude_for_source(source_index, t) attitude_gaia = attitude elif self.updating == 'scanned source': attitude = self.sat.func_attitude(t) attitude_gaia = attitude elif self.updating == 'attitude': attitude = self.get_attitude(t) attitude_gaia = self.sat.func_attitude(t) else: raise ValueError('incorrect value for self.updating') eta_obs, zeta_obs = observed_field_angles(self.real_sources[source_index], attitude_gaia, self.sat, t, self.double_telescope) eta_calc, zeta_calc = calculated_field_angles(self.calc_sources[source_index], attitude, self.sat, t, self.double_telescope) return eta_obs, zeta_obs, eta_calc, zeta_calc
[docs] def deviate_field_angles_color_aberration(self, source_index, t, angles): """ | apply color aberration deviation to field angles (eta, zeta) .. note:: If we the color aberration is not given in the sources, nothing happens and the same angles are returned. """ # # WARNING: check also deviation in the source update eta_obs, zeta_obs, eta_calc, zeta_calc = angles # if self.degree_error != 0: f_color = self.real_sources[source_index].func_color(t) # # TODO: separate eta zeta m_color = self.real_sources[source_index].mean_color eta_obs, zeta_obs = compute_deviated_angles_color_aberration(eta_obs, zeta_obs, f_color, self.degree_error) eta_calc, zeta_calc = compute_deviated_angles_color_aberration(eta_calc, zeta_calc, m_color, self.degree_error) return eta_obs, zeta_obs, eta_calc, zeta_calc
[docs] def compute_R_L(self, source_index, t): """ | Ref. Paper [LUDW2011]_ eq. [25]-[26]. | :math:`R = eta_{obs} + zeta_{obs} - eta_{calc} - zeta_{calc}` :note: R_AL = R_eta, R_AC = R_zeta :returns: [tuple] R_eta, R_zeta """ # WARNING: maybe source is not in the field of vision of sat at time t! R_L_eta, R_L_zeta = (0, 0) angles = self.get_field_angles(source_index, t) eta_obs, zeta_obs, eta_calc, zeta_calc = self.deviate_field_angles_color_aberration(source_index, t, angles) R_L_eta = eta_obs - eta_calc # AL R_L_zeta = zeta_obs - zeta_calc # AC return (R_L_eta, R_L_zeta)
[docs] def iterate(self, num, use_sparse=False, verbosity=0): """ Do _num_ iterations """ if self.verbose is True: verbosity += 1 for i in range(num): self.iter_counter += 1 if verbosity > 0: print('***** Iteration: {} *****'.format(self.iter_counter)) if verbosity > 1: print('Error before iteration: {}'.format(self.error_function())) if self.updating == 'source' or self.updating == 'scanned source': self.update_S_block() elif self.updating == 'attitude': self.update_A_block(use_sparse) error = error_between_func_attitudes(self.all_obs_times, self.sat.func_attitude, self.get_attitude) if verbosity > 1: print('attitude error:', error) if verbosity > 0: print('Error after iteration: {}'.format(self.error_function()))
# ### End generic functions ################################################ # # ### Functions only for sources updating (source and scanned source)
[docs] def update_S_block(self): """ Performs the update of the source parameters """ for i, calc_source in enumerate(self.calc_sources): calc_source.s_old.append(calc_source.s_params.copy()) calc_source.errors.append(self.error_function()) self.update_block_S_i(i)
[docs] def update_block_S_i(self, source_index): """ | Ref. Paper [LUDW2011]_ eq. [57] | Uses function :meth:`block_S_error_rate_matrix` :param source_index: [int] Index of the source that will be updated :action: update source number *source_index* """ calc_source = self.calc_sources[source_index] A = self.block_S_error_rate_matrix(source_index) W = np.eye(len(calc_source.obs_times)) h = self.compute_h(source_index) LHS = A.transpose() @ W @ A RHS = A.transpose() @ W @ h d = np.linalg.solve(LHS, RHS) d = d.flatten() self.calc_sources[source_index].s_params[:] += d if self.verbose: print('dim A:{}\ndim W:{}\ndim h:{}\ndim d:{}' .format(A.shape, W.shape, h.shape, d.shape))
[docs] def compute_h(self, source_index): """ | Ref. Paper [LUDW2011]_ eq. [59] | Source update Right hand side | Uses functions :meth:`compute_R_L` .. warning:: here we sum along-scan and across-scan observations. It should be good for this update, but be carefull in future implementations :param source_index: [int] Index of the source that will be updated """ calc_source = self.calc_sources[source_index] h = np.zeros((len(calc_source.obs_times), 1)) for i, t_L in enumerate(calc_source.obs_times): R_L_AL, R_L_AC = self.compute_R_L(source_index, t_L) h[i, 0] = (R_L_AL + R_L_AC) return h
[docs] def block_S_error_rate_matrix(self, source_index): """ | Ref. Paper [LUDW2011]_ eq. [58] | error matrix for the block update S uses :meth:`compute_du_ds` and :meth:`dR_ds` :param source_index: [int] Index of the source that will be updated """ du_ds = self.compute_du_ds(source_index) dR_ds_AL, dR_ds_AC = self.dR_ds(source_index, du_ds) return - (dR_ds_AL + dR_ds_AC)
[docs] def dR_ds(self, source_index, du_ds): """ Ref. Paper eq. [71] Computes the derivative of the error (R_l) wrt the 5 astronomic parameters s_i transposed. :param source_index: [int] Index of the source that will be updated :param du_ds: [numpy array] derivative of the topocentric function wrt astronometric parameters :returns: [tuple of numpy arrays] with derivatives of observations in the Along-scan (AL) and Across-scan (AC) directions """ calc_source = self.calc_sources[source_index] dR_ds_AL = np.zeros((len(calc_source.obs_times), 5)) dR_ds_AC = np.zeros(dR_ds_AL.shape) # Iterate through the observation times of the source we are currently # updating for i, t_L in enumerate(calc_source.obs_times): if self.updating == 'source': attitude = self.get_attitude_for_source(source_index, t_L) elif self.updating == 'scanned source': attitude = self.sat.func_attitude(t_L) else: raise ValueError('not yet implemented for this kind of updating') # Set double_telescope to False to get phi phi, zeta = calculated_field_angles(calc_source, attitude, self.sat, i, double_telescope=False) phi, zeta = compute_deviated_angles_color_aberration(phi, zeta, calc_source.mean_color, self.degree_error) m, n, u = compute_mnu(phi, zeta) dR_ds_AL[i, :] = -m @ du_ds[:, :, i].transpose() * helpers.sec(zeta) dR_ds_AC[i, :] = -n @ du_ds[:, :, i].transpose() return (dR_ds_AL, dR_ds_AC)
[docs] def compute_du_ds(self, source_index): """ | Ref. Paper [LUDW2011]_ eq. [73] | Compute dũ_ds for a given source .. note:: - b_G(t) barycentric position of Gaia at the time of observation, also called barycentric ephemeris of the Gaia Satellite - t_B barycentric time (takes into account the Römer delay) - t_ep in the paper is not used since we assume t_ep=0 and start counting the time from J2000 Uses :meth:`CoMRS_to_SRS_for_source_derivatives`, :meth:`satellite.Satellite.ephemeris_bcrs` and :meth:`agis_functions.compute_du_dparallax` :param source_index: [int] Index of the source that will be updated :returns: du_ds - source derivatives (wrt astrometric parameters) in SRS """ # In this function consider all u as being ũ! (for notation we call them here u) # Values needed to compute the derivatives calc_source = self.calc_sources[source_index] n_i = len(calc_source.obs_times) # the number of observations du_ds = np.zeros((5, 3, n_i)) alpha = calc_source.s_params[0] delta = calc_source.s_params[1] p, q, r = ft.compute_pqr(alpha, delta) r.shape = (3, 1) # reshapes r # For each observation compute du/ds for j, t_l in enumerate(calc_source.obs_times): # t_l being the observation time b_G = self.sat.ephemeris_bcrs(t_l) t_B = t_l # + np.dot(r, b_G) / const.c tau = t_B - const.t_ep # Compute derivatives du_dalpha = p du_ddelta = q du_dparallax = compute_du_dparallax(r, b_G) du_dmualpha = p*tau du_dmudelta = q*tau CoMRS_derivatives = [du_dalpha, du_ddelta, du_dparallax, du_dmualpha, du_dmudelta] SRS_derivatives = self.CoMRS_to_SRS_for_source_derivatives(CoMRS_derivatives, calc_source, t_l, source_index) du_ds[:, :, j] = SRS_derivatives return du_ds
[docs] def CoMRS_to_SRS_for_source_derivatives(self, CoMRS_derivatives, calc_source, t_L, source_index): """ | Ref. Paper [LUDW2011]_ eq. [72] | rotate the frame from CoRMS (lmn) to SRS (xyz) for the given derivatives :param CoMRS_derivatives: [list] of deriatives [du_dalpha, du_ddelta, du_dparallax, du_dmualpha, du_dmudelta] :param calc_source: [Calc_source] :param t_L: [float] time of observation :param source_index: [int] """ SRS_derivatives = [] if self.updating == 'source': attitude = self.get_attitude_for_source(source_index, t_L) elif self.updating == 'scanned source': attitude = self.sat.func_attitude(t_L) else: # attitude = self.get_attitude(t_L) raise ValueError('Not yet implemented for this case') for derivative in CoMRS_derivatives: # TODO: remove these ugly for loop SRS_derivatives.append(ft.lmn_to_xyz(attitude, derivative)) return SRS_derivatives
[docs] def get_attitude_for_source(self, source_index, t): """ For only source updating with **color aberration**. Change if condition to decide which sources are affected by that aberration :param source_index: [int] Index of the source that will be updated :param t: [float] [days] time at which we want the attitude """ if source_index < 0: deviation = self.degree_error * const.rad_per_deg # number in degrees and converted in radians else: deviation = 0 return attitude_from_alpha_delta(self.real_sources[source_index], self.sat, t, deviation)
# ### End function only for source updating ################################ # # ### For attitude update --------------------------------------------------
[docs] def get_attitude(self, t, unit=True): """ Get attitude from the attitude coefficients at time *t*. If *unit* is True, the return normalized attitude. :param t: [float] time at which we desire the attitude :param unit: [bool] if true normalize the quaternion :returns: [Quaternion object] attitude """ s_w = self.attitude_splines[0] s_x = self.attitude_splines[1] s_y = self.attitude_splines[2] s_z = self.attitude_splines[3] attitude = np.quaternion(s_w(t), s_x(t), s_y(t), s_z(t)) if unit: attitude = attitude.normalized() return attitude
[docs] def actualise_splines(self): """ :action: Update the splines re-creating them from the new coefficients """ for i in range(self.attitude_splines.shape[0]): self.attitude_splines[i] = BSpline(self.att_knots, self.att_coeffs[i], k=self.k)
[docs] def update_A_block(self, use_sparse=False): # one """ | Ref. Paper [LUDW2011]_ Section 5.2 | Solve for the attitude """ if use_sparse is True: der_band, reg_band = self.compute_attitude_banded_derivative_and_regularisation_matrices() self.compute_sparses_matrices(der_band, reg_band) LHS = self.attitude_der_matrix + self.attitude_reg_matrix RHS = self.compute_attitude_RHS() d = sps.linalg.spsolve(LHS, RHS) else: LHS = self.compute_attitude_LHS() # LHS = self.N_aa RHS = self.compute_attitude_RHS() # RHS = self.h d = np.linalg.solve(LHS, RHS) # L = np.linalg.cholesky(LHS) # y = np.linalg.solve(L, RHS) # d = np.linalg.solve(L.T, y) # d = np.linalg.lstsq(LHS, RHS) # not what it is for self.d = d.reshape(self.att_coeffs.shape) c_update = d.reshape(self.att_coeffs.shape) for i in range(0, self.att_coeffs.shape[1]): c_update[0, i] = d[i*4] c_update[1, i] = d[i*4+1] c_update[2, i] = d[i*4+2] c_update[3, i] = d[i*4+3] self.c_update = c_update.copy() self.att_coeffs[:, :] += c_update[:, :].copy() self.actualise_splines() # Create the new splines
[docs] def update_A_block_bis(self): """ | updates the four components separately (wrong but not by much) | It should work if we update just some of the blocks (like one or two) """ LHS = self.compute_attitude_LHS() RHS = self.compute_attitude_RHS() for i in range(4): d = np.linalg.solve(LHS[i::4, i::4], RHS[i::4]) c_update = d.reshape(-1) self.att_coeffs[i] += c_update self.actualise_splines() # Create the new splines
[docs] def compute_attitude_LHS(self): """ Ref. Paper [LUDW2011]_ eq. [82] """ N_aa_dim = self.att_coeffs.shape[1] # *4 N_aa = np.zeros((N_aa_dim*4, N_aa_dim*4)) for n in range(0, N_aa_dim): # # TODO: take advantage of the symmetry for m in range(0, N_aa_dim): # # TODO: avoid doing the brute force version # for m in range(max((n-self.k), 0), min((n+self.k)+1, N_aa_dim+1)): N_aa[n*4:n*4+4, m*4:m*4+4] = self.compute_Naa_mn(m, n) self.N_aa = N_aa return N_aa
[docs] def compute_attitude_RHS(self): """ Ref. Paper [LUDW2011]_ eq. [82] """ N_aa_dim = self.att_coeffs.shape[1] RHS = np.zeros((N_aa_dim*4, 1)) for n in range(0, N_aa_dim): RHS[n*4:n*4+4] = self.compute_attitude_RHS_n(n) self.h = RHS.copy() return RHS
[docs] def get_source_index(self, t): """ get the index of the source corresponding to observation time t :param t: [float] observation time :return: [int] source index """ if t in self.time_dict: return self.time_dict[t] else: raise ValueError('time not in time_dict')
[docs] def compute_attitude_RHS_n(self, n_index): """ Ref. Paper [LUDW2011]_ eq. [82] :param n_index: :return: entry **n** of the Right Hand Side column vector for the attitude update """ rhs = np.zeros((4, 1)) time_support_spline_n = get_times_in_knot_interval(self.all_obs_times, self.att_knots, n_index, self.M) # Iterate through the support of spline_n for i, t_L in enumerate(time_support_spline_n): source_index = self.get_source_index(t_L) calc_source = self.calc_sources[source_index] attitude = self.get_attitude(t_L, unit=False) left_index = get_left_index(self.att_knots, t_L, M=self.M) obs_time_index = list(self.all_obs_times).index(t_L) # Get the regulation part: coeff_basis_sum = compute_coeff_basis_sum(self.att_coeffs, self.att_bases, left_index, self.M, obs_time_index) D_L = compute_attitude_deviation(coeff_basis_sum) dDL_da_n = compute_DL_da_i(coeff_basis_sum, self.att_bases, obs_time_index, n_index) # dDL_da_n = compute_DL_da_i_from_attitude(attitude, self.att_bases, obs_time_index, n_index) regularisation_part = self.attitude_regularisation_factor**2 * dDL_da_n * D_L # Get derivatives: dR_dq_AL, dR_dq_AC = compute_dR_dq(calc_source, self.sat, attitude, t_L) dR_da_n_AL = dR_da_i(dR_dq_AL, self.att_bases[n_index, obs_time_index]) dR_da_n_AC = dR_da_i(dR_dq_AC, self.att_bases[n_index, obs_time_index]) R_L_AL, R_L_AC = self.compute_R_L(source_index, t_L) rhs += regularisation_part + dR_da_n_AL * R_L_AL + dR_da_n_AC * R_L_AC return -rhs
[docs] def compute_Naa_mn(self, m_index, n_index): """ | Ref. Paper [LUDW2011]_ eq. [82] | compute dR/da (i.e. wrt coeffs) :param m_index: :param n_index: :return: entry [n, m] of the attitude normal matrix N_a """ Naa_mn = np.zeros((4, 4)) # Get times in the support of both spline_m and spline_n time_support_spline_m = get_times_in_knot_interval(self.all_obs_times, self.att_knots, m_index, self.M) time_support_spline_n = get_times_in_knot_interval(self.all_obs_times, self.att_knots, n_index, self.M) time_support_spline_mn = np.sort(helpers.get_lists_intersection(time_support_spline_m, time_support_spline_n)) # Iterate through all observation in the support of spline n and spline m for i, t_L in enumerate(time_support_spline_mn): # for i, t_L in enumerate(self.all_obs_times): calc_source = self.calc_sources[self.get_source_index(t_L)] attitude = self.get_attitude(t_L, unit=False) left_index = get_left_index(self.att_knots, t=t_L, M=self.M) obs_time_index = list(self.all_obs_times).index(t_L) # Compute the regulation part coeff_basis_sum = compute_coeff_basis_sum(self.att_coeffs, self.att_bases, left_index, self.M, obs_time_index) dDL_da_n = compute_DL_da_i(coeff_basis_sum, self.att_bases, obs_time_index, n_index) dDL_da_m = compute_DL_da_i(coeff_basis_sum, self.att_bases, obs_time_index, m_index) # dDL_da_n = compute_DL_da_i_from_attitude(attitude, self.att_bases, obs_time_index, n_index) # dDL_da_m = compute_DL_da_i_from_attitude(attitude, self.att_bases, obs_time_index, m_index) regularisation_part = self.attitude_regularisation_factor**2 * dDL_da_n @ dDL_da_m.T # Compute the original objective function part dR_dq_AL, dR_dq_AC = compute_dR_dq(calc_source, self.sat, attitude, t_L) dR_da_m_AL = dR_da_i(dR_dq_AL, self.att_bases[m_index, obs_time_index]) dR_da_m_AC = dR_da_i(dR_dq_AC, self.att_bases[m_index, obs_time_index]) dR_da_n_AL = dR_da_i(dR_dq_AL, self.att_bases[n_index, obs_time_index]) dR_da_n_AC = dR_da_i(dR_dq_AC, self.att_bases[n_index, obs_time_index]) Naa_mn += regularisation_part + dR_da_n_AL @ dR_da_m_AL.T + dR_da_n_AC @ dR_da_m_AC.T return Naa_mn
# ### Sparse implementation of attitude update-----
[docs] def compute_attitude_banded_derivative_and_regularisation_matrices(self): """ | Ref. Paper [LUDW2011]_ eq. [82] | Computes the bands of the banded matrices of the derivatives for the normal matrix and the band for the regularization of the normal matrix. :return: (der_band, reg_band) """ der_band = np.zeros((self.N*4, 16)) reg_band = np.zeros((self.N*4, 16)) for n in range(0, self.N): for i, m in enumerate(range(n, min(n+4, self.N))): der_band[n*4:n*4+4, i*4:i*4+4] = self.compute_matrix_der_mn(m, n) reg_band[n*4:n*4+4, i*4:i*4+4] = self.compute_matrix_reg_mn(m, n) return der_band, reg_band
[docs] def compute_sparses_matrices(self, der_band, reg_band): """ :action: Creates sparse banded matrices from bands and stores them in class properties :param der_band: band for the derivatives part of the normal attitude matrix :param reg_band: band for the regularization part of the normal attitude matrix """ self.attitude_der_matrix = helpers.get_sparse_diagonal_matrix_from_half_band(der_band) self.attitude_reg_matrix = helpers.get_sparse_diagonal_matrix_from_half_band(reg_band)
[docs] def compute_matrix_reg_mn(self, m_index, n_index): """ | Ref. Paper [LUDW2011]_ eq. [82] | compute :math:`\lambda^2*\\frac{dD_l}{da_m} * \\frac{dD_l}/{da_n^T}` (i.e. wrt coeffs) :param m_index: [int] :param n_index: [int] :return: reg_mn [n, m] entry of the regularisation part of the attitude normal matrix """ reg_mn = np.zeros((4, 4)) time_support_spline_m = get_times_in_knot_interval(self.all_obs_times, self.att_knots, m_index, self.M) time_support_spline_n = get_times_in_knot_interval(self.all_obs_times, self.att_knots, n_index, self.M) time_support_spline_mn = np.sort(helpers.get_lists_intersection(time_support_spline_m, time_support_spline_n)) for i, t_L in enumerate(time_support_spline_mn): left_index = get_left_index(self.att_knots, t=t_L, M=self.M) obs_time_index = list(self.all_obs_times).index(t_L) # Compute the regulation part coeff_basis_sum = compute_coeff_basis_sum(self.att_coeffs, self.att_bases, left_index, self.M, obs_time_index) dDL_da_n = compute_DL_da_i(coeff_basis_sum, self.att_bases, obs_time_index, n_index) dDL_da_m = compute_DL_da_i(coeff_basis_sum, self.att_bases, obs_time_index, m_index) reg_mn += self.attitude_regularisation_factor**2 * dDL_da_n @ dDL_da_m.T return reg_mn
[docs] def compute_matrix_der_mn(self, m_index, n_index): """ | Ref. Paper [LUDW2011]_ eq. [82] | Compute :math:`\\frac{dR_l}{da_n} \\frac{dR_l}{da_m^T}` (i.e. wrt coeffs) :param m_index: [int] :param n_index: [int] :return: der_mn, entry [n ,m] of the derivative parts of the attitude normal matrix """ der_mn = np.zeros((4, 4)) time_support_spline_m = get_times_in_knot_interval(self.all_obs_times, self.att_knots, m_index, self.M) time_support_spline_n = get_times_in_knot_interval(self.all_obs_times, self.att_knots, n_index, self.M) time_support_spline_mn = np.sort(helpers.get_lists_intersection(time_support_spline_m, time_support_spline_n)) for i, t_L in enumerate(time_support_spline_mn): # for i, t_L in enumerate(self.all_obs_times): calc_source = self.calc_sources[self.get_source_index(t_L)] attitude = self.get_attitude(t_L, unit=False) obs_time_index = list(self.all_obs_times).index(t_L) # Compute the original objective function part dR_dq_AL, dR_dq_AC = compute_dR_dq(calc_source, self.sat, attitude, t_L) dR_da_m_AL = dR_da_i(dR_dq_AL, self.att_bases[m_index, obs_time_index]) dR_da_m_AC = dR_da_i(dR_dq_AC, self.att_bases[m_index, obs_time_index]) dR_da_n_AL = dR_da_i(dR_dq_AL, self.att_bases[n_index, obs_time_index]) dR_da_n_AC = dR_da_i(dR_dq_AC, self.att_bases[n_index, obs_time_index]) der_mn += dR_da_n_AL @ dR_da_m_AL.T + dR_da_n_AC @ dR_da_m_AC.T return der_mn
# ### Implementation with moble's quaternion # This functions are not yet ready, but are here to be for a future # implementation of numpy-compatible implementation def compute_attitude_splines(self): s_w = self.attitude_splines[0] s_x = self.attitude_splines[1] s_y = self.attitude_splines[2] s_z = self.attitude_splines[3] splines_coeffs = np.array([s_w, s_x, s_y, s_z]).T self.discretized_attitude = quaternion.from_float_array(splines_coeffs) return self.discretized_attitude def get_attitude_from_attitude_array(self, t): raise ValueError('This function is not complete') pass def compute_stuff_for_source(self, s): raise ValueError('This function is not complete') alpha, delta, parallax, mu_alpha, mu_delta = calc_source.s_params[:] params = np.array([alpha, delta, parallax, mu_alpha, mu_delta, calc_source.mu_radial]) Cu = compute_topocentric_direction(params, sat, t) # u in CoMRS frame Su = ft.lmn_to_xyz(attitude, Cu) # u in SRS frame phi, zeta = compute_field_angles(Su, double_telescope=False) pass
if __name__ == '__main__': print('Executing agis.py as main file')