Source code for symforce.opt.optimization_problem

# ----------------------------------------------------------------------------
# SymForce - Copyright 2022, Skydio, Inc.
# This source code is under the Apache 2.0 license found in the LICENSE file.
# ----------------------------------------------------------------------------

import itertools

import symforce.symbolic as sf
from symforce import logger
from symforce import ops
from symforce import typing as T
from symforce.codegen import codegen_config
from symforce.codegen.backends.cpp.cpp_config import CppConfig
from symforce.opt.factor import Factor
from symforce.opt.numeric_factor import NumericFactor
from symforce.opt.sub_problem import SubProblem
from symforce.python_util import dots_and_brackets_to_underscores
from symforce.values import Values

[docs]class OptimizationProblem: """ An optimization problem. Defined by a collection of :class:`.sub_problem.SubProblem`, each of which defines a set of inputs (variables in the :class:`Values <symforce.values.values.Values>`) and a set of residuals. SubProblems are generally expected to expose inputs that are used by other subproblems; these dependencies should be handled by the user while constructing the ``residual_blocks`` argument. Typical workflow is to construct a set of SubProblems (which should also construct each SubProblem Inputs), build the ``residual_blocks`` Values by calling :meth:`.sub_problem.SubProblem.build_residuals` on each subproblem with the appropriate arguments, and then pass the subproblems and ``residual_blocks`` to the :class:`OptimizationProblem` constructor. Args: subproblems: Mapping from subproblem names to subproblems residual_blocks: Values where each leaf is a :class:`.residual_block.ResidualBlock`, containing all the residuals for the problem. Typically created by calling :meth:`.sub_problem.SubProblem.build_residuals` on each subproblem. shared_inputs: If provided, an additional ``shared_inputs`` block to be added to the Values """ subproblems: T.Mapping[str, SubProblem] inputs: Values residual_blocks: Values residuals: Values extra_values: Values def __init__( self, subproblems: T.Mapping[str, SubProblem], residual_blocks: Values, shared_inputs: T.Optional[T.Dataclass] = None, ): self.subproblems = subproblems self.inputs = build_inputs(self.subproblems.values(), shared_inputs) self.residual_blocks = residual_blocks self.residuals, self.extra_values = self.split_residual_blocks(residual_blocks)
[docs] @staticmethod def split_residual_blocks(residual_blocks: Values) -> T.Tuple[Values, Values]: """ Split :attr:`residual_blocks` into ``residuals`` and ``extra_values`` """ residuals = Values() extra_values = Values() for key, residual_block in residual_blocks.items_recursive(): residuals[key] = residual_block.residual extra_values[key] = residual_block.extra_values return residuals, extra_values
[docs] def keys(self) -> T.List[str]: """ Compute the set of all keys specified by the subproblems """ return self.inputs.dataclasses_to_values().keys_recursive()
[docs] def optimized_keys(self) -> T.List[str]: """ Compute the set of optimized keys, as specified by the subproblems """ inputs = self.inputs.dataclasses_to_values() optimized_values = itertools.chain.from_iterable( subproblem.optimized_values() for subproblem in self.subproblems.values() ) content_addressable_inputs = { tuple(ops.StorageOps.to_storage(value)): key for key, value in inputs.items_recursive() } optimized_keys = [] for value in optimized_values: optimized_key = content_addressable_inputs[tuple(ops.StorageOps.to_storage(value))] if value != inputs[optimized_key]: raise TypeError( f"Variable returned by `optimized_values()` ({value}) in " + "subproblem does not match variable in `Inputs` of subproblem " + f"({inputs[optimized_key]}) for key {optimized_key}." ) optimized_keys.append(optimized_key) return optimized_keys
[docs] def generate( self, output_dir: T.Openable, namespace: str, name: str, sparse_linearization: bool = False, config: CppConfig = None, ) -> None: """ Generate everything needed to optimize ``self`` in C++. This currently assumes there is only one factor generated for the optimization problem. Args: output_dir: Directory in which to output the generated files. namespace: Namespace used in each generated file. name: Name of the generated factor. sparse_linearization: Whether the generated factors should use sparse jacobians/hessians config: C++ code configuration used with the linearization functions generated for each factor. """ if config is None: config = CppConfig() # Generate the C++ code for the residual linearization function factors = self.make_symbolic_factors(name=name, config=config) for factor in factors: output_data = factor.generate( optimized_keys=self.optimized_keys(), output_dir=output_dir, namespace=namespace, sparse_linearization=sparse_linearization, ) logger.debug( "Generated function `{}` in directory `{}`".format( output_data["name"], output_data["function_dir"] ) )
[docs] def make_symbolic_factors( self, name: str, config: T.Optional[codegen_config.CodegenConfig] = None, ) -> T.List[Factor]: """ Return a list of symbolic factors for this problem for analysis purposes. If the factors are to be passed to an :class:`symforce.opt.optimizer.Optimizer`, use :meth:`make_numeric_factors` instead. Args: name: Name of factors. Note that the generated linearization functions will have ``"_factor"`` appended to the function name (see ``Codegen._pick_name_for_function_with_derivatives`` for details). config: Language the factors will be generated in when :meth:`generate` is called. If not provided, uses the same default as the :class:`.factor.Factor` constructor. """ inputs = self.inputs.dataclasses_to_values() def compute_jacobians(keys: T.Iterable[str]) -> sf.Matrix: """ Functor that computes the jacobians of the residual with respect to a set of keys The set of keys is not known when make_symbolic_factors is called, because we may want to create a :class:`.numeric_factor.NumericFactor` which computes derivatives with respect to different sets of optimized variables. """ jacobians = [ residual_block.compute_jacobians( [inputs[key] for key in keys], residual_name=residual_key, key_names=keys ) for residual_key, residual_block in self.residual_blocks.items_recursive() ] return sf.Matrix.block_matrix(jacobians) return [ Factor.from_inputs_and_residual( keys=self.keys(), inputs=Values( **{ dots_and_brackets_to_underscores(key): value for key, value in inputs.items_recursive() } ), residual=sf.M(self.residuals.to_storage()), config=config, custom_jacobian_func=compute_jacobians, name=name, ) ]
[docs] def make_numeric_factors( self, name: str, optimized_keys: T.Sequence[str] = None ) -> T.List[NumericFactor]: """ Returns a list of `NumericFactor` for this problem, for example to pass to :class:`Optimizer <symforce.opt.optimizer.Optimizer>`. Args: name: Name of factors. Note that the generated linearization functions will have ``"_factor"`` appended to the function name (see ``Codegen._pick_name_for_function_with_derivatives`` for details). optimized_keys: List of keys to optimize with respect to. Defaults to the optimized keys specified by the subproblems of this optimization problem. """ if optimized_keys is None: optimized_keys = self.optimized_keys() numeric_factors = [] for factor in self.make_symbolic_factors(name): factor_optimized_keys = [ opt_key for opt_key in optimized_keys if opt_key in factor.keys ] numeric_factors.append(factor.to_numeric_factor(factor_optimized_keys)) return numeric_factors
[docs]def build_inputs( subproblems: T.Iterable[SubProblem], shared_inputs: T.Optional[T.Element] = None ) -> Values: """ Build the inputs Values for a set of subproblems. The resulting values is structured as:: Values(, ...,, shared_inputs=shared_inputs, ) Args: subproblems: Iterable of SubProblems shared_inputs: Optional additional shared inputs Returns: inputs: the combined Values """ inputs = Values() if shared_inputs is not None: inputs["shared_inputs"] = shared_inputs # Build inputs for subproblem in subproblems: if subproblem.inputs: inputs[] = subproblem.inputs return inputs