Skip to main content

hydro_lang/sim/
flow.rs

1//! Entrypoint for compiling and running Hydro simulations.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::panic::RefUnwindSafe;
6use std::rc::Rc;
7
8use libloading::Library;
9use slotmap::SparseSecondaryMap;
10
11use super::builder::SimBuilder;
12use super::compiled::{CompiledSim, CompiledSimInstance};
13use super::graph::{SimDeploy, SimExternal, SimNode, compile_sim, create_sim_graph_trybuild};
14use crate::compile::ir::HydroRoot;
15use crate::location::LocationKey;
16use crate::prelude::Cluster;
17use crate::sim::graph::SimExternalPortRegistry;
18use crate::staging_util::Invariant;
19
20/// A not-yet-compiled simulator for a Hydro program.
21pub struct SimFlow<'a> {
22    pub(crate) ir: Vec<HydroRoot>,
23
24    /// SimNode for each Process.
25    pub(crate) processes: SparseSecondaryMap<LocationKey, SimNode>,
26    /// SimNode for each Cluster.
27    pub(crate) clusters: SparseSecondaryMap<LocationKey, SimNode>,
28    /// SimExternal for each External.
29    pub(crate) externals: SparseSecondaryMap<LocationKey, SimExternal>,
30
31    /// Max size of each cluster.
32    pub(crate) cluster_max_sizes: SparseSecondaryMap<LocationKey, usize>,
33    /// Handle to state handling `external`s' ports.
34    pub(crate) externals_port_registry: Rc<RefCell<SimExternalPortRegistry>>,
35
36    pub(crate) _phantom: Invariant<'a>,
37}
38
39impl<'a> SimFlow<'a> {
40    /// Sets the maximum size of the given cluster in the simulation.
41    pub fn with_cluster_size<C>(mut self, cluster: &Cluster<'a, C>, max_size: usize) -> Self {
42        self.cluster_max_sizes.insert(cluster.key, max_size);
43        self
44    }
45
46    /// Executes the given closure with a single instance of the compiled simulation.
47    pub fn with_instance<T>(self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
48        self.compiled().with_instance(thunk)
49    }
50
51    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
52    /// closure will be repeatedly executed with instances of the Hydro program where the
53    /// batching boundaries, order of messages, and retries are varied.
54    ///
55    /// During development, you should run the test that invokes this function with the `cargo sim`
56    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
57    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
58    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
59    /// be executed, and if no reproducer is found a small number of random executions will be
60    /// performed.
61    pub fn fuzz(self, thunk: impl AsyncFn() + RefUnwindSafe) {
62        self.compiled().fuzz(thunk)
63    }
64
65    /// Exhaustively searches all possible executions of the simulation. The provided
66    /// closure will be repeatedly executed with instances of the Hydro program where the
67    /// batching boundaries, order of messages, and retries are varied.
68    ///
69    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
70    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
71    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
72    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
73    ///
74    /// Returns the number of distinct executions explored.
75    pub fn exhaustive(self, thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
76        self.compiled().exhaustive(thunk)
77    }
78
79    /// Compiles the simulation into a dynamically loadable library, and returns a handle to it.
80    pub fn compiled(mut self) -> CompiledSim {
81        use std::collections::BTreeMap;
82
83        use dfir_lang::graph::{eliminate_extra_unions_tees, partition_graph};
84
85        let mut sim_emit = SimBuilder {
86            process_graphs: BTreeMap::new(),
87            cluster_graphs: BTreeMap::new(),
88            process_tick_dfirs: BTreeMap::new(),
89            cluster_tick_dfirs: BTreeMap::new(),
90            extra_stmts_global: vec![],
91            extra_stmts_cluster: BTreeMap::new(),
92            next_hoff_id: 0,
93        };
94
95        // Ensure the default (0) external is always present.
96        self.externals.insert(
97            LocationKey::FIRST,
98            SimExternal {
99                shared_inner: self.externals_port_registry.clone(),
100            },
101        );
102
103        let mut seen_tees_instantiate: HashMap<_, _> = HashMap::new();
104        self.ir.iter_mut().for_each(|leaf| {
105            leaf.compile_network::<SimDeploy>(
106                &mut SparseSecondaryMap::new(),
107                &mut seen_tees_instantiate,
108                &self.processes,
109                &self.clusters,
110                &self.externals,
111            );
112        });
113
114        let mut seen_tees = HashMap::new();
115        let mut built_tees = HashMap::new();
116        let mut next_stmt_id = 0;
117        for leaf in &mut self.ir {
118            leaf.emit::<SimDeploy>(
119                &mut sim_emit,
120                &mut seen_tees,
121                &mut built_tees,
122                &mut next_stmt_id,
123            );
124        }
125
126        let process_graphs = sim_emit
127            .process_graphs
128            .into_iter()
129            .map(|(l, g)| {
130                let (mut flat_graph, _, _) = g.build();
131                eliminate_extra_unions_tees(&mut flat_graph);
132                (
133                    l,
134                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
135                )
136            })
137            .collect::<BTreeMap<_, _>>();
138
139        let cluster_graphs = sim_emit
140            .cluster_graphs
141            .into_iter()
142            .map(|(l, g)| {
143                let (mut flat_graph, _, _) = g.build();
144                eliminate_extra_unions_tees(&mut flat_graph);
145                (
146                    l,
147                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
148                )
149            })
150            .collect::<BTreeMap<_, _>>();
151
152        let process_tick_graphs = sim_emit
153            .process_tick_dfirs
154            .into_iter()
155            .map(|(l, g)| {
156                let (mut flat_graph, _, _) = g.build();
157                eliminate_extra_unions_tees(&mut flat_graph);
158                (
159                    l,
160                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
161                )
162            })
163            .collect::<BTreeMap<_, _>>();
164
165        let cluster_tick_graphs = sim_emit
166            .cluster_tick_dfirs
167            .into_iter()
168            .map(|(l, g)| {
169                let (mut flat_graph, _, _) = g.build();
170                eliminate_extra_unions_tees(&mut flat_graph);
171                (
172                    l,
173                    partition_graph(flat_graph).expect("Failed to partition (cycle detected)."),
174                )
175            })
176            .collect::<BTreeMap<_, _>>();
177
178        #[expect(
179            clippy::disallowed_methods,
180            reason = "nondeterministic iteration order, fine for checks"
181        )]
182        for c in self.clusters.keys() {
183            assert!(
184                self.cluster_max_sizes.contains_key(c),
185                "Cluster {:?} missing max size; call with_cluster_size() before compiled()",
186                c
187            );
188        }
189
190        let (bin, trybuild) = create_sim_graph_trybuild(
191            process_graphs,
192            cluster_graphs,
193            self.cluster_max_sizes,
194            process_tick_graphs,
195            cluster_tick_graphs,
196            sim_emit.extra_stmts_global,
197            sim_emit.extra_stmts_cluster,
198        );
199
200        let out = compile_sim(bin, trybuild).unwrap();
201        let lib = unsafe { Library::new(&out).unwrap() };
202
203        CompiledSim {
204            _path: out,
205            lib,
206            externals_port_registry: self.externals_port_registry.take(),
207        }
208    }
209}