Porytiles
Loading...
Searching...
No Matches
pipeline.cpp
Go to the documentation of this file.
2
3#include <queue>
4
8
9namespace porytiles2 {
10
11Pipeline::Pipeline(const std::vector<Operation *> &ops)
12{
13 // 1) Map each operand key to the producer op that generates it
14 for (auto &op : ops) {
15 for (const auto &output_operand : op->declare_outputs()) {
16 const auto &out_key = output_operand.key();
17 if (producers_.contains(out_key)) {
18 panic(fmt::format("duplicate producers for key: {}", out_key));
19 }
20 producers_.insert({out_key, op});
21 }
22 }
23
24 // 2) Build adjacency and compute in-degrees
25 for (auto &op : ops) {
26 adj_.try_emplace(op, std::vector<Operation *>{});
27 }
28 for (auto &op : ops) {
29 const auto inputs = op->declare_inputs();
30 int deps = 0;
31 for (const auto &input_operand : inputs) {
32 if (const auto &in_key = input_operand.key(); producers_.contains(in_key)) {
33 auto *producer_op = producers_.at(in_key);
34 adj_.at(producer_op).push_back(op);
35 deps++;
36 }
37 else {
38 panic(fmt::format("operation '{}' depends on non-existent operand: '{}'", op->name(), in_key));
39 }
40 }
41 in_degree_.insert({op, deps});
42 }
43
44 // 3) Kahn's algorithm
45 std::queue<Operation *> q;
46 for (const auto &[op, degree] : in_degree_) {
47 if (degree == 0) {
48 q.push(op);
49 }
50 }
51 while (!q.empty()) {
52 auto *op = q.front();
53 q.pop();
54 sorted_.push_back(op);
55 for (auto *neighbor : adj_.at(op)) {
56 if (--in_degree_[neighbor] == 0) {
57 q.push(neighbor);
58 }
59 }
60 }
61 if (sorted_.size() != ops.size()) {
62 panic("cycle detected in pipeline dependencies");
63 }
64}
65
67{
68 OperandBundle operand_pool{};
69 for (auto *op : sorted_) {
70 // Gather inputs for the operation
71 OperandBundle inputs{};
72 for (auto &input_operand : op->declare_inputs()) {
73 const auto &key = input_operand.key();
74 const auto val = operand_pool.get(key);
75 if (!val.has_value()) {
76 panic(fmt::format("operation '{}' missing input operand: {}", op->name(), key));
77 }
78 inputs.put(key, val.value());
79 }
80
81 // Execute the operation
82 auto result = op->apply(inputs);
83 if (!result.has_value()) {
84 return ChainableResult<void>{FormattableError{fmt::format("operation '{}' failed", op->name())}, result};
85 }
86
87 // Merge outputs
88 auto output_bundle = result.value();
89 for (const auto &[key, value] : output_bundle) {
90 if (operand_pool.contains(key)) {
91 panic(fmt::format("op '{}' output operand '{}' already present in operand pool", op->name(), key));
92 }
93 operand_pool.put(key, value);
94 }
95 }
96 return {};
97}
98
99} // namespace porytiles2
A result type that maintains a chainable sequence of errors for debugging and error reporting.
General-purpose error implementation with formatted message support.
Definition error.hpp:117
A type-erased container for orchestration operands with runtime type checking.
std::optional< std::any > get(const std::string &key) const
Retrieves an operand value as std::any.
Pipeline(const std::vector< Operation * > &ops)
Constructs a pipeline from a collection of operations.
Definition pipeline.cpp:11
ChainableResult< void > run() const
Executes all operations in the pipeline in dependency order.
Definition pipeline.cpp:66
void panic(const StringViewSourceLoc &s)
Unconditionally terminates the program with a panic message.
Definition panic.hpp:53