페이지

2024년 7월 6일 토요일

Producer Default Partitioner and how keys are hashed

 1. By default, your keys are hashed using the "murmuyr2" algorithm.


2. It is most likely prefered to not override the behavior of the partitioner, but it is possible todo so (partitioner.class).


3. The formula is:

targetPartition = Utils.abs(Utils.murmur2(record.key())) % numPartitions;


4. This means that same key will go to the same partition (we already know this), and adding partitions to a topic will completely alter the formuila



High Throughput Producer Demo

1. We'll add snappy message compression in our producer

2. snappy is very helpful if your messages are text based, for example log lines or JSON documents

3. snappy has a good balance of CPU / compression ratio

4. We'll also increase the batch.size to 32KB and introduce a small delay through linger.ms(20ms)  

Linger.ms & batch.size

 1. By default, Kafka tries to send records as soon as possible

    - It will have up to 5 reuests in flight, meaning up to messages individually sent at the same time.

    - After this, if more messages have to be sent while others are in flight, Kafka is smart and will start batching them while they wait to send them all at once.


2. This smart batching allows Kafka to increase throughput while maintaining very low latency.

3. Batches have higher compression ratio so better efficiency


4. So how can we control the batching mechanism?


- Linger.ms: Number of milliseconds a producer is willing to wait before sending a batch out. (default 0)

- By introducing some lag (for example linger.ms=5), we increase the chances of messages being sent together in a batch

- So at the expense of introducing a  small delay, we can increase throughput, compression and efficiency of our producer.


- If a batch is full (see batch.size) before the end of the linger.ms period, it will be sent to Kafka right away!



* Linger.ms




- batch.size: Maximum number of bytes that will be included in a batch. The default is 16KB.

- Increasing a batch size to something like 32KB or 64KB can help increasing the compression, throuthput, and efficiency of requests

- Any message that is bigger than the batch size will not be batched

- A batch is allocated per partition, so make sure that you don't set it to a number that's too high, otherwise you'll run waste memory!


- (Note: You can monitor the average batch size metric using Kafka Producer Metrics)











Linger.ms

 






2024년 7월 5일 금요일

Message compression Recommendations

 1. Find a compression algorithm that gives you the best performance for your specific data. Test all of them!


2. Always use compression in production and especially if you have high throughput


3. Consider tweaking linger.ms and batch.size to have bigger batches, and therefore more compression and higher throughput

Message Compression

1. Producer usually send data that is text-based, for example with JSON data

2. In this case, it is important to apply compression to the producer.


3. Compression is enabled at the Producer level and doesn't require any configuration change in the Brokers or in the Consumers

4. "compression.type" can be 'none'(default), 'gzip', 'lz4', 'snappy'


5. Compression is more effective the bigger the batch of message being sent to Kafka!

6. Benchmarks here: https://blog.cloudflare.com/squeezing-the-firehose/


7. The compressed batch has the following advantage:

    - Much smaller producer request size (compression ration up to 4x!)

    - Faster to transfer data over the network => less latency

    - Better throughput

    - Better disk utilisation in Kafka (stored messages on disk are smaller)


8. Disadvantages (very minor):

    - Producers must commit some CPU cycles to compression

    - Consumers must commit some CPU cycles to decompression


9. Overall:

    - Consider testing snappy or lz4 for optimal speed / compression ratio



Safe producer Summary & Demo

 Kafka < 0.11

- acks = all (producer level)

    - Ensures data is properly replicated before an ack is received

- min.insync.replicas=2 (broker/topic level)

    - Ensures two brokers in ISR at least have the data after an ack

- retries=MAX_INT (producer level)

    - Ensures transient errors are retried indefinitely

- max.in.flight.requests.per.connection=1 (producer level)

    - Ensures onlyu one request is tried at any time, preventing message re-ordering in case of retries


Kafka >= 0.11

- enable.idempotence=true (producer level) + min.insync.replicas=2 (broker/topic level)

    - Implies acks=all, retries=MAX_INT, max.in.flight.requests.per.connection=5(default)

    - while keeping ordering guarantees and improving performance!

- Running a "safe producer" might impact throughput an latency, always test for your use case