Pinterest Tiered Storage for Apache Kafka®️: A Dealer-Decoupled Strategy | by Pinterest Engineering | Pinterest Engineering Weblog | Sep, 2024

Pinterest Engineering
Pinterest Engineering Blog

20 min learn

Sep 17, 2024

Jeff Xiang | Senior Software program Engineer, Logging Platform; Vahid Hashemian | Workers Software program Engineer, Logging Platform

Relating to PubSub options, few have achieved increased levels of ubiquity, neighborhood help, and adoption than Apache Kafka®️, which has develop into the business customary for knowledge transportation at giant scale. At Pinterest, petabytes of knowledge are transported by PubSub pipelines each day, powering foundational programs akin to AI coaching, content material security and relevance, and real-time advert bidding, bringing inspiration to lots of of tens of millions of Pinners worldwide. Given the continual progress in PubSub-dependent use instances and natural knowledge quantity, it grew to become paramount that PubSub storage have to be scaled to fulfill rising storage calls for whereas decreasing the per-unit price of storage.

Tiered Storage is a design sample that addresses this downside by offloading knowledge usually saved on dealer disk to a less expensive distant storage, akin to Amazon S3®️. This permits the brokers themselves to maintain much less knowledge on costly native disks, decreasing the general storage footprint and price of PubSub clusters. MemQ is a PubSub answer that maximally employs this design sample by retaining all knowledge in object storage, eliminating the necessity for native disk storage to decouple storage from compute.

KIP-405 adopts the Tiered Storage design sample for open-source Kafka (obtainable in Kafka 3.6.0+). It particulars a broker-coupled implementation, which natively integrates Tiered Storage performance into the dealer course of itself.

At Pinterest, we productionalized and at the moment are open-sourcing our implementation of Tiered Storage for Apache Kafka®️, which is decoupled from the dealer. This brings the advantages of storage-compute decoupling from MemQ to Kafka and unlocks key benefits in flexibility, ease of adoption, price discount, and useful resource utilization when in comparison with the native implementation in KIP-405.

With 20+ manufacturing matters onboarded since Might 2024, our broker-decoupled Tiered Storage implementation at present offloads ~200 TB of knowledge each day from dealer disk to a less expensive object storage. On this weblog, we share the strategy we took and the learnings we gained.

Information despatched by Kafka is briefly saved on the dealer’s disk, replicated throughout followers for every partition, and eliminated as soon as the info exceeds the configured retention time or dimension threshold. Because of this the price of Kafka storage is a perform of knowledge quantity, retention, and replication issue. Information quantity progress is usually natural in nature, whereas retention and replication issue are sometimes inflexible and needed for non-negotiables akin to fault tolerance and restoration. When confronted with progress in any of these variables, horizontal or vertical scaling of Kafka clusters is required to help increased storage calls for.

Historically, horizontal scaling of Kafka clusters concerned including extra brokers to the cluster so as to improve the overall storage capability, whereas vertical scaling concerned changing current brokers with new ones which have increased storage capability. This meant that the overall storage price of scaling up Kafka clusters grew as an element of the per-unit storage price to retailer knowledge on dealer disk. This truth is obvious by the next equations:

totalCost = costPerGB * totalGB * replicationFactor

totalGB = GBperSecond * retentionSeconds

Substituting the second equation into the primary leads to:

totalCost = costPerGB * GBperSecond * retentionSeconds * replicationFactor

As talked about beforehand, we normally wouldn’t have management over throughput (GBperSecond), retention (retentionSeconds), or replication issue. Subsequently, decreasing the overall price of storage is most successfully achieved by reducing the per-unit storage price (costPerGB). Tiered Storage achieves this by offloading knowledge from costly dealer disks to a less expensive distant storage.

The native Tiered Storage providing in Apache Kafka®️ 3.6.0+ incorporates its options into the dealer course of itself, leading to an inseparable coupling between Tiered Storage and the dealer. Whereas the tight coupling strategy permits the native implementation of Tiered Storage to entry Kafka inside protocols and metadata for a extremely coordinated design, it additionally comes with limitations in realizing the complete potential of Tiered Storage. Most notably, integrating Tiered Storage into the dealer course of signifies that the dealer is all the time within the lively serving path throughout consumption. This leaves behind the chance to leverage the distant storage system as a secondary serving path.

To deal with this, we utilized the MemQ design sample to Tiered Storage for Apache Kafka®️ by decoupling Tiered Storage from the dealer, permitting for direct consumption from the distant storage. This delegates the lively serving path to the distant storage, liberating up sources on the Kafka cluster past simply storage and dramatically decreasing the price of serving. It additionally supplies the next diploma of flexibility in adopting Tiered Storage and making use of function updates. The desk under illustrates a number of key benefits of a broker-decoupled strategy in contrast towards the native implementation.

Determine 1: Structure Overview

The implementation of broker-decoupled Tiered Storage consists of three predominant parts:

  1. Phase Uploader: A steady course of that runs on every dealer and uploads finalized log segments to a distant storage system
  2. Tiered Storage Shopper: A client shopper able to studying knowledge from each distant storage and native dealer disk
  3. Distant Storage: A storage system that ought to have a per-unit storage price that’s decrease than that of dealer disk and helps operations essential to the Phase Uploader and Tiered Storage Shopper

The diagram proven in Determine 1 depicts how the Phase Uploader and Tiered Storage Shopper work together with distant storage, in addition to different related programs and processes. Let’s dive deeper into every of those parts.

Phase Uploader

Determine 2: Phase Uploader Structure

The Phase Uploader is an impartial course of that runs on each dealer as a sidecar in a Kafka cluster that has enabled Tiered Storage. Its major duty is to add finalized log segments for the partitions that the dealer leads. Reaching this goal whereas sustaining decoupling from the dealer course of requires designing these important mechanisms:

  • Log listing monitoring
  • Management change detection
  • Fault tolerance

Log Listing Monitoring

The Phase Uploader’s perform is to add knowledge residing on its native dealer’s disk to a distant storage system. Because of the truth that it runs independently of the dealer course of, the Phase Uploader displays the dealer file system for indications signaling that sure knowledge recordsdata are prepared for add. Let’s dive into how this monitoring mechanism works.

The Kafka dealer course of writes knowledge to a neighborhood listing specified by the dealer configuration log.dir. The listing construction and content material of log.dir is maintained by the dealer course of and usually seems to be like the next:

<log.dir>
| - - - topicA-0
| - - - 00100.index
| - - - 00100.log
| - - - 00100.timeindex
| - - - 00200.index
| - - - 00200.log
| - - - 00200.timeindex
| - - - 00300.index ← lively
| - - - 00300.log ← lively
| - - - 00300.timeindex ← lively
| - - - topicA-3
| - - - <ommitted>
| - - - topicB-1
| - - - <ommitted>
| - - - topicB-10
| - - - <ommitted>

This listing accommodates all of the topic-partitions that this dealer is a pacesetter or follower for. The dealer course of writes knowledge it receives from producer functions into log section recordsdata (recordsdata ending in .log), in addition to corresponding indexing metadata into .index and .timeindex recordsdata that enable for environment friendly lookups. The recordsdata names correspond to the earliest offset contained within the section. Incoming knowledge is written to the lively log section, which is the section with the biggest offset worth.

The Phase Uploader makes use of a file system watcher on the topic-partition directories that this dealer is at present main to watch occasions on these directories. File system occasions point out to the Phase Uploader when a log section is finalized (i.e. it’s rotated and not receiving new knowledge) and able to add. To do that, the Phase Uploader maintains a set of lively log segments for every topic-partition that it’s watching and uploads probably the most just lately rotated section upon detecting a rotation occasion.

Within the instance above, suppose that the dealer is a pacesetter for topicA-0 (topicA, partition 0), and 00300 is the lively section for topicA-0. Let’s additionally suppose that 00100 and 00200 are already uploaded to the distant storage, and every log section accommodates 100 offsets. The Kafka dealer course of will proceed to jot down incoming knowledge into 00300 till it reaches the configured dimension or time threshold specified by dealer configurations. Most often, segments are rotated through the scale threshold, however time-based rotations can happen when the throughput is low sufficient the place the scale threshold can’t be reached earlier than the subject’s retention.ms configuration takes impact. Upon reaching both threshold, 00300 shall be finalized and closed, whereas a brand new lively section (e.g. 00400) shall be created. This triggers a file system occasion which notifies the Phase Uploader that 00300 is able to add. The Phase Uploader then enqueues all three recordsdata for 00300 for add to distant storage and units 00400 as the present lively section for topicA-0.

<log.dir>
| - - - topicA-0
| - - - 00100.index
| - - - 00100.log
| - - - 00100.timeindex
| - - - 00200.index
| - - - 00200.log
| - - - 00200.timeindex
| - - - 00300.index ← enqueue & add
| - - - 00300.log ← enqueue & add
| - - - 00300.timeindex ← enqueue & add
| - - - 00400.index ← lively
| - - - 00400.log ← lively
| - - - 00400.timeindex ← lively

Upon the profitable completion of the uploads, the Phase Uploader will add an offset.wm file whose content material is the final efficiently uploaded offset for that topic-partition. This mechanism commits the progress of the Phase Uploader and permits it to get well from restarts and interruptions by resuming uploads just for segments that got here after the final dedicated offset.

Management Change Detection

Decoupling the Phase Uploader from the dealer course of necessitates a dependable mechanism for locating topic-partition management and detecting modifications in actual time. The Phase Uploader achieves this by monitoring the ZooKeeper endpoints for the Kafka cluster, which is up to date and maintained by the Kafka cluster’s controller.

Upon startup, the Phase Uploader bootstraps the present state by studying the /brokers/matters/<subject>/partitions/<partition>/state path in ZooKeeper, which informs the Phase Uploader in regards to the topic-partitions that the dealer is at present main. The Phase Uploader correspondingly locations a file system look ahead to these topic-partition subdirectories in log.dir.

Management modifications are mirrored in actual time below the identical ZooKeeper path. When a dealer turns into the chief of a topic-partition, it have to be already thought-about an in-sync duplicate (ISR), which signifies that it’s totally caught up on replicating from the unique chief. This additionally signifies that the Phase Uploader could be assured that when the Kafka cluster controller updates the management metadata in ZooKeeper, the Phase Uploader on the brand new chief can instantly place a watch on the topic-partition listing, whereas the Phase Uploader on the previous chief can take away the watch.

In Apache Kafka®️ 3.3+, ZooKeeper is changed by KRaft. As such, the Phase Uploader might want to monitor management through KRaft when deployed alongside the most recent Kafka variations. Notice that KRaft help within the Phase Uploader is at present below improvement.

Fault Tolerance

In Tiered Storage, missed uploads means knowledge loss. As such, the principle goal of fault tolerance within the Phase Uploader is to ensure the continuity of knowledge within the distant storage by guaranteeing that there are not any missed uploads. For the reason that Phase Uploader is decoupled from the dealer course of, designing it for fault tolerance requires particular consideration.

The commonest threat elements which could compromise knowledge integrity of the uploaded log segments come primarily from the next areas:

  • Transient add failures
  • Dealer or Phase Uploader unavailability
  • Unclean chief election
  • Log section deletion as a result of retention

Transient add failures are normally mitigated by the Phase Uploader’s retry mechanism. Dealer or Phase Uploader downtime could be recovered through the final dedicated offset specified by offset.wm as described within the earlier part. Unclean chief election leads to knowledge loss on the dealer stage, so we settle for this as a built-in threat when unclean chief election is enabled.

Probably the most attention-grabbing downside is the final one — how can the Phase Uploader forestall missed uploads when log section administration and deletions are carried out individually by the dealer course of?

The decoupling between Phase Uploader and the dealer course of implies that the Phase Uploader should add each rotated log section earlier than it’s cleaned up by Kafka retention insurance policies, and the mechanism to take action should depend on alerts outdoors of the Kafka inside protocol. Doing so whereas sustaining decoupling from the dealer requires cautious consideration of how log segments are managed by the dealer.

Stopping Missed Uploads Because of Log Phase Deletion

To raised perceive how the Phase Uploader prevents missed uploads as a result of log section deletion, let’s first discover how a log section is managed by the Kafka dealer’s LogManager. A given log section saved on native Kafka dealer disk undergoes 4 phases in its lifecycle: lively, rotated, staged for deletion, and deleted. The timing of when a log section transitions between these phases is set by knowledge throughput, in addition to a number of configuration values on the subject and dealer ranges. The diagram under explains the part transitions visually:

Determine 3: Log Phase Lifecycle & Timing

The dealer configuration log.section.bytes (default 1G) determines the scale threshold for every log section file. As soon as an lively section is stuffed to this threshold, the section is rotated and a brand new lively section is created to simply accept subsequent writes. Thereafter, the rotated section stays on the native dealer’s disk till the topic-level retention thresholds of both retention.ms (time threshold) or retention.bytes (dimension threshold) is reached, whichever occurs first. Upon reaching the retention threshold, the Kafka dealer phases the section for deletion by appending a “.deleted” suffix to the section filename, and stays on this state for the length specified by the dealer configuration log.section.delete.delay.ms (default 60 seconds). Solely after this does the section get completely deleted.

A log section is just eligible for add after it’s rotated and can not be modified. When a log section is deleted within the last part, it’s thought-about completely misplaced. Subsequently, the Phase Uploader should add the log section whereas it’s within the rotated or staged for deletion phases. Below regular circumstances, the Phase Uploader will add probably the most recently-rotated log section inside one to 2 minutes of the section’s rotation, as it’s instantly enqueued for add upon rotation. So long as the subject retention is giant sufficient for the Phase Uploader to have some buffer room for the add, this usually doesn’t current an issue. The diagram under compares the time sequences of the section lifecycle and the triggering of uploads.

Determine 4: Log section lifecycle vs. Phase Uploader uploads

This works nicely in apply offered that log segments are rotated on a size-based threshold outlined by log.section.bytes. Nonetheless, in some circumstances, the log segments is likely to be rotated on a time-based threshold. This state of affairs happens when a topic-partition receives low sufficient site visitors the place it doesn’t have sufficient knowledge to replenish log.section.bytes inside the configured retention.ms. On this state of affairs, the dealer rotates the section and concurrently phases it for deletion when the section’s last-modified timestamp is past the subject’s retention.ms. That is illustrated within the following diagram:

Determine 5: Phase rotation as a result of time-based threshold

It’s crucial that the Phase Uploader is ready to add through the time that it’s within the staged for deletion part, the length of which is set by the dealer configuration log.section.delete.delay.ms. Furthermore, the section filename upon rotation is totally different from the conventional state of affairs as a result of appended “.deleted” suffix, so trying to add the section with the common filename (with out the suffix) will end in a failed add. As such, the Phase Uploader retries the add with a “.deleted” suffix upon encountering a FileNotFoundException. Moreover, the dealer configuration of log.section.delete.delay.ms ought to be adjusted to a barely increased worth (e.g. 5 minutes) to offer extra buffer room for the Phase Uploader to finish the add.

It’s value mentioning that the above state of affairs with low quantity matters is mostly not a priority as a result of the advantages of Tiered Storage are most successfully achieved when utilized to excessive quantity matters.

Tiered Storage Shopper

The Phase Uploader is just a part of the story — knowledge residing in distant storage is just helpful if it may be learn by client functions. Many benefits of a broker-decoupled strategy (i.e. Pinterest Tiered Storage) are realized on the consumption facet, akin to bypassing the dealer within the consumption path to avoid wasting on compute and cross-AZ community switch prices. Tiered Storage Shopper comes out-of-the-box with the potential of studying knowledge from each distant storage and the dealer in a totally clear method to the person, bringing the theoretical advantages of broker-decoupled Tiered Storage from idea to actuality.

Shopper Structure

Determine 6: How Tiered Storage Shopper works

Tiered Storage Shopper is a shopper library that wraps the native KafkaConsumer shopper. It delegates operations to both the native KafkaConsumer or the RemoteConsumer relying on the specified serving path and the place the requested knowledge is saved. Tiered Storage Shopper accepts native KafkaConsumer configurations, with some further properties that customers can provide to specify the specified conduct of Tiered Storage Shopper. Most notably, the person ought to specify the mode of consumption, which is likely one of the following:

  • Distant Solely: Bypass the dealer throughout consumption and browse instantly and solely from distant storage.
  • Kafka Solely: Solely learn from Kafka brokers and by no means from distant storage (this is similar as a daily KafkaConsumer).
  • Distant Most popular: If the requested offset exists in each distant and dealer, learn from distant.
  • Kafka Most popular: If the requested offset exists in each distant and dealer, learn from dealer.
Determine 7: Consumption modes and serving path for requested offsets

This part will give attention to the design and conduct of Tiered Storage Shopper when studying from distant storage.

Studying From Distant Storage

Shopper Group Administration

Tiered Storage Shopper leverages the present functionalities of Kafka’s client group administration, akin to partition project (through subscription) and offset commits, by delegating these operations to the KafkaConsumer no matter its consumption mode. That is the case even when consuming from distant storage. For instance, a Tiered Storage Shopper in Distant Solely mode can nonetheless take part in group administration and offset commit mechanisms the identical method {that a} common KafkaConsumer does, even when the offsets are solely obtainable within the distant storage system and cleaned up on the dealer.

Storage Endpoint Discovery

When studying from distant storage, Tiered Storage Shopper must know the place the info resides for the actual topic-partition that the buyer is trying to learn. The storage endpoint for any specific topic-partition is offered by the user-defined implementation of StorageServiceEndpointProvider, which ought to be shared between the Phase Uploader and the Tiered Storage Shopper. The StorageServiceEndpointProvider class title is offered within the Tiered Storage Shopper configurations, which deterministically constructs a distant storage endpoint for a given topic-partition. In apply, the identical implementation of StorageServiceEndpointProvider ought to be packaged into the classpaths of each the Phase Uploader and the buyer utility to ensure that constructed endpoints are constant between the 2.

Determine 8: StorageServiceEndpointProvider utilization

With the distant endpoints constructed upon subscribe() or assign() calls, a Tiered Storage Shopper in its consumption loop will delegate learn obligations to both the KafkaConsumer or the RemoteConsumer, relying on the user-specified consumption mode and the place the requested offsets reside. Studying from distant storage is feasible in each consumption mode besides Kafka Solely.

Let’s stroll by the main points of Kafka Most popular and Distant Solely consumption modes. Particulars for the opposite two consumption modes are assumed to be self-explanatory after understanding these two.

Kafka Most popular Consumption

When in Kafka Most popular consumption mode, Tiered Storage Shopper delegates learn operations first to the native KafkaConsumer, then to the RemoteConsumer if the specified offsets don’t exist on the Kafka dealer. This mode permits for client functions to learn knowledge in close to real-time below regular circumstances, and browse earlier knowledge from distant storage if the buyer is lagging past the earliest offsets on Kafka brokers.

Determine 9: Kafka Most popular consumption mode

When the requested offset is just not on the Kafka dealer, the KafkaConsumer’s ballot name throws a NoOffsetForPartitionException or OffsetOutOfRangeException. That is caught internally by the Tiered Storage Shopper in Kafka Most popular mode, which then delegates the RemoteConsumer to try to discover the info from distant storage. If the offset exists in distant storage, Tiered Storage Shopper returns these information to the applying layer after instantly fetching these information from the distant storage system, skipping the dealer when accessing the precise knowledge.

Distant Solely Consumption — Skip the Dealer

When in Distant Solely consumption mode, Tiered Storage Shopper solely delegates the learn operations to the RemoteConsumer. The RemoteConsumer will instantly request knowledge from the distant storage primarily based on the constructed endpoints for the assigned topic-partitions, permitting it to keep away from contacting the Kafka cluster instantly through the consumption loop aside from offset commits and group rebalances / partition assignments, which nonetheless depend on Kafka’s inside client group and offset administration protocols.

Determine 10: Distant Solely consumption mode

When skipping the dealer in Distant Solely consumption mode, the complete set of advantages in broker-decoupled Tiered Storage is realized. By rerouting the serving path to the distant storage system as a substitute of the dealer, client functions can carry out historic backfills and browse older knowledge from distant storage whereas leveraging solely the compute sources of the distant storage system, liberating up these sources from the Kafka dealer. Concurrently, relying on the pricing mannequin of the distant storage system, cross-AZ community switch price could be averted (e.g. Amazon doesn’t cost for bandwidth between S3 and EC2 in the identical area). This pushes the advantages of adopting Tiered Storage past simply storage price financial savings.

Distant Storage

The selection of the distant storage system backing Tiered Storage is important to the success of its adoption. Because of this, particular consideration ought to be paid to the next areas when evaluating a distant storage system:

  • Interface compatibility and help for operations essential to Tiered Storage
  • Pricing and mechanisms of knowledge storage, switch, replication, and lifecycle administration
  • Scalability and partitioning

Interface Compatibility

At a minimal, the distant storage system ought to help the next generic operations over the community:

void putObject(byte[] object, Path path);

Record<String> listObjectNames(Path path);

byte[] getObject(Path path);

Some extra operations are typically wanted for improved efficiency and scalability. These embrace, however should not restricted to:

Future putObjectAsync(byte[] object, Path path, Callback callback);

InputStream getObjectInputStream(Path path);

Clearly, in-place updates and modifications to uploaded log segments are pointless. Because of this, object storage programs are normally most well-liked for his or her scalability and price advantages. An eligible and suitable distant storage system could be added to Pinterest Tiered Storage so long as they’re able to implement the related interfaces within the Phase Uploader and Tiered Storage Shopper modules.

Information Storage, Switch, Replication, & Lifecycle Administration

The aim of adopting broker-decoupled Tiered Storage is to decrease the fee and sources of storage and serving on the Kafka cluster. Subsequently, it is very important perceive the technical mechanisms and pricing mannequin of the distant storage system in the case of operations offloaded from the Kafka cluster to the distant storage. These embrace knowledge storage, switch, replication, and lifecycle administration.

Many of the frequent distant storage programs with widespread business adoption have publicly obtainable documentation and pricing for every of these operations. It is very important observe that the financial savings and advantages that may be achieved with Tiered Storage adoption is critically depending on holistic analysis of those elements of the distant storage system, in tandem with current case-specific elements akin to knowledge throughput, learn / write patterns, desired availability and knowledge consistency, locality and placement of companies, and many others.

Not like the native implementation of Tiered Storage in KIP-405, the lifecycle administration of knowledge uploaded to distant storage on this broker-decoupled implementation is delegated to native mechanisms on the distant storage system. Because of this, the retention of uploaded knowledge on distant storage ought to be configured in keeping with the mechanisms obtainable to the distant storage system of alternative.

Scalability & Partitioning

Writing knowledge to a distant storage system and serving consumption utilizing its sources requires preparations for scale. The commonest bottleneck on the distant storage system comes from compute sources, which is usually enforced through request fee limits. For instance, Amazon S3®️ particulars its request rate limits for each reads and writes on a per partitioned prefix foundation. For Tiered Storage to function at giant scale, it’s important that the distant storage is pre-partitioned in a considerate method that evenly distributes request charges throughout the distant storage partitions so as to keep away from hotspots and fee limiting errors.

Taking Amazon S3®️ for instance, partitioning is achieved through frequent prefixes between object keys. A bucket storing Tiered Storage log segments ought to ideally be pre-partitioned in a method that evenly distributes request fee load throughout partitions. To take action, the thing keyspace have to be designed in such a method that enables for prefix-based partitioning, the place every prefix-partition receives comparable request charges as each different prefix-partition.

In Tiered Storage, log segments uploaded to S3 typically adhere to a keyspace that appears like the next examples:

topicA-0
- - - -
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-0/00100.log
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-0/00200.log
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-0/00300.log

topicA-1
- - - -
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-1/00150.log
s3://my-bucket/custom-prefix/kafkaCluster1/topicA-1/00300.log

topicB-0
- - - -
s3://my-bucket/custom-prefix/kafkaCluster1/topicB-0/01000.log
s3://my-bucket/custom-prefix/kafkaCluster1/topicB-0/02000.log

The Kafka cluster title, subject title, and Kafka partition ID are all a part of the important thing. That is in order that the Tiered Storage Shopper can reconstruct the prefix solely primarily based on these items of data when assigned to these topic-partitions. Nonetheless, what if topicA receives a lot increased learn and write site visitors than topicB? The above keyspace scheme doesn’t enable for S3 prefix partitioning in a method that evenly spreads out request fee load, and so the S3 prefix-partition custom-prefix/kafkaCluster1/topicA turns into a request fee hotspot.

The answer to this downside is to introduce prefix entropy into the keyspace so as to randomize the S3 prefix-partitions that host knowledge throughout totally different matters and partitions. The idea of partitioning the distant storage through prefix entropy was launched in MemQ and has been battle-tested in manufacturing for a number of years.

To help this in Tiered Storage, the Phase Uploader permits customers to configure a worth for ts.section.uploader.s3.prefix.entropy.bits which injects an N-digit MD5 binary hash to the thing key. The hash is calculated from the cluster title, subject title, and Kafka partition ID mixture. Assuming that N=5, we get the next keys for a similar examples above:

topicA-0
- - - -
s3://my-bucket/custom-prefix/01011/kafkaCluster1/topicA-0/00100.log
s3://my-bucket/custom-prefix/01011/kafkaCluster1/topicA-0/00200.log
s3://my-bucket/custom-prefix/01011/kafkaCluster1/topicA-0/00300.log

topicA-1
- - - -
s3://my-bucket/custom-prefix/01010/kafkaCluster1/topicA-1/00150.log
s3://my-bucket/custom-prefix/01010/kafkaCluster1/topicA-1/00300.log

topicB-0
- - - -
s3://my-bucket/custom-prefix/11100/kafkaCluster1/topicB-0/01000.log
s3://my-bucket/custom-prefix/11100/kafkaCluster1/topicB-0/02000.log

With this new keyspace, if topicA receives a lot increased request charges than different matters, the request load is unfold evenly between totally different S3 prefix-partitions, assuming that every of its Kafka partitions receives comparatively even learn and write throughput. Making use of this idea throughout a lot of Kafka clusters, matters, and partitions will statistically result in a fair distribution of request charges between S3 prefix-partitions.

When establishing a Tiered Storage Shopper, the person should provide the identical N worth because the Phase Uploader in order that the buyer is ready to reconstruct the proper key for every topic-partition it’s assigned to.

Prefix-partitioning my-bucket with N=5

Ensuing 32 prefix-partitions:
custom-prefix/00000
custom-prefix/00001
custom-prefix/00010
custom-prefix/00011

custom-prefix/11111

Decoupling from the dealer signifies that Tiered Storage function additions could be rolled out and utilized with no need to improve dealer variations. Listed here are among the options which can be at present deliberate:

  • Integration with PubSub Client, a backend-agnostic shopper library
  • Integration with Apache Flink®️ (through PubSub Shopper integration)
  • Help for extra distant storage programs (e.g. HDFS)
  • Help for Parquet log section storage format to allow real-time analytics (depending on adoption of KIP-1008)

Pinterest Tiered Storage for Apache Kafka®️ is now open-sourced on GitHub. Test it out here! Suggestions and contributions are welcome and inspired.

The present state of Pinterest Tiered Storage for Apache Kafka®️ wouldn’t have been doable with out vital contributions and help offered by Ambud Sharma, Shardul Jewalikar, and the Logging Platform staff. Particular because of Ang Zhang and Chunyan Wang for steady steering, suggestions, and help.

Apache®️, Apache Kafka®️, Kafka®️, Apache Flink®️, and Flink®️ are logos of the Apache Software program Basis (https://www.apache.org/).
Amazon®️, AWS®️, S3®️, and EC2®️ are logos of Amazon.com, Inc. or its associates.

To be taught extra about engineering at Pinterest, try the remainder of our Engineering Weblog and go to our Pinterest Labs web site. To discover and apply to open roles, go to our Careers web page.