I was chair of the committee at Stanford for our university reports. We put out lots and lots of reports from all phases of the department through these years. We had a big mailing list. People were also trading their reports with us. We had to have a massive bookkeeping system just to keep the correspondence, so that the secretaries in charge of it could know who had paid for their reports, who we were sharing with. All this administrative type of work had to be done.
It seemed like just a small matter of programming to do this. I had a grad student who volunteered to do this as his master’s project; to write-up program that would take care of all of the administrative chores of the Stanford tech reports distribution. He turned in his term paper and I looked at it superficially and I gave him an A on it, and he graduated with his master’s degree.
A week later, the secretary called me up and said, “Don, we’re having a little trouble with this program. Can you take a look at it for us?” The program was running up at the AI lab, which I hadn’t visited very often. I went up there and took a look at the program. I got to page five of the program and I said, “Hmmm. This is interesting. Let me make a copy of this page. I’m going to show it to my class.” [It was] the first time I saw where you change one symbol on the page and you can make the program run 50 times faster. He had misunderstood a sorting algorithm. I thought this was great. Then I turned to the next page and he has a searching algorithm there for binary search. I said, “Oh, he made a very interesting error here. I’ll make a copy of this page so I can show my class next time I teach about the wrong way to do binary search.” Then I got to page eight or nine, and I realized that the way he had written his program was hopelessly wrong.
He had written a program that would only work on the test case that he had used in his report for the master’s thesis, that was based on a database of size three or something like this. If you increased the database to four, all the structures would break down. It was the most weird thing. I would never conceive of it in my life. He would assume that the whole database was being maintained by the text editor, and the text editor would generate an index, the way the thing did.
Anyway, it was completely hopeless. There was no way to fix the program. I thought I was going to spend the weekend and give it to the secretary on Monday and she could work on it. There was no way. I had to spend a month writing a program that summer -- I think it was probably ’75, ’76 -- to cover up for my terrible error of giving this guy an A without seeing it. The report that he had, made it look like his program was working. But it only worked on that one case.
- Donald Knuth in his Turing Award Interview
My introductory quotes are getting bigger and more crazy, aren't they? Anyway, in this blog post, we will be doing things a bit differently. It is not one of my usual computer sciency explorations; instead, we will be diving head-first into the One Billion Row Challenge.
A few months back, my friend called me to discuss this challenge, but I did not have the bandwidth at the time, so I am a bit late to the party. This challenge was introduced in January 2024 by Gunnar Morling and lasted an entire month. The original challenge was in Java and involved an innocently simple problem:
Your mission, should you decide to accept it, is deceptively simple: write a Java program for retrieving temperature measurement values from a text file and calculating the min, mean, and max temperature per weather station. There’s just one caveat: the file has 1,000,000,000 rows!
That’s it! Gunnar Morling’s blog post has further details on it, including some constraints he put in to make this a bigger challenge. However, in this blog post, we will be doing things a bit differently. It will be a no-holds-barred match with no constraints on language, external dependencies, or hardware. Anything goes!
Let us start by creating one billion temperature measurement values
stored in a text file called measurements.txt
. These are
randomly generated by first building the 1brc project using Apache Maven
and then running the ./create_measurements.sh
shell script
with 1000000000 as its argument. My measurements.txt
file
is roughly 13GB and looks something like this:
[rust] head measurements.txt
Lhasa;13.4
Detroit;18.9
Manila;50.5
Riga;10.3
Zagreb;0.0
Nassau;46.2
Tucson;19.7
Vancouver;10.0
San Salvador;34.7
Nouakchott;35.1
Note that there are only 413 distinct cities, and the city names are validly UTF-8 encoded.
I will be testing my solutions on a 2021 MacBook Pro. It comes with an Apple M1 Pro chip containing one processor with 10 cores - 8 performance and 2 efficiency. The memory is 32 GB. According to a 2021 investigation by AnandTech, the M1 Pro has:
When I first heard of this problem, I wondered what an absolutely naive
solution might look like. My first instinct was to use Python with
Pandas. I
imagined using read_csv()
with chunksize
set
to a million to read the text file in chunks or batches, and iterating
over each row in the chunk using iterrows()
, with a Python
dictionary storing the calculated mean, max, and min for each unique
location. Well, this solution was laughably naive and took a whopping
16 seconds just to parse a million rows, or around 5 hours in
total.
Now, there is no need to box the data in a Series
, which
can be quite slow as it might introduce overheads such as data type
checks and data copying. We can improve upon this by directly processing
each chunk. Think about it, for each chunk, all we need to do is group
by location and then perform max, min, sum, and count
aggregations on the temperature column. Here is the code:
import pandas as pd
import time
class Measurements:
def __init__(self) -> None:
self.path = "./measurements.txt"
def parse(self):
size = 10**7
results = pd.DataFrame()
start_time = time.time()
index = 1
with pd.read_csv(
self.path, chunksize=size, delimiter=";", names=["loc", "temp"]
) as reader:
for chunk in reader:
print(f"Chunk {index}")
index += 1
curr_df = chunk.groupby("loc")["temp"].agg(
["sum", "max", "min", "count"]
)
if not results.empty:
results = results.join(curr_df, how="outer", rsuffix="_")
results["min"] = results[["min", "min_"]].min(axis=1)
results["max"] = results[["max", "max_"]].max(axis=1)
results["count"] = results[["count", "count_"]].sum(axis=1)
results["sum"] = results[["sum", "sum_"]].sum(axis=1)
results.drop(
["sum_", "min_", "max_", "count_"], axis=1, inplace=True
)
else:
results = curr_df
results["mean"] = results["sum"] / results["count"]
results.drop(["sum", "count"], axis=1, inplace=True)
end_time = time.time()
print(f"Result is: {results} in {end_time - start_time} seconds")
if __name__ == "__main__":
Measurements().parse()
I bumped up the chunk size to ten million. For each chunk, except the first one, we perform an outer join with the previously consolidated chunk and calculate the new min, max, count, and sum, eventually dropping the unnecessary columns. These should not be super expensive operations, given that the number of distinct locations is only 413. Once all the chunks have been processed, we calculate the mean. All in all, this code takes between 157 and 160 seconds to run; however, we have reduced the time from 5 hours to roughly 3 minutes.
Now, to be completely honest, writing these solutions in Python was my excuse to compare the performance of Polars against Pandas. Here’s a very similar code I developed with Polars after a few iterations:
import polars as pl
import time
class Measurements:
def __init__(self) -> None:
self.path = "./measurements.txt"
def parse(self):
size = 10**7
results = pl.DataFrame()
start_time = time.time()
df = pl.read_csv(
self.path, separator=";", has_header=False, new_columns=["loc", "temp"]
)
for i in range(100):
start = i * size
chunk = df.slice(start, size)
aggr_chunk = chunk.group_by("loc").agg(
[
pl.col("temp").sum().alias("sum"),
pl.col("temp").min().alias("min"),
pl.col("temp").max().alias("max"),
pl.count("temp").alias("count"),
]
)
if results.is_empty():
results = aggr_chunk
else:
results = results.join(aggr_chunk, how="outer", on=["loc"], suffix="_")
results = results.with_columns(
pl.when(results["min"] < results["min_"])
.then(results["min"])
.otherwise(results["min_"])
.alias("min")
)
results = results.with_columns(
pl.when(results["max"] > results["max_"])
.then(results["max"])
.otherwise(results["max_"])
.alias("max")
)
results = results.with_columns(
(results["sum"] + results["sum_"]).alias("sum")
)
results = results.with_columns(
(results["count"] + results["count_"]).alias("count")
)
results = results.drop(["sum_", "min_", "max_", "count_", "loc_"])
results = results.with_columns(
(results["sum"] / results["count"]).alias("mean")
)
results = results.drop(["sum", "count"])
end_time = time.time()
print(f"Result is: {results} in {end_time - start_time} seconds")
if __name__ == "__main__":
Measurements().parse()
Note that this stores the entire DataFrame
in memory. In
fact, it consumes around 22 GB of memory, certainly not for the
faint-hearted! On a clean run, right after a device shutdown, this took
about 35.6 seconds to run. Here is the metrics from
vmstat
:
[13:23:14][rust] vm_stat 2 Mach Virtual Memory Statistics: (page size of 16384 bytes) free active specul inactive throttle wired prgable faults copy 0fill reactive purged file-backed anonymous cmprssed cmprssor dcomprs comprs pageins pageout swapins swapouts 1401313 287882 113712 182609 0 70389 8976 97 0 92 0 0 318338 265865 0 0 0 0 0 0 0 0 1401247 281549 113701 182609 0 76838 7458 2440 0 422 0 0 318366 259493 0 0 0 0 26 0 0 0 1344265 318681 114498 203016 0 75543 8783 56608 506 35544 0 0 338622 297573 0 0 0 0 19813 0 0 0 864526 561256 114503 445900 0 69660 10150 480577 0 309768 0 0 509359 612300 0 0 0 0 170728 0 0 0 385414 800484 114462 684676 0 70902 10173 481426 156 310140 0 1 679889 919733 0 0 0 0 170516 0 0 0 2998 991647 60060 930271 0 70774 10128 437358 0 281834 22 36 780960 1201019 0 0 0 0 155482 0 0 0 3072 818442 507 817018 0 72451 3565 228953 0 147344 35142 5430 756420 879547 436207 343369 46 437313 81402 0 0 0 1068 537773 2340 533868 0 75359 2395 190609 158 122479 2914 6 823667 250314 1166425 905495 154 730383 67279 1 0 0 3876 428142 5383 421634 0 76724 0 238523 0 151745 68897 2574 780247 74912 1463738 1120217 2400 300021 83512 4 0 0 4016 394213 4842 387982 0 80536 0 176237 115 111582 107 2512 714570 72467 1572895 1184378 6170 115616 61976 4 12 31300 7327 392760 6049 385243 0 78233 0 118316 4 74303 361 2412 710081 73971 1645602 1186302 12307 85324 34450 1 13392 72460 ..... 3635 640394 60240 578572 0 77763 2407 6529598 122408 2988856 114617 23555 483459 795747 938561 695469 763073 1705248 1090289 76 90672 115568 4547 700072 44036 654893 0 77802 2679 559162 0 181567 0 0 428252 970749 768423 574576 170039 0 635 30 12525 4504 17253 755314 10058 745255 0 74579 2698 522784 102 161677 0 2 394292 1116335 611535 453578 156895 0 117 0 6060 0 5084 830593 2525 826709 0 79976 2788 526357 1 177334 0 58 366560 1293267 445089 311039 166437 0 0 3 5820 0 30467 894219 2033 892185 0 77778 2814 534056 108 189506 0 10 345828 1442609 282009 159268 163070 0 338 11 1444 0 1557493 4198 2999 376232 0 81118 2642 430053 3 118525 2 0 346932 36497 153880 33865 127518 0 775 0 4 0 1555482 8312 3616 376241 0 78430 4040 2290 0 554 0 0 347720 40449 152923 33783 956 0 472 0 19 0 1555036 10501 3727 376192 0 76702 4082 1100 102 422 0 0 347854 42566 152758 33766 170 0 32 0 0 0
As can be seen from vm_stat
, the initial free memory pages
are quite high, indicating plenty of available memory. Furthermore,
there are low pageins
/pageouts
and
swap
activity, typical of an idle system. As the script starts (from around
row 4 onwards), there is a significant drop in free memory, indicating
that we are loading data into memory. This is further confirmed by the
spike in pageins
, quite possibly from our
measurements.txt
file.
During the script execution, there is a high number of active pages,
which indicates that memory is heavily utilized throughout the script's
execution. There is also some swapping activity, which might suggest
that the system is beginning to write data back to disk. The fascinating
part is the comprs
and dcomprs
columns, which
might suggest that there is some activity in memory compression and
decompression, perhaps for managing memory pressure.
If memory is a constraint, read_csv_batched
can also be
leveraged. The parse method then gets modified to something like:
def parse(self):
results = pl.DataFrame()
reader = pl.read_csv_batched(self.path, separator=';', batch_size=10**7, has_header=False, new_columns=['loc', 'temp'])
for i in range(100):
chunk = reader.next_batches(1)[0]
This modification took about 67 seconds to run. However, with a batch_size of approximately 100 million, I managed to reduce the runtime to 49.34 seconds while consuming around 4 GB of memory.
At this point, I was three hours into a Sunday afternoon and felt battle-ready to tackle this with Rust. I’ve been using Rust at my workplace, Neurelo, for about nine months now. In my open-source world, I’ve also created a few learning projects in Rust, such as a Chip-8 interpreter and a CLI-based air traffic control simulator.
As with Python, I quickly chalked up a naive solution in Rust.
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufRead, BufReader};
#[derive(Debug)]
struct Weather {
max: f32,
min: f32,
sum: f32,
count: usize,
}
impl Weather {
fn new(temp: f32) -> Self {
Self {
max: temp,
min: temp,
sum: temp,
count: 1,
}
}
fn update(&mut self, temp: f32) {
self.max = f32::max(self.max, temp);
self.min = f32::min(self.min, temp);
self.sum += temp;
self.count += 1;
}
}
fn main() -> io::Result<()> {
let file = File::open("../measurements.txt")?;
let reader = BufReader::new(file);
let mut data = HashMap::new();
for line in reader.lines() {
let line = line.unwrap();
let parts: Vec<&str> = line.split(';').collect();
let loc = parts[0].to_owned();
let temp = parts[1].parse::<f32>().unwrap();
data.entry(loc)
.and_modify(|prev: &mut Weather| prev.update(temp))
.or_insert(Weather::new(temp));
}
println!("Data: {:?}", data);
Ok(())
}
Executing it with the time command, we see 112.63s user
,
2.81s system
, 99% CPU
, totaling
1:56.25. Two minutes is not bad for a start; however, there are a
number of optimizations we could potentially perform on this. And that
is exactly what I did for the next one and a half days, chipping away at
the rough edges and pushing the runtime down as much as possible. Here’s
how my final code looked -
my less than 100 LOC solution to the one billion row challenge.
use fxhash::{FxBuildHasher, FxHashMap};
use memmap2::Mmap;
use rayon::prelude::*;
use std::fs::File;
use std::io::{self};
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
#[derive(Debug)]
struct Weather {
max: f32,
min: f32,
sum: f32,
mean: f32,
count: f32,
}
impl Weather {
fn new(temp: f32) -> Self {
Self {
max: temp,
min: temp,
sum: temp,
mean: 0.0,
count: 1.0,
}
}
fn update(&mut self, temp: f32) {
self.max = f32::max(self.max, temp);
self.min = f32::min(self.min, temp);
self.sum += temp;
self.count += 1.0;
}
fn merge(&mut self, weather: &Weather) {
self.max = f32::max(self.max, weather.max);
self.min = f32::min(self.min, weather.min);
self.sum += weather.sum;
self.count += weather.count;
}
}
fn main() -> io::Result<()> {
let file = File::open("../measurements.txt")?;
let buffer = unsafe { Mmap::map(&file) }.unwrap();
let mut data = buffer
.par_chunks(100_000_000)
.map(|chunk| {
let mut local_data = FxHashMap::with_capacity_and_hasher(512, FxBuildHasher::default());
for line in chunk.split(|&b| b == b'\n') {
let parts = line.split(|&b| b == b';').collect::<Vec<_>>();
if parts.len() == 2 {
if let Ok(temp) = fast_float::parse::<f32, &[u8]>(parts[1]) {
let loc: std::borrow::Cow<'_, str> = String::from_utf8_lossy(parts[0]);
local_data
.entry(loc)
.and_modify(|prev: &mut Weather| prev.update(temp))
.or_insert(Weather::new(temp));
}
}
}
local_data
})
.reduce(
|| FxHashMap::with_capacity_and_hasher(512, FxBuildHasher::default()),
|mut a, b| {
for (loc, weather) in b {
a.entry(loc)
.and_modify(|prev| prev.merge(&weather))
.or_insert(weather);
}
a
},
);
data.iter_mut().for_each(|(_, weather)| {
weather.mean = weather.sum / weather.count;
});
println!("Data: {:?} {:?}", data, data.len());
Ok(())
}
This code takes between 5.5 to 6.5 seconds to run on my system.
Here is the time from my most recent run: 53.06s user
,
1.67s system
, 924% CPU
, totaling
5.917
seconds.
One of the biggest improvements I saw was by using
Rayon, a data
parallelism library similar to
OpenMP. My earliest
iteration with Rayon involved using
reader.lines().collect::<Vec<_>>().into_par_iter()
with a mutex lock on a shared HashMap
. However, this
approach meant that I would have to use locking on every line I was
processing. Consequently, each thread in the parallel iterator had to
wait for access to the shared HashMap
, considerably
reducing any gains I might have obtained from concurrent execution. It
was terribly slow.
The way such issues are usually mitigated is by using a thread-local
data structure (a HashMap
in our scenario). That is, each
thread builds its own local HashMap
, and once the
processing is completed, we merge these local HashMaps
into
a shared one - a map-reduce solution, if you will. This
significantly reduces lock contention. My code above is effectively
doing the same thing.
As for par_chunks
, we can still leverage
into_par_iter().chunks(x)
, however, as mentioned in their
codebase, this can be inefficient as it involves allocating intermediate
Vec
s for the chunks. par_chunks
mitigates this
issue.
Now, the check if parts.len() == 2
might seem downright
bizarre at first glance; however, there is a cheeky reason why that
check’s in place. You see,
my above solution is not one hundred percent correct! I’ve cut
some corners - particularly, mmap
is going to return the
bytes from my measurements.txt
file, and when performing
chunking, there is a good chance that Rayon would be splitting chunks in
a way that might divide a line into two parts—imagine splitting in the
middle of a line, like 'Det' and 'roit;18.9'. Currently, I
am handling these cases by ignoring them entirely! This approach
reduces the accuracy slightly; however, it saves computation.
The chunk size of one hundred million is something I decided upon after
quite a few trials and errors. Rayon also provides a way to tweak the
number of threads in their pool using
ThreadPoolBuilder::new().num_threads(8).build()
. However, I
found its default setting to be quite performant. Today, Rayon uses the
standard library’s available_parallelism
to determine such
default values, which, on my M1, was 10 threads.
There are many other optimization nuggets in the code above. For our
problem, and given my hardware, mmap
provided a
considerable improvement over BufReader
. Using it means
that the OS handles the data at a page level and that the data
automatically becomes part of the OS's page cache. This setup means that
repeated accesses to the data do not require disk I/O, which can be
confirmed by using iostat 1
. It shows very minor disk I/O
activity. This helps to reduce the runtime after the first execution.
Mach Virtual Memory Statistics: (page size of 16384 bytes) free active specul inactive throttle wired prgable faults copy 0fill reactive purged file-backed anonymous cmprssed cmprssor dcomprs comprs pageins pageout swapins swapouts 7328 901688 20169 880396 0 122775 7064 461 110 90 0 0 1055964 746289 348729 123603 2 0 1 0 0 0 11106 899711 20172 878385 0 122968 6906 171 0 110 0 0 1055967 742301 348729 123603 0 0 0 0 0 0 11128 898929 20173 878198 0 123976 6563 629 0 422 0 0 1055968 741332 348729 123603 0 0 0 0 0 0 11185 900230 20174 878014 0 122737 7064 345 1 250 0 0 1055969 742449 348727 123603 2 0 0 0 0 0 10831 899898 20181 878439 0 122908 6924 49602 75 372 0 64 1055978 742540 348727 123603 1 0 9 0 0 0 10744 899967 8544 890248 0 122837 7060 152319 115 193 0 0 1055978 742781 348727 123603 2 0 1 0 0 0 10850 900166 5334 893648 0 122398 6912 152576 0 53 0 128 1055979 743169 348727 123603 0 0 0 0 0 0 10571 900204 2497 896463 0 122507 6912 152520 0 133 1 0 1055980 743184 348727 123603 0 0 0 0 0 0 10403 900285 2505 896470 0 122563 6919 152352 79 152 0 0 1055991 743269 348727 123603 0 0 9 0 0 0 10276 899531 2515 895925 0 124009 6901 151377 40 382 0 192 1056003 741968 348727 123603 0 0 9 0 0 0 10664 899493 2515 896109 0 123529 6751 35071 113 459 0 0 1056001 742116 348720 123603 9 0 1 0 0 0 10680 900859 2516 896199 0 122073 6912 144 1 102 0 0 1056002 743572 348720 123603 0 0 0 0 0 0 10599 899580 2517 896177 0 123374 6914 736 0 732 0 0 1056003 742271 348720 123603 0 0 0 0 0 0 10580 899774 2525 896175 0 123198 6913 895 0 866 0 0 1056011 742463 348720 123603 0 0 0 0 0 0
Also, using vmstat
, I observed that the number of page
faults with mmap
were considerably high, with single-digit
pageins
, and no pageouts
. The high page faults
might be due to minor page faults - that is, my process is trying
to access a page that is not in memory but can be loaded from the page
cache. The single-digit pageins
essentially validate the
iostat
result, showing that we are observing very minor
disk I/O activity. This means the number of pages actually being read
into physical memory from the disk is quite low, thus triggering few
major page faults. Moreover, the absence of pageouts
is
largely due to my 32GB of RAM! It confirms that my system is not
experiencing any memory pressure that might warrant swapping memory
pages out to disk.
Other optimizations, such as using
jemalloc, a
faster hashmap called
FxHashMap, a faster float
parser named
fast_float, and
processing on bytes, proved to be quite effective. At the compiler
level, I leveraged lto = "fat"
, opt-level = 3
,
and RUSTFLAGS="-C target-cpu=native"
to improve runtime. My manual experiments with SIMD were not quite effective. On
aarch64
, SIMD is supported through
NEON technology, and Rust seems to enable the target-feature=+neon
on my
M1, which already considers some SIMD optimizations. However, I have a
strong hunch that this can be further improved.
Another interesting observation I made was that setting
codegen-units=1
resulted in a performance regression. I am quite baffled as to
why this is the case, and it requires further investigation on my part.
And that's it, folks! I had a blast working on this problem. If you're interested in exploring other Rust solutions, I highly encourage you to check out some fascinating investigations by Ragnar Groot Koerkamp and Amine Dirhoussi. Happy exploring!
←