Skip to main content

DFIR Operators

In our previous examples we made use of some of DFIR's operators. Here we document each operator in more detail. Many of these operators are based on the Rust equivalents for iterators; see the Rust documentation.

Maps: _counter, enumerate, identity, inspect, map, resolve_futures, resolve_futures_blocking, resolve_futures_blocking_ordered, resolve_futures_ordered
Simple one-in-one-out operators.

Filters: filter, filter_map
One-in zero-or-one-out operators.

Flattens: flat_map, flatten
One-in multiple-out operators.

Folds: fold, prefix, reduce, scan
Operators which accumulate elements together.

Keyed Folds: fold_keyed, reduce_keyed
Operators which accumulate elements together by key.

Lattice Folds: lattice_fold, lattice_reduce
Folds based on lattice-merge.

Persistent Operators: multiset_delta, defer_signal, persist, persist_mut, persist_mut_keyed, sort, sort_by_key, state, state_by, unique
Persistent (stateful) operators.

Multi-Input Operators: anti_join, chain, chain_first_n, cross_join, cross_join_multiset, cross_singleton, difference, join, join_fused, join_fused_lhs, join_fused_rhs, join_multiset, lattice_bimorphism, union, zip, zip_longest
Operators with multiple inputs.

Multi-Output Operators: demux_enum, partition, tee, unzip
Operators with multiple outputs.

Sources: initialize, null, spin, source_file, source_interval, source_iter, source_json, source_stdin, source_stream, source_stream_serde
Operators which produce output elements (and consume no inputs).

Sinks: dest_file, dest_sink, dest_sink_serde, for_each, null
Operators which consume input elements (and produce no outputs).

Control Flow Operators: assert, assert_eq, next_iteration, next_stratum, defer_tick, defer_tick_lazy
Operators which affect control flow/scheduling.

Compiler Fusion Operators: _lattice_fold_batch, _lattice_join_fused_join
Operators which are necessary to implement certain optimizations and rewrite rules.

Windowing Operators: all_once, batch, repeat_n, prefix
Operators for windowing loop inputs.

Un-Windowing Operators: all_iterations
Operators for collecting loop outputs.


all_iterations

InputsSyntaxOutputsFlow
exactly 1-> all_iterations() ->exactly 1Streaming

all_once

InputsSyntaxOutputsFlow
exactly 1-> all_once() ->exactly 1Streaming

Given a bounded input stream, emits the entire stream in the first loop iteration.

Never causes additional loop iterations.

anti_join

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]anti_join() ->exactly 1Blocking

Input port names: pos (streaming), neg (blocking)

2 input streams the first of type (K, T), the second of type K, with output type (K, T)

For a given tick, computes the anti-join of the items in the input streams, returning items in the pos input that do not have matching keys in the neg input. NOTE this uses multiset semantics only on the positive side, so duplicated positive inputs will appear in the output either 0 times (if matched in neg) or as many times as they appear in the input (if not matched in neg)

source_iter(vec![("cat", 2), ("cat", 2), ("elephant", 3), ("elephant", 3)]) -> [pos]diff;
source_iter(vec!["dog", "cat", "gorilla"]) -> [neg]diff;
diff = anti_join() -> assert_eq([("elephant", 3), ("elephant", 3)]);

assert

InputsSyntaxOutputsFlow
exactly 1-> assert(A)at least 0 and at most 1Streaming

1 input stream, 1 optional output stream Arguments: a predicate function that will be applied to each item in the stream

If the predicate returns false for any input item then the operator will panic at runtime.

source_iter([1, 2, 3])
-> assert(|x| *x > 0)
-> assert_eq([1, 2, 3]);

assert_eq

InputsSyntaxOutputsFlow
exactly 1-> assert_eq(A)at least 0 and at most 1Streaming

1 input stream, 1 optional output stream Arguments: A Vector, Slice, or Array containing objects that will be compared to the input stream.

The input stream will be compared with the provided argument, element by element. If any elements do not match, assert_eq will panic. If the input stream produces more elements than are in the provided argument, assert_eq will panic.

The input stream is passed through assert_eq unchanged to the output stream.

assert_eq is mainly useful for testing and documenting the behavior of dfir code inline.

assert_eq will remember the stream position across ticks, see example.

unioned = union();

source_iter([1]) -> assert_eq([1]) -> unioned;
source_iter([2]) -> defer_tick() -> assert_eq([2]) -> unioned;

unioned -> assert_eq([1, 2]);

batch

InputsSyntaxOutputsFlow
exactly 1-> batch() ->exactly 1Streaming

Given an unbounded input stream, emits values arbitrarily split into batches over multiple iterations in the same order.

Will cause additional loop iterations as long as new values arrive.

chain

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]chain() ->exactly 1Streaming

2 input streams of the same type, 1 output stream of the same type

Chains together a pair of streams, with all the elements of the first emitted before the second.

Since chain has multiple input streams, it needs to be assigned to a variable to reference its multiple input ports across statements.

source_iter(vec!["hello", "world"]) -> [0]my_chain;
source_iter(vec!["stay", "gold"]) -> [1]my_chain;
my_chain = chain()
-> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD", "STAY", "GOLD"]);

chain_first_n

InputsSyntaxOutputsFlow
at least 2-> [<input_port>]chain_first_n(A) ->exactly 1Streaming

2 input streams of the same type, 1 output stream of the same type

Chains together a pair of streams, with all the elements of the first emitted before the second, emitting up to N elements where N is passed as an argument.

Since chain_first_n has multiple input streams, it needs to be assigned to a variable to reference its multiple input ports across statements.

source_iter(vec!["hello", "world"]) -> [0]my_chain;
source_iter(vec!["stay", "gold"]) -> [1]my_chain;
my_chain = chain_first_n(3)
-> map(|x| x.to_uppercase())
-> assert_eq(["HELLO", "WORLD", "STAY"]);

cross_join

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]cross_join() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

2 input streams of type S and T, 1 output stream of type (S, T)

Forms the cross-join (Cartesian product) of the items in the input streams, returning all tupled pairs.

source_iter(vec!["happy", "sad"]) -> [0]my_join;
source_iter(vec!["dog", "cat"]) -> [1]my_join;
my_join = cross_join() -> assert_eq([("happy", "dog"), ("sad", "dog"), ("happy", "cat"), ("sad", "cat")]);

cross_join can be provided with one or two generic lifetime persistence arguments in the same way as join, see join's documentation for more info.

let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<&str>();
let mut flow = dfir_rs::dfir_syntax! {
my_join = cross_join::<'tick>();
source_iter(["hello", "bye"]) -> [0]my_join;
source_stream(input_recv) -> [1]my_join;
my_join -> for_each(|(s, t)| println!("({}, {})", s, t));
};
input_send.send("oakland").unwrap();
flow.run_tick();
input_send.send("san francisco").unwrap();
flow.run_tick();

Prints only "(hello, oakland)" and "(bye, oakland)". The source_iter is only included in the first tick, then forgotten, so when "san francisco" arrives on input [1] in the second tick, there is nothing for it to match with from input [0], and therefore it does appear in the output.

cross_join_multiset

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]cross_join_multiset() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

2 input streams of type S and T, 1 output stream of type (S, T)

Forms the multiset cross-join (Cartesian product) of the (possibly duplicated) items in the input streams, returning all tupled pairs regardless of duplicates.

source_iter(vec!["happy", "happy", "sad"]) -> [0]my_join;
source_iter(vec!["dog", "cat", "cat"]) -> [1]my_join;
my_join = cross_join_multiset() -> sort() -> assert_eq([
("happy", "cat"),
("happy", "cat"),
("happy", "cat"),
("happy", "cat"),
("happy", "dog"),
("happy", "dog"),
("sad", "cat"),
("sad", "cat"),
("sad", "dog"), ]);

cross_singleton

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]cross_singleton() ->exactly 1Blocking

Input port names: input (streaming), single (blocking)

2 input streams, 1 output stream, no arguments.

Operates like cross-join, but treats one of the inputs as a "singleton"-like stream, emitting ignoring everything after the first element. This operator blocks on the singleton input, and then joins it with all the elements in the other stream if an element is present. This operator is useful when a singleton input must be used to transform elements of a stream, since unlike cross-product it avoids cloning the stream of inputs. It is also useful for creating conditional branches, since the operator short circuits if the singleton input produces no values.

There are two inputs to cross_singleton, they are input and single. input is the input data flow, and single is the singleton input.

join = cross_singleton();

source_iter([1, 2, 3]) -> [input]join;
source_iter([0]) -> [single]join;

join -> assert_eq([(1, 0), (2, 0), (3, 0)]);

defer_signal

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]defer_signal() ->exactly 1Blocking

Input port names: input (blocking), signal (blocking)

2 input streams, 1 output stream, no arguments.

Defers streaming input and releases it downstream when a signal is delivered. The order of input is preserved. This allows for buffering data and delivering it at a later, chosen, tick.

There are two inputs to defer_signal, they are input and signal. input is the input data flow. Data that is delivered on this input is collected in order inside of the defer_signal operator. When anything is sent to signal the collected data is released downstream. The entire signal input is consumed each tick, so sending 5 things on signal will not release inputs on the next 5 consecutive ticks.

gate = defer_signal();

source_iter([1, 2, 3]) -> [input]gate;
source_iter([()]) -> [signal]gate;

gate -> assert_eq([1, 2, 3]);

defer_tick

InputsSyntaxOutputsFlow
exactly 1-> defer_tick() ->exactly 1Blocking

Buffers all input items and releases them in the next tick. the state of the current tick. For example, See the book discussion on time for details on ticks. A tick may be divided into multiple strata; see the next_stratum() operator.

defer_tick is sometimes needed to separate conflicting data across time, in order to preserve invariants. Consider the following example, which implements a flip-flop -- the invariant is that it emit one of true or false in a given tick (but never both!)

pub fn main() {
let mut df = dfir_rs::dfir_syntax! {
source_iter(vec!(true))
-> state;
state = union()
-> assert(|x| if context.current_tick().0 % 2 == 0 { *x == true } else { *x == false })
-> map(|x| !x)
-> defer_tick()
-> state;
};
for i in 1..100 {
println!("tick {}", i);
df.run_tick();
}
}

defer_tick can also be handy for comparing stream content across ticks. In the example below defer_tick() is used alongside difference() to filter out any items that arrive from inp in the current tick which match an item from inp in the previous tick.

// Outputs 1 2 3 4 5 6 (on separate lines).
let (input_send, input_recv) = dfir_rs::util::unbounded_channel::<usize>();
let mut flow = dfir_rs::dfir_syntax! {
inp = source_stream(input_recv) -> tee();
inp -> [pos]diff;
inp -> defer_tick() -> [neg]diff;
diff = difference() -> for_each(|x| println!("{}", x));
};

for x in [1, 2, 3, 4] {
input_send.send(x).unwrap();
}
flow.run_tick();

for x in [3, 4, 5, 6] {
input_send.send(x).unwrap();
}
flow.run_tick();

You can also supply a type parameter defer_tick::<MyType>() to specify what items flow through the the pipeline. This can be useful for helping the compiler infer types.

defer_tick_lazy

InputsSyntaxOutputsFlow
exactly 1-> defer_tick_lazy() ->exactly 1Blocking

See defer_tick This operator is identical to defer_tick except that it does not eagerly cause a new tick to be scheduled.

demux_enum

InputsSyntaxOutputsFlow
exactly 1-> demux_enum()any number ofStreaming

Output port names: Variadic, as specified in arguments.

dest_file

InputsSyntaxOutputsFlow
exactly 1-> dest_file(A, B)exactly 0Streaming

dest_sink

InputsSyntaxOutputsFlow
exactly 1-> dest_sink(A)exactly 0Streaming

dest_sink_serde

InputsSyntaxOutputsFlow
exactly 1-> dest_sink_serde(A)exactly 0Streaming

difference

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]difference() ->exactly 1Blocking

Input port names: pos (streaming), neg (blocking)

enumerate

InputsSyntaxOutputsFlow
exactly 1-> enumerate() ->exactly 1Streaming

filter

InputsSyntaxOutputsFlow
exactly 1-> filter(A) ->exactly 1Streaming

filter_map

InputsSyntaxOutputsFlow
exactly 1-> filter_map(A) ->exactly 1Streaming

flat_map

InputsSyntaxOutputsFlow
exactly 1-> flat_map(A) ->exactly 1Streaming

flatten

InputsSyntaxOutputsFlow
exactly 1-> flatten() ->exactly 1Streaming

fold

InputsSyntaxOutputsFlow
exactly 1-> fold(A, B)at least 0 and at most 1Blocking

fold_keyed

InputsSyntaxOutputsFlow
exactly 1-> fold_keyed(A, B) ->exactly 1Blocking

for_each

InputsSyntaxOutputsFlow
exactly 1-> for_each(A)exactly 0Streaming

identity

InputsSyntaxOutputsFlow
exactly 1-> identity() ->exactly 1Streaming

initialize

InputsSyntaxOutputsFlow
exactly 0initialize() ->exactly 1Streaming

inspect

InputsSyntaxOutputsFlow
exactly 1-> inspect(A)at least 0 and at most 1Streaming

join

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

join_fused

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join_fused(A, B) ->exactly 1Blocking

Input port names: 0 (blocking), 1 (blocking)

join_fused_lhs

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join_fused_lhs(A) ->exactly 1Blocking

Input port names: 0 (blocking), 1 (streaming)

join_fused_rhs

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join_fused_rhs(A) ->exactly 1Blocking

Input port names: 0 (streaming), 1 (blocking)

join_multiset

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]join_multiset() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

lattice_bimorphism

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]lattice_bimorphism(A, B, C) ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

lattice_fold

InputsSyntaxOutputsFlow
exactly 1-> lattice_fold(A) ->exactly 1Blocking

lattice_reduce

InputsSyntaxOutputsFlow
exactly 1-> lattice_reduce() ->exactly 1Blocking

map

InputsSyntaxOutputsFlow
exactly 1-> map(A) ->exactly 1Streaming

multiset_delta

InputsSyntaxOutputsFlow
exactly 1-> multiset_delta() ->exactly 1Streaming

next_iteration

InputsSyntaxOutputsFlow
exactly 1-> next_iteration() ->exactly 1Streaming

next_stratum

InputsSyntaxOutputsFlow
exactly 1-> next_stratum() ->exactly 1Blocking

null

InputsSyntaxOutputsFlow
at least 0 and at most 1null()at least 0 and at most 1Streaming

partition

InputsSyntaxOutputsFlow
exactly 1-> partition(A)[<output_port>] ->at least 2Streaming

Output port names: Variadic, as specified in arguments.

persist

InputsSyntaxOutputsFlow
exactly 1-> persist() ->exactly 1Streaming

persist_mut

InputsSyntaxOutputsFlow
exactly 1-> persist_mut() ->exactly 1Blocking

persist_mut_keyed

InputsSyntaxOutputsFlow
exactly 1-> persist_mut_keyed() ->exactly 1Blocking

prefix

InputsSyntaxOutputsFlow
exactly 1-> prefix()at least 0 and at most 1Streaming

reduce

InputsSyntaxOutputsFlow
exactly 1-> reduce(A)at least 0 and at most 1Blocking

reduce_keyed

InputsSyntaxOutputsFlow
exactly 1-> reduce_keyed(A) ->exactly 1Blocking

repeat_n

InputsSyntaxOutputsFlow
exactly 1-> repeat_n(A) ->exactly 1Streaming

resolve_futures

InputsSyntaxOutputsFlow
exactly 1-> resolve_futures() ->exactly 1Streaming

resolve_futures_blocking

InputsSyntaxOutputsFlow
exactly 1-> resolve_futures_blocking() ->exactly 1Streaming

resolve_futures_blocking_ordered

InputsSyntaxOutputsFlow
exactly 1-> resolve_futures_blocking_ordered() ->exactly 1Streaming

resolve_futures_ordered

InputsSyntaxOutputsFlow
exactly 1-> resolve_futures_ordered() ->exactly 1Streaming

scan

InputsSyntaxOutputsFlow
exactly 1-> scan(A, B) ->exactly 1Streaming

sort

InputsSyntaxOutputsFlow
exactly 1-> sort() ->exactly 1Blocking

sort_by_key

InputsSyntaxOutputsFlow
exactly 1-> sort_by_key(A) ->exactly 1Blocking

source_file

InputsSyntaxOutputsFlow
exactly 0source_file(A) ->exactly 1Streaming

source_interval

InputsSyntaxOutputsFlow
exactly 0source_interval(A) ->exactly 1Streaming

source_iter

InputsSyntaxOutputsFlow
exactly 0source_iter(A) ->exactly 1Streaming

source_json

InputsSyntaxOutputsFlow
exactly 0source_json(A) ->exactly 1Streaming

source_stdin

InputsSyntaxOutputsFlow
exactly 0source_stdin() ->exactly 1Streaming

source_stream

InputsSyntaxOutputsFlow
exactly 0source_stream(A) ->exactly 1Streaming

source_stream_serde

InputsSyntaxOutputsFlow
exactly 0source_stream_serde(A) ->exactly 1Streaming

spin

InputsSyntaxOutputsFlow
exactly 0spin() ->exactly 1Streaming

state

InputsSyntaxOutputsFlow
exactly 1-> state()at least 0 and at most 1Streaming

state_by

InputsSyntaxOutputsFlow
exactly 1-> state_by(A, B)at least 0 and at most 1Streaming

tee

InputsSyntaxOutputsFlow
exactly 1-> tee()[<output_port>] ->at least 2Streaming

union

InputsSyntaxOutputsFlow
at least 2-> [<input_port>]union() ->exactly 1Streaming

unique

InputsSyntaxOutputsFlow
exactly 1-> unique() ->exactly 1Streaming

unzip

InputsSyntaxOutputsFlow
exactly 1-> unzip()[<output_port>] ->exactly 2Streaming

Output port names: 0, 1

zip

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]zip() ->exactly 1Streaming

Input port names: 0 (streaming), 1 (streaming)

zip_longest

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]zip_longest() ->exactly 1Blocking

Input port names: 0 (blocking), 1 (blocking)

_counter

InputsSyntaxOutputsFlow
exactly 1-> _counter(A, B)at least 0 and at most 1Streaming

Arguments: A tag string and a Duration for how long to wait between printing. A third optional parameter controls the prefix used for logging (defaults to "_counter").

Counts the number of items passing through and prints to stdout whenever the stream trigger activates.

source_stream(dfir_rs::util::iter_batches_stream(0..=100_000, 1))
-> _counter("nums", std::time::Duration::from_millis(100));

stdout:

_counter(nums): 1
_counter(nums): 6202
_counter(nums): 12540
_counter(nums): 18876
_counter(nums): 25218
_counter(nums): 31557
_counter(nums): 37893
_counter(nums): 44220
_counter(nums): 50576
_counter(nums): 56909
_counter(nums): 63181
_counter(nums): 69549
_counter(nums): 75914
_counter(nums): 82263
_counter(nums): 88638
_counter(nums): 94980

_lattice_fold_batch

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]_lattice_fold_batch() ->exactly 1Blocking

Input port names: input (blocking), signal (blocking)

2 input streams, 1 output stream, no arguments.

Batches streaming input and releases it downstream when a signal is delivered. This allows for buffering data and delivering it later while also folding it into a single lattice data structure. This operator is similar to defer_signal in that it batches input and releases it when a signal is given. It is also similar to lattice_fold in that it folds the input into a single lattice. So, _lattice_fold_batch is a combination of both defer_signal and lattice_fold. This operator is useful when trying to combine a sequence of defer_signal and lattice_fold operators without unnecessary memory consumption.

There are two inputs to _lattice_fold_batch, they are input and signal. input is the input data flow. Data that is delivered on this input is collected in order inside of the _lattice_fold_batch operator. When anything is sent to signal the collected data is released downstream. The entire signal input is consumed each tick, so sending 5 things on signal will not release inputs on the next 5 consecutive ticks.

use lattices::set_union::SetUnionHashSet;
use lattices::set_union::SetUnionSingletonSet;

source_iter([1, 2, 3])
-> map(SetUnionSingletonSet::new_from)
-> [input]batcher;

source_iter([()])
-> [signal]batcher;

batcher = _lattice_fold_batch::<SetUnionHashSet<usize>>()
-> assert_eq([SetUnionHashSet::new_from([1, 2, 3])]);

_lattice_join_fused_join

InputsSyntaxOutputsFlow
exactly 2-> [<input_port>]_lattice_join_fused_join() ->exactly 1Blocking

Input port names: 0 (blocking), 1 (blocking)

2 input streams of type (K, V1) and (K, V2), 1 output stream of type (K, (V1', V2')) where V1, V2, V1', V2' are lattice types

Performs a fold_keyed with lattice-merge aggregate function on each input and then forms the equijoin of the resulting key/value pairs in the input streams by their first (key) attribute. Unlike join, the result is not a stream of tuples, it's a stream of MapUnionSingletonMap lattices. You can (non-monotonically) "reveal" these as tuples if desired via map; see the examples below.

You must specify the the accumulating lattice types, they cannot be inferred. The first type argument corresponds to the [0] input of the join, and the second to the [1] input. Type arguments are specified in dfir using the rust turbofish syntax ::<>, for example _lattice_join_fused_join::<Min<_>, Max<_>>() The accumulating lattice type is not necessarily the same type as the input, see the below example involving SetUnion for such a case.

Like join, _lattice_join_fused_join can also be provided with one or two generic lifetime persistence arguments, either 'tick or 'static, to specify how join data persists. With 'tick, pairs will only be joined with corresponding pairs within the same tick. With 'static, pairs will be remembered across ticks and will be joined with pairs arriving in later ticks. When not explicitly specified persistence defaults to `tick.

Like join, when two persistence arguments are supplied the first maps to port 0 and the second maps to port 1. When a single persistence argument is supplied, it is applied to both input ports. When no persistence arguments are applied it defaults to 'tick for both. It is important to specify all persistence arguments before any type arguments, otherwise the persistence arguments will be ignored.

The syntax is as follows:

_lattice_join_fused_join::<MaxRepr<usize>, MaxRepr<usize>>(); // Or
_lattice_join_fused_join::<'static, MaxRepr<usize>, MaxRepr<usize>>();

_lattice_join_fused_join::<'tick, MaxRepr<usize>, MaxRepr<usize>>();

_lattice_join_fused_join::<'static, 'tick, MaxRepr<usize>, MaxRepr<usize>>();

_lattice_join_fused_join::<'tick, 'static, MaxRepr<usize>, MaxRepr<usize>>();
// etc.

Examples

use dfir_rs::lattices::Min;
use dfir_rs::lattices::Max;

source_iter([("key", Min::new(1)), ("key", Min::new(2))]) -> [0]my_join;
source_iter([("key", Max::new(1)), ("key", Max::new(2))]) -> [1]my_join;

my_join = _lattice_join_fused_join::<'tick, Min<usize>, Max<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> assert_eq([("key", (Min::new(1), Max::new(2)))]);
use dfir_rs::lattices::set_union::SetUnionSingletonSet;
use dfir_rs::lattices::set_union::SetUnionHashSet;

source_iter([("key", SetUnionSingletonSet::new_from(0)), ("key", SetUnionSingletonSet::new_from(1))]) -> [0]my_join;
source_iter([("key", SetUnionHashSet::new_from([0])), ("key", SetUnionHashSet::new_from([1]))]) -> [1]my_join;

my_join = _lattice_join_fused_join::<'tick, SetUnionHashSet<usize>, SetUnionHashSet<usize>>()
-> map(|singleton_map| {
let lattices::collections::SingletonMap(k, v) = singleton_map.into_reveal();
(k, (v.into_reveal()))
})
-> assert_eq([("key", (SetUnionHashSet::new_from([0, 1]), SetUnionHashSet::new_from([0, 1])))]);