Iterative Performance Benchmarking of Apache Kafka – Part 1

This article demonstrates how iterative performance benchmarking analysis can be applied to the latest release of Apache Kaka – a highly optimized publish-subscribe messaging technology based around the concept of a distributed commit log. The first task is to create a script that can repeatedly execute a benchmark, and between each consecutive benchmark execution, refine the instrumentation set used by the Satoris JVM agent, based on the hotspot analysis from the previous iteration. The following script will repeat for 20 times a built-in message producer performance test. Each time the instrumentation will be refined. For a finely tuned system and focused benchmark, the number of iterations can be much lower. Generally, the agent will narrow down the most significant hotspots within the first few iterations with subsequent runs only to eliminate some borderline cases that when removed further increases the accuracy of the resulting performance hotspot model.

kafka.0.10.benchmark

Keeping with the standard Apache Kafka configuration, below is the performance model from the very first benchmark iteration. The table is sorted by Total Inherent (Self). Shown below are the probes, Java or Scala methods, still being measured at the end of a single process iteration as the Satoris agent includes an extremely advance adaptive instrumentation and measurement metering engine that automatically eliminates non-hotspots during the dynamic execution of the benchmark under observation. All that is needed is for the software performance engineer to set suitable hotspot thresholds and let the agent do the rest itself. Here the hotspot threshold was 5 microseconds for inclusive wall clock time and 1 microseconds for the corresponding inherent (self/exclusive) wall clock time.

kafka.0.10.p1.r1

After 20 benchmark iterations, the above list has been further refined though not very substantially indicating the incredible effectiveness of the Satoris online adaptive profiling mechanism. A quick scan and comparison of the tables show a number of methods with lower average times such as KakaApis.handle which dropped from 64 microseconds to 54 microseconds. A drop of 10 microseconds that can be attributed largely to the instrumentation and measurement that occurred and subsequently eliminated in the early period of the first iteration, when much of the code base was still instrumented and being measured. It is not possible to observe the discarded measurement here because the Satoris performance snapshots do not by default export the collected measurement data for probes that have been dynamically disabled during adaptive profiling.

kafka.0.10.p1.r20

The above performance model was obtained with the default configuration settings for Apache Kakfa. What if the number of partitions is increased? Does the model change significantly? The following performance hotspot model was obtained after a complete rerun of the benchmark script with the num.partitions system property in located in ${KAFKA}/config/server.properties changed to 10. With this new setting, there is change in the inherent (self/exclusive) clock time totals though most probes (methods) do reappear only in a slightly different ordering. There also see some new hotspot probes such as those related to the updating of channel request (monitoring) metrics.

kafka.0.10.p10.r20

Changing the num.partitions system property to 20 as well as increasing the --throughput parameter to 200000 in the benchmark script produces the following performance hotspot model again following another complete rerun of the benchmark script. A quick scan down the probe name listing whilst eyeing the inherent average column shows a high degree of similarity with the previous performance hotspot model. The visual pattern at the top of the table is another quick way of checking whether we are still looking at the same system – for the most part.

kafka.0.10.p20.r20

With the num.partitions system property now at 25 and the --throughput parameter set to 250000 the counts across the board have dropped down further (due possibly to some sort of queued batching), but by and large, excluding the top 4 rows in the table which relate largely to I/O and networking activity, the inherent totals are very similar to the previous run. Much like in the previous models the largest portion of the exclusive time is attributed to Log.append followed by replication and partitioning related activities. Kafka is based on logging so nothing new here other than, and most importantly, the accurate quantification of how the execution cost model is broken down across subsystems and their activities, excluding external third party libraries not instrumented including those distributed with both SDK and runtimes for Scala and Java.

kafka.0.10.p25.r20

Sometimes the dynamic instrumentation and measurement adaptation performed by the Satoris agent can be a little bit too aggressive during the warmup phase of a stress test. This can be remedied by increasing the initial scorecard allowance used by the default enabled hotspot extension, giving all probes more time to settle down into normal execution patterns after the JIT compiler has kicked in sufficiently. Below is a revised performance hotspot model with the initial changed from 1000 to 6000. One particular new hotspot probe identified with this setting is ResponseSend.serialize – a ByteBuffer allocation site that probably executed cheaply during the warmup phase when there was less memory/GC pressure.

kafka.0.10.p25.i6k.l5k.r20

Up to now the need to collect the caller-to-callee call chain, sometimes called a trace, has not being warranted because the agent is adaptively and intelligently trimming the many irrelevant ingress-, transit- and egress nodes typical of an enterprise system, with the end result being that the probes (methods) still being instrumented and measured are the most effective points to start any in-depth source code inspection and code tuning. What can be useful is to pick a particular execution trace (flow), an entry point, and then only measure (meter) from within the execution scope of this point. Below is revised performance model following another rerun of the benchmark but this time with the Satoris entrypoint extension enabled and configured to regard kafka.server.KafkaApis.handle as the single entry point in the system. All other threads and instrumented methods not executing with the handle method on the call stack are ignored by the measurement and collection engine.

kafka.0.10.p25.i6k.l5k.entrypoint.r20

The source code for many of the performance hotspots identified above is listed in APPENDIX A along with some code classification tagging. You are very much welcome to perform a similar benchmark with an alternative tool of your choice and report on your own findings which will be linked to here and compared.

Appendix A – Hotspot Code Listings

kafka.log.Log.append #large #synchronized #logging #metrics #io #copying #scala

kafka.0.10.code.log.append

kafka.server.ReplicaManager.appendMessages #logging #allocation #copying #scala

kafka.0.10.code.replicamanager.appendmessages

kafka.server.ReplicaManager.appendToLocalLog #logging #metrics #allocation #scala

kafka.0.10.code.replicamanager.appendtolocallog

kafka.server.KafkaApis.handleProducerRequest #large #convoluted #allocation #scala

kafka.0.10.code.kafkaapis.handleproducerrequest.append

kafka.cluster.Partition.appendMessagesToLeader #allocation #io #scala

kafka.0.10.code.partition.appendmessagestoleader

kafka.network.RequestChannel$Request.updateRequestMetrics #metrics #scala

kafka.0.10.code.requestchannel.updaterequestmetrics

org.apache.kafka.common.requests.ProduceResponse.initCommonFields #allocation #java

kafka.0.10.code.produceresponse.initcommonfields

kafka.server.DelayedProduce.tryComplete #logging #allocation #scala

kafka.0.10.code.delayedproduce.trycomplete

kafka.server.ClientQuotaManager.recordAndMaybeThrottle #logging #control #scala

kafka.0.10.code.clientquoutamanager.recordandmaybethrottle

org.apache.kafka.common.network.Selector.poll #nio #java

kafka.0.10.code.selector.poll

org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel #nio #java

kafka.0.10.code.networkreceive.read

org.apache.kafka.common.requests.ResponseSend.serialize #allocation #nio

kafka.0.10.code.responseend.serialize