11 for (
auto &op : ops) {
12 for (
const auto &output_artifact : op->DeclareOutputs()) {
13 const auto &out_key = output_artifact.key();
14 if (producers_.contains(out_key)) {
15 Panic(
"duplicate producers for key: " + out_key);
17 producers_.insert({out_key, op.get()});
22 for (
auto &op : ops) {
23 adj_.try_emplace(op.get(), std::vector<Operation *>{});
25 for (
auto &op : ops) {
26 const auto inputs = op->DeclareInputs();
28 for (
const auto &input_artifact : inputs) {
29 if (
const auto &in_key = input_artifact.key(); producers_.contains(in_key)) {
30 auto *producer_op = producers_.at(in_key);
31 adj_.at(producer_op).push_back(op.get());
34 Panic(fmt::format(
"operation '{}' depends on non-existent artifact: '{}'", op->name(), in_key));
37 in_degree_.insert({op.get(), deps});
41 std::queue<Operation *> q;
42 for (
const auto &[op, degree] : in_degree_) {
50 sorted_.push_back(op);
51 for (
auto *neighbor : adj_.at(op)) {
52 if (--in_degree_[neighbor] == 0) {
57 if (sorted_.size() != ops.size()) {
58 Panic(
"cycle detected in pipeline dependencies");
64 for (
auto *op : sorted_) {
67 for (
auto &input_artifact : op->DeclareInputs()) {
68 const auto &key = input_artifact.key();
69 const auto val = artifacts.
TryAny(key);
70 if (!val.has_value()) {
71 Panic(fmt::format(
"operation '{}' missing input artifact: {}", op->name(), key));
73 inputs.Put(key, val.value());
77 auto result = op->Execute(inputs);
78 if (!result.has_value()) {
83 for (
auto outputs_map = result.value();
const auto &[key, value] : outputs_map) {
84 if (artifacts.Contains(key)) {
85 Panic(
"duplicate output artifact: " + key);
87 artifacts.Put(key, value);
std::optional< std::any > TryAny(const std::string &key) const
Pipeline(const std::vector< std::shared_ptr< Operation > > &ops)
std::expected< AnyMap, std::string > Run() const
void Panic(const StringViewSourceLoc &s) noexcept