hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38    pub(crate) location: Loc,
39    pub(crate) ir_node: RefCell<HydroNode>,
40
41    _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46    L: Location<'a>,
47{
48    fn defer_tick(self) -> Self {
49        Optional::defer_tick(self)
50    }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55    L: Location<'a>,
56{
57    type Location = Tick<L>;
58
59    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60        Optional::new(
61            location.clone(),
62            HydroNode::CycleSource {
63                ident,
64                metadata: location.new_node_metadata(Self::collection_kind()),
65            },
66        )
67    }
68}
69
70impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72    L: Location<'a>,
73{
74    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75        assert_eq!(
76            Location::id(&self.location),
77            expected_location,
78            "locations do not match"
79        );
80        self.location
81            .flow_state()
82            .borrow_mut()
83            .push_root(HydroRoot::CycleSink {
84                ident,
85                input: Box::new(self.ir_node.into_inner()),
86                op_metadata: HydroIrOpMetadata::new(),
87            });
88    }
89}
90
91impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
92where
93    L: Location<'a>,
94{
95    type Location = Tick<L>;
96
97    fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
98        Optional::new(
99            location.clone(),
100            HydroNode::CycleSource {
101                ident,
102                metadata: location.new_node_metadata(Self::collection_kind()),
103            },
104        )
105    }
106}
107
108impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
109where
110    L: Location<'a>,
111{
112    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
113        assert_eq!(
114            Location::id(&self.location),
115            expected_location,
116            "locations do not match"
117        );
118        self.location
119            .flow_state()
120            .borrow_mut()
121            .push_root(HydroRoot::CycleSink {
122                ident,
123                input: Box::new(self.ir_node.into_inner()),
124                op_metadata: HydroIrOpMetadata::new(),
125            });
126    }
127}
128
129impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
130where
131    L: Location<'a> + NoTick,
132{
133    type Location = L;
134
135    fn create_source(ident: syn::Ident, location: L) -> Self {
136        Optional::new(
137            location.clone(),
138            HydroNode::CycleSource {
139                ident,
140                metadata: location.new_node_metadata(Self::collection_kind()),
141            },
142        )
143    }
144}
145
146impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
147where
148    L: Location<'a> + NoTick,
149{
150    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
151        assert_eq!(
152            Location::id(&self.location),
153            expected_location,
154            "locations do not match"
155        );
156        self.location
157            .flow_state()
158            .borrow_mut()
159            .push_root(HydroRoot::CycleSink {
160                ident,
161                input: Box::new(self.ir_node.into_inner()),
162                op_metadata: HydroIrOpMetadata::new(),
163            });
164    }
165}
166
167impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
168where
169    L: Location<'a>,
170{
171    fn from(singleton: Optional<T, L, Bounded>) -> Self {
172        Optional::new(singleton.location, singleton.ir_node.into_inner())
173    }
174}
175
176impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
177where
178    L: Location<'a>,
179{
180    fn from(singleton: Singleton<T, L, B>) -> Self {
181        Optional::new(
182            singleton.location.clone(),
183            HydroNode::Cast {
184                inner: Box::new(singleton.ir_node.into_inner()),
185                metadata: singleton
186                    .location
187                    .new_node_metadata(Self::collection_kind()),
188            },
189        )
190    }
191}
192
193#[cfg(stageleft_runtime)]
194fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
195    me: Optional<T, L, B>,
196    other: Optional<O, L, B>,
197) -> Optional<(T, O), L, B> {
198    check_matching_location(&me.location, &other.location);
199
200    Optional::new(
201        me.location.clone(),
202        HydroNode::CrossSingleton {
203            left: Box::new(me.ir_node.into_inner()),
204            right: Box::new(other.ir_node.into_inner()),
205            metadata: me
206                .location
207                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
208        },
209    )
210}
211
212#[cfg(stageleft_runtime)]
213fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
214    me: Optional<T, L, B>,
215    other: Optional<T, L, B>,
216) -> Optional<T, L, B> {
217    check_matching_location(&me.location, &other.location);
218
219    Optional::new(
220        me.location.clone(),
221        HydroNode::ChainFirst {
222            first: Box::new(me.ir_node.into_inner()),
223            second: Box::new(other.ir_node.into_inner()),
224            metadata: me
225                .location
226                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
227        },
228    )
229}
230
231impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
232where
233    T: Clone,
234    L: Location<'a>,
235{
236    fn clone(&self) -> Self {
237        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
238            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
239            *self.ir_node.borrow_mut() = HydroNode::Tee {
240                inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
241                metadata: self.location.new_node_metadata(Self::collection_kind()),
242            };
243        }
244
245        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
246            Optional {
247                location: self.location.clone(),
248                ir_node: HydroNode::Tee {
249                    inner: TeeNode(inner.0.clone()),
250                    metadata: metadata.clone(),
251                }
252                .into(),
253                _phantom: PhantomData,
254            }
255        } else {
256            unreachable!()
257        }
258    }
259}
260
261impl<'a, T, L, B: Boundedness> Optional<T, L, B>
262where
263    L: Location<'a>,
264{
265    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
266        debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
267        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
268        Optional {
269            location,
270            ir_node: RefCell::new(ir_node),
271            _phantom: PhantomData,
272        }
273    }
274
275    pub(crate) fn collection_kind() -> CollectionKind {
276        CollectionKind::Optional {
277            bound: B::BOUND_KIND,
278            element_type: stageleft::quote_type::<T>().into(),
279        }
280    }
281
282    /// Returns the [`Location`] where this optional is being materialized.
283    pub fn location(&self) -> &L {
284        &self.location
285    }
286
287    /// Transforms the optional value by applying a function `f` to it,
288    /// continuously as the input is updated.
289    ///
290    /// Whenever the optional is empty, the output optional is also empty.
291    ///
292    /// # Example
293    /// ```rust
294    /// # #[cfg(feature = "deploy")] {
295    /// # use hydro_lang::prelude::*;
296    /// # use futures::StreamExt;
297    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
298    /// let tick = process.tick();
299    /// let optional = tick.optional_first_tick(q!(1));
300    /// optional.map(q!(|v| v + 1)).all_ticks()
301    /// # }, |mut stream| async move {
302    /// // 2
303    /// # assert_eq!(stream.next().await.unwrap(), 2);
304    /// # }));
305    /// # }
306    /// ```
307    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
308    where
309        F: Fn(T) -> U + 'a,
310    {
311        let f = f.splice_fn1_ctx(&self.location).into();
312        Optional::new(
313            self.location.clone(),
314            HydroNode::Map {
315                f,
316                input: Box::new(self.ir_node.into_inner()),
317                metadata: self
318                    .location
319                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
320            },
321        )
322    }
323
324    /// Transforms the optional value by applying a function `f` to it and then flattening
325    /// the result into a stream, preserving the order of elements.
326    ///
327    /// If the optional is empty, the output stream is also empty. If the optional contains
328    /// a value, `f` is applied to produce an iterator, and all items from that iterator
329    /// are emitted in the output stream in deterministic order.
330    ///
331    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
332    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
333    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
334    ///
335    /// # Example
336    /// ```rust
337    /// # #[cfg(feature = "deploy")] {
338    /// # use hydro_lang::prelude::*;
339    /// # use futures::StreamExt;
340    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341    /// let tick = process.tick();
342    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
343    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
344    /// # }, |mut stream| async move {
345    /// // 1, 2, 3
346    /// # for w in vec![1, 2, 3] {
347    /// #     assert_eq!(stream.next().await.unwrap(), w);
348    /// # }
349    /// # }));
350    /// # }
351    /// ```
352    pub fn flat_map_ordered<U, I, F>(
353        self,
354        f: impl IntoQuotedMut<'a, F, L>,
355    ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
356    where
357        I: IntoIterator<Item = U>,
358        F: Fn(T) -> I + 'a,
359    {
360        let f = f.splice_fn1_ctx(&self.location).into();
361        Stream::new(
362            self.location.clone(),
363            HydroNode::FlatMap {
364                f,
365                input: Box::new(self.ir_node.into_inner()),
366                metadata: self.location.new_node_metadata(
367                    Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
368                ),
369            },
370        )
371    }
372
373    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
374    /// for the output type `I` to produce items in any order.
375    ///
376    /// If the optional is empty, the output stream is also empty. If the optional contains
377    /// a value, `f` is applied to produce an iterator, and all items from that iterator
378    /// are emitted in the output stream in non-deterministic order.
379    ///
380    /// # Example
381    /// ```rust
382    /// # #[cfg(feature = "deploy")] {
383    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
384    /// # use futures::StreamExt;
385    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
386    /// let tick = process.tick();
387    /// let optional = tick.optional_first_tick(q!(
388    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
389    /// ));
390    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
391    /// # }, |mut stream| async move {
392    /// // 1, 2, 3, but in no particular order
393    /// # let mut results = Vec::new();
394    /// # for _ in 0..3 {
395    /// #     results.push(stream.next().await.unwrap());
396    /// # }
397    /// # results.sort();
398    /// # assert_eq!(results, vec![1, 2, 3]);
399    /// # }));
400    /// # }
401    /// ```
402    pub fn flat_map_unordered<U, I, F>(
403        self,
404        f: impl IntoQuotedMut<'a, F, L>,
405    ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
406    where
407        I: IntoIterator<Item = U>,
408        F: Fn(T) -> I + 'a,
409    {
410        let f = f.splice_fn1_ctx(&self.location).into();
411        Stream::new(
412            self.location.clone(),
413            HydroNode::FlatMap {
414                f,
415                input: Box::new(self.ir_node.into_inner()),
416                metadata: self
417                    .location
418                    .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
419            },
420        )
421    }
422
423    /// Flattens the optional value into a stream, preserving the order of elements.
424    ///
425    /// If the optional is empty, the output stream is also empty. If the optional contains
426    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
427    /// in the output stream in deterministic order.
428    ///
429    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
430    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
431    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
432    ///
433    /// # Example
434    /// ```rust
435    /// # #[cfg(feature = "deploy")] {
436    /// # use hydro_lang::prelude::*;
437    /// # use futures::StreamExt;
438    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
439    /// let tick = process.tick();
440    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
441    /// optional.flatten_ordered().all_ticks()
442    /// # }, |mut stream| async move {
443    /// // 1, 2, 3
444    /// # for w in vec![1, 2, 3] {
445    /// #     assert_eq!(stream.next().await.unwrap(), w);
446    /// # }
447    /// # }));
448    /// # }
449    /// ```
450    pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
451    where
452        T: IntoIterator<Item = U>,
453    {
454        self.flat_map_ordered(q!(|v| v))
455    }
456
457    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
458    /// for the element type `T` to produce items in any order.
459    ///
460    /// If the optional is empty, the output stream is also empty. If the optional contains
461    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
462    /// in the output stream in non-deterministic order.
463    ///
464    /// # Example
465    /// ```rust
466    /// # #[cfg(feature = "deploy")] {
467    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
468    /// # use futures::StreamExt;
469    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
470    /// let tick = process.tick();
471    /// let optional = tick.optional_first_tick(q!(
472    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
473    /// ));
474    /// optional.flatten_unordered().all_ticks()
475    /// # }, |mut stream| async move {
476    /// // 1, 2, 3, but in no particular order
477    /// # let mut results = Vec::new();
478    /// # for _ in 0..3 {
479    /// #     results.push(stream.next().await.unwrap());
480    /// # }
481    /// # results.sort();
482    /// # assert_eq!(results, vec![1, 2, 3]);
483    /// # }));
484    /// # }
485    /// ```
486    pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
487    where
488        T: IntoIterator<Item = U>,
489    {
490        self.flat_map_unordered(q!(|v| v))
491    }
492
493    /// Creates an optional containing only the value if it satisfies a predicate `f`.
494    ///
495    /// If the optional is empty, the output optional is also empty. If the optional contains
496    /// a value and the predicate returns `true`, the output optional contains the same value.
497    /// If the predicate returns `false`, the output optional is empty.
498    ///
499    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
500    /// not modify or take ownership of the value. If you need to modify the value while filtering
501    /// use [`Optional::filter_map`] instead.
502    ///
503    /// # Example
504    /// ```rust
505    /// # #[cfg(feature = "deploy")] {
506    /// # use hydro_lang::prelude::*;
507    /// # use futures::StreamExt;
508    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
509    /// let tick = process.tick();
510    /// let optional = tick.optional_first_tick(q!(5));
511    /// optional.filter(q!(|&x| x > 3)).all_ticks()
512    /// # }, |mut stream| async move {
513    /// // 5
514    /// # assert_eq!(stream.next().await.unwrap(), 5);
515    /// # }));
516    /// # }
517    /// ```
518    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
519    where
520        F: Fn(&T) -> bool + 'a,
521    {
522        let f = f.splice_fn1_borrow_ctx(&self.location).into();
523        Optional::new(
524            self.location.clone(),
525            HydroNode::Filter {
526                f,
527                input: Box::new(self.ir_node.into_inner()),
528                metadata: self.location.new_node_metadata(Self::collection_kind()),
529            },
530        )
531    }
532
533    /// An operator that both filters and maps. It yields only the value if the supplied
534    /// closure `f` returns `Some(value)`.
535    ///
536    /// If the optional is empty, the output optional is also empty. If the optional contains
537    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
538    /// If the closure returns `None`, the output optional is empty.
539    ///
540    /// # Example
541    /// ```rust
542    /// # #[cfg(feature = "deploy")] {
543    /// # use hydro_lang::prelude::*;
544    /// # use futures::StreamExt;
545    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
546    /// let tick = process.tick();
547    /// let optional = tick.optional_first_tick(q!("42"));
548    /// optional
549    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
550    ///     .all_ticks()
551    /// # }, |mut stream| async move {
552    /// // 42
553    /// # assert_eq!(stream.next().await.unwrap(), 42);
554    /// # }));
555    /// # }
556    /// ```
557    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
558    where
559        F: Fn(T) -> Option<U> + 'a,
560    {
561        let f = f.splice_fn1_ctx(&self.location).into();
562        Optional::new(
563            self.location.clone(),
564            HydroNode::FilterMap {
565                f,
566                input: Box::new(self.ir_node.into_inner()),
567                metadata: self
568                    .location
569                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
570            },
571        )
572    }
573
574    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
575    ///
576    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
577    /// non-null. This is useful for combining several pieces of state together.
578    ///
579    /// # Example
580    /// ```rust
581    /// # #[cfg(feature = "deploy")] {
582    /// # use hydro_lang::prelude::*;
583    /// # use futures::StreamExt;
584    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
585    /// let tick = process.tick();
586    /// let numbers = process
587    ///   .source_iter(q!(vec![123, 456, 789]))
588    ///   .batch(&tick, nondet!(/** test */));
589    /// let min = numbers.clone().min(); // Optional
590    /// let max = numbers.max(); // Optional
591    /// min.zip(max).all_ticks()
592    /// # }, |mut stream| async move {
593    /// // [(123, 789)]
594    /// # for w in vec![(123, 789)] {
595    /// #     assert_eq!(stream.next().await.unwrap(), w);
596    /// # }
597    /// # }));
598    /// # }
599    /// ```
600    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
601    where
602        O: Clone,
603    {
604        let other: Optional<O, L, B> = other.into();
605        check_matching_location(&self.location, &other.location);
606
607        if L::is_top_level()
608            && let Some(tick) = self.location.try_tick()
609        {
610            let out = zip_inside_tick(
611                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
612                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
613            )
614            .latest();
615
616            Optional::new(out.location, out.ir_node.into_inner())
617        } else {
618            zip_inside_tick(self, other)
619        }
620    }
621
622    /// Passes through `self` when it has a value, otherwise passes through `other`.
623    ///
624    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
625    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
626    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
627    ///
628    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
629    /// of the inputs change (including to/from null states).
630    ///
631    /// # Example
632    /// ```rust
633    /// # #[cfg(feature = "deploy")] {
634    /// # use hydro_lang::prelude::*;
635    /// # use futures::StreamExt;
636    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637    /// let tick = process.tick();
638    /// // ticks are lazy by default, forces the second tick to run
639    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640    ///
641    /// let some_first_tick = tick.optional_first_tick(q!(123));
642    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
643    /// some_first_tick.or(some_second_tick).all_ticks()
644    /// # }, |mut stream| async move {
645    /// // [123 /* first tick */, 456 /* second tick */]
646    /// # for w in vec![123, 456] {
647    /// #     assert_eq!(stream.next().await.unwrap(), w);
648    /// # }
649    /// # }));
650    /// # }
651    /// ```
652    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
653        check_matching_location(&self.location, &other.location);
654
655        if L::is_top_level()
656            && let Some(tick) = self.location.try_tick()
657        {
658            let out = or_inside_tick(
659                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
660                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
661            )
662            .latest();
663
664            Optional::new(out.location, out.ir_node.into_inner())
665        } else {
666            Optional::new(
667                self.location.clone(),
668                HydroNode::ChainFirst {
669                    first: Box::new(self.ir_node.into_inner()),
670                    second: Box::new(other.ir_node.into_inner()),
671                    metadata: self.location.new_node_metadata(Self::collection_kind()),
672                },
673            )
674        }
675    }
676
677    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
678    ///
679    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
680    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
681    ///
682    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
683    /// of the inputs change (including to/from null states).
684    ///
685    /// # Example
686    /// ```rust
687    /// # #[cfg(feature = "deploy")] {
688    /// # use hydro_lang::prelude::*;
689    /// # use futures::StreamExt;
690    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
691    /// let tick = process.tick();
692    /// // ticks are lazy by default, forces the later ticks to run
693    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
694    ///
695    /// let some_first_tick = tick.optional_first_tick(q!(123));
696    /// some_first_tick
697    ///     .unwrap_or(tick.singleton(q!(456)))
698    ///     .all_ticks()
699    /// # }, |mut stream| async move {
700    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
701    /// # for w in vec![123, 456, 456, 456] {
702    /// #     assert_eq!(stream.next().await.unwrap(), w);
703    /// # }
704    /// # }));
705    /// # }
706    /// ```
707    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
708        let res_option = self.or(other.into());
709        Singleton::new(
710            res_option.location.clone(),
711            HydroNode::Cast {
712                inner: Box::new(res_option.ir_node.into_inner()),
713                metadata: res_option
714                    .location
715                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
716            },
717        )
718    }
719
720    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
721    ///
722    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
723    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
724    /// so that Hydro can skip any computation on null values.
725    ///
726    /// # Example
727    /// ```rust
728    /// # #[cfg(feature = "deploy")] {
729    /// # use hydro_lang::prelude::*;
730    /// # use futures::StreamExt;
731    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
732    /// let tick = process.tick();
733    /// // ticks are lazy by default, forces the later ticks to run
734    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
735    ///
736    /// let some_first_tick = tick.optional_first_tick(q!(123));
737    /// some_first_tick.into_singleton().all_ticks()
738    /// # }, |mut stream| async move {
739    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
740    /// # for w in vec![Some(123), None, None, None] {
741    /// #     assert_eq!(stream.next().await.unwrap(), w);
742    /// # }
743    /// # }));
744    /// # }
745    /// ```
746    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
747    where
748        T: Clone,
749    {
750        let none: syn::Expr = parse_quote!(::std::option::Option::None);
751
752        let none_singleton = Singleton::new(
753            self.location.clone(),
754            HydroNode::SingletonSource {
755                value: none.into(),
756                metadata: self
757                    .location
758                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
759            },
760        );
761
762        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
763    }
764
765    /// An operator which allows you to "name" a `HydroNode`.
766    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
767    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
768        {
769            let mut node = self.ir_node.borrow_mut();
770            let metadata = node.metadata_mut();
771            metadata.tag = Some(name.to_string());
772        }
773        self
774    }
775}
776
777impl<'a, T, L> Optional<T, L, Bounded>
778where
779    L: Location<'a>,
780{
781    /// Filters this optional, passing through the optional value if it is non-null **and** the
782    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
783    ///
784    /// Useful for conditionally processing, such as only emitting an optional's value outside
785    /// a tick if some other condition is satisfied.
786    ///
787    /// # Example
788    /// ```rust
789    /// # #[cfg(feature = "deploy")] {
790    /// # use hydro_lang::prelude::*;
791    /// # use futures::StreamExt;
792    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
793    /// let tick = process.tick();
794    /// // ticks are lazy by default, forces the second tick to run
795    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
796    ///
797    /// let batch_first_tick = process
798    ///   .source_iter(q!(vec![]))
799    ///   .batch(&tick, nondet!(/** test */));
800    /// let batch_second_tick = process
801    ///   .source_iter(q!(vec![456]))
802    ///   .batch(&tick, nondet!(/** test */))
803    ///   .defer_tick(); // appears on the second tick
804    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
805    /// batch_first_tick.chain(batch_second_tick).first()
806    ///   .filter_if_some(some_on_first_tick)
807    ///   .unwrap_or(tick.singleton(q!(789)))
808    ///   .all_ticks()
809    /// # }, |mut stream| async move {
810    /// // [789, 789]
811    /// # for w in vec![789, 789] {
812    /// #     assert_eq!(stream.next().await.unwrap(), w);
813    /// # }
814    /// # }));
815    /// # }
816    /// ```
817    pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
818        self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
819    }
820
821    /// Filters this optional, passing through the optional value if it is non-null **and** the
822    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
823    ///
824    /// Useful for conditionally processing, such as only emitting an optional's value outside
825    /// a tick if some other condition is satisfied.
826    ///
827    /// # Example
828    /// ```rust
829    /// # #[cfg(feature = "deploy")] {
830    /// # use hydro_lang::prelude::*;
831    /// # use futures::StreamExt;
832    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
833    /// let tick = process.tick();
834    /// // ticks are lazy by default, forces the second tick to run
835    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
836    ///
837    /// let batch_first_tick = process
838    ///   .source_iter(q!(vec![]))
839    ///   .batch(&tick, nondet!(/** test */));
840    /// let batch_second_tick = process
841    ///   .source_iter(q!(vec![456]))
842    ///   .batch(&tick, nondet!(/** test */))
843    ///   .defer_tick(); // appears on the second tick
844    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
845    /// batch_first_tick.chain(batch_second_tick).first()
846    ///   .filter_if_none(some_on_first_tick)
847    ///   .unwrap_or(tick.singleton(q!(789)))
848    ///   .all_ticks()
849    /// # }, |mut stream| async move {
850    /// // [789, 789]
851    /// # for w in vec![789, 456] {
852    /// #     assert_eq!(stream.next().await.unwrap(), w);
853    /// # }
854    /// # }));
855    /// # }
856    /// ```
857    pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
858        self.filter_if_some(
859            other
860                .map(q!(|_| ()))
861                .into_singleton()
862                .filter(q!(|o| o.is_none())),
863        )
864    }
865
866    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
867    ///
868    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
869    /// having a value, such as only releasing a piece of state if the node is the leader.
870    ///
871    /// # Example
872    /// ```rust
873    /// # #[cfg(feature = "deploy")] {
874    /// # use hydro_lang::prelude::*;
875    /// # use futures::StreamExt;
876    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
877    /// let tick = process.tick();
878    /// // ticks are lazy by default, forces the second tick to run
879    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
880    ///
881    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
882    /// some_on_first_tick
883    ///     .if_some_then(tick.singleton(q!(456)))
884    ///     .unwrap_or(tick.singleton(q!(123)))
885    /// # .all_ticks()
886    /// # }, |mut stream| async move {
887    /// // 456 (first tick) ~> 123 (second tick onwards)
888    /// # for w in vec![456, 123, 123] {
889    /// #     assert_eq!(stream.next().await.unwrap(), w);
890    /// # }
891    /// # }));
892    /// # }
893    /// ```
894    pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
895        value.filter_if_some(self)
896    }
897}
898
899impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
900where
901    L: Location<'a> + NoTick,
902{
903    /// Returns an optional value corresponding to the latest snapshot of the optional
904    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
905    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
906    /// all snapshots of this optional into the atomic-associated tick will observe the
907    /// same value each tick.
908    ///
909    /// # Non-Determinism
910    /// Because this picks a snapshot of a optional whose value is continuously changing,
911    /// the output optional has a non-deterministic value since the snapshot can be at an
912    /// arbitrary point in time.
913    pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
914        Optional::new(
915            self.location.clone().tick,
916            HydroNode::Batch {
917                inner: Box::new(self.ir_node.into_inner()),
918                metadata: self
919                    .location
920                    .tick
921                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
922            },
923        )
924    }
925
926    /// Returns this optional back into a top-level, asynchronous execution context where updates
927    /// to the value will be asynchronously propagated.
928    pub fn end_atomic(self) -> Optional<T, L, B> {
929        Optional::new(
930            self.location.tick.l.clone(),
931            HydroNode::EndAtomic {
932                inner: Box::new(self.ir_node.into_inner()),
933                metadata: self
934                    .location
935                    .tick
936                    .l
937                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
938            },
939        )
940    }
941}
942
943impl<'a, T, L, B: Boundedness> Optional<T, L, B>
944where
945    L: Location<'a>,
946{
947    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
948    /// will observe the same version of the value and will be executed synchronously before any
949    /// outputs are yielded (in [`Optional::end_atomic`]).
950    ///
951    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
952    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
953    /// a different version).
954    ///
955    /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
956    /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
957    /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
958    pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
959        let out_location = Atomic { tick: tick.clone() };
960        Optional::new(
961            out_location.clone(),
962            HydroNode::BeginAtomic {
963                inner: Box::new(self.ir_node.into_inner()),
964                metadata: out_location
965                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
966            },
967        )
968    }
969
970    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
971    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
972    /// relevant data that contributed to the snapshot at tick `t`.
973    ///
974    /// # Non-Determinism
975    /// Because this picks a snapshot of a optional whose value is continuously changing,
976    /// the output optional has a non-deterministic value since the snapshot can be at an
977    /// arbitrary point in time.
978    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
979        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
980        Optional::new(
981            tick.clone(),
982            HydroNode::Batch {
983                inner: Box::new(self.ir_node.into_inner()),
984                metadata: tick
985                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
986            },
987        )
988    }
989
990    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
991    /// with order corresponding to increasing prefixes of data contributing to the optional.
992    ///
993    /// # Non-Determinism
994    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
995    /// to non-deterministic batching and arrival of inputs, the output stream is
996    /// non-deterministic.
997    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
998    where
999        L: NoTick,
1000    {
1001        let tick = self.location.tick();
1002        self.snapshot(&tick, nondet).all_ticks().weakest_retries()
1003    }
1004
1005    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1006    /// value taken at various points in time. Because the input optional may be
1007    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1008    /// represent the value of the optional given some prefix of the streams leading up to
1009    /// it.
1010    ///
1011    /// # Non-Determinism
1012    /// The output stream is non-deterministic in which elements are sampled, since this
1013    /// is controlled by a clock.
1014    pub fn sample_every(
1015        self,
1016        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1017        nondet: NonDet,
1018    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1019    where
1020        L: NoTick + NoAtomic,
1021    {
1022        let samples = self.location.source_interval(interval, nondet);
1023        let tick = self.location.tick();
1024
1025        self.snapshot(&tick, nondet)
1026            .filter_if_some(samples.batch(&tick, nondet).first())
1027            .all_ticks()
1028            .weakest_retries()
1029    }
1030}
1031
1032impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1033where
1034    L: Location<'a>,
1035{
1036    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1037    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1038    /// null values).
1039    ///
1040    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1041    /// producing one element in the output for each (non-null) tick. This is useful for batched
1042    /// computations, where the results from each tick must be combined together.
1043    ///
1044    /// # Example
1045    /// ```rust
1046    /// # #[cfg(feature = "deploy")] {
1047    /// # use hydro_lang::prelude::*;
1048    /// # use futures::StreamExt;
1049    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1050    /// # let tick = process.tick();
1051    /// # // ticks are lazy by default, forces the second tick to run
1052    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1053    /// # let batch_first_tick = process
1054    /// #   .source_iter(q!(vec![]))
1055    /// #   .batch(&tick, nondet!(/** test */));
1056    /// # let batch_second_tick = process
1057    /// #   .source_iter(q!(vec![1, 2, 3]))
1058    /// #   .batch(&tick, nondet!(/** test */))
1059    /// #   .defer_tick(); // appears on the second tick
1060    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1061    /// input_batch // first tick: [], second tick: [1, 2, 3]
1062    ///     .max()
1063    ///     .all_ticks()
1064    /// # }, |mut stream| async move {
1065    /// // [3]
1066    /// # for w in vec![3] {
1067    /// #     assert_eq!(stream.next().await.unwrap(), w);
1068    /// # }
1069    /// # }));
1070    /// # }
1071    /// ```
1072    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1073        self.into_stream().all_ticks()
1074    }
1075
1076    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1077    /// which will stream the value computed in _each_ tick as a separate stream element.
1078    ///
1079    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1080    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1081    /// optional's [`Tick`] context.
1082    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1083        self.into_stream().all_ticks_atomic()
1084    }
1085
1086    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1087    /// be asynchronously updated with the latest value of the optional inside the tick, including
1088    /// whether the optional is null or not.
1089    ///
1090    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1091    /// tick that tracks the inner value. This is useful for getting the value as of the
1092    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1093    ///
1094    /// # Example
1095    /// ```rust
1096    /// # #[cfg(feature = "deploy")] {
1097    /// # use hydro_lang::prelude::*;
1098    /// # use futures::StreamExt;
1099    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1100    /// # let tick = process.tick();
1101    /// # // ticks are lazy by default, forces the second tick to run
1102    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1103    /// # let batch_first_tick = process
1104    /// #   .source_iter(q!(vec![]))
1105    /// #   .batch(&tick, nondet!(/** test */));
1106    /// # let batch_second_tick = process
1107    /// #   .source_iter(q!(vec![1, 2, 3]))
1108    /// #   .batch(&tick, nondet!(/** test */))
1109    /// #   .defer_tick(); // appears on the second tick
1110    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1111    /// input_batch // first tick: [], second tick: [1, 2, 3]
1112    ///     .max()
1113    ///     .latest()
1114    /// # .into_singleton()
1115    /// # .sample_eager(nondet!(/** test */))
1116    /// # }, |mut stream| async move {
1117    /// // asynchronously changes from None ~> 3
1118    /// # for w in vec![None, Some(3)] {
1119    /// #     assert_eq!(stream.next().await.unwrap(), w);
1120    /// # }
1121    /// # }));
1122    /// # }
1123    /// ```
1124    pub fn latest(self) -> Optional<T, L, Unbounded> {
1125        Optional::new(
1126            self.location.outer().clone(),
1127            HydroNode::YieldConcat {
1128                inner: Box::new(self.ir_node.into_inner()),
1129                metadata: self
1130                    .location
1131                    .outer()
1132                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1133            },
1134        )
1135    }
1136
1137    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1138    /// be updated with the latest value of the optional inside the tick.
1139    ///
1140    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1141    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1142    /// optional's [`Tick`] context.
1143    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1144        let out_location = Atomic {
1145            tick: self.location.clone(),
1146        };
1147
1148        Optional::new(
1149            out_location.clone(),
1150            HydroNode::YieldConcat {
1151                inner: Box::new(self.ir_node.into_inner()),
1152                metadata: out_location
1153                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1154            },
1155        )
1156    }
1157
1158    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1159    /// always has the state of `self` at tick `T - 1`.
1160    ///
1161    /// At tick `0`, the output optional is null, since there is no previous tick.
1162    ///
1163    /// This operator enables stateful iterative processing with ticks, by sending data from one
1164    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1165    ///
1166    /// # Example
1167    /// ```rust
1168    /// # #[cfg(feature = "deploy")] {
1169    /// # use hydro_lang::prelude::*;
1170    /// # use futures::StreamExt;
1171    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1172    /// let tick = process.tick();
1173    /// // ticks are lazy by default, forces the second tick to run
1174    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1175    ///
1176    /// let batch_first_tick = process
1177    ///   .source_iter(q!(vec![1, 2]))
1178    ///   .batch(&tick, nondet!(/** test */));
1179    /// let batch_second_tick = process
1180    ///   .source_iter(q!(vec![3, 4]))
1181    ///   .batch(&tick, nondet!(/** test */))
1182    ///   .defer_tick(); // appears on the second tick
1183    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1184    ///   .reduce(q!(|state, v| *state += v));
1185    ///
1186    /// current_tick_sum.clone().into_singleton().zip(
1187    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1188    /// ).all_ticks()
1189    /// # }, |mut stream| async move {
1190    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1191    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1192    /// #     assert_eq!(stream.next().await.unwrap(), w);
1193    /// # }
1194    /// # }));
1195    /// # }
1196    /// ```
1197    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1198        Optional::new(
1199            self.location.clone(),
1200            HydroNode::DeferTick {
1201                input: Box::new(self.ir_node.into_inner()),
1202                metadata: self.location.new_node_metadata(Self::collection_kind()),
1203            },
1204        )
1205    }
1206
1207    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1208    /// non-null. Otherwise, the stream is empty.
1209    ///
1210    /// # Example
1211    /// ```rust
1212    /// # #[cfg(feature = "deploy")] {
1213    /// # use hydro_lang::prelude::*;
1214    /// # use futures::StreamExt;
1215    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1216    /// # let tick = process.tick();
1217    /// # // ticks are lazy by default, forces the second tick to run
1218    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1219    /// # let batch_first_tick = process
1220    /// #   .source_iter(q!(vec![]))
1221    /// #   .batch(&tick, nondet!(/** test */));
1222    /// # let batch_second_tick = process
1223    /// #   .source_iter(q!(vec![123, 456]))
1224    /// #   .batch(&tick, nondet!(/** test */))
1225    /// #   .defer_tick(); // appears on the second tick
1226    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1227    /// input_batch // first tick: [], second tick: [123, 456]
1228    ///     .clone()
1229    ///     .max()
1230    ///     .into_stream()
1231    ///     .chain(input_batch)
1232    ///     .all_ticks()
1233    /// # }, |mut stream| async move {
1234    /// // [456, 123, 456]
1235    /// # for w in vec![456, 123, 456] {
1236    /// #     assert_eq!(stream.next().await.unwrap(), w);
1237    /// # }
1238    /// # }));
1239    /// # }
1240    /// ```
1241    pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1242        Stream::new(
1243            self.location.clone(),
1244            HydroNode::Cast {
1245                inner: Box::new(self.ir_node.into_inner()),
1246                metadata: self.location.new_node_metadata(Stream::<
1247                    T,
1248                    Tick<L>,
1249                    Bounded,
1250                    TotalOrder,
1251                    ExactlyOnce,
1252                >::collection_kind()),
1253            },
1254        )
1255    }
1256}
1257
1258#[cfg(feature = "deploy")]
1259#[cfg(test)]
1260mod tests {
1261    use futures::StreamExt;
1262    use hydro_deploy::Deployment;
1263    use stageleft::q;
1264
1265    use super::Optional;
1266    use crate::compile::builder::FlowBuilder;
1267    use crate::location::Location;
1268    use crate::nondet::nondet;
1269
1270    #[tokio::test]
1271    async fn optional_or_cardinality() {
1272        let mut deployment = Deployment::new();
1273
1274        let flow = FlowBuilder::new();
1275        let node = flow.process::<()>();
1276        let external = flow.external::<()>();
1277
1278        let node_tick = node.tick();
1279        let tick_singleton = node_tick.singleton(q!(123));
1280        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1281        let counts = tick_optional_inhabited
1282            .clone()
1283            .or(tick_optional_inhabited)
1284            .into_stream()
1285            .count()
1286            .all_ticks()
1287            .send_bincode_external(&external);
1288
1289        let nodes = flow
1290            .with_process(&node, deployment.Localhost())
1291            .with_external(&external, deployment.Localhost())
1292            .deploy(&mut deployment);
1293
1294        deployment.deploy().await.unwrap();
1295
1296        let mut external_out = nodes.connect(counts).await;
1297
1298        deployment.start().await.unwrap();
1299
1300        assert_eq!(external_out.next().await.unwrap(), 1);
1301    }
1302
1303    #[tokio::test]
1304    async fn into_singleton_top_level_none_cardinality() {
1305        let mut deployment = Deployment::new();
1306
1307        let flow = FlowBuilder::new();
1308        let node = flow.process::<()>();
1309        let external = flow.external::<()>();
1310
1311        let node_tick = node.tick();
1312        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1313        let into_singleton = top_level_none.into_singleton();
1314
1315        let tick_driver = node.spin();
1316
1317        let counts = into_singleton
1318            .snapshot(&node_tick, nondet!(/** test */))
1319            .into_stream()
1320            .count()
1321            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1322            .map(q!(|(c, _)| c))
1323            .all_ticks()
1324            .send_bincode_external(&external);
1325
1326        let nodes = flow
1327            .with_process(&node, deployment.Localhost())
1328            .with_external(&external, deployment.Localhost())
1329            .deploy(&mut deployment);
1330
1331        deployment.deploy().await.unwrap();
1332
1333        let mut external_out = nodes.connect(counts).await;
1334
1335        deployment.start().await.unwrap();
1336
1337        assert_eq!(external_out.next().await.unwrap(), 1);
1338        assert_eq!(external_out.next().await.unwrap(), 1);
1339        assert_eq!(external_out.next().await.unwrap(), 1);
1340    }
1341}