# ----------------------------------------------------------------------------
# SymForce - Copyright 2022, Skydio, Inc.
# This source code is under the Apache 2.0 license found in the LICENSE file.
# ----------------------------------------------------------------------------
import itertools
import symforce.symbolic as sf
from symforce import logger
from symforce import ops
from symforce import typing as T
from symforce.codegen import codegen_config
from symforce.codegen.backends.cpp.cpp_config import CppConfig
from symforce.opt.factor import Factor
from symforce.opt.numeric_factor import NumericFactor
from symforce.opt.sub_problem import SubProblem
from symforce.python_util import dots_and_brackets_to_underscores
from symforce.values import Values
[docs]class OptimizationProblem:
"""
An optimization problem.
Defined by a collection of :class:`.sub_problem.SubProblem`, each of which defines a set of
inputs (variables in the :class:`Values <symforce.values.values.Values>`) and a set of
residuals. SubProblems are generally expected to expose inputs that are used by other
subproblems; these dependencies should be handled by the user while constructing the
``residual_blocks`` argument. Typical workflow is to construct a set of SubProblems (which
should also construct each SubProblem Inputs), build the ``residual_blocks`` Values by calling
:meth:`.sub_problem.SubProblem.build_residuals` on each subproblem with the appropriate
arguments, and then pass the subproblems and ``residual_blocks`` to the
:class:`OptimizationProblem` constructor.
Args:
subproblems: Mapping from subproblem names to subproblems
residual_blocks: Values where each leaf is a :class:`.residual_block.ResidualBlock`,
containing all the residuals for the problem. Typically created by calling
:meth:`.sub_problem.SubProblem.build_residuals` on each subproblem.
shared_inputs: If provided, an additional ``shared_inputs`` block to be added to the Values
"""
subproblems: T.Mapping[str, SubProblem]
inputs: Values
residual_blocks: Values
residuals: Values
extra_values: Values
def __init__(
self,
subproblems: T.Mapping[str, SubProblem],
residual_blocks: Values,
shared_inputs: T.Optional[T.Dataclass] = None,
):
self.subproblems = subproblems
self.inputs = build_inputs(self.subproblems.values(), shared_inputs)
self.residual_blocks = residual_blocks
self.residuals, self.extra_values = self.split_residual_blocks(residual_blocks)
[docs] @staticmethod
def split_residual_blocks(residual_blocks: Values) -> T.Tuple[Values, Values]:
"""
Split :attr:`residual_blocks` into ``residuals`` and ``extra_values``
"""
residuals = Values()
extra_values = Values()
for key, residual_block in residual_blocks.items_recursive():
residuals[key] = residual_block.residual
extra_values[key] = residual_block.extra_values
return residuals, extra_values
[docs] def keys(self) -> T.List[str]:
"""
Compute the set of all keys specified by the subproblems
"""
return self.inputs.dataclasses_to_values().keys_recursive()
[docs] def optimized_keys(self) -> T.List[str]:
"""
Compute the set of optimized keys, as specified by the subproblems
"""
inputs = self.inputs.dataclasses_to_values()
optimized_values = itertools.chain.from_iterable(
subproblem.optimized_values() for subproblem in self.subproblems.values()
)
content_addressable_inputs = {
tuple(ops.StorageOps.to_storage(value)): key for key, value in inputs.items_recursive()
}
optimized_keys = []
for value in optimized_values:
optimized_key = content_addressable_inputs[tuple(ops.StorageOps.to_storage(value))]
if value != inputs[optimized_key]:
raise TypeError(
f"Variable returned by `optimized_values()` ({value}) in "
+ "subproblem does not match variable in `Inputs` of subproblem "
+ f"({inputs[optimized_key]}) for key {optimized_key}."
)
optimized_keys.append(optimized_key)
return optimized_keys
[docs] def generate(
self,
output_dir: T.Openable,
namespace: str,
name: str,
sparse_linearization: bool = False,
config: T.Optional[CppConfig] = None,
) -> None:
"""
Generate everything needed to optimize ``self`` in C++. This currently assumes there is
only one factor generated for the optimization problem.
Args:
output_dir: Directory in which to output the generated files.
namespace: Namespace used in each generated file.
name: Name of the generated factor.
sparse_linearization: Whether the generated factors should use sparse jacobians/hessians
config: C++ code configuration used with the linearization functions generated for each
factor.
"""
if config is None:
config = CppConfig()
# Generate the C++ code for the residual linearization function
factors = self.make_symbolic_factors(name=name, config=config)
for factor in factors:
output_data = factor.generate(
optimized_keys=self.optimized_keys(),
output_dir=output_dir,
namespace=namespace,
sparse_linearization=sparse_linearization,
)
logger.debug(
"Generated function `{}` in directory `{}`".format(
output_data["name"], output_data["function_dir"]
)
)
[docs] def make_symbolic_factors(
self,
name: str,
config: T.Optional[codegen_config.CodegenConfig] = None,
) -> T.List[Factor]:
"""
Return a list of symbolic factors for this problem for analysis purposes. If the factors
are to be passed to an :class:`symforce.opt.optimizer.Optimizer`, use
:meth:`make_numeric_factors` instead.
Args:
name: Name of factors. Note that the generated linearization functions will have
``"_factor"`` appended to the function name (see
``Codegen._pick_name_for_function_with_derivatives`` for details).
config: Language the factors will be generated in when :meth:`generate` is called. If
not provided, uses the same default as the :class:`.factor.Factor` constructor.
"""
inputs = self.inputs.dataclasses_to_values()
def compute_jacobians(keys: T.Iterable[str]) -> sf.Matrix:
"""
Functor that computes the jacobians of the residual with respect to a set of keys
The set of keys is not known when make_symbolic_factors is called, because we may want
to create a :class:`.numeric_factor.NumericFactor` which computes derivatives with
respect to different sets of optimized variables.
"""
jacobians = [
residual_block.compute_jacobians(
[inputs[key] for key in keys], residual_name=residual_key, key_names=keys
)
for residual_key, residual_block in self.residual_blocks.items_recursive()
]
return sf.Matrix.block_matrix(jacobians)
return [
Factor.from_inputs_and_residual(
keys=self.keys(),
inputs=Values(
**{
dots_and_brackets_to_underscores(key): value
for key, value in inputs.items_recursive()
}
),
residual=sf.M(self.residuals.to_storage()),
config=config,
custom_jacobian_func=compute_jacobians,
name=name,
)
]
[docs] def make_numeric_factors(
self, name: str, optimized_keys: T.Optional[T.Sequence[str]] = None
) -> T.List[NumericFactor]:
"""
Returns a list of `NumericFactor` for this problem, for example to pass to
:class:`Optimizer <symforce.opt.optimizer.Optimizer>`.
Args:
name: Name of factors. Note that the generated linearization functions will have
``"_factor"`` appended to the function name (see
``Codegen._pick_name_for_function_with_derivatives`` for details).
optimized_keys: List of keys to optimize with respect to. Defaults to the optimized keys
specified by the subproblems of this optimization problem.
"""
if optimized_keys is None:
optimized_keys = self.optimized_keys()
numeric_factors = []
for factor in self.make_symbolic_factors(name):
factor_optimized_keys = [
opt_key for opt_key in optimized_keys if opt_key in factor.keys
]
numeric_factors.append(factor.to_numeric_factor(factor_optimized_keys))
return numeric_factors