hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, TeeNode};
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::forward_handle::{ForwardRef, TickCycle};
18#[cfg(stageleft_runtime)]
19use crate::location::dynamic::{DynLocation, LocationId};
20use crate::location::tick::{Atomic, DeferTick, NoAtomic};
21use crate::location::{Location, NoTick, Tick, check_matching_location};
22use crate::nondet::{NonDet, nondet};
23
24/// A *nullable* Rust value that can asynchronously change over time.
25///
26/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
27/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
28/// asynchronously change over time, including becoming present of uninhabited.
29///
30/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
31/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
32///
33/// Type Parameters:
34/// - `Type`: the type of the value in this optional (when it is not null)
35/// - `Loc`: the [`Location`] where the optional is materialized
36/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
37pub struct Optional<Type, Loc, Bound: Boundedness> {
38 pub(crate) location: Loc,
39 pub(crate) ir_node: RefCell<HydroNode>,
40
41 _phantom: PhantomData<(Type, Loc, Bound)>,
42}
43
44impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
45where
46 L: Location<'a>,
47{
48 fn defer_tick(self) -> Self {
49 Optional::defer_tick(self)
50 }
51}
52
53impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
54where
55 L: Location<'a>,
56{
57 type Location = Tick<L>;
58
59 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
60 Optional::new(
61 location.clone(),
62 HydroNode::CycleSource {
63 ident,
64 metadata: location.new_node_metadata(Self::collection_kind()),
65 },
66 )
67 }
68}
69
70impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
71where
72 L: Location<'a>,
73{
74 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
75 assert_eq!(
76 Location::id(&self.location),
77 expected_location,
78 "locations do not match"
79 );
80 self.location
81 .flow_state()
82 .borrow_mut()
83 .push_root(HydroRoot::CycleSink {
84 ident,
85 input: Box::new(self.ir_node.into_inner()),
86 op_metadata: HydroIrOpMetadata::new(),
87 });
88 }
89}
90
91impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
92where
93 L: Location<'a>,
94{
95 type Location = Tick<L>;
96
97 fn create_source(ident: syn::Ident, location: Tick<L>) -> Self {
98 Optional::new(
99 location.clone(),
100 HydroNode::CycleSource {
101 ident,
102 metadata: location.new_node_metadata(Self::collection_kind()),
103 },
104 )
105 }
106}
107
108impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
109where
110 L: Location<'a>,
111{
112 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
113 assert_eq!(
114 Location::id(&self.location),
115 expected_location,
116 "locations do not match"
117 );
118 self.location
119 .flow_state()
120 .borrow_mut()
121 .push_root(HydroRoot::CycleSink {
122 ident,
123 input: Box::new(self.ir_node.into_inner()),
124 op_metadata: HydroIrOpMetadata::new(),
125 });
126 }
127}
128
129impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
130where
131 L: Location<'a> + NoTick,
132{
133 type Location = L;
134
135 fn create_source(ident: syn::Ident, location: L) -> Self {
136 Optional::new(
137 location.clone(),
138 HydroNode::CycleSource {
139 ident,
140 metadata: location.new_node_metadata(Self::collection_kind()),
141 },
142 )
143 }
144}
145
146impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
147where
148 L: Location<'a> + NoTick,
149{
150 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
151 assert_eq!(
152 Location::id(&self.location),
153 expected_location,
154 "locations do not match"
155 );
156 self.location
157 .flow_state()
158 .borrow_mut()
159 .push_root(HydroRoot::CycleSink {
160 ident,
161 input: Box::new(self.ir_node.into_inner()),
162 op_metadata: HydroIrOpMetadata::new(),
163 });
164 }
165}
166
167impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
168where
169 L: Location<'a>,
170{
171 fn from(singleton: Optional<T, L, Bounded>) -> Self {
172 Optional::new(singleton.location, singleton.ir_node.into_inner())
173 }
174}
175
176impl<'a, T, L, B: Boundedness> From<Singleton<T, L, B>> for Optional<T, L, B>
177where
178 L: Location<'a>,
179{
180 fn from(singleton: Singleton<T, L, B>) -> Self {
181 Optional::new(
182 singleton.location.clone(),
183 HydroNode::Cast {
184 inner: Box::new(singleton.ir_node.into_inner()),
185 metadata: singleton
186 .location
187 .new_node_metadata(Self::collection_kind()),
188 },
189 )
190 }
191}
192
193#[cfg(stageleft_runtime)]
194fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
195 me: Optional<T, L, B>,
196 other: Optional<O, L, B>,
197) -> Optional<(T, O), L, B> {
198 check_matching_location(&me.location, &other.location);
199
200 Optional::new(
201 me.location.clone(),
202 HydroNode::CrossSingleton {
203 left: Box::new(me.ir_node.into_inner()),
204 right: Box::new(other.ir_node.into_inner()),
205 metadata: me
206 .location
207 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
208 },
209 )
210}
211
212#[cfg(stageleft_runtime)]
213fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
214 me: Optional<T, L, B>,
215 other: Optional<T, L, B>,
216) -> Optional<T, L, B> {
217 check_matching_location(&me.location, &other.location);
218
219 Optional::new(
220 me.location.clone(),
221 HydroNode::ChainFirst {
222 first: Box::new(me.ir_node.into_inner()),
223 second: Box::new(other.ir_node.into_inner()),
224 metadata: me
225 .location
226 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
227 },
228 )
229}
230
231impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
232where
233 T: Clone,
234 L: Location<'a>,
235{
236 fn clone(&self) -> Self {
237 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
238 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
239 *self.ir_node.borrow_mut() = HydroNode::Tee {
240 inner: TeeNode(Rc::new(RefCell::new(orig_ir_node))),
241 metadata: self.location.new_node_metadata(Self::collection_kind()),
242 };
243 }
244
245 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
246 Optional {
247 location: self.location.clone(),
248 ir_node: HydroNode::Tee {
249 inner: TeeNode(inner.0.clone()),
250 metadata: metadata.clone(),
251 }
252 .into(),
253 _phantom: PhantomData,
254 }
255 } else {
256 unreachable!()
257 }
258 }
259}
260
261impl<'a, T, L, B: Boundedness> Optional<T, L, B>
262where
263 L: Location<'a>,
264{
265 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
266 debug_assert_eq!(ir_node.metadata().location_kind, Location::id(&location));
267 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
268 Optional {
269 location,
270 ir_node: RefCell::new(ir_node),
271 _phantom: PhantomData,
272 }
273 }
274
275 pub(crate) fn collection_kind() -> CollectionKind {
276 CollectionKind::Optional {
277 bound: B::BOUND_KIND,
278 element_type: stageleft::quote_type::<T>().into(),
279 }
280 }
281
282 /// Returns the [`Location`] where this optional is being materialized.
283 pub fn location(&self) -> &L {
284 &self.location
285 }
286
287 /// Transforms the optional value by applying a function `f` to it,
288 /// continuously as the input is updated.
289 ///
290 /// Whenever the optional is empty, the output optional is also empty.
291 ///
292 /// # Example
293 /// ```rust
294 /// # #[cfg(feature = "deploy")] {
295 /// # use hydro_lang::prelude::*;
296 /// # use futures::StreamExt;
297 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
298 /// let tick = process.tick();
299 /// let optional = tick.optional_first_tick(q!(1));
300 /// optional.map(q!(|v| v + 1)).all_ticks()
301 /// # }, |mut stream| async move {
302 /// // 2
303 /// # assert_eq!(stream.next().await.unwrap(), 2);
304 /// # }));
305 /// # }
306 /// ```
307 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
308 where
309 F: Fn(T) -> U + 'a,
310 {
311 let f = f.splice_fn1_ctx(&self.location).into();
312 Optional::new(
313 self.location.clone(),
314 HydroNode::Map {
315 f,
316 input: Box::new(self.ir_node.into_inner()),
317 metadata: self
318 .location
319 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
320 },
321 )
322 }
323
324 /// Transforms the optional value by applying a function `f` to it and then flattening
325 /// the result into a stream, preserving the order of elements.
326 ///
327 /// If the optional is empty, the output stream is also empty. If the optional contains
328 /// a value, `f` is applied to produce an iterator, and all items from that iterator
329 /// are emitted in the output stream in deterministic order.
330 ///
331 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
332 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
333 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
334 ///
335 /// # Example
336 /// ```rust
337 /// # #[cfg(feature = "deploy")] {
338 /// # use hydro_lang::prelude::*;
339 /// # use futures::StreamExt;
340 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
341 /// let tick = process.tick();
342 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
343 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
344 /// # }, |mut stream| async move {
345 /// // 1, 2, 3
346 /// # for w in vec![1, 2, 3] {
347 /// # assert_eq!(stream.next().await.unwrap(), w);
348 /// # }
349 /// # }));
350 /// # }
351 /// ```
352 pub fn flat_map_ordered<U, I, F>(
353 self,
354 f: impl IntoQuotedMut<'a, F, L>,
355 ) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
356 where
357 I: IntoIterator<Item = U>,
358 F: Fn(T) -> I + 'a,
359 {
360 let f = f.splice_fn1_ctx(&self.location).into();
361 Stream::new(
362 self.location.clone(),
363 HydroNode::FlatMap {
364 f,
365 input: Box::new(self.ir_node.into_inner()),
366 metadata: self.location.new_node_metadata(
367 Stream::<U, L, B, TotalOrder, ExactlyOnce>::collection_kind(),
368 ),
369 },
370 )
371 }
372
373 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
374 /// for the output type `I` to produce items in any order.
375 ///
376 /// If the optional is empty, the output stream is also empty. If the optional contains
377 /// a value, `f` is applied to produce an iterator, and all items from that iterator
378 /// are emitted in the output stream in non-deterministic order.
379 ///
380 /// # Example
381 /// ```rust
382 /// # #[cfg(feature = "deploy")] {
383 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
384 /// # use futures::StreamExt;
385 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
386 /// let tick = process.tick();
387 /// let optional = tick.optional_first_tick(q!(
388 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
389 /// ));
390 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
391 /// # }, |mut stream| async move {
392 /// // 1, 2, 3, but in no particular order
393 /// # let mut results = Vec::new();
394 /// # for _ in 0..3 {
395 /// # results.push(stream.next().await.unwrap());
396 /// # }
397 /// # results.sort();
398 /// # assert_eq!(results, vec![1, 2, 3]);
399 /// # }));
400 /// # }
401 /// ```
402 pub fn flat_map_unordered<U, I, F>(
403 self,
404 f: impl IntoQuotedMut<'a, F, L>,
405 ) -> Stream<U, L, B, NoOrder, ExactlyOnce>
406 where
407 I: IntoIterator<Item = U>,
408 F: Fn(T) -> I + 'a,
409 {
410 let f = f.splice_fn1_ctx(&self.location).into();
411 Stream::new(
412 self.location.clone(),
413 HydroNode::FlatMap {
414 f,
415 input: Box::new(self.ir_node.into_inner()),
416 metadata: self
417 .location
418 .new_node_metadata(Stream::<U, L, B, NoOrder, ExactlyOnce>::collection_kind()),
419 },
420 )
421 }
422
423 /// Flattens the optional value into a stream, preserving the order of elements.
424 ///
425 /// If the optional is empty, the output stream is also empty. If the optional contains
426 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
427 /// in the output stream in deterministic order.
428 ///
429 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
430 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
431 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
432 ///
433 /// # Example
434 /// ```rust
435 /// # #[cfg(feature = "deploy")] {
436 /// # use hydro_lang::prelude::*;
437 /// # use futures::StreamExt;
438 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
439 /// let tick = process.tick();
440 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
441 /// optional.flatten_ordered().all_ticks()
442 /// # }, |mut stream| async move {
443 /// // 1, 2, 3
444 /// # for w in vec![1, 2, 3] {
445 /// # assert_eq!(stream.next().await.unwrap(), w);
446 /// # }
447 /// # }));
448 /// # }
449 /// ```
450 pub fn flatten_ordered<U>(self) -> Stream<U, L, B, TotalOrder, ExactlyOnce>
451 where
452 T: IntoIterator<Item = U>,
453 {
454 self.flat_map_ordered(q!(|v| v))
455 }
456
457 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
458 /// for the element type `T` to produce items in any order.
459 ///
460 /// If the optional is empty, the output stream is also empty. If the optional contains
461 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
462 /// in the output stream in non-deterministic order.
463 ///
464 /// # Example
465 /// ```rust
466 /// # #[cfg(feature = "deploy")] {
467 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
468 /// # use futures::StreamExt;
469 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, NoOrder, ExactlyOnce>(|process| {
470 /// let tick = process.tick();
471 /// let optional = tick.optional_first_tick(q!(
472 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
473 /// ));
474 /// optional.flatten_unordered().all_ticks()
475 /// # }, |mut stream| async move {
476 /// // 1, 2, 3, but in no particular order
477 /// # let mut results = Vec::new();
478 /// # for _ in 0..3 {
479 /// # results.push(stream.next().await.unwrap());
480 /// # }
481 /// # results.sort();
482 /// # assert_eq!(results, vec![1, 2, 3]);
483 /// # }));
484 /// # }
485 /// ```
486 pub fn flatten_unordered<U>(self) -> Stream<U, L, B, NoOrder, ExactlyOnce>
487 where
488 T: IntoIterator<Item = U>,
489 {
490 self.flat_map_unordered(q!(|v| v))
491 }
492
493 /// Creates an optional containing only the value if it satisfies a predicate `f`.
494 ///
495 /// If the optional is empty, the output optional is also empty. If the optional contains
496 /// a value and the predicate returns `true`, the output optional contains the same value.
497 /// If the predicate returns `false`, the output optional is empty.
498 ///
499 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
500 /// not modify or take ownership of the value. If you need to modify the value while filtering
501 /// use [`Optional::filter_map`] instead.
502 ///
503 /// # Example
504 /// ```rust
505 /// # #[cfg(feature = "deploy")] {
506 /// # use hydro_lang::prelude::*;
507 /// # use futures::StreamExt;
508 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
509 /// let tick = process.tick();
510 /// let optional = tick.optional_first_tick(q!(5));
511 /// optional.filter(q!(|&x| x > 3)).all_ticks()
512 /// # }, |mut stream| async move {
513 /// // 5
514 /// # assert_eq!(stream.next().await.unwrap(), 5);
515 /// # }));
516 /// # }
517 /// ```
518 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
519 where
520 F: Fn(&T) -> bool + 'a,
521 {
522 let f = f.splice_fn1_borrow_ctx(&self.location).into();
523 Optional::new(
524 self.location.clone(),
525 HydroNode::Filter {
526 f,
527 input: Box::new(self.ir_node.into_inner()),
528 metadata: self.location.new_node_metadata(Self::collection_kind()),
529 },
530 )
531 }
532
533 /// An operator that both filters and maps. It yields only the value if the supplied
534 /// closure `f` returns `Some(value)`.
535 ///
536 /// If the optional is empty, the output optional is also empty. If the optional contains
537 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
538 /// If the closure returns `None`, the output optional is empty.
539 ///
540 /// # Example
541 /// ```rust
542 /// # #[cfg(feature = "deploy")] {
543 /// # use hydro_lang::prelude::*;
544 /// # use futures::StreamExt;
545 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
546 /// let tick = process.tick();
547 /// let optional = tick.optional_first_tick(q!("42"));
548 /// optional
549 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
550 /// .all_ticks()
551 /// # }, |mut stream| async move {
552 /// // 42
553 /// # assert_eq!(stream.next().await.unwrap(), 42);
554 /// # }));
555 /// # }
556 /// ```
557 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
558 where
559 F: Fn(T) -> Option<U> + 'a,
560 {
561 let f = f.splice_fn1_ctx(&self.location).into();
562 Optional::new(
563 self.location.clone(),
564 HydroNode::FilterMap {
565 f,
566 input: Box::new(self.ir_node.into_inner()),
567 metadata: self
568 .location
569 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
570 },
571 )
572 }
573
574 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
575 ///
576 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
577 /// non-null. This is useful for combining several pieces of state together.
578 ///
579 /// # Example
580 /// ```rust
581 /// # #[cfg(feature = "deploy")] {
582 /// # use hydro_lang::prelude::*;
583 /// # use futures::StreamExt;
584 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
585 /// let tick = process.tick();
586 /// let numbers = process
587 /// .source_iter(q!(vec![123, 456, 789]))
588 /// .batch(&tick, nondet!(/** test */));
589 /// let min = numbers.clone().min(); // Optional
590 /// let max = numbers.max(); // Optional
591 /// min.zip(max).all_ticks()
592 /// # }, |mut stream| async move {
593 /// // [(123, 789)]
594 /// # for w in vec![(123, 789)] {
595 /// # assert_eq!(stream.next().await.unwrap(), w);
596 /// # }
597 /// # }));
598 /// # }
599 /// ```
600 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
601 where
602 O: Clone,
603 {
604 let other: Optional<O, L, B> = other.into();
605 check_matching_location(&self.location, &other.location);
606
607 if L::is_top_level()
608 && let Some(tick) = self.location.try_tick()
609 {
610 let out = zip_inside_tick(
611 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
612 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
613 )
614 .latest();
615
616 Optional::new(out.location, out.ir_node.into_inner())
617 } else {
618 zip_inside_tick(self, other)
619 }
620 }
621
622 /// Passes through `self` when it has a value, otherwise passes through `other`.
623 ///
624 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
625 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
626 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
627 ///
628 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
629 /// of the inputs change (including to/from null states).
630 ///
631 /// # Example
632 /// ```rust
633 /// # #[cfg(feature = "deploy")] {
634 /// # use hydro_lang::prelude::*;
635 /// # use futures::StreamExt;
636 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
637 /// let tick = process.tick();
638 /// // ticks are lazy by default, forces the second tick to run
639 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
640 ///
641 /// let some_first_tick = tick.optional_first_tick(q!(123));
642 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
643 /// some_first_tick.or(some_second_tick).all_ticks()
644 /// # }, |mut stream| async move {
645 /// // [123 /* first tick */, 456 /* second tick */]
646 /// # for w in vec![123, 456] {
647 /// # assert_eq!(stream.next().await.unwrap(), w);
648 /// # }
649 /// # }));
650 /// # }
651 /// ```
652 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
653 check_matching_location(&self.location, &other.location);
654
655 if L::is_top_level()
656 && let Some(tick) = self.location.try_tick()
657 {
658 let out = or_inside_tick(
659 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
660 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
661 )
662 .latest();
663
664 Optional::new(out.location, out.ir_node.into_inner())
665 } else {
666 Optional::new(
667 self.location.clone(),
668 HydroNode::ChainFirst {
669 first: Box::new(self.ir_node.into_inner()),
670 second: Box::new(other.ir_node.into_inner()),
671 metadata: self.location.new_node_metadata(Self::collection_kind()),
672 },
673 )
674 }
675 }
676
677 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
678 ///
679 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
680 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
681 ///
682 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
683 /// of the inputs change (including to/from null states).
684 ///
685 /// # Example
686 /// ```rust
687 /// # #[cfg(feature = "deploy")] {
688 /// # use hydro_lang::prelude::*;
689 /// # use futures::StreamExt;
690 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
691 /// let tick = process.tick();
692 /// // ticks are lazy by default, forces the later ticks to run
693 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
694 ///
695 /// let some_first_tick = tick.optional_first_tick(q!(123));
696 /// some_first_tick
697 /// .unwrap_or(tick.singleton(q!(456)))
698 /// .all_ticks()
699 /// # }, |mut stream| async move {
700 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
701 /// # for w in vec![123, 456, 456, 456] {
702 /// # assert_eq!(stream.next().await.unwrap(), w);
703 /// # }
704 /// # }));
705 /// # }
706 /// ```
707 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
708 let res_option = self.or(other.into());
709 Singleton::new(
710 res_option.location.clone(),
711 HydroNode::Cast {
712 inner: Box::new(res_option.ir_node.into_inner()),
713 metadata: res_option
714 .location
715 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
716 },
717 )
718 }
719
720 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
721 ///
722 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
723 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
724 /// so that Hydro can skip any computation on null values.
725 ///
726 /// # Example
727 /// ```rust
728 /// # #[cfg(feature = "deploy")] {
729 /// # use hydro_lang::prelude::*;
730 /// # use futures::StreamExt;
731 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
732 /// let tick = process.tick();
733 /// // ticks are lazy by default, forces the later ticks to run
734 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
735 ///
736 /// let some_first_tick = tick.optional_first_tick(q!(123));
737 /// some_first_tick.into_singleton().all_ticks()
738 /// # }, |mut stream| async move {
739 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
740 /// # for w in vec![Some(123), None, None, None] {
741 /// # assert_eq!(stream.next().await.unwrap(), w);
742 /// # }
743 /// # }));
744 /// # }
745 /// ```
746 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
747 where
748 T: Clone,
749 {
750 let none: syn::Expr = parse_quote!(::std::option::Option::None);
751
752 let none_singleton = Singleton::new(
753 self.location.clone(),
754 HydroNode::SingletonSource {
755 value: none.into(),
756 metadata: self
757 .location
758 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
759 },
760 );
761
762 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
763 }
764
765 /// An operator which allows you to "name" a `HydroNode`.
766 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
767 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
768 {
769 let mut node = self.ir_node.borrow_mut();
770 let metadata = node.metadata_mut();
771 metadata.tag = Some(name.to_string());
772 }
773 self
774 }
775}
776
777impl<'a, T, L> Optional<T, L, Bounded>
778where
779 L: Location<'a>,
780{
781 /// Filters this optional, passing through the optional value if it is non-null **and** the
782 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
783 ///
784 /// Useful for conditionally processing, such as only emitting an optional's value outside
785 /// a tick if some other condition is satisfied.
786 ///
787 /// # Example
788 /// ```rust
789 /// # #[cfg(feature = "deploy")] {
790 /// # use hydro_lang::prelude::*;
791 /// # use futures::StreamExt;
792 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
793 /// let tick = process.tick();
794 /// // ticks are lazy by default, forces the second tick to run
795 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
796 ///
797 /// let batch_first_tick = process
798 /// .source_iter(q!(vec![]))
799 /// .batch(&tick, nondet!(/** test */));
800 /// let batch_second_tick = process
801 /// .source_iter(q!(vec![456]))
802 /// .batch(&tick, nondet!(/** test */))
803 /// .defer_tick(); // appears on the second tick
804 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
805 /// batch_first_tick.chain(batch_second_tick).first()
806 /// .filter_if_some(some_on_first_tick)
807 /// .unwrap_or(tick.singleton(q!(789)))
808 /// .all_ticks()
809 /// # }, |mut stream| async move {
810 /// // [789, 789]
811 /// # for w in vec![789, 789] {
812 /// # assert_eq!(stream.next().await.unwrap(), w);
813 /// # }
814 /// # }));
815 /// # }
816 /// ```
817 pub fn filter_if_some<U>(self, signal: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
818 self.zip(signal.map(q!(|_u| ()))).map(q!(|(d, _signal)| d))
819 }
820
821 /// Filters this optional, passing through the optional value if it is non-null **and** the
822 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
823 ///
824 /// Useful for conditionally processing, such as only emitting an optional's value outside
825 /// a tick if some other condition is satisfied.
826 ///
827 /// # Example
828 /// ```rust
829 /// # #[cfg(feature = "deploy")] {
830 /// # use hydro_lang::prelude::*;
831 /// # use futures::StreamExt;
832 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
833 /// let tick = process.tick();
834 /// // ticks are lazy by default, forces the second tick to run
835 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
836 ///
837 /// let batch_first_tick = process
838 /// .source_iter(q!(vec![]))
839 /// .batch(&tick, nondet!(/** test */));
840 /// let batch_second_tick = process
841 /// .source_iter(q!(vec![456]))
842 /// .batch(&tick, nondet!(/** test */))
843 /// .defer_tick(); // appears on the second tick
844 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
845 /// batch_first_tick.chain(batch_second_tick).first()
846 /// .filter_if_none(some_on_first_tick)
847 /// .unwrap_or(tick.singleton(q!(789)))
848 /// .all_ticks()
849 /// # }, |mut stream| async move {
850 /// // [789, 789]
851 /// # for w in vec![789, 456] {
852 /// # assert_eq!(stream.next().await.unwrap(), w);
853 /// # }
854 /// # }));
855 /// # }
856 /// ```
857 pub fn filter_if_none<U>(self, other: Optional<U, L, Bounded>) -> Optional<T, L, Bounded> {
858 self.filter_if_some(
859 other
860 .map(q!(|_| ()))
861 .into_singleton()
862 .filter(q!(|o| o.is_none())),
863 )
864 }
865
866 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
867 ///
868 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
869 /// having a value, such as only releasing a piece of state if the node is the leader.
870 ///
871 /// # Example
872 /// ```rust
873 /// # #[cfg(feature = "deploy")] {
874 /// # use hydro_lang::prelude::*;
875 /// # use futures::StreamExt;
876 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
877 /// let tick = process.tick();
878 /// // ticks are lazy by default, forces the second tick to run
879 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
880 ///
881 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
882 /// some_on_first_tick
883 /// .if_some_then(tick.singleton(q!(456)))
884 /// .unwrap_or(tick.singleton(q!(123)))
885 /// # .all_ticks()
886 /// # }, |mut stream| async move {
887 /// // 456 (first tick) ~> 123 (second tick onwards)
888 /// # for w in vec![456, 123, 123] {
889 /// # assert_eq!(stream.next().await.unwrap(), w);
890 /// # }
891 /// # }));
892 /// # }
893 /// ```
894 pub fn if_some_then<U>(self, value: Singleton<U, L, Bounded>) -> Optional<U, L, Bounded> {
895 value.filter_if_some(self)
896 }
897}
898
899impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
900where
901 L: Location<'a> + NoTick,
902{
903 /// Returns an optional value corresponding to the latest snapshot of the optional
904 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
905 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
906 /// all snapshots of this optional into the atomic-associated tick will observe the
907 /// same value each tick.
908 ///
909 /// # Non-Determinism
910 /// Because this picks a snapshot of a optional whose value is continuously changing,
911 /// the output optional has a non-deterministic value since the snapshot can be at an
912 /// arbitrary point in time.
913 pub fn snapshot_atomic(self, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
914 Optional::new(
915 self.location.clone().tick,
916 HydroNode::Batch {
917 inner: Box::new(self.ir_node.into_inner()),
918 metadata: self
919 .location
920 .tick
921 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
922 },
923 )
924 }
925
926 /// Returns this optional back into a top-level, asynchronous execution context where updates
927 /// to the value will be asynchronously propagated.
928 pub fn end_atomic(self) -> Optional<T, L, B> {
929 Optional::new(
930 self.location.tick.l.clone(),
931 HydroNode::EndAtomic {
932 inner: Box::new(self.ir_node.into_inner()),
933 metadata: self
934 .location
935 .tick
936 .l
937 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
938 },
939 )
940 }
941}
942
943impl<'a, T, L, B: Boundedness> Optional<T, L, B>
944where
945 L: Location<'a>,
946{
947 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
948 /// will observe the same version of the value and will be executed synchronously before any
949 /// outputs are yielded (in [`Optional::end_atomic`]).
950 ///
951 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
952 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
953 /// a different version).
954 ///
955 /// Entering an atomic section requires a [`Tick`] argument that declares where the optional will
956 /// be atomically processed. Snapshotting an optional into the _same_ [`Tick`] will preserve the
957 /// synchronous execution, and all such snapshots in the same [`Tick`] will have the same value.
958 pub fn atomic(self, tick: &Tick<L>) -> Optional<T, Atomic<L>, B> {
959 let out_location = Atomic { tick: tick.clone() };
960 Optional::new(
961 out_location.clone(),
962 HydroNode::BeginAtomic {
963 inner: Box::new(self.ir_node.into_inner()),
964 metadata: out_location
965 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
966 },
967 )
968 }
969
970 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
971 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
972 /// relevant data that contributed to the snapshot at tick `t`.
973 ///
974 /// # Non-Determinism
975 /// Because this picks a snapshot of a optional whose value is continuously changing,
976 /// the output optional has a non-deterministic value since the snapshot can be at an
977 /// arbitrary point in time.
978 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
979 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
980 Optional::new(
981 tick.clone(),
982 HydroNode::Batch {
983 inner: Box::new(self.ir_node.into_inner()),
984 metadata: tick
985 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
986 },
987 )
988 }
989
990 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
991 /// with order corresponding to increasing prefixes of data contributing to the optional.
992 ///
993 /// # Non-Determinism
994 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
995 /// to non-deterministic batching and arrival of inputs, the output stream is
996 /// non-deterministic.
997 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
998 where
999 L: NoTick,
1000 {
1001 let tick = self.location.tick();
1002 self.snapshot(&tick, nondet).all_ticks().weakest_retries()
1003 }
1004
1005 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1006 /// value taken at various points in time. Because the input optional may be
1007 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1008 /// represent the value of the optional given some prefix of the streams leading up to
1009 /// it.
1010 ///
1011 /// # Non-Determinism
1012 /// The output stream is non-deterministic in which elements are sampled, since this
1013 /// is controlled by a clock.
1014 pub fn sample_every(
1015 self,
1016 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1017 nondet: NonDet,
1018 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1019 where
1020 L: NoTick + NoAtomic,
1021 {
1022 let samples = self.location.source_interval(interval, nondet);
1023 let tick = self.location.tick();
1024
1025 self.snapshot(&tick, nondet)
1026 .filter_if_some(samples.batch(&tick, nondet).first())
1027 .all_ticks()
1028 .weakest_retries()
1029 }
1030}
1031
1032impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1033where
1034 L: Location<'a>,
1035{
1036 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1037 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1038 /// null values).
1039 ///
1040 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1041 /// producing one element in the output for each (non-null) tick. This is useful for batched
1042 /// computations, where the results from each tick must be combined together.
1043 ///
1044 /// # Example
1045 /// ```rust
1046 /// # #[cfg(feature = "deploy")] {
1047 /// # use hydro_lang::prelude::*;
1048 /// # use futures::StreamExt;
1049 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1050 /// # let tick = process.tick();
1051 /// # // ticks are lazy by default, forces the second tick to run
1052 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1053 /// # let batch_first_tick = process
1054 /// # .source_iter(q!(vec![]))
1055 /// # .batch(&tick, nondet!(/** test */));
1056 /// # let batch_second_tick = process
1057 /// # .source_iter(q!(vec![1, 2, 3]))
1058 /// # .batch(&tick, nondet!(/** test */))
1059 /// # .defer_tick(); // appears on the second tick
1060 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1061 /// input_batch // first tick: [], second tick: [1, 2, 3]
1062 /// .max()
1063 /// .all_ticks()
1064 /// # }, |mut stream| async move {
1065 /// // [3]
1066 /// # for w in vec![3] {
1067 /// # assert_eq!(stream.next().await.unwrap(), w);
1068 /// # }
1069 /// # }));
1070 /// # }
1071 /// ```
1072 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1073 self.into_stream().all_ticks()
1074 }
1075
1076 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1077 /// which will stream the value computed in _each_ tick as a separate stream element.
1078 ///
1079 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1080 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1081 /// optional's [`Tick`] context.
1082 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1083 self.into_stream().all_ticks_atomic()
1084 }
1085
1086 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1087 /// be asynchronously updated with the latest value of the optional inside the tick, including
1088 /// whether the optional is null or not.
1089 ///
1090 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1091 /// tick that tracks the inner value. This is useful for getting the value as of the
1092 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1093 ///
1094 /// # Example
1095 /// ```rust
1096 /// # #[cfg(feature = "deploy")] {
1097 /// # use hydro_lang::prelude::*;
1098 /// # use futures::StreamExt;
1099 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1100 /// # let tick = process.tick();
1101 /// # // ticks are lazy by default, forces the second tick to run
1102 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1103 /// # let batch_first_tick = process
1104 /// # .source_iter(q!(vec![]))
1105 /// # .batch(&tick, nondet!(/** test */));
1106 /// # let batch_second_tick = process
1107 /// # .source_iter(q!(vec![1, 2, 3]))
1108 /// # .batch(&tick, nondet!(/** test */))
1109 /// # .defer_tick(); // appears on the second tick
1110 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1111 /// input_batch // first tick: [], second tick: [1, 2, 3]
1112 /// .max()
1113 /// .latest()
1114 /// # .into_singleton()
1115 /// # .sample_eager(nondet!(/** test */))
1116 /// # }, |mut stream| async move {
1117 /// // asynchronously changes from None ~> 3
1118 /// # for w in vec![None, Some(3)] {
1119 /// # assert_eq!(stream.next().await.unwrap(), w);
1120 /// # }
1121 /// # }));
1122 /// # }
1123 /// ```
1124 pub fn latest(self) -> Optional<T, L, Unbounded> {
1125 Optional::new(
1126 self.location.outer().clone(),
1127 HydroNode::YieldConcat {
1128 inner: Box::new(self.ir_node.into_inner()),
1129 metadata: self
1130 .location
1131 .outer()
1132 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1133 },
1134 )
1135 }
1136
1137 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1138 /// be updated with the latest value of the optional inside the tick.
1139 ///
1140 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1141 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1142 /// optional's [`Tick`] context.
1143 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1144 let out_location = Atomic {
1145 tick: self.location.clone(),
1146 };
1147
1148 Optional::new(
1149 out_location.clone(),
1150 HydroNode::YieldConcat {
1151 inner: Box::new(self.ir_node.into_inner()),
1152 metadata: out_location
1153 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1154 },
1155 )
1156 }
1157
1158 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1159 /// always has the state of `self` at tick `T - 1`.
1160 ///
1161 /// At tick `0`, the output optional is null, since there is no previous tick.
1162 ///
1163 /// This operator enables stateful iterative processing with ticks, by sending data from one
1164 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1165 ///
1166 /// # Example
1167 /// ```rust
1168 /// # #[cfg(feature = "deploy")] {
1169 /// # use hydro_lang::prelude::*;
1170 /// # use futures::StreamExt;
1171 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1172 /// let tick = process.tick();
1173 /// // ticks are lazy by default, forces the second tick to run
1174 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1175 ///
1176 /// let batch_first_tick = process
1177 /// .source_iter(q!(vec![1, 2]))
1178 /// .batch(&tick, nondet!(/** test */));
1179 /// let batch_second_tick = process
1180 /// .source_iter(q!(vec![3, 4]))
1181 /// .batch(&tick, nondet!(/** test */))
1182 /// .defer_tick(); // appears on the second tick
1183 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1184 /// .reduce(q!(|state, v| *state += v));
1185 ///
1186 /// current_tick_sum.clone().into_singleton().zip(
1187 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1188 /// ).all_ticks()
1189 /// # }, |mut stream| async move {
1190 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1191 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1192 /// # assert_eq!(stream.next().await.unwrap(), w);
1193 /// # }
1194 /// # }));
1195 /// # }
1196 /// ```
1197 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1198 Optional::new(
1199 self.location.clone(),
1200 HydroNode::DeferTick {
1201 input: Box::new(self.ir_node.into_inner()),
1202 metadata: self.location.new_node_metadata(Self::collection_kind()),
1203 },
1204 )
1205 }
1206
1207 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
1208 /// non-null. Otherwise, the stream is empty.
1209 ///
1210 /// # Example
1211 /// ```rust
1212 /// # #[cfg(feature = "deploy")] {
1213 /// # use hydro_lang::prelude::*;
1214 /// # use futures::StreamExt;
1215 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1216 /// # let tick = process.tick();
1217 /// # // ticks are lazy by default, forces the second tick to run
1218 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1219 /// # let batch_first_tick = process
1220 /// # .source_iter(q!(vec![]))
1221 /// # .batch(&tick, nondet!(/** test */));
1222 /// # let batch_second_tick = process
1223 /// # .source_iter(q!(vec![123, 456]))
1224 /// # .batch(&tick, nondet!(/** test */))
1225 /// # .defer_tick(); // appears on the second tick
1226 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1227 /// input_batch // first tick: [], second tick: [123, 456]
1228 /// .clone()
1229 /// .max()
1230 /// .into_stream()
1231 /// .chain(input_batch)
1232 /// .all_ticks()
1233 /// # }, |mut stream| async move {
1234 /// // [456, 123, 456]
1235 /// # for w in vec![456, 123, 456] {
1236 /// # assert_eq!(stream.next().await.unwrap(), w);
1237 /// # }
1238 /// # }));
1239 /// # }
1240 /// ```
1241 pub fn into_stream(self) -> Stream<T, Tick<L>, Bounded, TotalOrder, ExactlyOnce> {
1242 Stream::new(
1243 self.location.clone(),
1244 HydroNode::Cast {
1245 inner: Box::new(self.ir_node.into_inner()),
1246 metadata: self.location.new_node_metadata(Stream::<
1247 T,
1248 Tick<L>,
1249 Bounded,
1250 TotalOrder,
1251 ExactlyOnce,
1252 >::collection_kind()),
1253 },
1254 )
1255 }
1256}
1257
1258#[cfg(feature = "deploy")]
1259#[cfg(test)]
1260mod tests {
1261 use futures::StreamExt;
1262 use hydro_deploy::Deployment;
1263 use stageleft::q;
1264
1265 use super::Optional;
1266 use crate::compile::builder::FlowBuilder;
1267 use crate::location::Location;
1268 use crate::nondet::nondet;
1269
1270 #[tokio::test]
1271 async fn optional_or_cardinality() {
1272 let mut deployment = Deployment::new();
1273
1274 let flow = FlowBuilder::new();
1275 let node = flow.process::<()>();
1276 let external = flow.external::<()>();
1277
1278 let node_tick = node.tick();
1279 let tick_singleton = node_tick.singleton(q!(123));
1280 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1281 let counts = tick_optional_inhabited
1282 .clone()
1283 .or(tick_optional_inhabited)
1284 .into_stream()
1285 .count()
1286 .all_ticks()
1287 .send_bincode_external(&external);
1288
1289 let nodes = flow
1290 .with_process(&node, deployment.Localhost())
1291 .with_external(&external, deployment.Localhost())
1292 .deploy(&mut deployment);
1293
1294 deployment.deploy().await.unwrap();
1295
1296 let mut external_out = nodes.connect(counts).await;
1297
1298 deployment.start().await.unwrap();
1299
1300 assert_eq!(external_out.next().await.unwrap(), 1);
1301 }
1302
1303 #[tokio::test]
1304 async fn into_singleton_top_level_none_cardinality() {
1305 let mut deployment = Deployment::new();
1306
1307 let flow = FlowBuilder::new();
1308 let node = flow.process::<()>();
1309 let external = flow.external::<()>();
1310
1311 let node_tick = node.tick();
1312 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1313 let into_singleton = top_level_none.into_singleton();
1314
1315 let tick_driver = node.spin();
1316
1317 let counts = into_singleton
1318 .snapshot(&node_tick, nondet!(/** test */))
1319 .into_stream()
1320 .count()
1321 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1322 .map(q!(|(c, _)| c))
1323 .all_ticks()
1324 .send_bincode_external(&external);
1325
1326 let nodes = flow
1327 .with_process(&node, deployment.Localhost())
1328 .with_external(&external, deployment.Localhost())
1329 .deploy(&mut deployment);
1330
1331 deployment.deploy().await.unwrap();
1332
1333 let mut external_out = nodes.connect(counts).await;
1334
1335 deployment.start().await.unwrap();
1336
1337 assert_eq!(external_out.next().await.unwrap(), 1);
1338 assert_eq!(external_out.next().await.unwrap(), 1);
1339 assert_eq!(external_out.next().await.unwrap(), 1);
1340 }
1341}