14 for (
auto &op : ops) {
15 for (
const auto &output_operand : op->declare_outputs()) {
16 const auto &out_key = output_operand.key();
17 if (producers_.contains(out_key)) {
18 panic(fmt::format(
"duplicate producers for key: {}", out_key));
20 producers_.insert({out_key, op});
25 for (
auto &op : ops) {
26 adj_.try_emplace(op, std::vector<Operation *>{});
28 for (
auto &op : ops) {
29 const auto inputs = op->declare_inputs();
31 for (
const auto &input_operand : inputs) {
32 if (
const auto &in_key = input_operand.key(); producers_.contains(in_key)) {
33 auto *producer_op = producers_.at(in_key);
34 adj_.at(producer_op).push_back(op);
38 panic(fmt::format(
"operation '{}' depends on non-existent operand: '{}'", op->name(), in_key));
41 in_degree_.insert({op, deps});
45 std::queue<Operation *> q;
46 for (
const auto &[op, degree] : in_degree_) {
54 sorted_.push_back(op);
55 for (
auto *neighbor : adj_.at(op)) {
56 if (--in_degree_[neighbor] == 0) {
61 if (sorted_.size() != ops.size()) {
62 panic(
"cycle detected in pipeline dependencies");
69 for (
auto *op : sorted_) {
72 for (
auto &input_operand : op->declare_inputs()) {
73 const auto &key = input_operand.key();
74 const auto val = operand_pool.
get(key);
75 if (!val.has_value()) {
76 panic(fmt::format(
"operation '{}' missing input operand: {}", op->name(), key));
78 inputs.put(key, val.value());
82 auto result = op->apply(inputs);
83 if (!result.has_value()) {
88 auto output_bundle = result.value();
89 for (
const auto &[key, value] : output_bundle) {
90 if (operand_pool.contains(key)) {
91 panic(fmt::format(
"op '{}' output operand '{}' already present in operand pool", op->name(), key));
93 operand_pool.put(key, value);
A result type that maintains a chainable sequence of errors for debugging and error reporting.
A type-erased container for orchestration operands with runtime type checking.
std::optional< std::any > get(const std::string &key) const
Retrieves an operand value as std::any.
Pipeline(const std::vector< Operation * > &ops)
Constructs a pipeline from a collection of operations.
ChainableResult< void > run() const
Executes all operations in the pipeline in dependency order.
void panic(const StringViewSourceLoc &s)
Unconditionally terminates the program with a panic message.