This is a slightly edited chapter from the book “Architecting Streaming Data Pipelines” that I started writing in 2019. I’ve never finished it, but I think the content may be interesting for the readers of this newsletter.
An Ingestion API that’s exposed to producers in the form of a discoverable service is a popular data ingestion technique. HTTP API is a common implementation for an Ingestion API. Other protocols can also be used, like gRPC, but I focus on the HTTP API due to its popularity.
Historically, gathering clickstream data (events generated by web and mobile applications) was one of the primary use cases for streaming systems, as well as data pipelines in general. A little snippet of JavaScript added to every page of a website is enough to capture user activity as long as you have an Ingestion HTTP API available.
Why Build It?
Of course, you can use an off-the-shelf solution:
Products like Google Analytics or Segment provide SDKs that leverage an Ingestion HTTP API internally. However, they’re very opinionated and don’t allow customizations or direct access to the endpoint.
Confluent’s Kafka REST Proxy and Redpanda’s HTTP Proxy. You can deploy and manage them yourself, but they also don’t give much control over the request lifecycle and generally operate at a lower level. They also expose a lot of information about the cluster, so you may want to avoid exposing them publicly.
A custom Ingestion HTTP API can have flexible delivery semantics, handle different types of events, and allow data enrichment and metadata customization.
Overview
From the high-level design perspective, an Ingestion HTTP API has one or many endpoints to accept an event as a payload and publish it to a streaming platform like Apache Kafka.
When using REST, an Ingestion HTTP API endpoint may look like this:
POST /messages/{type}?producer={producer_name}
where type is a type of message and producer_name indicates a specific producer that sent it.
It’s important to highlight that the main goal of the Ingestion HTTP API is to write a message to a streaming platform as fast and as reliable as possible. Streaming platforms like Apache Kafka provide very high scalability and reliability characteristics, which makes them a perfect destination for ingested messages. The pull-based consumption model also makes it easy to tolerate ingestion traffic spikes - consumers will simply lag for a bit and eventually catch up (back-pressure mechanisms are not needed).
Even though the concept of an Ingestion HTTP API looks very simple, the actual implementation usually contains quite a few complex trade-offs. One of the most important ones is a trade-off between synchronous and asynchronous writes.
In the synchronous write model, every HTTP request will be processed synchronously, which means some sort of acknowledgement from the streaming platform should be received first before sending an HTTP response. Let’s look at an example with Kafka Producer:
In this scenario, the request is processed sequentially in the following steps:
An HTTP request is received by the HTTP API framework, parsed and validated. It may be enriched and then transformed into a message for the streaming platform.
The message is passed to the Kafka Producer.
Kafka Producer handles serialization (perhaps transforming a Java class into JSON) and sends a Kafka Message to the Kafka broker. A Kafka Producer uses the acks configuration parameter that dictates what should happen before the message is considered successfully sent:
with acks = 0, the message is sent to the broker and immediately considered to be delivered. This mode should rarely be used in production environments (but we’ll use it below!).
with acks = 1, the message is sent to the broker and only considered to be delivered when the broker (a leader for the partition) replies successfully.
acks = all is similar to acks = 1, but it also waits for acknowledgements from all replicas. This mode provides the highest level of durability. It’s also required for idempotent producers and exactly-once delivery semantics.
Depending on the acks configuration parameter, a successful acknowledgement is returned to the Kafka Producer.
Kafka Producer returns a successful acknowledgement to the HTTP handler.
An HTTP response is generated and sent by the HTTP handler.
The synchronous write model is great for durability but pretty bad for latency. It’s especially noticeable in scenarios with high data volume. As you can see, it involves a few network hops before returning an HTTP response, which also affects throughput.
Let’s now look at the asynchronous write model:
In this scenario, the request is not processed sequentially, instead:
An HTTP request is received by the HTTP API framework, parsed and validated. It may be enriched and then transformed into a message for the streaming platform.
The message is passed to the Kafka Producer asynchronously, which means the caller doesn’t wait until the background Kafka Producer thread returns with the result.
An HTTP response is generated and sent by the HTTP handler, even though the actual write to a Kafka broker has probably not yet succeeded.
Kafka Producer handles serialization and delivery of a Kafka Message to a Kafka broker, following the rules based on the acks configuration parameter.
A successful acknowledgement is returned to the Kafka Producer.
In this scenario, a very low latency (a few milliseconds) can be achieved by not waiting on the acknowledgement from a Kafka broker (so, setting acks = 0). It allows for handling very high throughput without a lot of HTTP API instances. However, any problem with message delivery in step 4 could mean lost data. There are a few ways to reduce data loss:
Retries. Kafka Producer supports an infinite number of retries, so as long as the Kafka cluster eventually becomes available and none of the HTTP API instances are terminated, the affected messages will eventually be delivered.
To make retries even more reliable and tolerate the loss of some HTTP API instances, it’s possible to temporarily accumulate undelivered messages in some kind of data store, like a local disk or distributed cache. Of course, in this case, the delivery guarantee would also depend on the availability of the data store.
The asynchronous write model may look unreliable, but the combination of Kafka broker acknowledgement and Kafka producer retries could give very good results. In the end, it’s quite common to combine both models, but handling most of the writes asynchronously and using the synchronous model only for the most critical types of messages.
Data Enrichment
Data enrichment is another important concern to think about when building Ingestion HTTP APIs. The use cases can vary a lot - geolocation, tracing, joining with a dataset to fill missing fields, etc. However, not all enrichment should happen in the Ingestion HTTP API. Actually, as we’ll see in the coming chapters, the most useful forms of enrichment could involve streaming joins, something that could only happen downstream from the ingestion.
So, what are the enrichment types that can happen in the Ingestion HTTP API? There are a few of them:
Enriching with HTTP request data (IP address, user-agent, custom HTTP headers, etc.). This is probably the most common and the most straightforward one.
Enriching with external data via in-memory cache. Sometimes, it may be useful to join received messages with small datasets to add or replace some payload fields.
Enriching with external data via network calls. This can be used for the same goal as above, as well as for authorization, authentication or validation of requests.
Any kind of network call that’s executed during the HTTP request processing can significantly affect latency, throughput and reliability. It’s pretty clear why it affects processing latency and, therefore, throughput - we saw how a few network hops have the same effect with the synchronous write model. But even if you don’t care about latency or throughput much, any network call adds a potential failure scenario. For example, if you need to authenticate every client before accepting a write, what should the Ingestion HTTP API do when the authentication endpoint is down? This question doesn’t have simple answers; they all come with certain compromises.
So, doing any network calls during request processing is highly discouraged. For simple enrichment operations, it could be possible to keep the datasets cached in memory after fetching them from external endpoints or data stores in runtime. Other operations like validation can happen downstream from the Ingestion HTTP API in routing and processing layers.
Data enrichment raises an important question for any Ingestion HTTP API: what exactly should be written to the streaming platform? For example, if HTTP API receives a JSON payload, should this payload be written to the streaming platform as is?
Message Structure
When writing request processing logic for the Ingestion HTTP API, it usually makes sense to keep the payload as is (so the enrichment process at this stage of the pipeline should generally be applied to metadata only!). The metadata can be represented as a set of headers or through the message envelope. Which approach should be chosen in the Ingestion HTTP API? Is there an alternative or a best practice?
I covered some of these in Approaches for Defining Message Metadata. But there are no simple answers here. In practice, the right solution would be driven by the requirements, including required latency and throughput. Based on these requirements, we can approach it in one of two ways:
Try to keep the originally received payload as is throughout the request processing lifecycle. Treat it as an array of bytes. In this case, there is no need to serialize/deserialize the payload, which notably decreases processing latency and increases throughput. In general, serialization/deserialization could be one of the slowest operations in data pipelines, so any chance to optimize it should be considered. Also, this approach means that the message metadata can be either passed as message headers or the payload itself can be constructed as an envelope. The latter can be problematic for the enrichment since it means deserializing, updating and serializing the envelope, which is something we want to avoid in this case.
Construct the message envelope inside the Ingestion HTTP API. This could mean paying heavy serialization cost, but sometimes it’s necessary. It offers more flexibility and the possibility to provide a layer of unification before writing to the streaming platform (for example, handling backward- and forward-compatibility when accepting requests from different versions of a client). This approach is also slightly less reliable: it requires using a highly defensive programming style to avoid potential null pointer exceptions and similar issues. And any exception could mean lost data in this case.
Ingestion API and Direct Publishing
Can an Ingestion API and direct event publishing (via a Kafka Producer) be used together? Or is one preferred over the other? Event publishing can actually mean a few different things depending on the context, so let’s look at a few different use cases.
Imagine that we need to design a data ingestion mechanism for a typical web application. You can approach it the following way:
The front-end side of the web application runs in a web browser and sends messages to the HTTP API, which forwards them to the Kafka cluster. HTTP API is really the only option here.
The back-end side of the application uses an SDK that, in the end, calls a Kafka Producer to send messages directly to the Kafka cluster. Connecting the back-end side directly to a streaming platform comes with pros and cons.
Pros:
Simplicity: just configure a Kafka Producer.
Low latency, minimum network hops.
Cons:
Each producer needs to keep a persistent TCP connection per broker. With tens or hundreds of brokers and thousands or tens of thousands of producers, this can become challenging; some tuning will be needed. Also, each time a broker is restarted (due to maintenance or a failure), all connections have to be re-established.
An alternative approach could standardize using the HTTP API in every case:
In this scenario, both the front-end part of the web application and the back-end one send messages to the Ingestion HTTP API, which forwards them to the Kafka cluster.
Pros:
Flexibility, especially with regards to maintenance: persistent TCP connections are kept between the HTTP API instances and the streaming platform. So, any kind of maintenance or failure of the brokers doesn’t affect the producers directly.
Possibility to have simple data enrichment. This is especially useful when the Ingestion API is exposed to third parties, and some kind of pre-processing is needed.
Cons:
For the back-end side, there is slightly increased latency and an extra component that can fail. Data ingestion is probably one of the most important things to get right when building a data pipeline, so eliminating every possible step that can fail is important. But the pros listed above can definitely outweigh this.
As you can see, an Ingestion API and an event publishing library can work together nicely. HTTP API becomes an implementation detail, simply how messages are delivered to the destination. Considering this, one can argue that an HTTP client on the front-end can also be turned into an SDK. It would be fairly limited, but it’s quite possible to get some useful features like schema management and validation working even in the limited browser environment.
Good overview, particularly the latency/durability tradeoffs between sync/async.
Do you have any views on the "reverse"? i.e., an "extraction API" or, in other words, the ability to consume from Kafka via HTTP? I believe the Confluent REST Proxy supports it but I haven't used it. I was always under the impression that the HTTP/KAFKA semantics aligned better when producing than when consuming.
Great read. If you ever finish the book, Ill be first in line to buy it. I'm working on an ingestion service for a project of mine currently with go, redpanda, and scylla, so this is right up my alley. Thanks for sharing!!