Microbenchmarking Big Data Solutions on the JVM – Part 2

In part 1 the process used to microbenchmark a particular service interaction with a big data solution was introduced along with an initial performance hotspot listing. In that first part we eliminated a large amount of measurement overhead via the online and offline adaptive profiling capabilities of Satoris. In this new part the performance model will be further refined over a number of configuration steps, each step building on the previous with the primary purpose of deriving a smaller, simpler and more relevant model of the microbenchmark to facilitate more targeted code inspection as well as a better understanding of the nature of execution flow across method, class and package boundaries.

The focus in part 1 was on reducing instrumentation and in turn the measurement overhead. In part 2 the focus shifts to dynamic contextual measurement reduction in preparation for additional data collection to be introduced in part 3.

RECURSION ELIMINATION

Measurement of a particular named probe within the measurement scope of a same named probe can occur when a method name is overloaded and chained, as parameter signatures are by default not used in the naming of a probe, or when the method is recursive by (call) nature. It is also possible for a method to be re-entrant such as when a method is part of a standard library that glues various (data) flows together. Think stream processing, collection iteration, and so on. Satoris offers two metering extensions that eliminate such nested measurement – nonrecursive and nonreentrant. In this article we will focus on eliminating call recursion that is direct, where a probe (method) is called and metered within the immediate scope of another probe (method) with the exact same name. In doing so only the count will change, be reduced, as the execution cost will still be assigned correctly to the probe.

satoris.nonrecursive.call.filtering.graph

Here is how this is achieved in the jxinsight.override.config file used by the profiling/monitoring agent.

j.s.p.nonrecursive.enabled=true

AUTOMATIC LABELING

In part 1 it was not possible to determine the overall execution time of the SparkTC.main method because the method was not deemed a hotspot. A probe only ever get classified a hotspot after it has breeched a threshold a configurable number of times. The default value is 1,000. Because SparkTC.main is only ever executed once it gets disabled during the refinement phase post the first iteration in the benchmark script. This can be addressed using the autolabel metering extension which allows association of a probe label, normally applied by an extension during execution, on startup.

j.s.p.autolabel.enabled=true
j.s.p.autolabels=disabled,hotspot
j.s.p.autolabel.disabled.name.groups=org.apache.spark.util.Utils$.logUncaughtExceptions
j.s.p.autolabel.hotspot.name.groups=org.apache.spark.examples.SparkTC.main

In the above configuration the measurement of the Utils$.logUncaughtExceptions method has been eliminated by applying the disabled label to the probe. This was done because the method simply decorates and delegates a call to underlying task code though not necessarily directly into the org.apache.spark.* package.

Below is the 20th performance snapshot obtained following execution of the benchmark.sh script with the slices (partitions) parameter set to 2. The table is sorted on the cumulative inherent (self) time. The SparkTC.main method, previously eliminated by the adaptive profiling process, is now listed on the second row. From looking at the cumulative totals for the SparkTC.main method we now know that all other probe metering within the performance model were performed in another thread to the one that called SparkTC.main. Because the MapOutTracker.getServerStatuses has a higher value we also know that there are multiple metered threads executing in parallel. There are other ways this information could have been obtained and presented, such as a thread timeline, but even with this simple table view there is a lot that can be ascertained with immediate inspection.

microbenchmarking.bigdata.spark.part2.t2.ht5.hit1

Now that we have the SparkTC.main method always instrumented across the refinements of the agent instrumentation set we can test the performance of an individual run with different slices values. The metering model below shows that with a slices value of 1 the overall wall clock (running) time is half that of the previous benchmark run with slices set to 2. Less is sometimes More!

microbenchmarking.bigdata.spark.part2.t1.ht5.hit1

HOTSPOT TUNINGS

At this point you might be thinking whether there are other “costly” methods executed during the benchmark that have been eliminated because they only execute a small number of times. We can investigate this by changing the hotspot configuration, giving an initial scorecard balance of 5,000 to each probe, instead of the default of 1,000, and having the lower range of the hotspot classification start at 5,000 instead of 2,000. Effectively each probe will be automatically classified as a hotspot on startup and will only lose this classification if it incurs one or more debits during the few method invocations (that are metered).

satoris.hotspot.label.state.chart.graph

Here is the configuration that needs to be added to the jxinsight.override.config file.

j.s.p.hotspot.initial=5000
j.s.p.hotspot.lower=5000

With the above configuration changes, including the recursion and auto labeling settings, we get the following performance snapshot after re-executing the benchmark.sh script. Of particular note is the appearance of new probes with very low counts and high cumulative inherent totals.

microbenchmarking.bigdata.spark.part2.t2.ht5.hit1.i5k.l5k

Here is the same metering table this time ordered by the cumulative total instead of the cumulative inherent (self) total.

microbenchmarking.bigdata.spark.part2.t2.ht5.hit1.i5k.l5k.reversed

There’s always a possibility that a probe (method) starts out executing fast but as the benchmark warms up, and more data is created and processed, it gets slower and slower. To prevent an early disablement of such a probe, though generally most methods in a typical application get faster as the dynamic hotspot compiler optimizes code, the initial balance value can be set higher than the default of 1,000 whilst still keeping the default gap of 1,000 between initial and lower. The following configuration will revert back to eliminating probes (methods) not executed frequently but still providing probes with a greater chance of staying the course during the adaptive profiling, if they do in fact perform above the hotspot thresholds, later in the run. The net increase in the hotspot scorecard for a probe to remain across performance profiles must be at least 1,000.

j.s.p.hotspot.initial=5000
j.s.p.hotspot.lower=6000

Running the benchmark.sh script again gives the following metering table in the last run out of 20 iterations. Utils$.copyStream seems to have benefited from this change as it did in the previous change. The reason for this, not visible in the table section shown, is that the Utils$.copyStream method had a single metered invocation of 37 seconds. Ouch!

microbenchmarking.bigdata.spark.part2.t2.ht5.hit1.i5k.l6k

SUBSYSTEM BOUNDARIES

Another approach to discerning the execution essence of any software is to only measure (meter) at the call surface (entry point) of sub-system boundaries. A possible boundary would be one or more packages and classes. The tierpoint metering extension allows easy definition of such boundaries, limiting measurement (metering) to only those probes at the call ingress edge. Effectively only the entry point call probe (method) will be metered with all call nested probes belonging to the same boundary ignored unless another boundary is crossed between callee and (indirect) callee grouped within same boundary. The result is that we go from hotspot probes, methods with high inherent costs, to surface points, methods representing a sub-system boundary interaction. With the hotspot adaptive profiling mechanism still in play, the default unless explicitly turned off, non-surface points are eliminated and the costs, in particular inherent (self), are moved up to the surface points. The tierpoint extension is extremely useful when dealing with the metering of third party libraries as well as complex systems with many sub-systems and components developed somewhat independently under the same overarching project.

The following configuration creates 10 metering tiers (boundaries). Most are defined with only one package namespace though multiple namespaces can be specified if need be.

j.s.p.tierpoint.enabled=true
j.s.p.tierpoints=storage,shuffle,scheduler,serializer,network,io,red,executor,util,*
j.s.p.tierpoint.storage.name.groups=org.apache.spark.storage
j.s.p.tierpoint.shuffle.name.groups=org.apache.spark.shuffle
j.s.p.tierpoint.scheduler.name.groups=org.apache.spark.scheduler
j.s.p.tierpoint.serializer.name.groups=org.apache.spark.serializer
j.s.p.tierpoint.network.name.groups=org.apache.spark.network
j.s.p.tierpoint.io.name.groups=org.apache.spark.io
j.s.p.tierpoint.rdd.name.groups=org.apache.spark.rdd
j.s.p.tierpoint.executor.name.groups=org.apache.spark.executor
j.s.p.tierpoint.util.name.groups=org.apache.spark.util
j.s.p.tierpoint.*.name.groups=org.apache.spark

Below is the 20th performance model (snapshot) following a complete rerun of the benchmark.sh script with the above configuration added. The number of hotspot probes has been reduced from 174 down to 67. The total number of metered calls dropped from 7 million down to 5 million. What is extremely nice about how metering is impacted by this extension is that when we look at the inherent (self) total for a probe we are actually looking at the time spent within the sub-system (boundary/tier) for that particular crossing excluding nested crossings of the same boundary unless it is the same probe.

Note: With the tierpoint extension direct recursion is also eliminated so there is no need for the nonrecursive extension to be enabled as well.

microbenchmarking.bigdata.spark.part2.t2.ht5.hit1.i5k.l6k.tiers

The above performance model was generated with the tierpoint extension enabled throughout the running of the 20 benchmark process runs. But we could have deferred the enablement of the tierpoint extension to a separate process run after we had already run the benchmark.sh script with the normal hotspot adaptive mechanism (as above). Done accordingly we further refine the performance model by eliminating the metering of hotspot call chains within a sub-system. The resulting hotspots are not exactly surface (ingress) points because the hotspot metering extension might have eliminated the actual subsystem surface (ingress) calls during the benchmark especially if they acted as mere pass through methods with little or no inherent execution cost. Here is such a performance model.

microbenchmarking.bigdata.spark.part2.t2.ht5.hit1.i5k.l6k.tiers.post 2

In Part 3 new data collection extensions are introduced in the playback of an episodic software memory. In Part 4 the many ways of visualizing the benchmark execution are given center stage.

APPENDIX A – SOME HOTSPOT CODE LISTINGS

org.apache.spark.MapOutputTracker.getServerStatuses

apache.spark.1.3.1.mapoutputtracker.getserverstatus.scala

org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses

apache.spark.1.3.1.mapoutputtracker.convertmapstatuses.scala

org.apache.spark.util.SizeEstimator$.estimate

apache.spark.1.3.1.sizeestimator.estimate.scala

org.apache.spark.storage.DiskBlockObjectWriter.open

apache.spark.1.3.1.diskblockobjectwriter.open.scala

org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile

apache.spark.1.3.1.indexshuffleblockmanager.writeindexfile.scala

org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData

apache.spark.1.3.1.indexshuffleblockmanager.getblockdata.scala

org.apache.spark.util.AkkaUtils$.askWithReply

apache.spark.1.3.1.akkautils.askwithreply.scala

org.apache.spark.util.Utils$.copyStream

apache.spark.1.3.1.utils.copystream.scala