1use core::panic;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::fmt::{Debug, Display};
5use std::hash::{Hash, Hasher};
6use std::ops::Deref;
7use std::rc::Rc;
8
9#[cfg(feature = "build")]
10use dfir_lang::graph::FlatGraphBuilder;
11#[cfg(feature = "build")]
12use proc_macro2::Span;
13use proc_macro2::TokenStream;
14use quote::ToTokens;
15#[cfg(feature = "build")]
16use quote::quote;
17#[cfg(feature = "build")]
18use slotmap::{SecondaryMap, SparseSecondaryMap};
19#[cfg(feature = "build")]
20use syn::parse_quote;
21use syn::visit::{self, Visit};
22use syn::visit_mut::VisitMut;
23
24use crate::compile::builder::ExternalPortId;
25#[cfg(feature = "build")]
26use crate::compile::deploy_provider::{Deploy, Node, RegisterPort};
27use crate::location::dynamic::LocationId;
28use crate::location::{LocationKey, NetworkHint};
29
30pub mod backtrace;
31use backtrace::Backtrace;
32
33#[derive(Clone, Hash)]
37pub struct DebugExpr(pub Box<syn::Expr>);
38
39impl From<syn::Expr> for DebugExpr {
40 fn from(expr: syn::Expr) -> Self {
41 Self(Box::new(expr))
42 }
43}
44
45impl Deref for DebugExpr {
46 type Target = syn::Expr;
47
48 fn deref(&self) -> &Self::Target {
49 &self.0
50 }
51}
52
53impl ToTokens for DebugExpr {
54 fn to_tokens(&self, tokens: &mut TokenStream) {
55 self.0.to_tokens(tokens);
56 }
57}
58
59impl Debug for DebugExpr {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 write!(f, "{}", self.0.to_token_stream())
62 }
63}
64
65impl Display for DebugExpr {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 let original = self.0.as_ref().clone();
68 let simplified = simplify_q_macro(original);
69
70 write!(f, "q!({})", quote::quote!(#simplified))
73 }
74}
75
76fn simplify_q_macro(mut expr: syn::Expr) -> syn::Expr {
78 let mut simplifier = QMacroSimplifier::new();
81 simplifier.visit_expr_mut(&mut expr);
82
83 if let Some(simplified) = simplifier.simplified_result {
85 simplified
86 } else {
87 expr
88 }
89}
90
91#[derive(Default)]
93pub struct QMacroSimplifier {
94 pub simplified_result: Option<syn::Expr>,
95}
96
97impl QMacroSimplifier {
98 pub fn new() -> Self {
99 Self::default()
100 }
101}
102
103impl VisitMut for QMacroSimplifier {
104 fn visit_expr_mut(&mut self, expr: &mut syn::Expr) {
105 if self.simplified_result.is_some() {
107 return;
108 }
109
110 if let syn::Expr::Call(call) = expr && let syn::Expr::Path(path_expr) = call.func.as_ref()
111 && self.is_stageleft_runtime_support_call(&path_expr.path)
113 && let Some(closure) = self.extract_closure_from_args(&call.args)
115 {
116 self.simplified_result = Some(closure);
117 return;
118 }
119
120 syn::visit_mut::visit_expr_mut(self, expr);
123 }
124}
125
126impl QMacroSimplifier {
127 fn is_stageleft_runtime_support_call(&self, path: &syn::Path) -> bool {
128 if let Some(last_segment) = path.segments.last() {
130 let fn_name = last_segment.ident.to_string();
131 fn_name.contains("_type_hint")
133 && path.segments.len() > 2
134 && path.segments[0].ident == "stageleft"
135 && path.segments[1].ident == "runtime_support"
136 } else {
137 false
138 }
139 }
140
141 fn extract_closure_from_args(
142 &self,
143 args: &syn::punctuated::Punctuated<syn::Expr, syn::Token![,]>,
144 ) -> Option<syn::Expr> {
145 for arg in args {
147 if let syn::Expr::Closure(_) = arg {
148 return Some(arg.clone());
149 }
150 if let Some(closure_expr) = self.find_closure_in_expr(arg) {
152 return Some(closure_expr);
153 }
154 }
155 None
156 }
157
158 fn find_closure_in_expr(&self, expr: &syn::Expr) -> Option<syn::Expr> {
159 let mut visitor = ClosureFinder {
160 found_closure: None,
161 prefer_inner_blocks: true,
162 };
163 visitor.visit_expr(expr);
164 visitor.found_closure
165 }
166}
167
168struct ClosureFinder {
170 found_closure: Option<syn::Expr>,
171 prefer_inner_blocks: bool,
172}
173
174impl<'ast> Visit<'ast> for ClosureFinder {
175 fn visit_expr(&mut self, expr: &'ast syn::Expr) {
176 if self.found_closure.is_some() {
178 return;
179 }
180
181 match expr {
182 syn::Expr::Closure(_) => {
183 self.found_closure = Some(expr.clone());
184 }
185 syn::Expr::Block(block) if self.prefer_inner_blocks => {
186 for stmt in &block.block.stmts {
188 if let syn::Stmt::Expr(stmt_expr, _) = stmt
189 && let syn::Expr::Block(_) = stmt_expr
190 {
191 let mut inner_visitor = ClosureFinder {
193 found_closure: None,
194 prefer_inner_blocks: false, };
196 inner_visitor.visit_expr(stmt_expr);
197 if inner_visitor.found_closure.is_some() {
198 self.found_closure = Some(stmt_expr.clone());
200 return;
201 }
202 }
203 }
204
205 visit::visit_expr(self, expr);
207
208 if self.found_closure.is_some() {
211 }
213 }
214 _ => {
215 visit::visit_expr(self, expr);
217 }
218 }
219 }
220}
221
222#[derive(Clone, PartialEq, Eq, Hash)]
226pub struct DebugType(pub Box<syn::Type>);
227
228impl From<syn::Type> for DebugType {
229 fn from(t: syn::Type) -> Self {
230 Self(Box::new(t))
231 }
232}
233
234impl Deref for DebugType {
235 type Target = syn::Type;
236
237 fn deref(&self) -> &Self::Target {
238 &self.0
239 }
240}
241
242impl ToTokens for DebugType {
243 fn to_tokens(&self, tokens: &mut TokenStream) {
244 self.0.to_tokens(tokens);
245 }
246}
247
248impl Debug for DebugType {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 write!(f, "{}", self.0.to_token_stream())
251 }
252}
253
254pub enum DebugInstantiate {
255 Building,
256 Finalized(Box<DebugInstantiateFinalized>),
257}
258
259#[cfg_attr(
260 not(feature = "build"),
261 expect(
262 dead_code,
263 reason = "sink, source unused without `feature = \"build\"`."
264 )
265)]
266pub struct DebugInstantiateFinalized {
267 sink: syn::Expr,
268 source: syn::Expr,
269 connect_fn: Option<Box<dyn FnOnce()>>,
270}
271
272impl From<DebugInstantiateFinalized> for DebugInstantiate {
273 fn from(f: DebugInstantiateFinalized) -> Self {
274 Self::Finalized(Box::new(f))
275 }
276}
277
278impl Debug for DebugInstantiate {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "<network instantiate>")
281 }
282}
283
284impl Hash for DebugInstantiate {
285 fn hash<H: Hasher>(&self, _state: &mut H) {
286 }
288}
289
290impl Clone for DebugInstantiate {
291 fn clone(&self) -> Self {
292 match self {
293 DebugInstantiate::Building => DebugInstantiate::Building,
294 DebugInstantiate::Finalized(_) => {
295 panic!("DebugInstantiate::Finalized should not be cloned")
296 }
297 }
298 }
299}
300
301#[derive(Debug, Hash, Clone)]
303pub enum HydroSource {
304 Stream(DebugExpr),
305 ExternalNetwork(),
306 Iter(DebugExpr),
307 Spin(),
308 ClusterMembers(LocationId),
309}
310
311#[cfg(feature = "build")]
312pub trait DfirBuilder {
318 fn singleton_intermediates(&self) -> bool;
320
321 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder;
323
324 fn batch(
325 &mut self,
326 in_ident: syn::Ident,
327 in_location: &LocationId,
328 in_kind: &CollectionKind,
329 out_ident: &syn::Ident,
330 out_location: &LocationId,
331 op_meta: &HydroIrOpMetadata,
332 );
333 fn yield_from_tick(
334 &mut self,
335 in_ident: syn::Ident,
336 in_location: &LocationId,
337 in_kind: &CollectionKind,
338 out_ident: &syn::Ident,
339 out_location: &LocationId,
340 );
341
342 fn begin_atomic(
343 &mut self,
344 in_ident: syn::Ident,
345 in_location: &LocationId,
346 in_kind: &CollectionKind,
347 out_ident: &syn::Ident,
348 out_location: &LocationId,
349 op_meta: &HydroIrOpMetadata,
350 );
351 fn end_atomic(
352 &mut self,
353 in_ident: syn::Ident,
354 in_location: &LocationId,
355 in_kind: &CollectionKind,
356 out_ident: &syn::Ident,
357 );
358
359 #[expect(clippy::too_many_arguments, reason = "TODO // internal")]
360 fn observe_nondet(
361 &mut self,
362 trusted: bool,
363 location: &LocationId,
364 in_ident: syn::Ident,
365 in_kind: &CollectionKind,
366 out_ident: &syn::Ident,
367 out_kind: &CollectionKind,
368 op_meta: &HydroIrOpMetadata,
369 );
370
371 #[expect(clippy::too_many_arguments, reason = "TODO")]
372 fn create_network(
373 &mut self,
374 from: &LocationId,
375 to: &LocationId,
376 input_ident: syn::Ident,
377 out_ident: &syn::Ident,
378 serialize: Option<&DebugExpr>,
379 sink: syn::Expr,
380 source: syn::Expr,
381 deserialize: Option<&DebugExpr>,
382 tag_id: usize,
383 );
384
385 fn create_external_source(
386 &mut self,
387 on: &LocationId,
388 source_expr: syn::Expr,
389 out_ident: &syn::Ident,
390 deserialize: Option<&DebugExpr>,
391 tag_id: usize,
392 );
393
394 fn create_external_output(
395 &mut self,
396 on: &LocationId,
397 sink_expr: syn::Expr,
398 input_ident: &syn::Ident,
399 serialize: Option<&DebugExpr>,
400 tag_id: usize,
401 );
402}
403
404#[cfg(feature = "build")]
405impl DfirBuilder for SecondaryMap<LocationKey, FlatGraphBuilder> {
406 fn singleton_intermediates(&self) -> bool {
407 false
408 }
409
410 fn get_dfir_mut(&mut self, location: &LocationId) -> &mut FlatGraphBuilder {
411 self.entry(location.root().key())
412 .expect("location was removed")
413 .or_default()
414 }
415
416 fn batch(
417 &mut self,
418 in_ident: syn::Ident,
419 in_location: &LocationId,
420 in_kind: &CollectionKind,
421 out_ident: &syn::Ident,
422 _out_location: &LocationId,
423 _op_meta: &HydroIrOpMetadata,
424 ) {
425 let builder = self.get_dfir_mut(in_location.root());
426 if in_kind.is_bounded()
427 && matches!(
428 in_kind,
429 CollectionKind::Singleton { .. }
430 | CollectionKind::Optional { .. }
431 | CollectionKind::KeyedSingleton { .. }
432 )
433 {
434 assert!(in_location.is_top_level());
435 builder.add_dfir(
436 parse_quote! {
437 #out_ident = #in_ident -> persist::<'static>();
438 },
439 None,
440 None,
441 );
442 } else {
443 builder.add_dfir(
444 parse_quote! {
445 #out_ident = #in_ident;
446 },
447 None,
448 None,
449 );
450 }
451 }
452
453 fn yield_from_tick(
454 &mut self,
455 in_ident: syn::Ident,
456 in_location: &LocationId,
457 _in_kind: &CollectionKind,
458 out_ident: &syn::Ident,
459 _out_location: &LocationId,
460 ) {
461 let builder = self.get_dfir_mut(in_location.root());
462 builder.add_dfir(
463 parse_quote! {
464 #out_ident = #in_ident;
465 },
466 None,
467 None,
468 );
469 }
470
471 fn begin_atomic(
472 &mut self,
473 in_ident: syn::Ident,
474 in_location: &LocationId,
475 _in_kind: &CollectionKind,
476 out_ident: &syn::Ident,
477 _out_location: &LocationId,
478 _op_meta: &HydroIrOpMetadata,
479 ) {
480 let builder = self.get_dfir_mut(in_location.root());
481 builder.add_dfir(
482 parse_quote! {
483 #out_ident = #in_ident;
484 },
485 None,
486 None,
487 );
488 }
489
490 fn end_atomic(
491 &mut self,
492 in_ident: syn::Ident,
493 in_location: &LocationId,
494 _in_kind: &CollectionKind,
495 out_ident: &syn::Ident,
496 ) {
497 let builder = self.get_dfir_mut(in_location.root());
498 builder.add_dfir(
499 parse_quote! {
500 #out_ident = #in_ident;
501 },
502 None,
503 None,
504 );
505 }
506
507 fn observe_nondet(
508 &mut self,
509 _trusted: bool,
510 location: &LocationId,
511 in_ident: syn::Ident,
512 _in_kind: &CollectionKind,
513 out_ident: &syn::Ident,
514 _out_kind: &CollectionKind,
515 _op_meta: &HydroIrOpMetadata,
516 ) {
517 let builder = self.get_dfir_mut(location);
518 builder.add_dfir(
519 parse_quote! {
520 #out_ident = #in_ident;
521 },
522 None,
523 None,
524 );
525 }
526
527 fn create_network(
528 &mut self,
529 from: &LocationId,
530 to: &LocationId,
531 input_ident: syn::Ident,
532 out_ident: &syn::Ident,
533 serialize: Option<&DebugExpr>,
534 sink: syn::Expr,
535 source: syn::Expr,
536 deserialize: Option<&DebugExpr>,
537 tag_id: usize,
538 ) {
539 let sender_builder = self.get_dfir_mut(from);
540 if let Some(serialize_pipeline) = serialize {
541 sender_builder.add_dfir(
542 parse_quote! {
543 #input_ident -> map(#serialize_pipeline) -> dest_sink(#sink);
544 },
545 None,
546 Some(&format!("send{}", tag_id)),
548 );
549 } else {
550 sender_builder.add_dfir(
551 parse_quote! {
552 #input_ident -> dest_sink(#sink);
553 },
554 None,
555 Some(&format!("send{}", tag_id)),
556 );
557 }
558
559 let receiver_builder = self.get_dfir_mut(to);
560 if let Some(deserialize_pipeline) = deserialize {
561 receiver_builder.add_dfir(
562 parse_quote! {
563 #out_ident = source_stream(#source) -> map(#deserialize_pipeline);
564 },
565 None,
566 Some(&format!("recv{}", tag_id)),
567 );
568 } else {
569 receiver_builder.add_dfir(
570 parse_quote! {
571 #out_ident = source_stream(#source);
572 },
573 None,
574 Some(&format!("recv{}", tag_id)),
575 );
576 }
577 }
578
579 fn create_external_source(
580 &mut self,
581 on: &LocationId,
582 source_expr: syn::Expr,
583 out_ident: &syn::Ident,
584 deserialize: Option<&DebugExpr>,
585 tag_id: usize,
586 ) {
587 let receiver_builder = self.get_dfir_mut(on);
588 if let Some(deserialize_pipeline) = deserialize {
589 receiver_builder.add_dfir(
590 parse_quote! {
591 #out_ident = source_stream(#source_expr) -> map(#deserialize_pipeline);
592 },
593 None,
594 Some(&format!("recv{}", tag_id)),
595 );
596 } else {
597 receiver_builder.add_dfir(
598 parse_quote! {
599 #out_ident = source_stream(#source_expr);
600 },
601 None,
602 Some(&format!("recv{}", tag_id)),
603 );
604 }
605 }
606
607 fn create_external_output(
608 &mut self,
609 on: &LocationId,
610 sink_expr: syn::Expr,
611 input_ident: &syn::Ident,
612 serialize: Option<&DebugExpr>,
613 tag_id: usize,
614 ) {
615 let sender_builder = self.get_dfir_mut(on);
616 if let Some(serialize_fn) = serialize {
617 sender_builder.add_dfir(
618 parse_quote! {
619 #input_ident -> map(#serialize_fn) -> dest_sink(#sink_expr);
620 },
621 None,
622 Some(&format!("send{}", tag_id)),
624 );
625 } else {
626 sender_builder.add_dfir(
627 parse_quote! {
628 #input_ident -> dest_sink(#sink_expr);
629 },
630 None,
631 Some(&format!("send{}", tag_id)),
632 );
633 }
634 }
635}
636
637#[cfg(feature = "build")]
638pub enum BuildersOrCallback<'a, L, N>
639where
640 L: FnMut(&mut HydroRoot, &mut usize),
641 N: FnMut(&mut HydroNode, &mut usize),
642{
643 Builders(&'a mut dyn DfirBuilder),
644 Callback(L, N),
645}
646
647#[derive(Debug, Hash)]
651pub enum HydroRoot {
652 ForEach {
653 f: DebugExpr,
654 input: Box<HydroNode>,
655 op_metadata: HydroIrOpMetadata,
656 },
657 SendExternal {
658 to_external_key: LocationKey,
659 to_port_id: ExternalPortId,
660 to_many: bool,
661 unpaired: bool,
662 serialize_fn: Option<DebugExpr>,
663 instantiate_fn: DebugInstantiate,
664 input: Box<HydroNode>,
665 op_metadata: HydroIrOpMetadata,
666 },
667 DestSink {
668 sink: DebugExpr,
669 input: Box<HydroNode>,
670 op_metadata: HydroIrOpMetadata,
671 },
672 CycleSink {
673 ident: syn::Ident,
674 input: Box<HydroNode>,
675 op_metadata: HydroIrOpMetadata,
676 },
677}
678
679impl HydroRoot {
680 #[cfg(feature = "build")]
681 pub fn compile_network<'a, D>(
682 &mut self,
683 extra_stmts: &mut SparseSecondaryMap<LocationKey, Vec<syn::Stmt>>,
684 seen_tees: &mut SeenTees,
685 processes: &SparseSecondaryMap<LocationKey, D::Process>,
686 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
687 externals: &SparseSecondaryMap<LocationKey, D::External>,
688 ) where
689 D: Deploy<'a>,
690 {
691 let refcell_extra_stmts = RefCell::new(extra_stmts);
692 self.transform_bottom_up(
693 &mut |l| {
694 if let HydroRoot::SendExternal {
695 input,
696 to_external_key,
697 to_port_id,
698 to_many,
699 unpaired,
700 instantiate_fn,
701 ..
702 } = l
703 {
704 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
705 DebugInstantiate::Building => {
706 let to_node = externals
707 .get(*to_external_key)
708 .unwrap_or_else(|| {
709 panic!("A external used in the graph was not instantiated: {}", to_external_key)
710 })
711 .clone();
712
713 match input.metadata().location_id.root() {
714 &LocationId::Process(process_key) => {
715 if *to_many {
716 (
717 (
718 D::e2o_many_sink(format!("{}_{}", *to_external_key, *to_port_id)),
719 parse_quote!(DUMMY),
720 ),
721 Box::new(|| {}) as Box<dyn FnOnce()>,
722 )
723 } else {
724 let from_node = processes
725 .get(process_key)
726 .unwrap_or_else(|| {
727 panic!("A process used in the graph was not instantiated: {}", process_key)
728 })
729 .clone();
730
731 let sink_port = from_node.next_port();
732 let source_port = to_node.next_port();
733
734 if *unpaired {
735 use stageleft::quote_type;
736 use tokio_util::codec::LengthDelimitedCodec;
737
738 to_node.register(*to_port_id, source_port.clone());
739
740 let _ = D::e2o_source(
741 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
742 &to_node, &source_port,
743 &from_node, &sink_port,
744 "e_type::<LengthDelimitedCodec>(),
745 format!("{}_{}", *to_external_key, *to_port_id)
746 );
747 }
748
749 (
750 (
751 D::o2e_sink(
752 &from_node,
753 &sink_port,
754 &to_node,
755 &source_port,
756 format!("{}_{}", *to_external_key, *to_port_id)
757 ),
758 parse_quote!(DUMMY),
759 ),
760 if *unpaired {
761 D::e2o_connect(
762 &to_node,
763 &source_port,
764 &from_node,
765 &sink_port,
766 *to_many,
767 NetworkHint::Auto,
768 )
769 } else {
770 Box::new(|| {}) as Box<dyn FnOnce()>
771 },
772 )
773 }
774 }
775 LocationId::Cluster(_) => todo!(),
776 _ => panic!()
777 }
778 },
779
780 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
781 };
782
783 *instantiate_fn = DebugInstantiateFinalized {
784 sink: sink_expr,
785 source: source_expr,
786 connect_fn: Some(connect_fn),
787 }
788 .into();
789 }
790 },
791 &mut |n| {
792 if let HydroNode::Network {
793 input,
794 instantiate_fn,
795 metadata,
796 ..
797 } = n
798 {
799 let (sink_expr, source_expr, connect_fn) = match instantiate_fn {
800 DebugInstantiate::Building => instantiate_network::<D>(
801 input.metadata().location_id.root(),
802 metadata.location_id.root(),
803 processes,
804 clusters,
805 ),
806
807 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
808 };
809
810 *instantiate_fn = DebugInstantiateFinalized {
811 sink: sink_expr,
812 source: source_expr,
813 connect_fn: Some(connect_fn),
814 }
815 .into();
816 } else if let HydroNode::ExternalInput {
817 from_external_key,
818 from_port_id,
819 from_many,
820 codec_type,
821 port_hint,
822 instantiate_fn,
823 metadata,
824 ..
825 } = n
826 {
827 let ((sink_expr, source_expr), connect_fn) = match instantiate_fn {
828 DebugInstantiate::Building => {
829 let from_node = externals
830 .get(*from_external_key)
831 .unwrap_or_else(|| {
832 panic!(
833 "A external used in the graph was not instantiated: {}",
834 from_external_key,
835 )
836 })
837 .clone();
838
839 match metadata.location_id.root() {
840 &LocationId::Process(process_key) => {
841 let to_node = processes
842 .get(process_key)
843 .unwrap_or_else(|| {
844 panic!("A process used in the graph was not instantiated: {}", process_key)
845 })
846 .clone();
847
848 let sink_port = from_node.next_port();
849 let source_port = to_node.next_port();
850
851 from_node.register(*from_port_id, sink_port.clone());
852
853 (
854 (
855 parse_quote!(DUMMY),
856 if *from_many {
857 D::e2o_many_source(
858 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
859 &to_node, &source_port,
860 codec_type.0.as_ref(),
861 format!("{}_{}", *from_external_key, *from_port_id)
862 )
863 } else {
864 D::e2o_source(
865 refcell_extra_stmts.borrow_mut().entry(process_key).expect("location was removed").or_default(),
866 &from_node, &sink_port,
867 &to_node, &source_port,
868 codec_type.0.as_ref(),
869 format!("{}_{}", *from_external_key, *from_port_id)
870 )
871 },
872 ),
873 D::e2o_connect(&from_node, &sink_port, &to_node, &source_port, *from_many, *port_hint),
874 )
875 }
876 LocationId::Cluster(_) => todo!(),
877 _ => panic!()
878 }
879 },
880
881 DebugInstantiate::Finalized(_) => panic!("network already finalized"),
882 };
883
884 *instantiate_fn = DebugInstantiateFinalized {
885 sink: sink_expr,
886 source: source_expr,
887 connect_fn: Some(connect_fn),
888 }
889 .into();
890 }
891 },
892 seen_tees,
893 false,
894 );
895 }
896
897 pub fn connect_network(&mut self, seen_tees: &mut SeenTees) {
898 self.transform_bottom_up(
899 &mut |l| {
900 if let HydroRoot::SendExternal { instantiate_fn, .. } = l {
901 match instantiate_fn {
902 DebugInstantiate::Building => panic!("network not built"),
903
904 DebugInstantiate::Finalized(finalized) => {
905 (finalized.connect_fn.take().unwrap())();
906 }
907 }
908 }
909 },
910 &mut |n| {
911 if let HydroNode::Network { instantiate_fn, .. }
912 | HydroNode::ExternalInput { instantiate_fn, .. } = n
913 {
914 match instantiate_fn {
915 DebugInstantiate::Building => panic!("network not built"),
916
917 DebugInstantiate::Finalized(finalized) => {
918 (finalized.connect_fn.take().unwrap())();
919 }
920 }
921 }
922 },
923 seen_tees,
924 false,
925 );
926 }
927
928 pub fn transform_bottom_up(
929 &mut self,
930 transform_root: &mut impl FnMut(&mut HydroRoot),
931 transform_node: &mut impl FnMut(&mut HydroNode),
932 seen_tees: &mut SeenTees,
933 check_well_formed: bool,
934 ) {
935 self.transform_children(
936 |n, s| n.transform_bottom_up(transform_node, s, check_well_formed),
937 seen_tees,
938 );
939
940 transform_root(self);
941 }
942
943 pub fn transform_children(
944 &mut self,
945 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
946 seen_tees: &mut SeenTees,
947 ) {
948 match self {
949 HydroRoot::ForEach { input, .. }
950 | HydroRoot::SendExternal { input, .. }
951 | HydroRoot::DestSink { input, .. }
952 | HydroRoot::CycleSink { input, .. } => {
953 transform(input, seen_tees);
954 }
955 }
956 }
957
958 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroRoot {
959 match self {
960 HydroRoot::ForEach {
961 f,
962 input,
963 op_metadata,
964 } => HydroRoot::ForEach {
965 f: f.clone(),
966 input: Box::new(input.deep_clone(seen_tees)),
967 op_metadata: op_metadata.clone(),
968 },
969 HydroRoot::SendExternal {
970 to_external_key,
971 to_port_id,
972 to_many,
973 unpaired,
974 serialize_fn,
975 instantiate_fn,
976 input,
977 op_metadata,
978 } => HydroRoot::SendExternal {
979 to_external_key: *to_external_key,
980 to_port_id: *to_port_id,
981 to_many: *to_many,
982 unpaired: *unpaired,
983 serialize_fn: serialize_fn.clone(),
984 instantiate_fn: instantiate_fn.clone(),
985 input: Box::new(input.deep_clone(seen_tees)),
986 op_metadata: op_metadata.clone(),
987 },
988 HydroRoot::DestSink {
989 sink,
990 input,
991 op_metadata,
992 } => HydroRoot::DestSink {
993 sink: sink.clone(),
994 input: Box::new(input.deep_clone(seen_tees)),
995 op_metadata: op_metadata.clone(),
996 },
997 HydroRoot::CycleSink {
998 ident,
999 input,
1000 op_metadata,
1001 } => HydroRoot::CycleSink {
1002 ident: ident.clone(),
1003 input: Box::new(input.deep_clone(seen_tees)),
1004 op_metadata: op_metadata.clone(),
1005 },
1006 }
1007 }
1008
1009 #[cfg(feature = "build")]
1010 pub fn emit<'a, D: Deploy<'a>>(
1011 &mut self,
1012 graph_builders: &mut dyn DfirBuilder,
1013 seen_tees: &mut SeenTees,
1014 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1015 next_stmt_id: &mut usize,
1016 ) {
1017 self.emit_core::<D>(
1018 &mut BuildersOrCallback::<
1019 fn(&mut HydroRoot, &mut usize),
1020 fn(&mut HydroNode, &mut usize),
1021 >::Builders(graph_builders),
1022 seen_tees,
1023 built_tees,
1024 next_stmt_id,
1025 );
1026 }
1027
1028 #[cfg(feature = "build")]
1029 pub fn emit_core<'a, D: Deploy<'a>>(
1030 &mut self,
1031 builders_or_callback: &mut BuildersOrCallback<
1032 impl FnMut(&mut HydroRoot, &mut usize),
1033 impl FnMut(&mut HydroNode, &mut usize),
1034 >,
1035 seen_tees: &mut SeenTees,
1036 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
1037 next_stmt_id: &mut usize,
1038 ) {
1039 match self {
1040 HydroRoot::ForEach { f, input, .. } => {
1041 let input_ident =
1042 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1043
1044 match builders_or_callback {
1045 BuildersOrCallback::Builders(graph_builders) => {
1046 graph_builders
1047 .get_dfir_mut(&input.metadata().location_id)
1048 .add_dfir(
1049 parse_quote! {
1050 #input_ident -> for_each(#f);
1051 },
1052 None,
1053 Some(&next_stmt_id.to_string()),
1054 );
1055 }
1056 BuildersOrCallback::Callback(leaf_callback, _) => {
1057 leaf_callback(self, next_stmt_id);
1058 }
1059 }
1060
1061 *next_stmt_id += 1;
1062 }
1063
1064 HydroRoot::SendExternal {
1065 serialize_fn,
1066 instantiate_fn,
1067 input,
1068 ..
1069 } => {
1070 let input_ident =
1071 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1072
1073 match builders_or_callback {
1074 BuildersOrCallback::Builders(graph_builders) => {
1075 let (sink_expr, _) = match instantiate_fn {
1076 DebugInstantiate::Building => (
1077 syn::parse_quote!(DUMMY_SINK),
1078 syn::parse_quote!(DUMMY_SOURCE),
1079 ),
1080
1081 DebugInstantiate::Finalized(finalized) => {
1082 (finalized.sink.clone(), finalized.source.clone())
1083 }
1084 };
1085
1086 graph_builders.create_external_output(
1087 &input.metadata().location_id,
1088 sink_expr,
1089 &input_ident,
1090 serialize_fn.as_ref(),
1091 *next_stmt_id,
1092 );
1093 }
1094 BuildersOrCallback::Callback(leaf_callback, _) => {
1095 leaf_callback(self, next_stmt_id);
1096 }
1097 }
1098
1099 *next_stmt_id += 1;
1100 }
1101
1102 HydroRoot::DestSink { sink, input, .. } => {
1103 let input_ident =
1104 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1105
1106 match builders_or_callback {
1107 BuildersOrCallback::Builders(graph_builders) => {
1108 graph_builders
1109 .get_dfir_mut(&input.metadata().location_id)
1110 .add_dfir(
1111 parse_quote! {
1112 #input_ident -> dest_sink(#sink);
1113 },
1114 None,
1115 Some(&next_stmt_id.to_string()),
1116 );
1117 }
1118 BuildersOrCallback::Callback(leaf_callback, _) => {
1119 leaf_callback(self, next_stmt_id);
1120 }
1121 }
1122
1123 *next_stmt_id += 1;
1124 }
1125
1126 HydroRoot::CycleSink { ident, input, .. } => {
1127 let input_ident =
1128 input.emit_core::<D>(builders_or_callback, seen_tees, built_tees, next_stmt_id);
1129
1130 match builders_or_callback {
1131 BuildersOrCallback::Builders(graph_builders) => {
1132 let elem_type: syn::Type = match &input.metadata().collection_kind {
1133 CollectionKind::KeyedSingleton {
1134 key_type,
1135 value_type,
1136 ..
1137 }
1138 | CollectionKind::KeyedStream {
1139 key_type,
1140 value_type,
1141 ..
1142 } => {
1143 parse_quote!((#key_type, #value_type))
1144 }
1145 CollectionKind::Stream { element_type, .. }
1146 | CollectionKind::Singleton { element_type, .. }
1147 | CollectionKind::Optional { element_type, .. } => {
1148 parse_quote!(#element_type)
1149 }
1150 };
1151
1152 graph_builders
1153 .get_dfir_mut(&input.metadata().location_id)
1154 .add_dfir(
1155 parse_quote! {
1156 #ident = #input_ident -> identity::<#elem_type>();
1157 },
1158 None,
1159 None,
1160 );
1161 }
1162 BuildersOrCallback::Callback(_, _) => {}
1164 }
1165 }
1166 }
1167 }
1168
1169 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
1170 match self {
1171 HydroRoot::ForEach { op_metadata, .. }
1172 | HydroRoot::SendExternal { op_metadata, .. }
1173 | HydroRoot::DestSink { op_metadata, .. }
1174 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1175 }
1176 }
1177
1178 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
1179 match self {
1180 HydroRoot::ForEach { op_metadata, .. }
1181 | HydroRoot::SendExternal { op_metadata, .. }
1182 | HydroRoot::DestSink { op_metadata, .. }
1183 | HydroRoot::CycleSink { op_metadata, .. } => op_metadata,
1184 }
1185 }
1186
1187 pub fn input(&self) -> &HydroNode {
1188 match self {
1189 HydroRoot::ForEach { input, .. }
1190 | HydroRoot::SendExternal { input, .. }
1191 | HydroRoot::DestSink { input, .. }
1192 | HydroRoot::CycleSink { input, .. } => input,
1193 }
1194 }
1195
1196 pub fn input_metadata(&self) -> &HydroIrMetadata {
1197 self.input().metadata()
1198 }
1199
1200 pub fn print_root(&self) -> String {
1201 match self {
1202 HydroRoot::ForEach { f, .. } => format!("ForEach({:?})", f),
1203 HydroRoot::SendExternal { .. } => "SendExternal".to_owned(),
1204 HydroRoot::DestSink { sink, .. } => format!("DestSink({:?})", sink),
1205 HydroRoot::CycleSink { ident, .. } => format!("CycleSink({:?})", ident),
1206 }
1207 }
1208
1209 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
1210 match self {
1211 HydroRoot::ForEach { f, .. } | HydroRoot::DestSink { sink: f, .. } => {
1212 transform(f);
1213 }
1214 HydroRoot::SendExternal { .. } | HydroRoot::CycleSink { .. } => {}
1215 }
1216 }
1217}
1218
1219#[cfg(feature = "build")]
1220pub fn emit<'a, D: Deploy<'a>>(
1221 ir: &mut Vec<HydroRoot>,
1222) -> SecondaryMap<LocationKey, FlatGraphBuilder> {
1223 let mut builders = SecondaryMap::new();
1224 let mut seen_tees = HashMap::new();
1225 let mut built_tees = HashMap::new();
1226 let mut next_stmt_id = 0;
1227 for leaf in ir {
1228 leaf.emit::<D>(
1229 &mut builders,
1230 &mut seen_tees,
1231 &mut built_tees,
1232 &mut next_stmt_id,
1233 );
1234 }
1235 builders
1236}
1237
1238#[cfg(feature = "build")]
1239pub fn traverse_dfir<'a, D: Deploy<'a>>(
1240 ir: &mut [HydroRoot],
1241 transform_root: impl FnMut(&mut HydroRoot, &mut usize),
1242 transform_node: impl FnMut(&mut HydroNode, &mut usize),
1243) {
1244 let mut seen_tees = HashMap::new();
1245 let mut built_tees = HashMap::new();
1246 let mut next_stmt_id = 0;
1247 let mut callback = BuildersOrCallback::Callback(transform_root, transform_node);
1248 ir.iter_mut().for_each(|leaf| {
1249 leaf.emit_core::<D>(
1250 &mut callback,
1251 &mut seen_tees,
1252 &mut built_tees,
1253 &mut next_stmt_id,
1254 );
1255 });
1256}
1257
1258pub fn transform_bottom_up(
1259 ir: &mut [HydroRoot],
1260 transform_root: &mut impl FnMut(&mut HydroRoot),
1261 transform_node: &mut impl FnMut(&mut HydroNode),
1262 check_well_formed: bool,
1263) {
1264 let mut seen_tees = HashMap::new();
1265 ir.iter_mut().for_each(|leaf| {
1266 leaf.transform_bottom_up(
1267 transform_root,
1268 transform_node,
1269 &mut seen_tees,
1270 check_well_formed,
1271 );
1272 });
1273}
1274
1275pub fn deep_clone(ir: &[HydroRoot]) -> Vec<HydroRoot> {
1276 let mut seen_tees = HashMap::new();
1277 ir.iter()
1278 .map(|leaf| leaf.deep_clone(&mut seen_tees))
1279 .collect()
1280}
1281
1282type PrintedTees = RefCell<Option<(usize, HashMap<*const RefCell<HydroNode>, usize>)>>;
1283thread_local! {
1284 static PRINTED_TEES: PrintedTees = const { RefCell::new(None) };
1285}
1286
1287pub fn dbg_dedup_tee<T>(f: impl FnOnce() -> T) -> T {
1288 PRINTED_TEES.with(|printed_tees| {
1289 let mut printed_tees_mut = printed_tees.borrow_mut();
1290 *printed_tees_mut = Some((0, HashMap::new()));
1291 drop(printed_tees_mut);
1292
1293 let ret = f();
1294
1295 let mut printed_tees_mut = printed_tees.borrow_mut();
1296 *printed_tees_mut = None;
1297
1298 ret
1299 })
1300}
1301
1302pub struct TeeNode(pub Rc<RefCell<HydroNode>>);
1303
1304impl TeeNode {
1305 pub fn as_ptr(&self) -> *const RefCell<HydroNode> {
1306 Rc::as_ptr(&self.0)
1307 }
1308}
1309
1310impl Debug for TeeNode {
1311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1312 PRINTED_TEES.with(|printed_tees| {
1313 let mut printed_tees_mut_borrow = printed_tees.borrow_mut();
1314 let printed_tees_mut = printed_tees_mut_borrow.as_mut();
1315
1316 if let Some(printed_tees_mut) = printed_tees_mut {
1317 if let Some(existing) = printed_tees_mut
1318 .1
1319 .get(&(self.0.as_ref() as *const RefCell<HydroNode>))
1320 {
1321 write!(f, "<tee {}>", existing)
1322 } else {
1323 let next_id = printed_tees_mut.0;
1324 printed_tees_mut.0 += 1;
1325 printed_tees_mut
1326 .1
1327 .insert(self.0.as_ref() as *const RefCell<HydroNode>, next_id);
1328 drop(printed_tees_mut_borrow);
1329 write!(f, "<tee {}>: ", next_id)?;
1330 Debug::fmt(&self.0.borrow(), f)
1331 }
1332 } else {
1333 drop(printed_tees_mut_borrow);
1334 write!(f, "<tee>: ")?;
1335 Debug::fmt(&self.0.borrow(), f)
1336 }
1337 })
1338 }
1339}
1340
1341impl Hash for TeeNode {
1342 fn hash<H: Hasher>(&self, state: &mut H) {
1343 self.0.borrow_mut().hash(state);
1344 }
1345}
1346
1347#[derive(Clone, PartialEq, Eq, Debug)]
1348pub enum BoundKind {
1349 Unbounded,
1350 Bounded,
1351}
1352
1353#[derive(Clone, PartialEq, Eq, Debug)]
1354pub enum StreamOrder {
1355 NoOrder,
1356 TotalOrder,
1357}
1358
1359#[derive(Clone, PartialEq, Eq, Debug)]
1360pub enum StreamRetry {
1361 AtLeastOnce,
1362 ExactlyOnce,
1363}
1364
1365#[derive(Clone, PartialEq, Eq, Debug)]
1366pub enum KeyedSingletonBoundKind {
1367 Unbounded,
1368 BoundedValue,
1369 Bounded,
1370}
1371
1372#[derive(Clone, PartialEq, Eq, Debug)]
1373pub enum CollectionKind {
1374 Stream {
1375 bound: BoundKind,
1376 order: StreamOrder,
1377 retry: StreamRetry,
1378 element_type: DebugType,
1379 },
1380 Singleton {
1381 bound: BoundKind,
1382 element_type: DebugType,
1383 },
1384 Optional {
1385 bound: BoundKind,
1386 element_type: DebugType,
1387 },
1388 KeyedStream {
1389 bound: BoundKind,
1390 value_order: StreamOrder,
1391 value_retry: StreamRetry,
1392 key_type: DebugType,
1393 value_type: DebugType,
1394 },
1395 KeyedSingleton {
1396 bound: KeyedSingletonBoundKind,
1397 key_type: DebugType,
1398 value_type: DebugType,
1399 },
1400}
1401
1402impl CollectionKind {
1403 pub fn is_bounded(&self) -> bool {
1404 matches!(
1405 self,
1406 CollectionKind::Stream {
1407 bound: BoundKind::Bounded,
1408 ..
1409 } | CollectionKind::Singleton {
1410 bound: BoundKind::Bounded,
1411 ..
1412 } | CollectionKind::Optional {
1413 bound: BoundKind::Bounded,
1414 ..
1415 } | CollectionKind::KeyedStream {
1416 bound: BoundKind::Bounded,
1417 ..
1418 } | CollectionKind::KeyedSingleton {
1419 bound: KeyedSingletonBoundKind::Bounded,
1420 ..
1421 }
1422 )
1423 }
1424}
1425
1426#[derive(Clone)]
1427pub struct HydroIrMetadata {
1428 pub location_id: LocationId,
1429 pub collection_kind: CollectionKind,
1430 pub cardinality: Option<usize>,
1431 pub tag: Option<String>,
1432 pub op: HydroIrOpMetadata,
1433}
1434
1435impl Hash for HydroIrMetadata {
1437 fn hash<H: Hasher>(&self, _: &mut H) {}
1438}
1439
1440impl PartialEq for HydroIrMetadata {
1441 fn eq(&self, _: &Self) -> bool {
1442 true
1443 }
1444}
1445
1446impl Eq for HydroIrMetadata {}
1447
1448impl Debug for HydroIrMetadata {
1449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1450 f.debug_struct("HydroIrMetadata")
1451 .field("location_id", &self.location_id)
1452 .field("collection_kind", &self.collection_kind)
1453 .finish()
1454 }
1455}
1456
1457#[derive(Clone)]
1460pub struct HydroIrOpMetadata {
1461 pub backtrace: Backtrace,
1462 pub cpu_usage: Option<f64>,
1463 pub network_recv_cpu_usage: Option<f64>,
1464 pub id: Option<usize>,
1465}
1466
1467impl HydroIrOpMetadata {
1468 #[expect(
1469 clippy::new_without_default,
1470 reason = "explicit calls to new ensure correct backtrace bounds"
1471 )]
1472 pub fn new() -> HydroIrOpMetadata {
1473 Self::new_with_skip(1)
1474 }
1475
1476 fn new_with_skip(skip_count: usize) -> HydroIrOpMetadata {
1477 HydroIrOpMetadata {
1478 backtrace: Backtrace::get_backtrace(2 + skip_count),
1479 cpu_usage: None,
1480 network_recv_cpu_usage: None,
1481 id: None,
1482 }
1483 }
1484}
1485
1486impl Debug for HydroIrOpMetadata {
1487 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1488 f.debug_struct("HydroIrOpMetadata").finish()
1489 }
1490}
1491
1492impl Hash for HydroIrOpMetadata {
1493 fn hash<H: Hasher>(&self, _: &mut H) {}
1494}
1495
1496#[derive(Debug, Hash)]
1499pub enum HydroNode {
1500 Placeholder,
1501
1502 Cast {
1510 inner: Box<HydroNode>,
1511 metadata: HydroIrMetadata,
1512 },
1513
1514 ObserveNonDet {
1520 inner: Box<HydroNode>,
1521 trusted: bool, metadata: HydroIrMetadata,
1523 },
1524
1525 Source {
1526 source: HydroSource,
1527 metadata: HydroIrMetadata,
1528 },
1529
1530 SingletonSource {
1531 value: DebugExpr,
1532 metadata: HydroIrMetadata,
1533 },
1534
1535 CycleSource {
1536 ident: syn::Ident,
1537 metadata: HydroIrMetadata,
1538 },
1539
1540 Tee {
1541 inner: TeeNode,
1542 metadata: HydroIrMetadata,
1543 },
1544
1545 BeginAtomic {
1546 inner: Box<HydroNode>,
1547 metadata: HydroIrMetadata,
1548 },
1549
1550 EndAtomic {
1551 inner: Box<HydroNode>,
1552 metadata: HydroIrMetadata,
1553 },
1554
1555 Batch {
1556 inner: Box<HydroNode>,
1557 metadata: HydroIrMetadata,
1558 },
1559
1560 YieldConcat {
1561 inner: Box<HydroNode>,
1562 metadata: HydroIrMetadata,
1563 },
1564
1565 Chain {
1566 first: Box<HydroNode>,
1567 second: Box<HydroNode>,
1568 metadata: HydroIrMetadata,
1569 },
1570
1571 ChainFirst {
1572 first: Box<HydroNode>,
1573 second: Box<HydroNode>,
1574 metadata: HydroIrMetadata,
1575 },
1576
1577 CrossProduct {
1578 left: Box<HydroNode>,
1579 right: Box<HydroNode>,
1580 metadata: HydroIrMetadata,
1581 },
1582
1583 CrossSingleton {
1584 left: Box<HydroNode>,
1585 right: Box<HydroNode>,
1586 metadata: HydroIrMetadata,
1587 },
1588
1589 Join {
1590 left: Box<HydroNode>,
1591 right: Box<HydroNode>,
1592 metadata: HydroIrMetadata,
1593 },
1594
1595 Difference {
1596 pos: Box<HydroNode>,
1597 neg: Box<HydroNode>,
1598 metadata: HydroIrMetadata,
1599 },
1600
1601 AntiJoin {
1602 pos: Box<HydroNode>,
1603 neg: Box<HydroNode>,
1604 metadata: HydroIrMetadata,
1605 },
1606
1607 ResolveFutures {
1608 input: Box<HydroNode>,
1609 metadata: HydroIrMetadata,
1610 },
1611 ResolveFuturesOrdered {
1612 input: Box<HydroNode>,
1613 metadata: HydroIrMetadata,
1614 },
1615
1616 Map {
1617 f: DebugExpr,
1618 input: Box<HydroNode>,
1619 metadata: HydroIrMetadata,
1620 },
1621 FlatMap {
1622 f: DebugExpr,
1623 input: Box<HydroNode>,
1624 metadata: HydroIrMetadata,
1625 },
1626 Filter {
1627 f: DebugExpr,
1628 input: Box<HydroNode>,
1629 metadata: HydroIrMetadata,
1630 },
1631 FilterMap {
1632 f: DebugExpr,
1633 input: Box<HydroNode>,
1634 metadata: HydroIrMetadata,
1635 },
1636
1637 DeferTick {
1638 input: Box<HydroNode>,
1639 metadata: HydroIrMetadata,
1640 },
1641 Enumerate {
1642 input: Box<HydroNode>,
1643 metadata: HydroIrMetadata,
1644 },
1645 Inspect {
1646 f: DebugExpr,
1647 input: Box<HydroNode>,
1648 metadata: HydroIrMetadata,
1649 },
1650
1651 Unique {
1652 input: Box<HydroNode>,
1653 metadata: HydroIrMetadata,
1654 },
1655
1656 Sort {
1657 input: Box<HydroNode>,
1658 metadata: HydroIrMetadata,
1659 },
1660 Fold {
1661 init: DebugExpr,
1662 acc: DebugExpr,
1663 input: Box<HydroNode>,
1664 metadata: HydroIrMetadata,
1665 },
1666
1667 Scan {
1668 init: DebugExpr,
1669 acc: DebugExpr,
1670 input: Box<HydroNode>,
1671 metadata: HydroIrMetadata,
1672 },
1673 FoldKeyed {
1674 init: DebugExpr,
1675 acc: DebugExpr,
1676 input: Box<HydroNode>,
1677 metadata: HydroIrMetadata,
1678 },
1679
1680 Reduce {
1681 f: DebugExpr,
1682 input: Box<HydroNode>,
1683 metadata: HydroIrMetadata,
1684 },
1685 ReduceKeyed {
1686 f: DebugExpr,
1687 input: Box<HydroNode>,
1688 metadata: HydroIrMetadata,
1689 },
1690 ReduceKeyedWatermark {
1691 f: DebugExpr,
1692 input: Box<HydroNode>,
1693 watermark: Box<HydroNode>,
1694 metadata: HydroIrMetadata,
1695 },
1696
1697 Network {
1698 name: Option<String>,
1699 serialize_fn: Option<DebugExpr>,
1700 instantiate_fn: DebugInstantiate,
1701 deserialize_fn: Option<DebugExpr>,
1702 input: Box<HydroNode>,
1703 metadata: HydroIrMetadata,
1704 },
1705
1706 ExternalInput {
1707 from_external_key: LocationKey,
1708 from_port_id: ExternalPortId,
1709 from_many: bool,
1710 codec_type: DebugType,
1711 port_hint: NetworkHint,
1712 instantiate_fn: DebugInstantiate,
1713 deserialize_fn: Option<DebugExpr>,
1714 metadata: HydroIrMetadata,
1715 },
1716
1717 Counter {
1718 tag: String,
1719 duration: DebugExpr,
1720 prefix: String,
1721 input: Box<HydroNode>,
1722 metadata: HydroIrMetadata,
1723 },
1724}
1725
1726pub type SeenTees = HashMap<*const RefCell<HydroNode>, Rc<RefCell<HydroNode>>>;
1727pub type SeenTeeLocations = HashMap<*const RefCell<HydroNode>, LocationId>;
1728
1729impl HydroNode {
1730 pub fn transform_bottom_up(
1731 &mut self,
1732 transform: &mut impl FnMut(&mut HydroNode),
1733 seen_tees: &mut SeenTees,
1734 check_well_formed: bool,
1735 ) {
1736 self.transform_children(
1737 |n, s| n.transform_bottom_up(transform, s, check_well_formed),
1738 seen_tees,
1739 );
1740
1741 transform(self);
1742
1743 let self_location = self.metadata().location_id.root();
1744
1745 if check_well_formed {
1746 match &*self {
1747 HydroNode::Network { .. } => {}
1748 _ => {
1749 self.input_metadata().iter().for_each(|i| {
1750 if i.location_id.root() != self_location {
1751 panic!(
1752 "Mismatching IR locations, child: {:?} ({:?}) of: {:?} ({:?})",
1753 i,
1754 i.location_id.root(),
1755 self,
1756 self_location
1757 )
1758 }
1759 });
1760 }
1761 }
1762 }
1763 }
1764
1765 #[inline(always)]
1766 pub fn transform_children(
1767 &mut self,
1768 mut transform: impl FnMut(&mut HydroNode, &mut SeenTees),
1769 seen_tees: &mut SeenTees,
1770 ) {
1771 match self {
1772 HydroNode::Placeholder => {
1773 panic!();
1774 }
1775
1776 HydroNode::Source { .. }
1777 | HydroNode::SingletonSource { .. }
1778 | HydroNode::CycleSource { .. }
1779 | HydroNode::ExternalInput { .. } => {}
1780
1781 HydroNode::Tee { inner, .. } => {
1782 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1783 *inner = TeeNode(transformed.clone());
1784 } else {
1785 let transformed_cell = Rc::new(RefCell::new(HydroNode::Placeholder));
1786 seen_tees.insert(inner.as_ptr(), transformed_cell.clone());
1787 let mut orig = inner.0.replace(HydroNode::Placeholder);
1788 transform(&mut orig, seen_tees);
1789 *transformed_cell.borrow_mut() = orig;
1790 *inner = TeeNode(transformed_cell);
1791 }
1792 }
1793
1794 HydroNode::Cast { inner, .. }
1795 | HydroNode::ObserveNonDet { inner, .. }
1796 | HydroNode::BeginAtomic { inner, .. }
1797 | HydroNode::EndAtomic { inner, .. }
1798 | HydroNode::Batch { inner, .. }
1799 | HydroNode::YieldConcat { inner, .. } => {
1800 transform(inner.as_mut(), seen_tees);
1801 }
1802
1803 HydroNode::Chain { first, second, .. } => {
1804 transform(first.as_mut(), seen_tees);
1805 transform(second.as_mut(), seen_tees);
1806 }
1807
1808 HydroNode::ChainFirst { first, second, .. } => {
1809 transform(first.as_mut(), seen_tees);
1810 transform(second.as_mut(), seen_tees);
1811 }
1812
1813 HydroNode::CrossSingleton { left, right, .. }
1814 | HydroNode::CrossProduct { left, right, .. }
1815 | HydroNode::Join { left, right, .. } => {
1816 transform(left.as_mut(), seen_tees);
1817 transform(right.as_mut(), seen_tees);
1818 }
1819
1820 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
1821 transform(pos.as_mut(), seen_tees);
1822 transform(neg.as_mut(), seen_tees);
1823 }
1824
1825 HydroNode::ReduceKeyedWatermark {
1826 input, watermark, ..
1827 } => {
1828 transform(input.as_mut(), seen_tees);
1829 transform(watermark.as_mut(), seen_tees);
1830 }
1831
1832 HydroNode::Map { input, .. }
1833 | HydroNode::ResolveFutures { input, .. }
1834 | HydroNode::ResolveFuturesOrdered { input, .. }
1835 | HydroNode::FlatMap { input, .. }
1836 | HydroNode::Filter { input, .. }
1837 | HydroNode::FilterMap { input, .. }
1838 | HydroNode::Sort { input, .. }
1839 | HydroNode::DeferTick { input, .. }
1840 | HydroNode::Enumerate { input, .. }
1841 | HydroNode::Inspect { input, .. }
1842 | HydroNode::Unique { input, .. }
1843 | HydroNode::Network { input, .. }
1844 | HydroNode::Fold { input, .. }
1845 | HydroNode::Scan { input, .. }
1846 | HydroNode::FoldKeyed { input, .. }
1847 | HydroNode::Reduce { input, .. }
1848 | HydroNode::ReduceKeyed { input, .. }
1849 | HydroNode::Counter { input, .. } => {
1850 transform(input.as_mut(), seen_tees);
1851 }
1852 }
1853 }
1854
1855 pub fn deep_clone(&self, seen_tees: &mut SeenTees) -> HydroNode {
1856 match self {
1857 HydroNode::Placeholder => HydroNode::Placeholder,
1858 HydroNode::Cast { inner, metadata } => HydroNode::Cast {
1859 inner: Box::new(inner.deep_clone(seen_tees)),
1860 metadata: metadata.clone(),
1861 },
1862 HydroNode::ObserveNonDet {
1863 inner,
1864 trusted,
1865 metadata,
1866 } => HydroNode::ObserveNonDet {
1867 inner: Box::new(inner.deep_clone(seen_tees)),
1868 trusted: *trusted,
1869 metadata: metadata.clone(),
1870 },
1871 HydroNode::Source { source, metadata } => HydroNode::Source {
1872 source: source.clone(),
1873 metadata: metadata.clone(),
1874 },
1875 HydroNode::SingletonSource { value, metadata } => HydroNode::SingletonSource {
1876 value: value.clone(),
1877 metadata: metadata.clone(),
1878 },
1879 HydroNode::CycleSource { ident, metadata } => HydroNode::CycleSource {
1880 ident: ident.clone(),
1881 metadata: metadata.clone(),
1882 },
1883 HydroNode::Tee { inner, metadata } => {
1884 if let Some(transformed) = seen_tees.get(&inner.as_ptr()) {
1885 HydroNode::Tee {
1886 inner: TeeNode(transformed.clone()),
1887 metadata: metadata.clone(),
1888 }
1889 } else {
1890 let new_rc = Rc::new(RefCell::new(HydroNode::Placeholder));
1891 seen_tees.insert(inner.as_ptr(), new_rc.clone());
1892 let cloned = inner.0.borrow().deep_clone(seen_tees);
1893 *new_rc.borrow_mut() = cloned;
1894 HydroNode::Tee {
1895 inner: TeeNode(new_rc),
1896 metadata: metadata.clone(),
1897 }
1898 }
1899 }
1900 HydroNode::YieldConcat { inner, metadata } => HydroNode::YieldConcat {
1901 inner: Box::new(inner.deep_clone(seen_tees)),
1902 metadata: metadata.clone(),
1903 },
1904 HydroNode::BeginAtomic { inner, metadata } => HydroNode::BeginAtomic {
1905 inner: Box::new(inner.deep_clone(seen_tees)),
1906 metadata: metadata.clone(),
1907 },
1908 HydroNode::EndAtomic { inner, metadata } => HydroNode::EndAtomic {
1909 inner: Box::new(inner.deep_clone(seen_tees)),
1910 metadata: metadata.clone(),
1911 },
1912 HydroNode::Batch { inner, metadata } => HydroNode::Batch {
1913 inner: Box::new(inner.deep_clone(seen_tees)),
1914 metadata: metadata.clone(),
1915 },
1916 HydroNode::Chain {
1917 first,
1918 second,
1919 metadata,
1920 } => HydroNode::Chain {
1921 first: Box::new(first.deep_clone(seen_tees)),
1922 second: Box::new(second.deep_clone(seen_tees)),
1923 metadata: metadata.clone(),
1924 },
1925 HydroNode::ChainFirst {
1926 first,
1927 second,
1928 metadata,
1929 } => HydroNode::ChainFirst {
1930 first: Box::new(first.deep_clone(seen_tees)),
1931 second: Box::new(second.deep_clone(seen_tees)),
1932 metadata: metadata.clone(),
1933 },
1934 HydroNode::CrossProduct {
1935 left,
1936 right,
1937 metadata,
1938 } => HydroNode::CrossProduct {
1939 left: Box::new(left.deep_clone(seen_tees)),
1940 right: Box::new(right.deep_clone(seen_tees)),
1941 metadata: metadata.clone(),
1942 },
1943 HydroNode::CrossSingleton {
1944 left,
1945 right,
1946 metadata,
1947 } => HydroNode::CrossSingleton {
1948 left: Box::new(left.deep_clone(seen_tees)),
1949 right: Box::new(right.deep_clone(seen_tees)),
1950 metadata: metadata.clone(),
1951 },
1952 HydroNode::Join {
1953 left,
1954 right,
1955 metadata,
1956 } => HydroNode::Join {
1957 left: Box::new(left.deep_clone(seen_tees)),
1958 right: Box::new(right.deep_clone(seen_tees)),
1959 metadata: metadata.clone(),
1960 },
1961 HydroNode::Difference { pos, neg, metadata } => HydroNode::Difference {
1962 pos: Box::new(pos.deep_clone(seen_tees)),
1963 neg: Box::new(neg.deep_clone(seen_tees)),
1964 metadata: metadata.clone(),
1965 },
1966 HydroNode::AntiJoin { pos, neg, metadata } => HydroNode::AntiJoin {
1967 pos: Box::new(pos.deep_clone(seen_tees)),
1968 neg: Box::new(neg.deep_clone(seen_tees)),
1969 metadata: metadata.clone(),
1970 },
1971 HydroNode::ResolveFutures { input, metadata } => HydroNode::ResolveFutures {
1972 input: Box::new(input.deep_clone(seen_tees)),
1973 metadata: metadata.clone(),
1974 },
1975 HydroNode::ResolveFuturesOrdered { input, metadata } => {
1976 HydroNode::ResolveFuturesOrdered {
1977 input: Box::new(input.deep_clone(seen_tees)),
1978 metadata: metadata.clone(),
1979 }
1980 }
1981 HydroNode::Map { f, input, metadata } => HydroNode::Map {
1982 f: f.clone(),
1983 input: Box::new(input.deep_clone(seen_tees)),
1984 metadata: metadata.clone(),
1985 },
1986 HydroNode::FlatMap { f, input, metadata } => HydroNode::FlatMap {
1987 f: f.clone(),
1988 input: Box::new(input.deep_clone(seen_tees)),
1989 metadata: metadata.clone(),
1990 },
1991 HydroNode::Filter { f, input, metadata } => HydroNode::Filter {
1992 f: f.clone(),
1993 input: Box::new(input.deep_clone(seen_tees)),
1994 metadata: metadata.clone(),
1995 },
1996 HydroNode::FilterMap { f, input, metadata } => HydroNode::FilterMap {
1997 f: f.clone(),
1998 input: Box::new(input.deep_clone(seen_tees)),
1999 metadata: metadata.clone(),
2000 },
2001 HydroNode::DeferTick { input, metadata } => HydroNode::DeferTick {
2002 input: Box::new(input.deep_clone(seen_tees)),
2003 metadata: metadata.clone(),
2004 },
2005 HydroNode::Enumerate { input, metadata } => HydroNode::Enumerate {
2006 input: Box::new(input.deep_clone(seen_tees)),
2007 metadata: metadata.clone(),
2008 },
2009 HydroNode::Inspect { f, input, metadata } => HydroNode::Inspect {
2010 f: f.clone(),
2011 input: Box::new(input.deep_clone(seen_tees)),
2012 metadata: metadata.clone(),
2013 },
2014 HydroNode::Unique { input, metadata } => HydroNode::Unique {
2015 input: Box::new(input.deep_clone(seen_tees)),
2016 metadata: metadata.clone(),
2017 },
2018 HydroNode::Sort { input, metadata } => HydroNode::Sort {
2019 input: Box::new(input.deep_clone(seen_tees)),
2020 metadata: metadata.clone(),
2021 },
2022 HydroNode::Fold {
2023 init,
2024 acc,
2025 input,
2026 metadata,
2027 } => HydroNode::Fold {
2028 init: init.clone(),
2029 acc: acc.clone(),
2030 input: Box::new(input.deep_clone(seen_tees)),
2031 metadata: metadata.clone(),
2032 },
2033 HydroNode::Scan {
2034 init,
2035 acc,
2036 input,
2037 metadata,
2038 } => HydroNode::Scan {
2039 init: init.clone(),
2040 acc: acc.clone(),
2041 input: Box::new(input.deep_clone(seen_tees)),
2042 metadata: metadata.clone(),
2043 },
2044 HydroNode::FoldKeyed {
2045 init,
2046 acc,
2047 input,
2048 metadata,
2049 } => HydroNode::FoldKeyed {
2050 init: init.clone(),
2051 acc: acc.clone(),
2052 input: Box::new(input.deep_clone(seen_tees)),
2053 metadata: metadata.clone(),
2054 },
2055 HydroNode::ReduceKeyedWatermark {
2056 f,
2057 input,
2058 watermark,
2059 metadata,
2060 } => HydroNode::ReduceKeyedWatermark {
2061 f: f.clone(),
2062 input: Box::new(input.deep_clone(seen_tees)),
2063 watermark: Box::new(watermark.deep_clone(seen_tees)),
2064 metadata: metadata.clone(),
2065 },
2066 HydroNode::Reduce { f, input, metadata } => HydroNode::Reduce {
2067 f: f.clone(),
2068 input: Box::new(input.deep_clone(seen_tees)),
2069 metadata: metadata.clone(),
2070 },
2071 HydroNode::ReduceKeyed { f, input, metadata } => HydroNode::ReduceKeyed {
2072 f: f.clone(),
2073 input: Box::new(input.deep_clone(seen_tees)),
2074 metadata: metadata.clone(),
2075 },
2076 HydroNode::Network {
2077 name,
2078 serialize_fn,
2079 instantiate_fn,
2080 deserialize_fn,
2081 input,
2082 metadata,
2083 } => HydroNode::Network {
2084 name: name.clone(),
2085 serialize_fn: serialize_fn.clone(),
2086 instantiate_fn: instantiate_fn.clone(),
2087 deserialize_fn: deserialize_fn.clone(),
2088 input: Box::new(input.deep_clone(seen_tees)),
2089 metadata: metadata.clone(),
2090 },
2091 HydroNode::ExternalInput {
2092 from_external_key,
2093 from_port_id,
2094 from_many,
2095 codec_type,
2096 port_hint,
2097 instantiate_fn,
2098 deserialize_fn,
2099 metadata,
2100 } => HydroNode::ExternalInput {
2101 from_external_key: *from_external_key,
2102 from_port_id: *from_port_id,
2103 from_many: *from_many,
2104 codec_type: codec_type.clone(),
2105 port_hint: *port_hint,
2106 instantiate_fn: instantiate_fn.clone(),
2107 deserialize_fn: deserialize_fn.clone(),
2108 metadata: metadata.clone(),
2109 },
2110 HydroNode::Counter {
2111 tag,
2112 duration,
2113 prefix,
2114 input,
2115 metadata,
2116 } => HydroNode::Counter {
2117 tag: tag.clone(),
2118 duration: duration.clone(),
2119 prefix: prefix.clone(),
2120 input: Box::new(input.deep_clone(seen_tees)),
2121 metadata: metadata.clone(),
2122 },
2123 }
2124 }
2125
2126 #[cfg(feature = "build")]
2127 pub fn emit_core<'a, D: Deploy<'a>>(
2128 &mut self,
2129 builders_or_callback: &mut BuildersOrCallback<
2130 impl FnMut(&mut HydroRoot, &mut usize),
2131 impl FnMut(&mut HydroNode, &mut usize),
2132 >,
2133 seen_tees: &mut SeenTees,
2134 built_tees: &mut HashMap<*const RefCell<HydroNode>, syn::Ident>,
2135 next_stmt_id: &mut usize,
2136 ) -> syn::Ident {
2137 let mut ident_stack: Vec<syn::Ident> = Vec::new();
2138
2139 self.transform_bottom_up(
2140 &mut |node: &mut HydroNode| {
2141 let out_location = node.metadata().location_id.clone();
2142 match node {
2143 HydroNode::Placeholder => {
2144 panic!()
2145 }
2146
2147 HydroNode::Cast { .. } => {
2148 match builders_or_callback {
2151 BuildersOrCallback::Builders(_) => {}
2152 BuildersOrCallback::Callback(_, node_callback) => {
2153 node_callback(node, next_stmt_id);
2154 }
2155 }
2156
2157 *next_stmt_id += 1;
2158 }
2160
2161 HydroNode::ObserveNonDet {
2162 inner,
2163 trusted,
2164 metadata,
2165 ..
2166 } => {
2167 let inner_ident = ident_stack.pop().unwrap();
2168
2169 let observe_ident =
2170 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2171
2172 match builders_or_callback {
2173 BuildersOrCallback::Builders(graph_builders) => {
2174 graph_builders.observe_nondet(
2175 *trusted,
2176 &inner.metadata().location_id,
2177 inner_ident,
2178 &inner.metadata().collection_kind,
2179 &observe_ident,
2180 &metadata.collection_kind,
2181 &metadata.op,
2182 );
2183 }
2184 BuildersOrCallback::Callback(_, node_callback) => {
2185 node_callback(node, next_stmt_id);
2186 }
2187 }
2188
2189 *next_stmt_id += 1;
2190
2191 ident_stack.push(observe_ident);
2192 }
2193
2194 HydroNode::Batch {
2195 inner, metadata, ..
2196 } => {
2197 let inner_ident = ident_stack.pop().unwrap();
2198
2199 let batch_ident =
2200 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2201
2202 match builders_or_callback {
2203 BuildersOrCallback::Builders(graph_builders) => {
2204 graph_builders.batch(
2205 inner_ident,
2206 &inner.metadata().location_id,
2207 &inner.metadata().collection_kind,
2208 &batch_ident,
2209 &out_location,
2210 &metadata.op,
2211 );
2212 }
2213 BuildersOrCallback::Callback(_, node_callback) => {
2214 node_callback(node, next_stmt_id);
2215 }
2216 }
2217
2218 *next_stmt_id += 1;
2219
2220 ident_stack.push(batch_ident);
2221 }
2222
2223 HydroNode::YieldConcat { inner, .. } => {
2224 let inner_ident = ident_stack.pop().unwrap();
2225
2226 let yield_ident =
2227 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2228
2229 match builders_or_callback {
2230 BuildersOrCallback::Builders(graph_builders) => {
2231 graph_builders.yield_from_tick(
2232 inner_ident,
2233 &inner.metadata().location_id,
2234 &inner.metadata().collection_kind,
2235 &yield_ident,
2236 &out_location,
2237 );
2238 }
2239 BuildersOrCallback::Callback(_, node_callback) => {
2240 node_callback(node, next_stmt_id);
2241 }
2242 }
2243
2244 *next_stmt_id += 1;
2245
2246 ident_stack.push(yield_ident);
2247 }
2248
2249 HydroNode::BeginAtomic { inner, metadata } => {
2250 let inner_ident = ident_stack.pop().unwrap();
2251
2252 let begin_ident =
2253 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2254
2255 match builders_or_callback {
2256 BuildersOrCallback::Builders(graph_builders) => {
2257 graph_builders.begin_atomic(
2258 inner_ident,
2259 &inner.metadata().location_id,
2260 &inner.metadata().collection_kind,
2261 &begin_ident,
2262 &out_location,
2263 &metadata.op,
2264 );
2265 }
2266 BuildersOrCallback::Callback(_, node_callback) => {
2267 node_callback(node, next_stmt_id);
2268 }
2269 }
2270
2271 *next_stmt_id += 1;
2272
2273 ident_stack.push(begin_ident);
2274 }
2275
2276 HydroNode::EndAtomic { inner, .. } => {
2277 let inner_ident = ident_stack.pop().unwrap();
2278
2279 let end_ident =
2280 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2281
2282 match builders_or_callback {
2283 BuildersOrCallback::Builders(graph_builders) => {
2284 graph_builders.end_atomic(
2285 inner_ident,
2286 &inner.metadata().location_id,
2287 &inner.metadata().collection_kind,
2288 &end_ident,
2289 );
2290 }
2291 BuildersOrCallback::Callback(_, node_callback) => {
2292 node_callback(node, next_stmt_id);
2293 }
2294 }
2295
2296 *next_stmt_id += 1;
2297
2298 ident_stack.push(end_ident);
2299 }
2300
2301 HydroNode::Source {
2302 source, metadata, ..
2303 } => {
2304 if let HydroSource::ExternalNetwork() = source {
2305 ident_stack.push(syn::Ident::new("DUMMY", Span::call_site()));
2306 } else {
2307 let source_ident =
2308 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2309
2310 let source_stmt = match source {
2311 HydroSource::Stream(expr) => {
2312 debug_assert!(metadata.location_id.is_top_level());
2313 parse_quote! {
2314 #source_ident = source_stream(#expr);
2315 }
2316 }
2317
2318 HydroSource::ExternalNetwork() => {
2319 unreachable!()
2320 }
2321
2322 HydroSource::Iter(expr) => {
2323 if metadata.location_id.is_top_level() {
2324 parse_quote! {
2325 #source_ident = source_iter(#expr);
2326 }
2327 } else {
2328 parse_quote! {
2330 #source_ident = source_iter(#expr) -> persist::<'static>();
2331 }
2332 }
2333 }
2334
2335 HydroSource::Spin() => {
2336 debug_assert!(metadata.location_id.is_top_level());
2337 parse_quote! {
2338 #source_ident = spin();
2339 }
2340 }
2341
2342 HydroSource::ClusterMembers(location_id) => {
2343 debug_assert!(metadata.location_id.is_top_level());
2344
2345 let expr = stageleft::QuotedWithContext::splice_untyped_ctx(
2346 D::cluster_membership_stream(location_id),
2347 &(),
2348 );
2349
2350 parse_quote! {
2351 #source_ident = source_stream(#expr);
2352 }
2353 }
2354 };
2355
2356 match builders_or_callback {
2357 BuildersOrCallback::Builders(graph_builders) => {
2358 let builder = graph_builders.get_dfir_mut(&out_location);
2359 builder.add_dfir(source_stmt, None, Some(&next_stmt_id.to_string()));
2360 }
2361 BuildersOrCallback::Callback(_, node_callback) => {
2362 node_callback(node, next_stmt_id);
2363 }
2364 }
2365
2366 *next_stmt_id += 1;
2367
2368 ident_stack.push(source_ident);
2369 }
2370 }
2371
2372 HydroNode::SingletonSource { value, metadata } => {
2373 let source_ident =
2374 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2375
2376 match builders_or_callback {
2377 BuildersOrCallback::Builders(graph_builders) => {
2378 let builder = graph_builders.get_dfir_mut(&out_location);
2379
2380 if metadata.location_id.is_top_level()
2381 && metadata.collection_kind.is_bounded()
2382 {
2383 builder.add_dfir(
2384 parse_quote! {
2385 #source_ident = source_iter([#value]);
2386 },
2387 None,
2388 Some(&next_stmt_id.to_string()),
2389 );
2390 } else {
2391 builder.add_dfir(
2392 parse_quote! {
2393 #source_ident = source_iter([#value]) -> persist::<'static>();
2394 },
2395 None,
2396 Some(&next_stmt_id.to_string()),
2397 );
2398 }
2399 }
2400 BuildersOrCallback::Callback(_, node_callback) => {
2401 node_callback(node, next_stmt_id);
2402 }
2403 }
2404
2405 *next_stmt_id += 1;
2406
2407 ident_stack.push(source_ident);
2408 }
2409
2410 HydroNode::CycleSource { ident, .. } => {
2411 let ident = ident.clone();
2412
2413 match builders_or_callback {
2414 BuildersOrCallback::Builders(_) => {}
2415 BuildersOrCallback::Callback(_, node_callback) => {
2416 node_callback(node, next_stmt_id);
2417 }
2418 }
2419
2420 *next_stmt_id += 1;
2422
2423 ident_stack.push(ident);
2424 }
2425
2426 HydroNode::Tee { inner, .. } => {
2427 let ret_ident = if let Some(teed_from) =
2428 built_tees.get(&(inner.0.as_ref() as *const RefCell<HydroNode>))
2429 {
2430 match builders_or_callback {
2431 BuildersOrCallback::Builders(_) => {}
2432 BuildersOrCallback::Callback(_, node_callback) => {
2433 node_callback(node, next_stmt_id);
2434 }
2435 }
2436
2437 teed_from.clone()
2438 } else {
2439 let inner_ident = ident_stack.pop().unwrap();
2442
2443 let tee_ident =
2444 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2445
2446 built_tees.insert(
2447 inner.0.as_ref() as *const RefCell<HydroNode>,
2448 tee_ident.clone(),
2449 );
2450
2451 match builders_or_callback {
2452 BuildersOrCallback::Builders(graph_builders) => {
2453 let builder = graph_builders.get_dfir_mut(&out_location);
2454 builder.add_dfir(
2455 parse_quote! {
2456 #tee_ident = #inner_ident -> tee();
2457 },
2458 None,
2459 Some(&next_stmt_id.to_string()),
2460 );
2461 }
2462 BuildersOrCallback::Callback(_, node_callback) => {
2463 node_callback(node, next_stmt_id);
2464 }
2465 }
2466
2467 tee_ident
2468 };
2469
2470 *next_stmt_id += 1;
2474 ident_stack.push(ret_ident);
2475 }
2476
2477 HydroNode::Chain { .. } => {
2478 let second_ident = ident_stack.pop().unwrap();
2480 let first_ident = ident_stack.pop().unwrap();
2481
2482 let chain_ident =
2483 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2484
2485 match builders_or_callback {
2486 BuildersOrCallback::Builders(graph_builders) => {
2487 let builder = graph_builders.get_dfir_mut(&out_location);
2488 builder.add_dfir(
2489 parse_quote! {
2490 #chain_ident = chain();
2491 #first_ident -> [0]#chain_ident;
2492 #second_ident -> [1]#chain_ident;
2493 },
2494 None,
2495 Some(&next_stmt_id.to_string()),
2496 );
2497 }
2498 BuildersOrCallback::Callback(_, node_callback) => {
2499 node_callback(node, next_stmt_id);
2500 }
2501 }
2502
2503 *next_stmt_id += 1;
2504
2505 ident_stack.push(chain_ident);
2506 }
2507
2508 HydroNode::ChainFirst { .. } => {
2509 let second_ident = ident_stack.pop().unwrap();
2510 let first_ident = ident_stack.pop().unwrap();
2511
2512 let chain_ident =
2513 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2514
2515 match builders_or_callback {
2516 BuildersOrCallback::Builders(graph_builders) => {
2517 let builder = graph_builders.get_dfir_mut(&out_location);
2518 builder.add_dfir(
2519 parse_quote! {
2520 #chain_ident = chain_first_n(1);
2521 #first_ident -> [0]#chain_ident;
2522 #second_ident -> [1]#chain_ident;
2523 },
2524 None,
2525 Some(&next_stmt_id.to_string()),
2526 );
2527 }
2528 BuildersOrCallback::Callback(_, node_callback) => {
2529 node_callback(node, next_stmt_id);
2530 }
2531 }
2532
2533 *next_stmt_id += 1;
2534
2535 ident_stack.push(chain_ident);
2536 }
2537
2538 HydroNode::CrossSingleton { right, .. } => {
2539 let right_ident = ident_stack.pop().unwrap();
2540 let left_ident = ident_stack.pop().unwrap();
2541
2542 let cross_ident =
2543 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2544
2545 match builders_or_callback {
2546 BuildersOrCallback::Builders(graph_builders) => {
2547 let builder = graph_builders.get_dfir_mut(&out_location);
2548
2549 if right.metadata().location_id.is_top_level()
2550 && right.metadata().collection_kind.is_bounded()
2551 {
2552 builder.add_dfir(
2553 parse_quote! {
2554 #cross_ident = cross_singleton();
2555 #left_ident -> [input]#cross_ident;
2556 #right_ident -> persist::<'static>() -> [single]#cross_ident;
2557 },
2558 None,
2559 Some(&next_stmt_id.to_string()),
2560 );
2561 } else {
2562 builder.add_dfir(
2563 parse_quote! {
2564 #cross_ident = cross_singleton();
2565 #left_ident -> [input]#cross_ident;
2566 #right_ident -> [single]#cross_ident;
2567 },
2568 None,
2569 Some(&next_stmt_id.to_string()),
2570 );
2571 }
2572 }
2573 BuildersOrCallback::Callback(_, node_callback) => {
2574 node_callback(node, next_stmt_id);
2575 }
2576 }
2577
2578 *next_stmt_id += 1;
2579
2580 ident_stack.push(cross_ident);
2581 }
2582
2583 HydroNode::CrossProduct { .. } | HydroNode::Join { .. } => {
2584 let operator: syn::Ident = if matches!(node, HydroNode::CrossProduct { .. }) {
2585 parse_quote!(cross_join_multiset)
2586 } else {
2587 parse_quote!(join_multiset)
2588 };
2589
2590 let (HydroNode::CrossProduct { left, right, .. }
2591 | HydroNode::Join { left, right, .. }) = node
2592 else {
2593 unreachable!()
2594 };
2595
2596 let is_top_level = left.metadata().location_id.is_top_level()
2597 && right.metadata().location_id.is_top_level();
2598 let left_lifetime = if left.metadata().location_id.is_top_level() {
2599 quote!('static)
2600 } else {
2601 quote!('tick)
2602 };
2603
2604 let right_lifetime = if right.metadata().location_id.is_top_level() {
2605 quote!('static)
2606 } else {
2607 quote!('tick)
2608 };
2609
2610 let right_ident = ident_stack.pop().unwrap();
2611 let left_ident = ident_stack.pop().unwrap();
2612
2613 let stream_ident =
2614 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2615
2616 match builders_or_callback {
2617 BuildersOrCallback::Builders(graph_builders) => {
2618 let builder = graph_builders.get_dfir_mut(&out_location);
2619 builder.add_dfir(
2620 if is_top_level {
2621 parse_quote! {
2624 #stream_ident = #operator::<#left_lifetime, #right_lifetime>() -> multiset_delta();
2625 #left_ident -> [0]#stream_ident;
2626 #right_ident -> [1]#stream_ident;
2627 }
2628 } else {
2629 parse_quote! {
2630 #stream_ident = #operator::<#left_lifetime, #right_lifetime>();
2631 #left_ident -> [0]#stream_ident;
2632 #right_ident -> [1]#stream_ident;
2633 }
2634 }
2635 ,
2636 None,
2637 Some(&next_stmt_id.to_string()),
2638 );
2639 }
2640 BuildersOrCallback::Callback(_, node_callback) => {
2641 node_callback(node, next_stmt_id);
2642 }
2643 }
2644
2645 *next_stmt_id += 1;
2646
2647 ident_stack.push(stream_ident);
2648 }
2649
2650 HydroNode::Difference { .. } | HydroNode::AntiJoin { .. } => {
2651 let operator: syn::Ident = if matches!(node, HydroNode::Difference { .. }) {
2652 parse_quote!(difference)
2653 } else {
2654 parse_quote!(anti_join)
2655 };
2656
2657 let (HydroNode::Difference { neg, .. } | HydroNode::AntiJoin { neg, .. }) =
2658 node
2659 else {
2660 unreachable!()
2661 };
2662
2663 let neg_lifetime = if neg.metadata().location_id.is_top_level() {
2664 quote!('static)
2665 } else {
2666 quote!('tick)
2667 };
2668
2669 let neg_ident = ident_stack.pop().unwrap();
2670 let pos_ident = ident_stack.pop().unwrap();
2671
2672 let stream_ident =
2673 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2674
2675 match builders_or_callback {
2676 BuildersOrCallback::Builders(graph_builders) => {
2677 let builder = graph_builders.get_dfir_mut(&out_location);
2678 builder.add_dfir(
2679 parse_quote! {
2680 #stream_ident = #operator::<'tick, #neg_lifetime>();
2681 #pos_ident -> [pos]#stream_ident;
2682 #neg_ident -> [neg]#stream_ident;
2683 },
2684 None,
2685 Some(&next_stmt_id.to_string()),
2686 );
2687 }
2688 BuildersOrCallback::Callback(_, node_callback) => {
2689 node_callback(node, next_stmt_id);
2690 }
2691 }
2692
2693 *next_stmt_id += 1;
2694
2695 ident_stack.push(stream_ident);
2696 }
2697
2698 HydroNode::ResolveFutures { .. } => {
2699 let input_ident = ident_stack.pop().unwrap();
2700
2701 let futures_ident =
2702 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2703
2704 match builders_or_callback {
2705 BuildersOrCallback::Builders(graph_builders) => {
2706 let builder = graph_builders.get_dfir_mut(&out_location);
2707 builder.add_dfir(
2708 parse_quote! {
2709 #futures_ident = #input_ident -> resolve_futures();
2710 },
2711 None,
2712 Some(&next_stmt_id.to_string()),
2713 );
2714 }
2715 BuildersOrCallback::Callback(_, node_callback) => {
2716 node_callback(node, next_stmt_id);
2717 }
2718 }
2719
2720 *next_stmt_id += 1;
2721
2722 ident_stack.push(futures_ident);
2723 }
2724
2725 HydroNode::ResolveFuturesOrdered { .. } => {
2726 let input_ident = ident_stack.pop().unwrap();
2727
2728 let futures_ident =
2729 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2730
2731 match builders_or_callback {
2732 BuildersOrCallback::Builders(graph_builders) => {
2733 let builder = graph_builders.get_dfir_mut(&out_location);
2734 builder.add_dfir(
2735 parse_quote! {
2736 #futures_ident = #input_ident -> resolve_futures_ordered();
2737 },
2738 None,
2739 Some(&next_stmt_id.to_string()),
2740 );
2741 }
2742 BuildersOrCallback::Callback(_, node_callback) => {
2743 node_callback(node, next_stmt_id);
2744 }
2745 }
2746
2747 *next_stmt_id += 1;
2748
2749 ident_stack.push(futures_ident);
2750 }
2751
2752 HydroNode::Map { f, .. } => {
2753 let input_ident = ident_stack.pop().unwrap();
2754
2755 let map_ident =
2756 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2757
2758 match builders_or_callback {
2759 BuildersOrCallback::Builders(graph_builders) => {
2760 let builder = graph_builders.get_dfir_mut(&out_location);
2761 builder.add_dfir(
2762 parse_quote! {
2763 #map_ident = #input_ident -> map(#f);
2764 },
2765 None,
2766 Some(&next_stmt_id.to_string()),
2767 );
2768 }
2769 BuildersOrCallback::Callback(_, node_callback) => {
2770 node_callback(node, next_stmt_id);
2771 }
2772 }
2773
2774 *next_stmt_id += 1;
2775
2776 ident_stack.push(map_ident);
2777 }
2778
2779 HydroNode::FlatMap { f, .. } => {
2780 let input_ident = ident_stack.pop().unwrap();
2781
2782 let flat_map_ident =
2783 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2784
2785 match builders_or_callback {
2786 BuildersOrCallback::Builders(graph_builders) => {
2787 let builder = graph_builders.get_dfir_mut(&out_location);
2788 builder.add_dfir(
2789 parse_quote! {
2790 #flat_map_ident = #input_ident -> flat_map(#f);
2791 },
2792 None,
2793 Some(&next_stmt_id.to_string()),
2794 );
2795 }
2796 BuildersOrCallback::Callback(_, node_callback) => {
2797 node_callback(node, next_stmt_id);
2798 }
2799 }
2800
2801 *next_stmt_id += 1;
2802
2803 ident_stack.push(flat_map_ident);
2804 }
2805
2806 HydroNode::Filter { f, .. } => {
2807 let input_ident = ident_stack.pop().unwrap();
2808
2809 let filter_ident =
2810 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2811
2812 match builders_or_callback {
2813 BuildersOrCallback::Builders(graph_builders) => {
2814 let builder = graph_builders.get_dfir_mut(&out_location);
2815 builder.add_dfir(
2816 parse_quote! {
2817 #filter_ident = #input_ident -> filter(#f);
2818 },
2819 None,
2820 Some(&next_stmt_id.to_string()),
2821 );
2822 }
2823 BuildersOrCallback::Callback(_, node_callback) => {
2824 node_callback(node, next_stmt_id);
2825 }
2826 }
2827
2828 *next_stmt_id += 1;
2829
2830 ident_stack.push(filter_ident);
2831 }
2832
2833 HydroNode::FilterMap { f, .. } => {
2834 let input_ident = ident_stack.pop().unwrap();
2835
2836 let filter_map_ident =
2837 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2838
2839 match builders_or_callback {
2840 BuildersOrCallback::Builders(graph_builders) => {
2841 let builder = graph_builders.get_dfir_mut(&out_location);
2842 builder.add_dfir(
2843 parse_quote! {
2844 #filter_map_ident = #input_ident -> filter_map(#f);
2845 },
2846 None,
2847 Some(&next_stmt_id.to_string()),
2848 );
2849 }
2850 BuildersOrCallback::Callback(_, node_callback) => {
2851 node_callback(node, next_stmt_id);
2852 }
2853 }
2854
2855 *next_stmt_id += 1;
2856
2857 ident_stack.push(filter_map_ident);
2858 }
2859
2860 HydroNode::Sort { .. } => {
2861 let input_ident = ident_stack.pop().unwrap();
2862
2863 let sort_ident =
2864 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2865
2866 match builders_or_callback {
2867 BuildersOrCallback::Builders(graph_builders) => {
2868 let builder = graph_builders.get_dfir_mut(&out_location);
2869 builder.add_dfir(
2870 parse_quote! {
2871 #sort_ident = #input_ident -> sort();
2872 },
2873 None,
2874 Some(&next_stmt_id.to_string()),
2875 );
2876 }
2877 BuildersOrCallback::Callback(_, node_callback) => {
2878 node_callback(node, next_stmt_id);
2879 }
2880 }
2881
2882 *next_stmt_id += 1;
2883
2884 ident_stack.push(sort_ident);
2885 }
2886
2887 HydroNode::DeferTick { .. } => {
2888 let input_ident = ident_stack.pop().unwrap();
2889
2890 let defer_tick_ident =
2891 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2892
2893 match builders_or_callback {
2894 BuildersOrCallback::Builders(graph_builders) => {
2895 let builder = graph_builders.get_dfir_mut(&out_location);
2896 builder.add_dfir(
2897 parse_quote! {
2898 #defer_tick_ident = #input_ident -> defer_tick_lazy();
2899 },
2900 None,
2901 Some(&next_stmt_id.to_string()),
2902 );
2903 }
2904 BuildersOrCallback::Callback(_, node_callback) => {
2905 node_callback(node, next_stmt_id);
2906 }
2907 }
2908
2909 *next_stmt_id += 1;
2910
2911 ident_stack.push(defer_tick_ident);
2912 }
2913
2914 HydroNode::Enumerate { input, .. } => {
2915 let input_ident = ident_stack.pop().unwrap();
2916
2917 let enumerate_ident =
2918 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2919
2920 match builders_or_callback {
2921 BuildersOrCallback::Builders(graph_builders) => {
2922 let builder = graph_builders.get_dfir_mut(&out_location);
2923 let lifetime = if input.metadata().location_id.is_top_level() {
2924 quote!('static)
2925 } else {
2926 quote!('tick)
2927 };
2928 builder.add_dfir(
2929 parse_quote! {
2930 #enumerate_ident = #input_ident -> enumerate::<#lifetime>();
2931 },
2932 None,
2933 Some(&next_stmt_id.to_string()),
2934 );
2935 }
2936 BuildersOrCallback::Callback(_, node_callback) => {
2937 node_callback(node, next_stmt_id);
2938 }
2939 }
2940
2941 *next_stmt_id += 1;
2942
2943 ident_stack.push(enumerate_ident);
2944 }
2945
2946 HydroNode::Inspect { f, .. } => {
2947 let input_ident = ident_stack.pop().unwrap();
2948
2949 let inspect_ident =
2950 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2951
2952 match builders_or_callback {
2953 BuildersOrCallback::Builders(graph_builders) => {
2954 let builder = graph_builders.get_dfir_mut(&out_location);
2955 builder.add_dfir(
2956 parse_quote! {
2957 #inspect_ident = #input_ident -> inspect(#f);
2958 },
2959 None,
2960 Some(&next_stmt_id.to_string()),
2961 );
2962 }
2963 BuildersOrCallback::Callback(_, node_callback) => {
2964 node_callback(node, next_stmt_id);
2965 }
2966 }
2967
2968 *next_stmt_id += 1;
2969
2970 ident_stack.push(inspect_ident);
2971 }
2972
2973 HydroNode::Unique { input, .. } => {
2974 let input_ident = ident_stack.pop().unwrap();
2975
2976 let unique_ident =
2977 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
2978
2979 match builders_or_callback {
2980 BuildersOrCallback::Builders(graph_builders) => {
2981 let builder = graph_builders.get_dfir_mut(&out_location);
2982 let lifetime = if input.metadata().location_id.is_top_level() {
2983 quote!('static)
2984 } else {
2985 quote!('tick)
2986 };
2987
2988 builder.add_dfir(
2989 parse_quote! {
2990 #unique_ident = #input_ident -> unique::<#lifetime>();
2991 },
2992 None,
2993 Some(&next_stmt_id.to_string()),
2994 );
2995 }
2996 BuildersOrCallback::Callback(_, node_callback) => {
2997 node_callback(node, next_stmt_id);
2998 }
2999 }
3000
3001 *next_stmt_id += 1;
3002
3003 ident_stack.push(unique_ident);
3004 }
3005
3006 HydroNode::Fold { .. } | HydroNode::FoldKeyed { .. } | HydroNode::Scan { .. } => {
3007 let operator: syn::Ident = if let HydroNode::Fold { input, .. } = node {
3008 if input.metadata().location_id.is_top_level()
3009 && input.metadata().collection_kind.is_bounded()
3010 {
3011 parse_quote!(fold_no_replay)
3012 } else {
3013 parse_quote!(fold)
3014 }
3015 } else if matches!(node, HydroNode::Scan { .. }) {
3016 parse_quote!(scan)
3017 } else if let HydroNode::FoldKeyed { input, .. } = node {
3018 if input.metadata().location_id.is_top_level()
3019 && input.metadata().collection_kind.is_bounded()
3020 {
3021 todo!("Fold keyed on a top-level bounded collection is not yet supported")
3022 } else {
3023 parse_quote!(fold_keyed)
3024 }
3025 } else {
3026 unreachable!()
3027 };
3028
3029 let (HydroNode::Fold { input, .. }
3030 | HydroNode::FoldKeyed { input, .. }
3031 | HydroNode::Scan { input, .. }) = node
3032 else {
3033 unreachable!()
3034 };
3035
3036 let lifetime = if input.metadata().location_id.is_top_level() {
3037 quote!('static)
3038 } else {
3039 quote!('tick)
3040 };
3041
3042 let input_ident = ident_stack.pop().unwrap();
3043
3044 let (HydroNode::Fold { init, acc, .. }
3045 | HydroNode::FoldKeyed { init, acc, .. }
3046 | HydroNode::Scan { init, acc, .. }) = &*node
3047 else {
3048 unreachable!()
3049 };
3050
3051 let fold_ident =
3052 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3053
3054 match builders_or_callback {
3055 BuildersOrCallback::Builders(graph_builders) => {
3056 if matches!(node, HydroNode::Fold { .. })
3057 && node.metadata().location_id.is_top_level()
3058 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3059 && graph_builders.singleton_intermediates()
3060 && !node.metadata().collection_kind.is_bounded()
3061 {
3062 let builder = graph_builders.get_dfir_mut(&out_location);
3063
3064 let acc: syn::Expr = parse_quote!({
3065 let mut __inner = #acc;
3066 move |__state, __value| {
3067 __inner(__state, __value);
3068 Some(__state.clone())
3069 }
3070 });
3071
3072 builder.add_dfir(
3073 parse_quote! {
3074 source_iter([(#init)()]) -> [0]#fold_ident;
3075 #input_ident -> scan::<#lifetime>(#init, #acc) -> [1]#fold_ident;
3076 #fold_ident = chain();
3077 },
3078 None,
3079 Some(&next_stmt_id.to_string()),
3080 );
3081 } else if matches!(node, HydroNode::FoldKeyed { .. })
3082 && node.metadata().location_id.is_top_level()
3083 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3084 && graph_builders.singleton_intermediates()
3085 && !node.metadata().collection_kind.is_bounded()
3086 {
3087 let builder = graph_builders.get_dfir_mut(&out_location);
3088
3089 let acc: syn::Expr = parse_quote!({
3090 let mut __init = #init;
3091 let mut __inner = #acc;
3092 move |__state, __kv: (_, _)| {
3093 let __state = __state
3095 .entry(::std::clone::Clone::clone(&__kv.0))
3096 .or_insert_with(|| (__init)());
3097 __inner(__state, __kv.1);
3098 Some((__kv.0, ::std::clone::Clone::clone(&*__state)))
3099 }
3100 });
3101
3102 builder.add_dfir(
3103 parse_quote! {
3104 source_iter([(#init)()]) -> [0]#fold_ident;
3105 #fold_ident = #input_ident -> scan::<#lifetime>(|| ::std::collections::HashMap::new(), #acc);
3106 },
3107 None,
3108 Some(&next_stmt_id.to_string()),
3109 );
3110 } else {
3111 let builder = graph_builders.get_dfir_mut(&out_location);
3112 builder.add_dfir(
3113 parse_quote! {
3114 #fold_ident = #input_ident -> #operator::<#lifetime>(#init, #acc);
3115 },
3116 None,
3117 Some(&next_stmt_id.to_string()),
3118 );
3119 }
3120 }
3121 BuildersOrCallback::Callback(_, node_callback) => {
3122 node_callback(node, next_stmt_id);
3123 }
3124 }
3125
3126 *next_stmt_id += 1;
3127
3128 ident_stack.push(fold_ident);
3129 }
3130
3131 HydroNode::Reduce { .. } | HydroNode::ReduceKeyed { .. } => {
3132 let operator: syn::Ident = if let HydroNode::Reduce { input, .. } = node {
3133 if input.metadata().location_id.is_top_level()
3134 && input.metadata().collection_kind.is_bounded()
3135 {
3136 parse_quote!(reduce_no_replay)
3137 } else {
3138 parse_quote!(reduce)
3139 }
3140 } else if let HydroNode::ReduceKeyed { input, .. } = node {
3141 if input.metadata().location_id.is_top_level()
3142 && input.metadata().collection_kind.is_bounded()
3143 {
3144 todo!(
3145 "Calling keyed reduce on a top-level bounded collection is not supported"
3146 )
3147 } else {
3148 parse_quote!(reduce_keyed)
3149 }
3150 } else {
3151 unreachable!()
3152 };
3153
3154 let (HydroNode::Reduce { input, .. } | HydroNode::ReduceKeyed { input, .. }) = node
3155 else {
3156 unreachable!()
3157 };
3158
3159 let lifetime = if input.metadata().location_id.is_top_level() {
3160 quote!('static)
3161 } else {
3162 quote!('tick)
3163 };
3164
3165 let input_ident = ident_stack.pop().unwrap();
3166
3167 let (HydroNode::Reduce { f, .. } | HydroNode::ReduceKeyed { f, .. }) = &*node
3168 else {
3169 unreachable!()
3170 };
3171
3172 let reduce_ident =
3173 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3174
3175 match builders_or_callback {
3176 BuildersOrCallback::Builders(graph_builders) => {
3177 if matches!(node, HydroNode::Reduce { .. })
3178 && node.metadata().location_id.is_top_level()
3179 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3180 && graph_builders.singleton_intermediates()
3181 && !node.metadata().collection_kind.is_bounded()
3182 {
3183 todo!(
3184 "Reduce with optional intermediates is not yet supported in simulator"
3185 );
3186 } else if matches!(node, HydroNode::ReduceKeyed { .. })
3187 && node.metadata().location_id.is_top_level()
3188 && !(matches!(node.metadata().location_id, LocationId::Atomic(_)))
3189 && graph_builders.singleton_intermediates()
3190 && !node.metadata().collection_kind.is_bounded()
3191 {
3192 todo!(
3193 "Reduce keyed with optional intermediates is not yet supported in simulator"
3194 );
3195 } else {
3196 let builder = graph_builders.get_dfir_mut(&out_location);
3197 builder.add_dfir(
3198 parse_quote! {
3199 #reduce_ident = #input_ident -> #operator::<#lifetime>(#f);
3200 },
3201 None,
3202 Some(&next_stmt_id.to_string()),
3203 );
3204 }
3205 }
3206 BuildersOrCallback::Callback(_, node_callback) => {
3207 node_callback(node, next_stmt_id);
3208 }
3209 }
3210
3211 *next_stmt_id += 1;
3212
3213 ident_stack.push(reduce_ident);
3214 }
3215
3216 HydroNode::ReduceKeyedWatermark {
3217 f,
3218 input,
3219 metadata,
3220 ..
3221 } => {
3222 let lifetime = if input.metadata().location_id.is_top_level() {
3223 quote!('static)
3224 } else {
3225 quote!('tick)
3226 };
3227
3228 let watermark_ident = ident_stack.pop().unwrap();
3230 let input_ident = ident_stack.pop().unwrap();
3231
3232 let chain_ident = syn::Ident::new(
3233 &format!("reduce_keyed_watermark_chain_{}", *next_stmt_id),
3234 Span::call_site(),
3235 );
3236
3237 let fold_ident =
3238 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3239
3240 let agg_operator: syn::Ident = if input.metadata().location_id.is_top_level()
3241 && input.metadata().collection_kind.is_bounded()
3242 {
3243 parse_quote!(fold_no_replay)
3244 } else {
3245 parse_quote!(fold)
3246 };
3247
3248 match builders_or_callback {
3249 BuildersOrCallback::Builders(graph_builders) => {
3250 if metadata.location_id.is_top_level()
3251 && !(matches!(metadata.location_id, LocationId::Atomic(_)))
3252 && graph_builders.singleton_intermediates()
3253 && !metadata.collection_kind.is_bounded()
3254 {
3255 todo!(
3256 "Reduce keyed watermarked on a top-level bounded collection is not yet supported"
3257 )
3258 } else {
3259 let builder = graph_builders.get_dfir_mut(&out_location);
3260 builder.add_dfir(
3261 parse_quote! {
3262 #chain_ident = chain();
3263 #input_ident
3264 -> map(|x| (Some(x), None))
3265 -> [0]#chain_ident;
3266 #watermark_ident
3267 -> map(|watermark| (None, Some(watermark)))
3268 -> [1]#chain_ident;
3269
3270 #fold_ident = #chain_ident
3271 -> #agg_operator::<#lifetime>(|| (::std::collections::HashMap::new(), None), {
3272 let __reduce_keyed_fn = #f;
3273 move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
3274 if let Some((k, v)) = opt_payload {
3275 if let Some(curr_watermark) = *opt_curr_watermark {
3276 if k <= curr_watermark {
3277 return;
3278 }
3279 }
3280 match map.entry(k) {
3281 ::std::collections::hash_map::Entry::Vacant(e) => {
3282 e.insert(v);
3283 }
3284 ::std::collections::hash_map::Entry::Occupied(mut e) => {
3285 __reduce_keyed_fn(e.get_mut(), v);
3286 }
3287 }
3288 } else {
3289 let watermark = opt_watermark.unwrap();
3290 if let Some(curr_watermark) = *opt_curr_watermark {
3291 if watermark <= curr_watermark {
3292 return;
3293 }
3294 }
3295 *opt_curr_watermark = opt_watermark;
3296 map.retain(|k, _| *k > watermark);
3297 }
3298 }
3299 })
3300 -> flat_map(|(map, _curr_watermark)| map);
3301 },
3302 None,
3303 Some(&next_stmt_id.to_string()),
3304 );
3305 }
3306 }
3307 BuildersOrCallback::Callback(_, node_callback) => {
3308 node_callback(node, next_stmt_id);
3309 }
3310 }
3311
3312 *next_stmt_id += 1;
3313
3314 ident_stack.push(fold_ident);
3315 }
3316
3317 HydroNode::Network {
3318 serialize_fn: serialize_pipeline,
3319 instantiate_fn,
3320 deserialize_fn: deserialize_pipeline,
3321 input,
3322 ..
3323 } => {
3324 let input_ident = ident_stack.pop().unwrap();
3325
3326 let receiver_stream_ident =
3327 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3328
3329 match builders_or_callback {
3330 BuildersOrCallback::Builders(graph_builders) => {
3331 let (sink_expr, source_expr) = match instantiate_fn {
3332 DebugInstantiate::Building => (
3333 syn::parse_quote!(DUMMY_SINK),
3334 syn::parse_quote!(DUMMY_SOURCE),
3335 ),
3336
3337 DebugInstantiate::Finalized(finalized) => {
3338 (finalized.sink.clone(), finalized.source.clone())
3339 }
3340 };
3341
3342 graph_builders.create_network(
3343 &input.metadata().location_id,
3344 &out_location,
3345 input_ident,
3346 &receiver_stream_ident,
3347 serialize_pipeline.as_ref(),
3348 sink_expr,
3349 source_expr,
3350 deserialize_pipeline.as_ref(),
3351 *next_stmt_id,
3352 );
3353 }
3354 BuildersOrCallback::Callback(_, node_callback) => {
3355 node_callback(node, next_stmt_id);
3356 }
3357 }
3358
3359 *next_stmt_id += 1;
3360
3361 ident_stack.push(receiver_stream_ident);
3362 }
3363
3364 HydroNode::ExternalInput {
3365 instantiate_fn,
3366 deserialize_fn: deserialize_pipeline,
3367 ..
3368 } => {
3369 let receiver_stream_ident =
3370 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3371
3372 match builders_or_callback {
3373 BuildersOrCallback::Builders(graph_builders) => {
3374 let (_, source_expr) = match instantiate_fn {
3375 DebugInstantiate::Building => (
3376 syn::parse_quote!(DUMMY_SINK),
3377 syn::parse_quote!(DUMMY_SOURCE),
3378 ),
3379
3380 DebugInstantiate::Finalized(finalized) => {
3381 (finalized.sink.clone(), finalized.source.clone())
3382 }
3383 };
3384
3385 graph_builders.create_external_source(
3386 &out_location,
3387 source_expr,
3388 &receiver_stream_ident,
3389 deserialize_pipeline.as_ref(),
3390 *next_stmt_id,
3391 );
3392 }
3393 BuildersOrCallback::Callback(_, node_callback) => {
3394 node_callback(node, next_stmt_id);
3395 }
3396 }
3397
3398 *next_stmt_id += 1;
3399
3400 ident_stack.push(receiver_stream_ident);
3401 }
3402
3403 HydroNode::Counter {
3404 tag,
3405 duration,
3406 prefix,
3407 ..
3408 } => {
3409 let input_ident = ident_stack.pop().unwrap();
3410
3411 let counter_ident =
3412 syn::Ident::new(&format!("stream_{}", *next_stmt_id), Span::call_site());
3413
3414 match builders_or_callback {
3415 BuildersOrCallback::Builders(graph_builders) => {
3416 let builder = graph_builders.get_dfir_mut(&out_location);
3417 builder.add_dfir(
3418 parse_quote! {
3419 #counter_ident = #input_ident -> _counter(#tag, #duration, #prefix);
3420 },
3421 None,
3422 Some(&next_stmt_id.to_string()),
3423 );
3424 }
3425 BuildersOrCallback::Callback(_, node_callback) => {
3426 node_callback(node, next_stmt_id);
3427 }
3428 }
3429
3430 *next_stmt_id += 1;
3431
3432 ident_stack.push(counter_ident);
3433 }
3434 }
3435 },
3436 seen_tees,
3437 false,
3438 );
3439
3440 ident_stack
3441 .pop()
3442 .expect("ident_stack should have exactly one element after traversal")
3443 }
3444
3445 pub fn visit_debug_expr(&mut self, mut transform: impl FnMut(&mut DebugExpr)) {
3446 match self {
3447 HydroNode::Placeholder => {
3448 panic!()
3449 }
3450 HydroNode::Cast { .. } | HydroNode::ObserveNonDet { .. } => {}
3451 HydroNode::Source { source, .. } => match source {
3452 HydroSource::Stream(expr) | HydroSource::Iter(expr) => transform(expr),
3453 HydroSource::ExternalNetwork()
3454 | HydroSource::Spin()
3455 | HydroSource::ClusterMembers(_) => {} },
3457 HydroNode::SingletonSource { value, .. } => {
3458 transform(value);
3459 }
3460 HydroNode::CycleSource { .. }
3461 | HydroNode::Tee { .. }
3462 | HydroNode::YieldConcat { .. }
3463 | HydroNode::BeginAtomic { .. }
3464 | HydroNode::EndAtomic { .. }
3465 | HydroNode::Batch { .. }
3466 | HydroNode::Chain { .. }
3467 | HydroNode::ChainFirst { .. }
3468 | HydroNode::CrossProduct { .. }
3469 | HydroNode::CrossSingleton { .. }
3470 | HydroNode::ResolveFutures { .. }
3471 | HydroNode::ResolveFuturesOrdered { .. }
3472 | HydroNode::Join { .. }
3473 | HydroNode::Difference { .. }
3474 | HydroNode::AntiJoin { .. }
3475 | HydroNode::DeferTick { .. }
3476 | HydroNode::Enumerate { .. }
3477 | HydroNode::Unique { .. }
3478 | HydroNode::Sort { .. } => {}
3479 HydroNode::Map { f, .. }
3480 | HydroNode::FlatMap { f, .. }
3481 | HydroNode::Filter { f, .. }
3482 | HydroNode::FilterMap { f, .. }
3483 | HydroNode::Inspect { f, .. }
3484 | HydroNode::Reduce { f, .. }
3485 | HydroNode::ReduceKeyed { f, .. }
3486 | HydroNode::ReduceKeyedWatermark { f, .. } => {
3487 transform(f);
3488 }
3489 HydroNode::Fold { init, acc, .. }
3490 | HydroNode::Scan { init, acc, .. }
3491 | HydroNode::FoldKeyed { init, acc, .. } => {
3492 transform(init);
3493 transform(acc);
3494 }
3495 HydroNode::Network {
3496 serialize_fn,
3497 deserialize_fn,
3498 ..
3499 } => {
3500 if let Some(serialize_fn) = serialize_fn {
3501 transform(serialize_fn);
3502 }
3503 if let Some(deserialize_fn) = deserialize_fn {
3504 transform(deserialize_fn);
3505 }
3506 }
3507 HydroNode::ExternalInput { deserialize_fn, .. } => {
3508 if let Some(deserialize_fn) = deserialize_fn {
3509 transform(deserialize_fn);
3510 }
3511 }
3512 HydroNode::Counter { duration, .. } => {
3513 transform(duration);
3514 }
3515 }
3516 }
3517
3518 pub fn op_metadata(&self) -> &HydroIrOpMetadata {
3519 &self.metadata().op
3520 }
3521
3522 pub fn metadata(&self) -> &HydroIrMetadata {
3523 match self {
3524 HydroNode::Placeholder => {
3525 panic!()
3526 }
3527 HydroNode::Cast { metadata, .. } => metadata,
3528 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3529 HydroNode::Source { metadata, .. } => metadata,
3530 HydroNode::SingletonSource { metadata, .. } => metadata,
3531 HydroNode::CycleSource { metadata, .. } => metadata,
3532 HydroNode::Tee { metadata, .. } => metadata,
3533 HydroNode::YieldConcat { metadata, .. } => metadata,
3534 HydroNode::BeginAtomic { metadata, .. } => metadata,
3535 HydroNode::EndAtomic { metadata, .. } => metadata,
3536 HydroNode::Batch { metadata, .. } => metadata,
3537 HydroNode::Chain { metadata, .. } => metadata,
3538 HydroNode::ChainFirst { metadata, .. } => metadata,
3539 HydroNode::CrossProduct { metadata, .. } => metadata,
3540 HydroNode::CrossSingleton { metadata, .. } => metadata,
3541 HydroNode::Join { metadata, .. } => metadata,
3542 HydroNode::Difference { metadata, .. } => metadata,
3543 HydroNode::AntiJoin { metadata, .. } => metadata,
3544 HydroNode::ResolveFutures { metadata, .. } => metadata,
3545 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3546 HydroNode::Map { metadata, .. } => metadata,
3547 HydroNode::FlatMap { metadata, .. } => metadata,
3548 HydroNode::Filter { metadata, .. } => metadata,
3549 HydroNode::FilterMap { metadata, .. } => metadata,
3550 HydroNode::DeferTick { metadata, .. } => metadata,
3551 HydroNode::Enumerate { metadata, .. } => metadata,
3552 HydroNode::Inspect { metadata, .. } => metadata,
3553 HydroNode::Unique { metadata, .. } => metadata,
3554 HydroNode::Sort { metadata, .. } => metadata,
3555 HydroNode::Scan { metadata, .. } => metadata,
3556 HydroNode::Fold { metadata, .. } => metadata,
3557 HydroNode::FoldKeyed { metadata, .. } => metadata,
3558 HydroNode::Reduce { metadata, .. } => metadata,
3559 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3560 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3561 HydroNode::ExternalInput { metadata, .. } => metadata,
3562 HydroNode::Network { metadata, .. } => metadata,
3563 HydroNode::Counter { metadata, .. } => metadata,
3564 }
3565 }
3566
3567 pub fn op_metadata_mut(&mut self) -> &mut HydroIrOpMetadata {
3568 &mut self.metadata_mut().op
3569 }
3570
3571 pub fn metadata_mut(&mut self) -> &mut HydroIrMetadata {
3572 match self {
3573 HydroNode::Placeholder => {
3574 panic!()
3575 }
3576 HydroNode::Cast { metadata, .. } => metadata,
3577 HydroNode::ObserveNonDet { metadata, .. } => metadata,
3578 HydroNode::Source { metadata, .. } => metadata,
3579 HydroNode::SingletonSource { metadata, .. } => metadata,
3580 HydroNode::CycleSource { metadata, .. } => metadata,
3581 HydroNode::Tee { metadata, .. } => metadata,
3582 HydroNode::YieldConcat { metadata, .. } => metadata,
3583 HydroNode::BeginAtomic { metadata, .. } => metadata,
3584 HydroNode::EndAtomic { metadata, .. } => metadata,
3585 HydroNode::Batch { metadata, .. } => metadata,
3586 HydroNode::Chain { metadata, .. } => metadata,
3587 HydroNode::ChainFirst { metadata, .. } => metadata,
3588 HydroNode::CrossProduct { metadata, .. } => metadata,
3589 HydroNode::CrossSingleton { metadata, .. } => metadata,
3590 HydroNode::Join { metadata, .. } => metadata,
3591 HydroNode::Difference { metadata, .. } => metadata,
3592 HydroNode::AntiJoin { metadata, .. } => metadata,
3593 HydroNode::ResolveFutures { metadata, .. } => metadata,
3594 HydroNode::ResolveFuturesOrdered { metadata, .. } => metadata,
3595 HydroNode::Map { metadata, .. } => metadata,
3596 HydroNode::FlatMap { metadata, .. } => metadata,
3597 HydroNode::Filter { metadata, .. } => metadata,
3598 HydroNode::FilterMap { metadata, .. } => metadata,
3599 HydroNode::DeferTick { metadata, .. } => metadata,
3600 HydroNode::Enumerate { metadata, .. } => metadata,
3601 HydroNode::Inspect { metadata, .. } => metadata,
3602 HydroNode::Unique { metadata, .. } => metadata,
3603 HydroNode::Sort { metadata, .. } => metadata,
3604 HydroNode::Scan { metadata, .. } => metadata,
3605 HydroNode::Fold { metadata, .. } => metadata,
3606 HydroNode::FoldKeyed { metadata, .. } => metadata,
3607 HydroNode::Reduce { metadata, .. } => metadata,
3608 HydroNode::ReduceKeyed { metadata, .. } => metadata,
3609 HydroNode::ReduceKeyedWatermark { metadata, .. } => metadata,
3610 HydroNode::ExternalInput { metadata, .. } => metadata,
3611 HydroNode::Network { metadata, .. } => metadata,
3612 HydroNode::Counter { metadata, .. } => metadata,
3613 }
3614 }
3615
3616 pub fn input(&self) -> Vec<&HydroNode> {
3617 match self {
3618 HydroNode::Placeholder => {
3619 panic!()
3620 }
3621 HydroNode::Source { .. }
3622 | HydroNode::SingletonSource { .. }
3623 | HydroNode::ExternalInput { .. }
3624 | HydroNode::CycleSource { .. }
3625 | HydroNode::Tee { .. } => {
3626 vec![]
3628 }
3629 HydroNode::Cast { inner, .. }
3630 | HydroNode::ObserveNonDet { inner, .. }
3631 | HydroNode::YieldConcat { inner, .. }
3632 | HydroNode::BeginAtomic { inner, .. }
3633 | HydroNode::EndAtomic { inner, .. }
3634 | HydroNode::Batch { inner, .. } => {
3635 vec![inner]
3636 }
3637 HydroNode::Chain { first, second, .. } => {
3638 vec![first, second]
3639 }
3640 HydroNode::ChainFirst { first, second, .. } => {
3641 vec![first, second]
3642 }
3643 HydroNode::CrossProduct { left, right, .. }
3644 | HydroNode::CrossSingleton { left, right, .. }
3645 | HydroNode::Join { left, right, .. } => {
3646 vec![left, right]
3647 }
3648 HydroNode::Difference { pos, neg, .. } | HydroNode::AntiJoin { pos, neg, .. } => {
3649 vec![pos, neg]
3650 }
3651 HydroNode::Map { input, .. }
3652 | HydroNode::FlatMap { input, .. }
3653 | HydroNode::Filter { input, .. }
3654 | HydroNode::FilterMap { input, .. }
3655 | HydroNode::Sort { input, .. }
3656 | HydroNode::DeferTick { input, .. }
3657 | HydroNode::Enumerate { input, .. }
3658 | HydroNode::Inspect { input, .. }
3659 | HydroNode::Unique { input, .. }
3660 | HydroNode::Network { input, .. }
3661 | HydroNode::Counter { input, .. }
3662 | HydroNode::ResolveFutures { input, .. }
3663 | HydroNode::ResolveFuturesOrdered { input, .. }
3664 | HydroNode::Fold { input, .. }
3665 | HydroNode::FoldKeyed { input, .. }
3666 | HydroNode::Reduce { input, .. }
3667 | HydroNode::ReduceKeyed { input, .. }
3668 | HydroNode::Scan { input, .. } => {
3669 vec![input]
3670 }
3671 HydroNode::ReduceKeyedWatermark {
3672 input, watermark, ..
3673 } => {
3674 vec![input, watermark]
3675 }
3676 }
3677 }
3678
3679 pub fn input_metadata(&self) -> Vec<&HydroIrMetadata> {
3680 self.input()
3681 .iter()
3682 .map(|input_node| input_node.metadata())
3683 .collect()
3684 }
3685
3686 pub fn print_root(&self) -> String {
3687 match self {
3688 HydroNode::Placeholder => {
3689 panic!()
3690 }
3691 HydroNode::Cast { .. } => "Cast()".to_owned(),
3692 HydroNode::ObserveNonDet { .. } => "ObserveNonDet()".to_owned(),
3693 HydroNode::Source { source, .. } => format!("Source({:?})", source),
3694 HydroNode::SingletonSource { value, .. } => format!("SingletonSource({:?})", value),
3695 HydroNode::CycleSource { ident, .. } => format!("CycleSource({})", ident),
3696 HydroNode::Tee { inner, .. } => format!("Tee({})", inner.0.borrow().print_root()),
3697 HydroNode::YieldConcat { .. } => "YieldConcat()".to_owned(),
3698 HydroNode::BeginAtomic { .. } => "BeginAtomic()".to_owned(),
3699 HydroNode::EndAtomic { .. } => "EndAtomic()".to_owned(),
3700 HydroNode::Batch { .. } => "Batch()".to_owned(),
3701 HydroNode::Chain { first, second, .. } => {
3702 format!("Chain({}, {})", first.print_root(), second.print_root())
3703 }
3704 HydroNode::ChainFirst { first, second, .. } => {
3705 format!(
3706 "ChainFirst({}, {})",
3707 first.print_root(),
3708 second.print_root()
3709 )
3710 }
3711 HydroNode::CrossProduct { left, right, .. } => {
3712 format!(
3713 "CrossProduct({}, {})",
3714 left.print_root(),
3715 right.print_root()
3716 )
3717 }
3718 HydroNode::CrossSingleton { left, right, .. } => {
3719 format!(
3720 "CrossSingleton({}, {})",
3721 left.print_root(),
3722 right.print_root()
3723 )
3724 }
3725 HydroNode::Join { left, right, .. } => {
3726 format!("Join({}, {})", left.print_root(), right.print_root())
3727 }
3728 HydroNode::Difference { pos, neg, .. } => {
3729 format!("Difference({}, {})", pos.print_root(), neg.print_root())
3730 }
3731 HydroNode::AntiJoin { pos, neg, .. } => {
3732 format!("AntiJoin({}, {})", pos.print_root(), neg.print_root())
3733 }
3734 HydroNode::ResolveFutures { .. } => "ResolveFutures()".to_owned(),
3735 HydroNode::ResolveFuturesOrdered { .. } => "ResolveFuturesOrdered()".to_owned(),
3736 HydroNode::Map { f, .. } => format!("Map({:?})", f),
3737 HydroNode::FlatMap { f, .. } => format!("FlatMap({:?})", f),
3738 HydroNode::Filter { f, .. } => format!("Filter({:?})", f),
3739 HydroNode::FilterMap { f, .. } => format!("FilterMap({:?})", f),
3740 HydroNode::DeferTick { .. } => "DeferTick()".to_owned(),
3741 HydroNode::Enumerate { .. } => "Enumerate()".to_owned(),
3742 HydroNode::Inspect { f, .. } => format!("Inspect({:?})", f),
3743 HydroNode::Unique { .. } => "Unique()".to_owned(),
3744 HydroNode::Sort { .. } => "Sort()".to_owned(),
3745 HydroNode::Fold { init, acc, .. } => format!("Fold({:?}, {:?})", init, acc),
3746 HydroNode::Scan { init, acc, .. } => format!("Scan({:?}, {:?})", init, acc),
3747 HydroNode::FoldKeyed { init, acc, .. } => format!("FoldKeyed({:?}, {:?})", init, acc),
3748 HydroNode::Reduce { f, .. } => format!("Reduce({:?})", f),
3749 HydroNode::ReduceKeyed { f, .. } => format!("ReduceKeyed({:?})", f),
3750 HydroNode::ReduceKeyedWatermark { f, .. } => format!("ReduceKeyedWatermark({:?})", f),
3751 HydroNode::Network { .. } => "Network()".to_owned(),
3752 HydroNode::ExternalInput { .. } => "ExternalInput()".to_owned(),
3753 HydroNode::Counter { tag, duration, .. } => {
3754 format!("Counter({:?}, {:?})", tag, duration)
3755 }
3756 }
3757 }
3758}
3759
3760#[cfg(feature = "build")]
3761fn instantiate_network<'a, D>(
3762 from_location: &LocationId,
3763 to_location: &LocationId,
3764 processes: &SparseSecondaryMap<LocationKey, D::Process>,
3765 clusters: &SparseSecondaryMap<LocationKey, D::Cluster>,
3766) -> (syn::Expr, syn::Expr, Box<dyn FnOnce()>)
3767where
3768 D: Deploy<'a>,
3769{
3770 let ((sink, source), connect_fn) = match (from_location, to_location) {
3771 (&LocationId::Process(from), &LocationId::Process(to)) => {
3772 let from_node = processes
3773 .get(from)
3774 .unwrap_or_else(|| {
3775 panic!("A process used in the graph was not instantiated: {}", from)
3776 })
3777 .clone();
3778 let to_node = processes
3779 .get(to)
3780 .unwrap_or_else(|| {
3781 panic!("A process used in the graph was not instantiated: {}", to)
3782 })
3783 .clone();
3784
3785 let sink_port = from_node.next_port();
3786 let source_port = to_node.next_port();
3787
3788 (
3789 D::o2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3790 D::o2o_connect(&from_node, &sink_port, &to_node, &source_port),
3791 )
3792 }
3793 (&LocationId::Process(from), &LocationId::Cluster(to)) => {
3794 let from_node = processes
3795 .get(from)
3796 .unwrap_or_else(|| {
3797 panic!("A process used in the graph was not instantiated: {}", from)
3798 })
3799 .clone();
3800 let to_node = clusters
3801 .get(to)
3802 .unwrap_or_else(|| {
3803 panic!("A cluster used in the graph was not instantiated: {}", to)
3804 })
3805 .clone();
3806
3807 let sink_port = from_node.next_port();
3808 let source_port = to_node.next_port();
3809
3810 (
3811 D::o2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3812 D::o2m_connect(&from_node, &sink_port, &to_node, &source_port),
3813 )
3814 }
3815 (&LocationId::Cluster(from), &LocationId::Process(to)) => {
3816 let from_node = clusters
3817 .get(from)
3818 .unwrap_or_else(|| {
3819 panic!("A cluster used in the graph was not instantiated: {}", from)
3820 })
3821 .clone();
3822 let to_node = processes
3823 .get(to)
3824 .unwrap_or_else(|| {
3825 panic!("A process used in the graph was not instantiated: {}", to)
3826 })
3827 .clone();
3828
3829 let sink_port = from_node.next_port();
3830 let source_port = to_node.next_port();
3831
3832 (
3833 D::m2o_sink_source(&from_node, &sink_port, &to_node, &source_port),
3834 D::m2o_connect(&from_node, &sink_port, &to_node, &source_port),
3835 )
3836 }
3837 (&LocationId::Cluster(from), &LocationId::Cluster(to)) => {
3838 let from_node = clusters
3839 .get(from)
3840 .unwrap_or_else(|| {
3841 panic!("A cluster used in the graph was not instantiated: {}", from)
3842 })
3843 .clone();
3844 let to_node = clusters
3845 .get(to)
3846 .unwrap_or_else(|| {
3847 panic!("A cluster used in the graph was not instantiated: {}", to)
3848 })
3849 .clone();
3850
3851 let sink_port = from_node.next_port();
3852 let source_port = to_node.next_port();
3853
3854 (
3855 D::m2m_sink_source(&from_node, &sink_port, &to_node, &source_port),
3856 D::m2m_connect(&from_node, &sink_port, &to_node, &source_port),
3857 )
3858 }
3859 (LocationId::Tick(_, _), _) => panic!(),
3860 (_, LocationId::Tick(_, _)) => panic!(),
3861 (LocationId::Atomic(_), _) => panic!(),
3862 (_, LocationId::Atomic(_)) => panic!(),
3863 };
3864 (sink, source, connect_fn)
3865}
3866
3867#[cfg(test)]
3868mod test {
3869 use std::mem::size_of;
3870
3871 use stageleft::{QuotedWithContext, q};
3872
3873 use super::*;
3874
3875 #[test]
3876 #[cfg_attr(
3877 not(feature = "build"),
3878 ignore = "expects inclusion of feature-gated fields"
3879 )]
3880 fn hydro_node_size() {
3881 assert_eq!(size_of::<HydroNode>(), 240);
3882 }
3883
3884 #[test]
3885 #[cfg_attr(
3886 not(feature = "build"),
3887 ignore = "expects inclusion of feature-gated fields"
3888 )]
3889 fn hydro_root_size() {
3890 assert_eq!(size_of::<HydroRoot>(), 136);
3891 }
3892
3893 #[test]
3894 fn test_simplify_q_macro_basic() {
3895 let simple_expr: syn::Expr = syn::parse_str("x + y").unwrap();
3897 let result = simplify_q_macro(simple_expr.clone());
3898 assert_eq!(result, simple_expr);
3899 }
3900
3901 #[test]
3902 fn test_simplify_q_macro_actual_stageleft_call() {
3903 let stageleft_call = q!(|x: usize| x + 1).splice_fn1_ctx(&());
3905 let result = simplify_q_macro(stageleft_call);
3906 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3909 }
3910
3911 #[test]
3912 fn test_closure_no_pipe_at_start() {
3913 let stageleft_call = q!({
3915 let foo = 123;
3916 move |b: usize| b + foo
3917 })
3918 .splice_fn1_ctx(&());
3919 let result = simplify_q_macro(stageleft_call);
3920 hydro_build_utils::assert_snapshot!(result.to_token_stream().to_string());
3921 }
3922}