Recently, I’ve enjoyed interviewing Richard Artoul, WarpStream’s co-founder. I hope this interview will be interesting for people who are working on the new tools in the data streaming space.
Yaroslav Tkachenko:
Hi Richie, could you introduce yourself?
Richard Artoul:
Hey, I'm Richie. I'm the co-founder of WarpStream Labs, along with Ryan Worl. WarpStream is a Kafka protocol-compatible data streaming system, that is built directly on top of object storage. The main value of what we're doing is that WarpStream eliminates all local disks in the system. So you can deploy Kafka into your cloud account and point it at an object storage bucket. It's just a Go binary. You never have to worry about partition rebalancing or losing your disks, or durability or any of those things.
It's all just handled automatically. And it also eliminates interzone networking fees, which is the other big thing, especially for high-volume Kafka use cases, and ends up dominating the cost of the workload.
Yaroslav Tkachenko:
The last time I checked, the Kafka protocol actually includes more than 60 (!) different message types, and obviously, because of the architecture, not all of them should be implemented, but you still need to implement a big chunk of those. So, how do you even approach this kind of exercise? Do you simply implement features one by one, or do you look at end-to-end scenarios, e.g. implementing everything for a consumer or a producer... How do you do that?
Richard Artoul:
We try and do it very use case focused. It's actually funny because WarpStream started its life as a Kinesis protocol-compatible system, and we still have that protocol layer. We started with basic integration test passing, but then what we do is we run these continuous workloads in our staging and production environments. And we're working on adding new use cases in those workloads and figuring out which parts of the protocol are required to do that.
So, the way we started was: can we just write some data into this thing and store it? That required implementing the obvious one - the Producer API, but there's also the API Versions API, which just declares which part of the protocol you support and the Metadata API, so the clients know which brokers to talk to.
Once we had that, we thought: let's now have something that can write and read, but you have to specify which partitions, and it's not using consumer groups. So we got all of that working and then decided to improve our integration test to use consumer groups and deploy that into our workloads. And with consumer groups, there's a lot of protocol messages that go into that, and that was a lot of work. Once we had that, we decided to test it with a bunch of different clients. So we run continuous testing all the time in multiple environments with four different clients, and that usually uncovers some other stuff. And then there's just some reading of the protocol too.
But we really do try to focus on use cases, because, there just is so much to do, and we always want to be working towards something someone can actually use. And then, after we announced it, we had a bunch of people try it with different software like Kafka Streams and Flink. Some software depends on describing the broker configuration, which doesn't matter for WarpStream, but it doesn't work if you don't support that. So then we added support for faking some of the broker configurations that were needed.
Yaroslav Tkachenko:
You mentioned integration testing, which is probably a very big deal when you're building something that's API-compatible. So, how did you come up with those integration tests? Do you just write them by hand? Do you try to copy existing tests from Kafka itself? How do you think about different languages and different versions?
Richard Artoul:
Luckily, the Kafka protocol is very versioned, so you can declare in the protocol itself which features of the protocol you support and what versions of the protocol you support. We try and test with as many clients as we can. We've taken in a bunch of clients and wrapped them so we can run the same test repeatedly with different clients and make sure it works with all of them. The thing I think that we pay the most attention to, probably more than those raw protocol tests, is actually our correctness tests.
So we have these tests, which you can call property testing or fuzz testing, they basically run all the time. They're constantly writing data and consuming data. And then, we use these side data structures to track every message that we've written. On the other side, we can track that every message that was written actually gets pulled out and in the right order. We also inject a bunch of faults into the system. And we do it in a way so that we can perform deterministic replay with another tool. That allows us to have really high confidence that we're never losing data or never corrupting data.
Finally, there is a layer that is essentially translating the Kafka protocol into our internal APIs. And that's mostly handled with integration tests; we try and test everything. We have one test that literally uses all of those shell scripts that are in the Kafka GitHub repository. It just shells out to all of them and asserts that they print the same thing as real Kafka does.
Yaroslav Tkachenko:
That type of correctness testing reminded me of Jepsen. Was it inspired by it, maybe?
Richard Artoul:
Yeah, heavily! Ryan and I used FoundationDB for several years at Datadog when we were working on Husky. So we’re big fans of simulation testing and fuzz testing. I just think it's really hard to have any confidence in a system like this with writing tests by hand. You really have to blast them with data, introduce tons of concurrency, and introduce faults. The thing I always tell engineers I'm working with: if your software can't handle injected faults for a couple of minutes in an integration test, it has no chance of surviving in production.
And the way we do fault injection, we can actually enable that in our staging environment as well. So we run WarpStream agents with actual clients and consumers in our staging environments with fault injected there as well, which helps a lot.
Yaroslav Tkachenko:
You mentioned Kinesis, and on the website it says that you support both Kinesis and Kafka protocols. I did not know that you started with Kinesis, so maybe that's half of the answer, but did it increase the scope, and how do you internally reconcile Kinesis with Kafka? Do you have some kind of internal data model that maps to both?
Richard Artoul:
We have an internal data model, and it actually looks a little bit more like Kinesis. Well, in terms of the naming conventions. Internally in our system, we refer to things as shards, like Kinesis does, not partitions, mostly because that's what we started with. I think the reason that we started with Kinesis is because, in some ways, the protocol is simpler, it's an HTTP JSON API, so you can focus on the core storage engine without spending a bunch of time debugging the custom Kafka TCP protocol.
That helped us a lot in the beginning. But really, the reason we started with it is because we liked the Kinesis model of being able to split and merge shards. That's a thing that Kafka doesn't support today. I know they're talking about adding it, actually. Which could be really interesting. That's one of the options they're considering for partitionless topics. But we found that to be an elegant solution, and we wanted it to be something that we supported internally. So we started with that, but we were always planning to support Kafka. And the more and more we talked to people, it felt like the pain was the greatest with Kafka users. So, we decided to focus on that protocol. But you'd be surprised how similar they are.
Kinesis has this idea of splitting and merging shards, but if you don't ever do that, it's very easy to translate to the Kafka protocol. Now, of course, Kinesis has no concept of consumer groups. We had to go and add all that logic, which was really hard to get.
Yaroslav Tkachenko:
If I'm not mistaken, WarpStream uses some proprietary protocol for files, right? You're not using something like Parquet. So, how did you come up with that? Why not use something that already exists?
Richard Artoul:
I would say that we do a bunch of really interesting stuff in our cloud control plane that we don't talk about a lot yet. But our file format is not that interesting. I don't think there's any secret sauce there. It's basically a bunch of Protobuf headers followed by opaque byte sequences for the batches. We didn't do it because we thought we had this really great idea for the file format.
Ryan and I have eight years of experience building databases, and to us, if you're building a database, tight integration between the file format and the storage engine is really key for performance reasons.
Especially if we were writing it in Java, it would be one thing - because the Parquet libraries are really good. In Go, you end up spending all the time fighting the Parquet libraries or other things to get the performance that you want. And we just really wanted full control there. Especially because we do a ton of compaction, so we're interacting with the file format a lot. In the end, the file format is custom because we wanted full control over it.
We considered Parquet because we thought it'd be super cool if the file format was Parquet on S3, and you could just query it. But because we're running active compactions on that data set, you would never actually be able to query that data consistently anyway. Because the file you're reading might disappear, or you might see duplicates. If you're not going through the WarpStream API, all bets are off. But we really like the idea of Parquet and streaming Iceberg. So we will probably offer the ability to take any topic and also have it emitted into your S3 bucket in a stable way as Iceberg-compatible Parquet. And that would allow you to just point something like Trino or Athena to it and query it.
Yaroslav Tkachenko:
You mentioned compaction, and it's a very big deal because you end up creating a lot of small files. You don't currently support compacted topics. But that's something you're planning to do as far as I know. In the scenario where you already have an existing topic, and you want to enable compaction, would you go and apply it historically as well?
Richard Artoul:
So you have an existing topic, and you want to switch the retention policy to compacted, right?
Yaroslav Tkachenko:
Yes. What happens in this case? How do you think that can work with the model that you have?
Richard Artoul:
We've been looking into this a lot, and the guarantees provided, from what I can tell by the compacted topics, are not really that strong. It's mostly an optimization around infinite retention for a given key: please don't fill up my entire disk because I write the same key over and over again. And so I think there are two things there. One, because all the data is stored in S3, the storage is so cheap. So you don't have to rush to get rid of all the extra keys as fast as you would want to if it was running on local SSDs. That's part of it.
And because we've separated storage and compute, we have a lot of options for the compacted topics because we don't even have to run the compaction that cleans up the data on the same nodes that are ingesting the data or serving the data or doing the other types of compactions. So I think we have a lot of options there, and we'll be able to enable it after the fact.
Yaroslav Tkachenko:
So maybe using a special set of agents just for compaction?
Richard Artoul:
Yeah. I don't think that'll be the default because a lot of workloads really just aren't that big or aggressive. We really like the idea that you can deploy three agents in three AZs, and they can do writes, reads, compactions, and we'll do the scheduling for you to make sure nothing's overloading anything else. And that mostly works. But for really large use cases for people who really want strong isolation, we will allow you to configure the agents.
We'll have a couple of roles, and then you can choose the roles you want each deployment of agents to run.
Yaroslav Tkachenko:
It looks like, at least initially, WarpStream was designed to address the cost of networking, right? That's the biggest selling point. Since then, have you found another area where this architecture also shines?
Richard Artoul:
It just makes the operations so much easier. When we first started doing this, we thought it would only be for people with these giant Kafka workloads doing gigabytes per second. And most people don't even understand the networking problem. Why would you? It's so unobvious. And even trying to figure it out from your Kafka bill is hard. You can look at an AWS bill, and you see really high interzone networking. I know that's Kafka, but it's not really attributed in any sense. But yeah, the ease of operations, not having any local disks, and not dealing with partition rebalancing are huge.
We have one design partner we're working with who's actually running the system in production, and they're moving telemetry data through it.
Before this, they were using another Kafka protocol-compatible system that uses local storage. And they didn't want to figure out how to do the clustering. So they just ran one really big box, and it was over-provisioned. With WarpStream, they deploy a very small number of agents, and they autoscale on CPU, so when their CPU usage gets high, more Kubernetes pods are added. When CPU usage goes low, Kubernetes pods go away.
They're an observability provider. So they have these very spiky peaky workloads: for example, one of their customers has an outage, and suddenly there's a huge spam of error logs getting ingested, and then it goes away. So they've gone from having this really large over-provisioned single node thing to simple autoscaling, a setup that they don't have to worry about.
Yaroslav Tkachenko:
Finally, if you were to start this from scratch today, what advice would you give to yourself?
Richard Artoul:
I would have probably started with the Kafka protocol. I think Kinesis is cool, and I'm glad that we have it. I'm sure it'll eventually help someone. But all the gravity really is around the Kafka protocol. So I definitely would've done that. And then we spent a ton of time working out the details. I would say I would've gotten consumer groups right the first time, but part of the reason it was hard is because there's just a bunch of undocumented stuff in that protocol.
And the way we ended up sorting it out was going line by line through the consumer group protocol code in Kafka, literally just line by line. So, every hack, every weird thing, we included it. The first time we implemented consumer groups, we attacked it from a first principal's perspective. And I think for something that would work, e.g. if it was a server-side protocol. But because the clients participate in the protocol so much, you have to do it one-to-one and make it look exactly like Kafka. So I think we could have saved some time there for sure.
Yaroslav Tkachenko:
Great! Any closing words?
Richard Artoul:
We're in developer preview. If you wanna talk to us, we have a community Slack. So come join us there. And if you have large workloads with high volumes of telemetry data, we think that's a use case where WarpStream really shines right now. And we'd love to talk to you.