A Quick Performance Hotspot Analysis of Apache Kafka

I was asked to produce a quick preliminary performance analysis of Apache Kafka. I downloaded the Kafka 0.8.2-beta distribution from the Apache site and then before launching a single Kafka server on my workstation I exported the following environment variable.

export KAFKA_OPTS="-server -noverify -javaagent:/Applications/satoris-1.9/bundle/satoris-javaagent.jar -Djxinsight.home=/Applications/satoris-1.9/conf/apache-kafka"

I used the following jxinsight.override.config file located in the jxinsight.home directory to drop down the hotspot metering extension thresholds.

# clock.time (μs) thresholds

The instrumentation performed dynamically at runtime by the Satoris agent was limited to just the kafka.* codebase by adding the following to the jxinsight.aspectj.filters.config file.


To generate workload I used the following script and command line arguments. The command was executed 5 times.

./bin/kafka-producer-perf-test.sh --broker-list=localhost:9092 --messages 10000000 --topic test --threads 10 --message-size 1000 --batch-size 100 --compression-codec 1

Here are the results reported by the performance script.


Below is the results of the metering hotspot analysis. The Satoris agent automatically and intelligently whittled down the list of methods from nearly 3,000 to just 40!! The table is sorted by inherent (self) total with kafka.log.Log.append being the biggest contributor to the execution time with an average of 9 ms per method invocation. Ignoring the pollExpired probe the next probe closest has a 6 minutes inherent total compared to 1 hour for Log.append. The cost profile for all other probes is dwarfed by Log.append.


Here is the Scala source code for the Log.append method. Note the lock synchronized block and the extent to which it covers in the code execution including a sub-optimal trace() call that involves a relatively expensive String.format call.


To determine whether indeed there might be a build up of threads at this point in the execution I turned on the queue metering extension by adding the following to the jxinsight.override.config file within the directory pointed by the jxinsight.home system property.


With the queue metering extension enabled it is relatively easy to inspect the concurrent thread execution flowing through the Log.append method. In the timeline below we can see it pegged at 8.


The Log.liftedTree1$1 method called from the Log.append method has only ever 1 thread executing pretty much confirming the monitor locked on within the Log.append method is a shared instance. The Log.liftedTree1$1 method is not actually listed in the Scala source code but instead is a synthetic method generated by the Scala compiler for the lock synchronized block (at least that is what I suspect here).


Looking for a possible workaround to a rather obvious performance bottleneck I came across this line in the server.properties file used in starting the Kafka server instance.

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across the brokers.

I then changed this property to 10, reset all data files and reran the benchmarks again. Here are the results reported by the performance script. They look much better but not amazing.


In this run, the number of hotspot classified probes has increased. The response of the method, Log.append, has dropped down from ms to microseconds and we start to get to see some other probes that might be worth investigating as possible areas that need code (and resource consumption) tuning. Many would appear related to ByteBuffer management and manipulation, which may well be outside of the scope (of influence), but there are some that seem relatively heavyweight for what is indicated by the method naming such as RequestChannel$Request.updateRequestMetrics. A whole 11 microseconds per call wasted on metric updates seems inefficient and wasteful.


Below some of the methods classified as hotspots by the Satoris agent starting with ByteBufferMessageSet.decompress.


The execution cost in the CompressionFactory looks to be largely object allocation related.


More object allocation and IO copying work in the ByteBufferMessageSet.create method.