If you like this blog post, do subscribe to my RSS feed

One Billion Row Challenge in Rust

First Published: 23/04/24

I was chair of the committee at Stanford for our university reports. We put out lots and lots of reports from all phases of the department through these years. We had a big mailing list. People were also trading their reports with us. We had to have a massive bookkeeping system just to keep the correspondence, so that the secretaries in charge of it could know who had paid for their reports, who we were sharing with. All this administrative type of work had to be done.
It seemed like just a small matter of programming to do this. I had a grad student who volunteered to do this as his master’s project; to write-up program that would take care of all of the administrative chores of the Stanford tech reports distribution. He turned in his term paper and I looked at it superficially and I gave him an A on it, and he graduated with his master’s degree.
A week later, the secretary called me up and said, “Don, we’re having a little trouble with this program. Can you take a look at it for us?” The program was running up at the AI lab, which I hadn’t visited very often. I went up there and took a look at the program. I got to page five of the program and I said, “Hmmm. This is interesting. Let me make a copy of this page. I’m going to show it to my class.” [It was] the first time I saw where you change one symbol on the page and you can make the program run 50 times faster. He had misunderstood a sorting algorithm. I thought this was great. Then I turned to the next page and he has a searching algorithm there for binary search. I said, “Oh, he made a very interesting error here. I’ll make a copy of this page so I can show my class next time I teach about the wrong way to do binary search.” Then I got to page eight or nine, and I realized that the way he had written his program was hopelessly wrong.
He had written a program that would only work on the test case that he had used in his report for the master’s thesis, that was based on a database of size three or something like this. If you increased the database to four, all the structures would break down. It was the most weird thing. I would never conceive of it in my life. He would assume that the whole database was being maintained by the text editor, and the text editor would generate an index, the way the thing did.
Anyway, it was completely hopeless. There was no way to fix the program. I thought I was going to spend the weekend and give it to the secretary on Monday and she could work on it. There was no way. I had to spend a month writing a program that summer -- I think it was probably ’75, ’76 -- to cover up for my terrible error of giving this guy an A without seeing it. The report that he had, made it look like his program was working. But it only worked on that one case.
- Donald Knuth in his Turing Award Interview

My introductory quotes are getting bigger and more crazy, aren't they? Anyway, in this blog post, we will be doing things a bit differently. It is not one of my usual computer sciency explorations; instead, we will be diving head-first into the One Billion Row Challenge.

A few months back, my friend called me to discuss this challenge, but I did not have the bandwidth at the time, so I am a bit late to the party. This challenge was introduced in January 2024 by Gunnar Morling and lasted an entire month. The original challenge was in Java and involved an innocently simple problem:

Your mission, should you decide to accept it, is deceptively simple: write a Java program for retrieving temperature measurement values from a text file and calculating the min, mean, and max temperature per weather station. There’s just one caveat: the file has 1,000,000,000 rows!

That’s it! Gunnar Morling’s blog post has further details on it, including some constraints he put in to make this a bigger challenge. However, in this blog post, we will be doing things a bit differently. It will be a no-holds-barred match with no constraints on language, external dependencies, or hardware. Anything goes!

Let us start by creating one billion temperature measurement values stored in a text file called measurements.txt. These are randomly generated by first building the 1brc project using Apache Maven and then running the ./create_measurements.sh shell script with 1000000000 as its argument. My measurements.txt file is roughly 13GB and looks something like this:

[rust] head measurements.txt
Lhasa;13.4
Detroit;18.9
Manila;50.5
Riga;10.3
Zagreb;0.0
Nassau;46.2
Tucson;19.7
Vancouver;10.0
San Salvador;34.7
Nouakchott;35.1

Note that there are only 413 distinct cities, and the city names are validly UTF-8 encoded.

I will be testing my solutions on a 2021 MacBook Pro. It comes with an Apple M1 Pro chip containing one processor with 10 cores - 8 performance and 2 efficiency. The memory is 32 GB. According to a 2021 investigation by AnandTech, the M1 Pro has:

When I first heard of this problem, I wondered what an absolutely naive solution might look like. My first instinct was to use Python with Pandas. I imagined using read_csv() with chunksize set to a million to read the text file in chunks or batches, and iterating over each row in the chunk using iterrows(), with a Python dictionary storing the calculated mean, max, and min for each unique location. Well, this solution was laughably naive and took a whopping 16 seconds just to parse a million rows, or around 5 hours in total.

Now, there is no need to box the data in a Series, which can be quite slow as it might introduce overheads such as data type checks and data copying. We can improve upon this by directly processing each chunk. Think about it, for each chunk, all we need to do is group by location and then perform max, min, sum, and count aggregations on the temperature column. Here is the code:

import pandas as pd
import time

class Measurements:
    def __init__(self) -> None:
        self.path = "./measurements.txt"

    def parse(self):
        size = 10**7
        results = pd.DataFrame()
        start_time = time.time()
        index = 1
        with pd.read_csv(
            self.path, chunksize=size, delimiter=";", names=["loc", "temp"]
        ) as reader:
            for chunk in reader:
                print(f"Chunk {index}")
                index += 1
                curr_df = chunk.groupby("loc")["temp"].agg(
                    ["sum", "max", "min", "count"]
                )
                if not results.empty:
                    results = results.join(curr_df, how="outer", rsuffix="_")
                    results["min"] = results[["min", "min_"]].min(axis=1)
                    results["max"] = results[["max", "max_"]].max(axis=1)
                    results["count"] = results[["count", "count_"]].sum(axis=1)
                    results["sum"] = results[["sum", "sum_"]].sum(axis=1)
                    results.drop(
                        ["sum_", "min_", "max_", "count_"], axis=1, inplace=True
                    )
                else:
                    results = curr_df
        results["mean"] = results["sum"] / results["count"]
        results.drop(["sum", "count"], axis=1, inplace=True)
        end_time = time.time()
        print(f"Result is: {results} in {end_time - start_time} seconds")

if __name__ == "__main__":
    Measurements().parse()

I bumped up the chunk size to ten million. For each chunk, except the first one, we perform an outer join with the previously consolidated chunk and calculate the new min, max, count, and sum, eventually dropping the unnecessary columns. These should not be super expensive operations, given that the number of distinct locations is only 413. Once all the chunks have been processed, we calculate the mean. All in all, this code takes between 157 and 160 seconds to run; however, we have reduced the time from 5 hours to roughly 3 minutes.

Now, to be completely honest, writing these solutions in Python was my excuse to compare the performance of Polars against Pandas. Here’s a very similar code I developed with Polars after a few iterations:

import polars as pl
import time

class Measurements:
    def __init__(self) -> None:
        self.path = "./measurements.txt"

    def parse(self):
        size = 10**7
        results = pl.DataFrame()
        start_time = time.time()
        df = pl.read_csv(
            self.path, separator=";", has_header=False, new_columns=["loc", "temp"]
        )
        for i in range(100):
            start = i * size
            chunk = df.slice(start, size)
            aggr_chunk = chunk.group_by("loc").agg(
                [
                    pl.col("temp").sum().alias("sum"),
                    pl.col("temp").min().alias("min"),
                    pl.col("temp").max().alias("max"),
                    pl.count("temp").alias("count"),
                ]
            )
            if results.is_empty():
                results = aggr_chunk
            else:
                results = results.join(aggr_chunk, how="outer", on=["loc"], suffix="_")
                results = results.with_columns(
                    pl.when(results["min"] < results["min_"])
                    .then(results["min"])
                    .otherwise(results["min_"])
                    .alias("min")
                )
                results = results.with_columns(
                    pl.when(results["max"] > results["max_"])
                    .then(results["max"])
                    .otherwise(results["max_"])
                    .alias("max")
                )
                results = results.with_columns(
                    (results["sum"] + results["sum_"]).alias("sum")
                )
                results = results.with_columns(
                    (results["count"] + results["count_"]).alias("count")
                )
                results = results.drop(["sum_", "min_", "max_", "count_", "loc_"])
        results = results.with_columns(
            (results["sum"] / results["count"]).alias("mean")
        )
        results = results.drop(["sum", "count"])
        end_time = time.time()
        print(f"Result is: {results} in {end_time - start_time} seconds")

if __name__ == "__main__":
    Measurements().parse()

Note that this stores the entire DataFrame in memory. In fact, it consumes around 22 GB of memory, certainly not for the faint-hearted! On a clean run, right after a device shutdown, this took about 35.6 seconds to run. Here is the metrics from vmstat:

[13:23:14][rust] vm_stat 2
Mach Virtual Memory Statistics: (page size of 16384 bytes)
    free   active   specul inactive throttle    wired  prgable   faults     copy    0fill reactive   purged file-backed anonymous cmprssed cmprssor  dcomprs   comprs  pageins  pageout  swapins swapouts
 1401313   287882   113712   182609        0    70389     8976       97        0       92        0        0      318338    265865        0        0        0        0        0        0        0        0
 1401247   281549   113701   182609        0    76838     7458     2440        0      422        0        0      318366    259493        0        0        0        0       26        0        0        0
 1344265   318681   114498   203016        0    75543     8783    56608      506    35544        0        0      338622    297573        0        0        0        0    19813        0        0        0
  864526   561256   114503   445900        0    69660    10150   480577        0   309768        0        0      509359    612300        0        0        0        0   170728        0        0        0
  385414   800484   114462   684676        0    70902    10173   481426      156   310140        0        1      679889    919733        0        0        0        0   170516        0        0        0
    2998   991647    60060   930271        0    70774    10128   437358        0   281834       22       36      780960   1201019        0        0        0        0   155482        0        0        0
    3072   818442      507   817018        0    72451     3565   228953        0   147344    35142     5430      756420    879547   436207   343369       46   437313    81402        0        0        0
    1068   537773     2340   533868        0    75359     2395   190609      158   122479     2914        6      823667    250314  1166425   905495      154   730383    67279        1        0        0
    3876   428142     5383   421634        0    76724        0   238523        0   151745    68897     2574      780247     74912  1463738  1120217     2400   300021    83512        4        0        0
    4016   394213     4842   387982        0    80536        0   176237      115   111582      107     2512      714570     72467  1572895  1184378     6170   115616    61976        4       12    31300
    7327   392760     6049   385243        0    78233        0   118316        4    74303      361     2412      710081     73971  1645602  1186302    12307    85324    34450        1    13392    72460
.....
    3635   640394    60240   578572        0    77763     2407  6529598   122408  2988856   114617    23555      483459    795747   938561   695469   763073  1705248  1090289       76    90672   115568
    4547   700072    44036   654893        0    77802     2679   559162        0   181567        0        0      428252    970749   768423   574576   170039        0      635       30    12525     4504
   17253   755314    10058   745255        0    74579     2698   522784      102   161677        0        2      394292   1116335   611535   453578   156895        0      117        0     6060        0
    5084   830593     2525   826709        0    79976     2788   526357        1   177334        0       58      366560   1293267   445089   311039   166437        0        0        3     5820        0
   30467   894219     2033   892185        0    77778     2814   534056      108   189506        0       10      345828   1442609   282009   159268   163070        0      338       11     1444        0
 1557493     4198     2999   376232        0    81118     2642   430053        3   118525        2        0      346932     36497   153880    33865   127518        0      775        0        4        0
 1555482     8312     3616   376241        0    78430     4040     2290        0      554        0        0      347720     40449   152923    33783      956        0      472        0       19        0
 1555036    10501     3727   376192        0    76702     4082     1100      102      422        0        0      347854     42566   152758    33766      170        0       32        0        0        0

As can be seen from vm_stat, the initial free memory pages are quite high, indicating plenty of available memory. Furthermore, there are low pageins/pageouts and swap activity, typical of an idle system. As the script starts (from around row 4 onwards), there is a significant drop in free memory, indicating that we are loading data into memory. This is further confirmed by the spike in pageins, quite possibly from our measurements.txt file.

During the script execution, there is a high number of active pages, which indicates that memory is heavily utilized throughout the script's execution. There is also some swapping activity, which might suggest that the system is beginning to write data back to disk. The fascinating part is the comprs and dcomprs columns, which might suggest that there is some activity in memory compression and decompression, perhaps for managing memory pressure.

If memory is a constraint, read_csv_batched can also be leveraged. The parse method then gets modified to something like:

def parse(self):
   	 results = pl.DataFrame()
   	 reader = pl.read_csv_batched(self.path, separator=';', batch_size=10**7, has_header=False, new_columns=['loc', 'temp'])
   	 for i in range(100):
   		 chunk = reader.next_batches(1)[0]

This modification took about 67 seconds to run. However, with a batch_size of approximately 100 million, I managed to reduce the runtime to 49.34 seconds while consuming around 4 GB of memory.

At this point, I was three hours into a Sunday afternoon and felt battle-ready to tackle this with Rust. I’ve been using Rust at my workplace, Neurelo, for about nine months now. In my open-source world, I’ve also created a few learning projects in Rust, such as a Chip-8 interpreter and a CLI-based air traffic control simulator.

As with Python, I quickly chalked up a naive solution in Rust.

use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufRead, BufReader};

#[derive(Debug)]
struct Weather {
    max: f32,
    min: f32,
    sum: f32,
    count: usize,
}

impl Weather {
    fn new(temp: f32) -> Self {
        Self {
            max: temp,
            min: temp,
            sum: temp,
            count: 1,
        }
    }

    fn update(&mut self, temp: f32) {
        self.max = f32::max(self.max, temp);
        self.min = f32::min(self.min, temp);
        self.sum += temp;
        self.count += 1;
    }
}

fn main() -> io::Result<()> {
    let file = File::open("../measurements.txt")?;
    let reader = BufReader::new(file);
    let mut data = HashMap::new();
    for line in reader.lines() {
        let line = line.unwrap();
        let parts: Vec<&str> = line.split(';').collect();

        let loc = parts[0].to_owned();
        let temp = parts[1].parse::<f32>().unwrap();

        data.entry(loc)
            .and_modify(|prev: &mut Weather| prev.update(temp))
            .or_insert(Weather::new(temp));
    }
    println!("Data: {:?}", data);
    Ok(())
}

Executing it with the time command, we see 112.63s user, 2.81s system, 99% CPU, totaling 1:56.25. Two minutes is not bad for a start; however, there are a number of optimizations we could potentially perform on this. And that is exactly what I did for the next one and a half days, chipping away at the rough edges and pushing the runtime down as much as possible. Here’s how my final code looked - my less than 100 LOC solution to the one billion row challenge.

use fxhash::{FxBuildHasher, FxHashMap};
use memmap2::Mmap;
use rayon::prelude::*;
use std::fs::File;
use std::io::{self};

#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;

#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

#[derive(Debug)]
struct Weather {
    max: f32,
    min: f32,
    sum: f32,
    mean: f32,
    count: f32,
}

impl Weather {
    fn new(temp: f32) -> Self {
        Self {
            max: temp,
            min: temp,
            sum: temp,
            mean: 0.0,
            count: 1.0,
        }
    }

    fn update(&mut self, temp: f32) {
        self.max = f32::max(self.max, temp);
        self.min = f32::min(self.min, temp);
        self.sum += temp;
        self.count += 1.0;
    }

    fn merge(&mut self, weather: &Weather) {
        self.max = f32::max(self.max, weather.max);
        self.min = f32::min(self.min, weather.min);
        self.sum += weather.sum;
        self.count += weather.count;
    }
}

fn main() -> io::Result<()> {
    let file = File::open("../measurements.txt")?;
    let buffer = unsafe { Mmap::map(&file) }.unwrap();

    let mut data = buffer
        .par_chunks(100_000_000)
        .map(|chunk| {
            let mut local_data = FxHashMap::with_capacity_and_hasher(512, FxBuildHasher::default());

            for line in chunk.split(|&b| b == b'\n') {
                let parts = line.split(|&b| b == b';').collect::<Vec<_>>();

                if parts.len() == 2 {
                    if let Ok(temp) = fast_float::parse::<f32, &[u8]>(parts[1]) {
                        let loc: std::borrow::Cow<'_, str> = String::from_utf8_lossy(parts[0]);
                        local_data
                            .entry(loc)
                            .and_modify(|prev: &mut Weather| prev.update(temp))
                            .or_insert(Weather::new(temp));
                    }
                }
            }

            local_data
        })
        .reduce(
            || FxHashMap::with_capacity_and_hasher(512, FxBuildHasher::default()),
            |mut a, b| {
                for (loc, weather) in b {
                    a.entry(loc)
                        .and_modify(|prev| prev.merge(&weather))
                        .or_insert(weather);
                }
                a
            },
        );

    data.iter_mut().for_each(|(_, weather)| {
        weather.mean = weather.sum / weather.count;
    });

    println!("Data: {:?} {:?}", data, data.len());
    Ok(())
}

This code takes between 5.5 to 6.5 seconds to run on my system. Here is the time from my most recent run: 53.06s user, 1.67s system, 924% CPU, totaling 5.917 seconds.

One of the biggest improvements I saw was by using Rayon, a data parallelism library similar to OpenMP. My earliest iteration with Rayon involved using reader.lines().collect::<Vec<_>>().into_par_iter() with a mutex lock on a shared HashMap. However, this approach meant that I would have to use locking on every line I was processing. Consequently, each thread in the parallel iterator had to wait for access to the shared HashMap, considerably reducing any gains I might have obtained from concurrent execution. It was terribly slow.

The way such issues are usually mitigated is by using a thread-local data structure (a HashMap in our scenario). That is, each thread builds its own local HashMap, and once the processing is completed, we merge these local HashMaps into a shared one - a map-reduce solution, if you will. This significantly reduces lock contention. My code above is effectively doing the same thing.

As for par_chunks, we can still leverage into_par_iter().chunks(x), however, as mentioned in their codebase, this can be inefficient as it involves allocating intermediate Vecs for the chunks. par_chunks mitigates this issue.

Now, the check if parts.len() == 2 might seem downright bizarre at first glance; however, there is a cheeky reason why that check’s in place. You see, my above solution is not one hundred percent correct! I’ve cut some corners - particularly, mmap is going to return the bytes from my measurements.txt file, and when performing chunking, there is a good chance that Rayon would be splitting chunks in a way that might divide a line into two parts—imagine splitting in the middle of a line, like 'Det' and 'roit;18.9'. Currently, I am handling these cases by ignoring them entirely! This approach reduces the accuracy slightly; however, it saves computation.

The chunk size of one hundred million is something I decided upon after quite a few trials and errors. Rayon also provides a way to tweak the number of threads in their pool using ThreadPoolBuilder::new().num_threads(8).build(). However, I found its default setting to be quite performant. Today, Rayon uses the standard library’s available_parallelism to determine such default values, which, on my M1, was 10 threads.

There are many other optimization nuggets in the code above. For our problem, and given my hardware, mmap provided a considerable improvement over BufReader. Using it means that the OS handles the data at a page level and that the data automatically becomes part of the OS's page cache. This setup means that repeated accesses to the data do not require disk I/O, which can be confirmed by using iostat 1. It shows very minor disk I/O activity. This helps to reduce the runtime after the first execution.

        Mach Virtual Memory Statistics: (page size of 16384 bytes)
        free   active   specul inactive throttle    wired  prgable   faults     copy    0fill reactive   purged file-backed anonymous cmprssed cmprssor  dcomprs   comprs  pageins  pageout  swapins swapouts
        7328   901688    20169   880396        0   122775     7064      461      110       90        0        0     1055964    746289   348729   123603        2        0        1        0        0        0
       11106   899711    20172   878385        0   122968     6906      171        0      110        0        0     1055967    742301   348729   123603        0        0        0        0        0        0
       11128   898929    20173   878198        0   123976     6563      629        0      422        0        0     1055968    741332   348729   123603        0        0        0        0        0        0
       11185   900230    20174   878014        0   122737     7064      345        1      250        0        0     1055969    742449   348727   123603        2        0        0        0        0        0
       10831   899898    20181   878439        0   122908     6924    49602       75      372        0       64     1055978    742540   348727   123603        1        0        9        0        0        0
       10744   899967     8544   890248        0   122837     7060   152319      115      193        0        0     1055978    742781   348727   123603        2        0        1        0        0        0
       10850   900166     5334   893648        0   122398     6912   152576        0       53        0      128     1055979    743169   348727   123603        0        0        0        0        0        0
       10571   900204     2497   896463        0   122507     6912   152520        0      133        1        0     1055980    743184   348727   123603        0        0        0        0        0        0
       10403   900285     2505   896470        0   122563     6919   152352       79      152        0        0     1055991    743269   348727   123603        0        0        9        0        0        0
       10276   899531     2515   895925        0   124009     6901   151377       40      382        0      192     1056003    741968   348727   123603        0        0        9        0        0        0
       10664   899493     2515   896109        0   123529     6751    35071      113      459        0        0     1056001    742116   348720   123603        9        0        1        0        0        0
       10680   900859     2516   896199        0   122073     6912      144        1      102        0        0     1056002    743572   348720   123603        0        0        0        0        0        0
       10599   899580     2517   896177        0   123374     6914      736        0      732        0        0     1056003    742271   348720   123603        0        0        0        0        0        0
       10580   899774     2525   896175        0   123198     6913      895        0      866        0        0     1056011    742463   348720   123603        0        0        0        0        0        0
      

Also, using vmstat, I observed that the number of page faults with mmap were considerably high, with single-digit pageins, and no pageouts. The high page faults might be due to minor page faults - that is, my process is trying to access a page that is not in memory but can be loaded from the page cache. The single-digit pageins essentially validate the iostat result, showing that we are observing very minor disk I/O activity. This means the number of pages actually being read into physical memory from the disk is quite low, thus triggering few major page faults. Moreover, the absence of pageouts is largely due to my 32GB of RAM! It confirms that my system is not experiencing any memory pressure that might warrant swapping memory pages out to disk.

Other optimizations, such as using jemalloc, a faster hashmap called FxHashMap, a faster float parser named fast_float, and processing on bytes, proved to be quite effective. At the compiler level, I leveraged lto = "fat", opt-level = 3, and RUSTFLAGS="-C target-cpu=native" to improve runtime. My manual experiments with SIMD were not quite effective. On aarch64, SIMD is supported through NEON technology, and Rust seems to enable the target-feature=+neon on my M1, which already considers some SIMD optimizations. However, I have a strong hunch that this can be further improved.

Another interesting observation I made was that setting codegen-units=1 resulted in a performance regression. I am quite baffled as to why this is the case, and it requires further investigation on my part.

And that's it, folks! I had a blast working on this problem. If you're interested in exploring other Rust solutions, I highly encourage you to check out some fascinating investigations by Ragnar Groot Koerkamp and Amine Dirhoussi. Happy exploring!