hydro_lang/live_collections/
sliced.rs

1//! Utilities for transforming live collections via slicing.
2
3use super::boundedness::{Bounded, Unbounded};
4use crate::live_collections::boundedness::Boundedness;
5use crate::live_collections::keyed_singleton::BoundedValue;
6use crate::live_collections::stream::{Ordering, Retries};
7use crate::location::{Location, NoTick, Tick};
8use crate::nondet::NonDet;
9
10#[doc(hidden)]
11pub fn __sliced_wrap_invoke<A, B, O: Unslicable>(
12    a: A,
13    b: B,
14    f: impl FnOnce(A, B) -> O,
15) -> O::Unsliced {
16    let o_slice = f(a, b);
17    o_slice.unslice()
18}
19
20#[doc(hidden)]
21#[macro_export]
22macro_rules! __sliced_parse_uses__ {
23    (
24        @uses [$($uses:tt)*]
25        let $name:ident = use $(::$style:ident)?($expr:expr, $nondet:expr); $($rest:tt)*
26    ) => {
27        $crate::__sliced_parse_uses__!(
28            @uses [$($uses)* { $name, ($($style)?), $expr, $nondet }]
29            $($rest)*
30        )
31    };
32
33    (
34        @uses [{ $first_name:ident, ($($first_style:ident)?), $first:expr, $nondet_first:expr } $({ $rest_name:ident, ($($rest_style:ident)?), $rest:expr, $nondet_expl:expr })*]
35        $($body:tt)*
36    ) => {
37        {
38            let _ = $nondet_first;
39            $(let _ = $nondet_expl;)*
40
41            let __styled = (
42                $($crate::live_collections::sliced::style::$first_style)?($first),
43                $($($crate::live_collections::sliced::style::$rest_style)?($rest),)*
44            );
45
46            let __tick = $crate::live_collections::sliced::Slicable::preferred_tick(&__styled).unwrap_or_else(|| $crate::live_collections::sliced::Slicable::get_location(&__styled.0).tick());
47            let __backtraces = {
48                use $crate::compile::ir::backtrace::__macro_get_backtrace;
49                (
50                    $crate::macro_support::copy_span::copy_span!($first, {
51                        __macro_get_backtrace(1)
52                    }),
53                    $($crate::macro_support::copy_span::copy_span!($rest, {
54                        __macro_get_backtrace(1)
55                    }),)*
56                )
57            };
58            let __sliced = $crate::live_collections::sliced::Slicable::slice(__styled, &__tick, __backtraces, $nondet_first);
59            let (
60                $first_name,
61                $($rest_name,)*
62            ) = __sliced;
63
64            $crate::live_collections::sliced::Unslicable::unslice({
65                $($body)*
66            })
67        }
68    };
69}
70
71#[macro_export]
72/// Transforms a live collection with a computation relying on a slice of another live collection.
73/// This is useful for reading a snapshot of an asynchronously updated collection while processing another
74/// collection, such as joining a stream with the latest values from a singleton.
75///
76/// # Syntax
77/// The `sliced!` macro takes in a closure-like syntax specifying the live collections to be sliced
78/// and the body of the transformation. Each `use` statement indicates a live collection to be sliced,
79/// along with a non-determinism explanation. Optionally, a style can be specified to control how the
80/// live collection is sliced (e.g., atomically). All `use` statements must appear before the body.
81///
82/// ```rust,ignore
83/// let stream = sliced! {
84///     let name1 = use(collection1, nondet!(/** explanation */));
85///     let name2 = use::atomic(collection2, nondet!(/** explanation */));
86///
87///     // arbitrary statements can follow
88///     let intermediate = name1.map(...);
89///     intermediate.cross_singleton(name2)
90/// };
91/// ```
92///
93/// # Example with two collections
94/// ```rust
95/// # #[cfg(feature = "deploy")] {
96/// # use hydro_lang::prelude::*;
97/// # use futures::StreamExt;
98/// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
99/// let singleton = process.singleton(q!(5));
100/// let stream = process.source_iter(q!(vec![1, 2, 3]));
101/// let out: Stream<(i32, i32), _> = sliced! {
102///     let batch_of_req = use(stream, nondet!(/** test */));
103///     let latest_singleton = use(singleton, nondet!(/** test */));
104///
105///     let mapped = batch_of_req.map(q!(|x| x * 2));
106///     mapped.cross_singleton(latest_singleton)
107/// };
108/// # out
109/// # }, |mut stream| async move {
110/// # assert_eq!(stream.next().await.unwrap(), (2, 5));
111/// # assert_eq!(stream.next().await.unwrap(), (4, 5));
112/// # assert_eq!(stream.next().await.unwrap(), (6, 5));
113/// # }));
114/// # }
115/// ```
116macro_rules! __sliced__ {
117    ($($tt:tt)*) => {
118        $crate::__sliced_parse_uses__!(
119            @uses []
120            $($tt)*
121        )
122    };
123}
124
125pub use crate::__sliced__ as sliced;
126
127/// Styles for use with the `sliced!` macro.
128pub mod style {
129    use super::Slicable;
130    use crate::live_collections::boundedness::{Bounded, Unbounded};
131    use crate::live_collections::keyed_singleton::BoundedValue;
132    use crate::live_collections::stream::{Ordering, Retries, Stream};
133    use crate::location::{Location, NoTick, Tick};
134    use crate::nondet::NonDet;
135
136    /// Marks a live collection to be treated atomically during slicing.
137    pub struct Atomic<T>(pub T);
138
139    /// Wraps a live collection to be treated atomically during slicing.
140    pub fn atomic<T>(t: T) -> Atomic<T> {
141        Atomic(t)
142    }
143
144    impl<'a, T, L: Location<'a> + NoTick, O: Ordering, R: Retries> Slicable<'a, L>
145        for Atomic<Stream<T, crate::location::Atomic<L>, Unbounded, O, R>>
146    {
147        type Slice = Stream<T, Tick<L>, Bounded, O, R>;
148        type Backtrace = crate::compile::ir::backtrace::Backtrace;
149
150        fn preferred_tick(&self) -> Option<Tick<L>> {
151            Some(self.0.location().tick().as_regular_tick())
152        }
153
154        fn get_location(&self) -> &L {
155            panic!("Atomic location has no accessible inner location")
156        }
157
158        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
159            assert_eq!(
160                self.0.location().tick().as_regular_tick().id(),
161                tick.id(),
162                "Mismatched tick for atomic slicing"
163            );
164
165            let out = self.0.batch_atomic(nondet);
166            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
167            out
168        }
169    }
170
171    impl<'a, T, L: Location<'a> + NoTick> Slicable<'a, L>
172        for Atomic<crate::live_collections::Singleton<T, crate::location::Atomic<L>, Unbounded>>
173    {
174        type Slice = crate::live_collections::Singleton<T, Tick<L>, Bounded>;
175        type Backtrace = crate::compile::ir::backtrace::Backtrace;
176
177        fn preferred_tick(&self) -> Option<Tick<L>> {
178            Some(self.0.location().tick().as_regular_tick())
179        }
180
181        fn get_location(&self) -> &L {
182            panic!("Atomic location has no accessible inner location")
183        }
184
185        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
186            assert_eq!(
187                self.0.location().tick().as_regular_tick().id(),
188                tick.id(),
189                "Mismatched tick for atomic slicing"
190            );
191
192            let out = self.0.snapshot_atomic(nondet);
193            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
194            out
195        }
196    }
197
198    impl<'a, T, L: Location<'a> + NoTick> Slicable<'a, L>
199        for Atomic<crate::live_collections::Optional<T, crate::location::Atomic<L>, Unbounded>>
200    {
201        type Slice = crate::live_collections::Optional<T, Tick<L>, Bounded>;
202        type Backtrace = crate::compile::ir::backtrace::Backtrace;
203
204        fn preferred_tick(&self) -> Option<Tick<L>> {
205            Some(self.0.location().tick().as_regular_tick())
206        }
207
208        fn get_location(&self) -> &L {
209            panic!("Atomic location has no accessible inner location")
210        }
211
212        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
213            assert_eq!(
214                self.0.location().tick().as_regular_tick().id(),
215                tick.id(),
216                "Mismatched tick for atomic slicing"
217            );
218
219            let out = self.0.snapshot_atomic(nondet);
220            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
221            out
222        }
223    }
224
225    impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
226        for Atomic<
227            crate::live_collections::KeyedSingleton<K, V, crate::location::Atomic<L>, Unbounded>,
228        >
229    {
230        type Slice = crate::live_collections::KeyedSingleton<K, V, Tick<L>, Bounded>;
231        type Backtrace = crate::compile::ir::backtrace::Backtrace;
232
233        fn preferred_tick(&self) -> Option<Tick<L>> {
234            Some(self.0.location().tick().as_regular_tick())
235        }
236
237        fn get_location(&self) -> &L {
238            panic!("Atomic location has no accessible inner location")
239        }
240
241        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
242            assert_eq!(
243                self.0.location().tick().as_regular_tick().id(),
244                tick.id(),
245                "Mismatched tick for atomic slicing"
246            );
247
248            let out = self.0.snapshot_atomic(nondet);
249            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
250            out
251        }
252    }
253
254    impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
255        for Atomic<
256            crate::live_collections::KeyedSingleton<K, V, crate::location::Atomic<L>, BoundedValue>,
257        >
258    {
259        type Slice = crate::live_collections::KeyedSingleton<K, V, Tick<L>, Bounded>;
260        type Backtrace = crate::compile::ir::backtrace::Backtrace;
261
262        fn preferred_tick(&self) -> Option<Tick<L>> {
263            Some(self.0.location().tick().as_regular_tick())
264        }
265
266        fn get_location(&self) -> &L {
267            panic!("Atomic location has no accessible inner location")
268        }
269
270        fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
271            assert_eq!(
272                self.0.location().tick().as_regular_tick().id(),
273                tick.id(),
274                "Mismatched tick for atomic slicing"
275            );
276
277            let out = self.0.batch_atomic(nondet);
278            out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
279            out
280        }
281    }
282}
283
284/// A trait for live collections which can be sliced into bounded versions at a tick.
285pub trait Slicable<'a, L: Location<'a>> {
286    /// The sliced version of this live collection.
287    type Slice;
288
289    /// The type of backtrace associated with this slice.
290    type Backtrace;
291
292    /// Gets the preferred tick to slice at. Used for atomic slicing.
293    fn preferred_tick(&self) -> Option<Tick<L>>;
294
295    /// Gets the location associated with this live collection.
296    fn get_location(&self) -> &L;
297
298    /// Slices this live collection at the given tick.
299    ///
300    /// # Non-Determinism
301    /// Slicing a live collection may involve non-determinism, such as choosing which messages
302    /// to include in a batch. The provided `nondet` parameter should be used to explain the impact
303    /// of this non-determinism on the program's behavior.
304    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice;
305}
306
307/// A trait for live collections which can be yielded out of a slice back into their original form.
308pub trait Unslicable {
309    /// The unsliced version of this live collection.
310    type Unsliced;
311
312    /// Unslices a sliced live collection back into its original form.
313    fn unslice(self) -> Self::Unsliced;
314}
315
316impl<'a, L: Location<'a>> Slicable<'a, L> for () {
317    type Slice = ();
318    type Backtrace = ();
319
320    fn get_location(&self) -> &L {
321        unreachable!()
322    }
323
324    fn preferred_tick(&self) -> Option<Tick<L>> {
325        None
326    }
327
328    fn slice(self, _tick: &Tick<L>, __backtrace: Self::Backtrace, _nondet: NonDet) -> Self::Slice {}
329}
330
331impl Unslicable for () {
332    type Unsliced = ();
333
334    fn unslice(self) -> Self::Unsliced {}
335}
336
337macro_rules! impl_slicable_for_tuple {
338    ($($T:ident, $T_bt:ident, $idx:tt),*) => {
339        impl<'a, L: Location<'a>, $($T: Slicable<'a, L>),*> Slicable<'a, L> for ($($T,)*) {
340            type Slice = ($($T::Slice,)*);
341            type Backtrace = ($($T::Backtrace,)*);
342
343            fn get_location(&self) -> &L {
344                self.0.get_location()
345            }
346
347            fn preferred_tick(&self) -> Option<Tick<L>> {
348                let mut preferred: Option<Tick<L>> = None;
349                $(
350                    if let Some(tick) = self.$idx.preferred_tick() {
351                        preferred = Some(match preferred {
352                            Some(current) => {
353                                if $crate::location::Location::id(&current) == $crate::location::Location::id(&tick) {
354                                    current
355                                } else {
356                                    panic!("Mismatched preferred ticks for sliced collections")
357                                }
358                            },
359                            None => tick,
360                        });
361                    }
362                )*
363                preferred
364            }
365
366            #[expect(non_snake_case, reason = "macro codegen")]
367            fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
368                let ($($T,)*) = self;
369                let ($($T_bt,)*) = backtrace;
370                ($($T.slice(tick, $T_bt, nondet),)*)
371            }
372        }
373    };
374}
375
376#[cfg(stageleft_runtime)]
377impl_slicable_for_tuple!(S1, S1_bt, 0);
378#[cfg(stageleft_runtime)]
379impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1);
380#[cfg(stageleft_runtime)]
381impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2);
382#[cfg(stageleft_runtime)]
383impl_slicable_for_tuple!(S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3);
384#[cfg(stageleft_runtime)]
385impl_slicable_for_tuple!(
386    S1, S1_bt, 0, S2, S2_bt, 1, S3, S3_bt, 2, S4, S4_bt, 3, S5, S5_bt, 4
387); // 5 slices ought to be enough for anyone
388
389impl<'a, T, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries> Slicable<'a, L>
390    for super::Stream<T, L, B, O, R>
391{
392    type Slice = super::Stream<T, Tick<L>, Bounded, O, R>;
393    type Backtrace = crate::compile::ir::backtrace::Backtrace;
394
395    fn get_location(&self) -> &L {
396        self.location()
397    }
398
399    fn preferred_tick(&self) -> Option<Tick<L>> {
400        None
401    }
402
403    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
404        let out = self.batch(tick, nondet);
405        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
406        out
407    }
408}
409
410impl<'a, T, L: Location<'a>, O: Ordering, R: Retries> Unslicable
411    for super::Stream<T, Tick<L>, Bounded, O, R>
412{
413    type Unsliced = super::Stream<T, L, Unbounded, O, R>;
414
415    fn unslice(self) -> Self::Unsliced {
416        self.all_ticks()
417    }
418}
419
420impl<'a, T, L: Location<'a>, B: Boundedness> Slicable<'a, L> for super::Singleton<T, L, B> {
421    type Slice = super::Singleton<T, Tick<L>, Bounded>;
422    type Backtrace = crate::compile::ir::backtrace::Backtrace;
423
424    fn get_location(&self) -> &L {
425        self.location()
426    }
427
428    fn preferred_tick(&self) -> Option<Tick<L>> {
429        None
430    }
431
432    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
433        let out = self.snapshot(tick, nondet);
434        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
435        out
436    }
437}
438
439impl<'a, T, L: Location<'a>> Unslicable for super::Singleton<T, Tick<L>, Bounded> {
440    type Unsliced = super::Singleton<T, L, Unbounded>;
441
442    fn unslice(self) -> Self::Unsliced {
443        self.latest()
444    }
445}
446
447impl<'a, T, L: Location<'a>> Slicable<'a, L> for super::Optional<T, L, Unbounded> {
448    type Slice = super::Optional<T, Tick<L>, Bounded>;
449    type Backtrace = crate::compile::ir::backtrace::Backtrace;
450
451    fn get_location(&self) -> &L {
452        self.location()
453    }
454
455    fn preferred_tick(&self) -> Option<Tick<L>> {
456        None
457    }
458
459    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
460        let out = self.snapshot(tick, nondet);
461        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
462        out
463    }
464}
465
466impl<'a, T, L: Location<'a>> Unslicable for super::Optional<T, Tick<L>, Bounded> {
467    type Unsliced = super::Optional<T, L, Unbounded>;
468
469    fn unslice(self) -> Self::Unsliced {
470        self.latest()
471    }
472}
473
474impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Slicable<'a, L>
475    for super::KeyedStream<K, V, L, Unbounded, O, R>
476{
477    type Slice = super::KeyedStream<K, V, Tick<L>, Bounded, O, R>;
478    type Backtrace = crate::compile::ir::backtrace::Backtrace;
479
480    fn get_location(&self) -> &L {
481        self.location()
482    }
483
484    fn preferred_tick(&self) -> Option<Tick<L>> {
485        None
486    }
487
488    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
489        let out = self.batch(tick, nondet);
490        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
491        out
492    }
493}
494
495impl<'a, K, V, L: Location<'a>, O: Ordering, R: Retries> Unslicable
496    for super::KeyedStream<K, V, Tick<L>, Bounded, O, R>
497{
498    type Unsliced = super::KeyedStream<K, V, L, Unbounded, O, R>;
499
500    fn unslice(self) -> Self::Unsliced {
501        self.all_ticks()
502    }
503}
504
505impl<'a, K, V, L: Location<'a>> Slicable<'a, L> for super::KeyedSingleton<K, V, L, Unbounded> {
506    type Slice = super::KeyedSingleton<K, V, Tick<L>, Bounded>;
507    type Backtrace = crate::compile::ir::backtrace::Backtrace;
508
509    fn get_location(&self) -> &L {
510        self.location()
511    }
512
513    fn preferred_tick(&self) -> Option<Tick<L>> {
514        None
515    }
516
517    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
518        let out = self.snapshot(tick, nondet);
519        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
520        out
521    }
522}
523
524impl<'a, K, V, L: Location<'a> + NoTick> Slicable<'a, L>
525    for super::KeyedSingleton<K, V, L, BoundedValue>
526{
527    type Slice = super::KeyedSingleton<K, V, Tick<L>, Bounded>;
528    type Backtrace = crate::compile::ir::backtrace::Backtrace;
529
530    fn get_location(&self) -> &L {
531        self.location()
532    }
533
534    fn preferred_tick(&self) -> Option<Tick<L>> {
535        None
536    }
537
538    fn slice(self, tick: &Tick<L>, backtrace: Self::Backtrace, nondet: NonDet) -> Self::Slice {
539        let out = self.batch(tick, nondet);
540        out.ir_node.borrow_mut().op_metadata_mut().backtrace = backtrace;
541        out
542    }
543}