This Week in Fluvio #3
Welcome to the third edition of This Week in Fluvio, our weekly newsletter for development updates to Fluvio open source. Fluvio is a distributed, programmable streaming platform written in Rust.
This week’s release has one big feature and several quality-of-life user experience improvements. The big feature is the arrival of our SmartStreams Aggregate feature, which allows users to write functions that combine records into a long-running accumulator state. Good examples of accumulator use-cases are calculating sums or averages of numeric data, or combining structural key-value data.
To quickly illustrate what a SmartStream Aggregate looks like, let’s take a look at the simplest example of just summing numbers together:
use fluvio_smartstream::{smartstream, Result, Record, RecordData};
#[smartstream(aggregate)]
pub fn aggregate(accumulator: RecordData, current: &Record) -> Result<RecordData> {
// Parse the accumulator and current record as strings
let accumulator_string = std::str::from_utf8(accumulator.as_ref())?;
let current_string = std::str::from_utf8(current.value.as_ref())?;
// Parse the strings into integers
let accumulator_int = accumulator_string.parse::<i32>().unwrap_or(0);
let current_int = current_string.parse::<i32>()?;
// Take the sum of the two integers and return it as a string
let sum = accumulator_int + current_int;
Ok(sum.to_string().into())
}
Every aggregate function has two inputs: an “accumulator” and the current record from the stream. The function’s jobs is to combine these two values and produce a new accumulator value, which will be used when processing subsequent records in the stream. In this example, we simply parse the accumulator and input record as integers, then add them together.
When consuming records in Fluvio, we generally open a stream by asking for a particular Topic and partition, and providing an Offset for the first record where we would like our stream to begin reading. Offsets may be specified in three ways: by giving an absolute index into the partition, by a relative-from-beginning offset, and by a relative-from-end offset. When you specify a relative offset, they are first resolved to absolute offsets and then used to make a stream request.
Prior to 0.9.3
, there was a bug in the client where relative offsets could overflow
the actual size of the stream, which would cause the consumer to simply freeze and not
yield any records or errors. An example that would cause this problem would be if you
had a topic with 10 records in it, and you asked for a stream starting “20 from the end”
of that topic. This would incorrectly resolve to an absolute offset of 10 - 20 = -10
.
This bug has been fixed in 0.9.3
, with a new behavior where relative offsets that
are too large simply “bottom out” at the start or end of the stream. That is, if you
ask for “1000 from the end” in a stream of 100 elements, you’ll just start streaming
from the start, and if you ask for “1000 from the beginning” of a stream of 100 elements,
you’ll just start at the end, waiting for new records to arrive.
This is a nice usability improvement for Fluvio CLI users, where you may now “tail”
your streams. This acts similarly to the UNIX tail
command, which reads the last 10
lines of a file. fluvio consume --tail=X
will open a stream that begins X
elements
from the end, letting you quickly see the most recent records in your stream. By
default, using --tail
with no argument (no X
) will give you the last 10 elements
for some easy context over the latest records (just like UNIX tail
does).
As a quick example, this is what happens when you use --tail
on a stream with 20
sequential integers.
$ fluvio consume ints --tail
Consuming records starting 10 from the end of topic 'ints'
11
12
13
14
15
16
17
18
19
20
For the full list of changes, be sure to check out our CHANGELOG. If you have any questions or are interested in contributing, be sure to join our Discord channel and come say hello!
Until next week!