Porytiles
Loading...
Searching...
No Matches
pipeline.cpp
Go to the documentation of this file.
2
3#include <queue>
4
6
7namespace porytiles {
8
9Pipeline::Pipeline(const std::vector<std::shared_ptr<Operation>> &ops) {
10 // 1) Map each artifact key to the producer op that generates it
11 for (auto &op : ops) {
12 for (const auto &output_artifact : op->DeclareOutputs()) {
13 const auto &out_key = output_artifact.key();
14 if (producers_.contains(out_key)) {
15 Panic("duplicate producers for key: " + out_key);
16 }
17 producers_.insert({out_key, op.get()});
18 }
19 }
20
21 // 2) Build adjacency and compute in-degrees
22 for (auto &op : ops) {
23 adj_.try_emplace(op.get(), std::vector<Operation *>{});
24 }
25 for (auto &op : ops) {
26 const auto inputs = op->DeclareInputs();
27 int deps = 0;
28 for (const auto &input_artifact : inputs) {
29 if (const auto &in_key = input_artifact.key(); producers_.contains(in_key)) {
30 auto *producer_op = producers_.at(in_key);
31 adj_.at(producer_op).push_back(op.get());
32 deps++;
33 } else {
34 Panic(fmt::format("operation '{}' depends on non-existent artifact: '{}'", op->name(), in_key));
35 }
36 }
37 in_degree_.insert({op.get(), deps});
38 }
39
40 // 3) Kahn's algorithm
41 std::queue<Operation *> q;
42 for (const auto &[op, degree] : in_degree_) {
43 if (degree == 0) {
44 q.push(op);
45 }
46 }
47 while (!q.empty()) {
48 auto *op = q.front();
49 q.pop();
50 sorted_.push_back(op);
51 for (auto *neighbor : adj_.at(op)) {
52 if (--in_degree_[neighbor] == 0) {
53 q.push(neighbor);
54 }
55 }
56 }
57 if (sorted_.size() != ops.size()) {
58 Panic("cycle detected in pipeline dependencies");
59 }
60}
61
62std::expected<AnyMap, std::string> Pipeline::Run() const {
63 AnyMap artifacts{};
64 for (auto *op : sorted_) {
65 // Gather inputs for the operation
66 AnyMap inputs{};
67 for (auto &input_artifact : op->DeclareInputs()) {
68 const auto &key = input_artifact.key();
69 const auto val = artifacts.TryAny(key);
70 if (!val.has_value()) {
71 Panic(fmt::format("operation '{}' missing input artifact: {}", op->name(), key));
72 }
73 inputs.Put(key, val.value());
74 }
75
76 // Execute the operation
77 auto result = op->Execute(inputs);
78 if (!result.has_value()) {
79 return result;
80 }
81
82 // Merge outputs
83 for (auto outputs_map = result.value(); const auto &[key, value] : outputs_map) {
84 if (artifacts.Contains(key)) {
85 Panic("duplicate output artifact: " + key);
86 }
87 artifacts.Put(key, value);
88 }
89 }
90 return artifacts;
91}
92
93} // namespace porytiles
std::optional< std::any > TryAny(const std::string &key) const
Definition any_map.hpp:63
Pipeline(const std::vector< std::shared_ptr< Operation > > &ops)
Definition pipeline.cpp:9
std::expected< AnyMap, std::string > Run() const
Definition pipeline.cpp:62
void Panic(const StringViewSourceLoc &s) noexcept
Definition panic.hpp:31