Sub-microsecond Code Profiling of Google Cloud DataFlow

This is an initial investigation into the performance model (profile) of Google Cloud DataFlow which provides a simple, powerful programming model for building both batch and streaming parallel data processing pipelines. The SDK itself does not ship with an actual benchmark suite for performance testing purposes but it does distribute a number of examples including the infamous WordCount listed partially below and used as the test case for the benchmarking reporting here.

google.dataflow.code.wordcount.functions

The above functions are applied (added) to the data pipeline processing as follows: Read one or more input files then count the words extracted by a function after which write a formatted output of the results to a file. It is all relatively clear, concise and compact by Java standards. Clearly, the most expensive operation would be the splitting up of a text line into words followed subsequently by the counting of such tokenization.

google.dataflow.code.wordcount.main

After downloading a relatively large text file (The Adventures of Sherlock Holmes) the following benchmark scripts was created to run the WordCount example with the Satoris agent installed into the runtime. The script itself executes the same benchmark test case 10 times with the Satoris Java instrumentation agent each time adaptively refining the instrumentation set based on the previous performance snapshot (an exported profile). After the 10th iteration, a final performance snapshot is captured this time without any online adaptive strategy employed followed by a timed non-instrumented benchmark test run used to sanity check the agent metering overhead in the last instrumented run.

google.dataflow.benchmark.script

After an initial performance run it was determined that it would be best to switch away from the default clock.time meter, metering in microseconds, to the clock.tick meter, metering in nanoseconds. It was also decided to only instrument the com.google.cloud.* package and its sub-packages. More importantly, the hotspot metering extension threshold for both inclusive and exclusive (self/inherent) metering were set to the same extremely low value of 250 nanoseconds from a default of 10 microseconds and 5 microseconds respectively. Knowing that an actual metered method call, both entering and exiting, will have a (combined) cost of 100 nanoseconds or more this does mean that there will be some perturbation of the performance measurement collected especially for hotspots with a relatively low inherent total percentage. With this in mind, It is best to skip over those hotspot probes having an inherent (exclusive) total that is less than 50% of the overall (inclusive) total in any the listing presented below.

For the benchmark run the large text file was duplicated and then both files submitted to the WordCount benchmarking. Below is the performance model following the 10th benchmark iteration. Surprisingly a large number of the hotspots identified relate to (data) value decoding, encoding and copying (or cloning) all along the data pipeline, hidden completely from the developer. As with many other data-flow like technologies, there is a heavy serialization cost to be paid as data is shuffled around and across stages and sinks.

google.dataflow.1abc.txt.250.250.5k.6k.#10

When you create or output pipeline data, you’ll need to specify how the elements in your PCollections are encoded and decoded to and from byte strings. One somewhat strange method is StringUtils.approximateSimpleName which takes a Class parameter and returns a simplified string version of the class name. Why would this be executed 7 million times in the counting of words in a line feed? This and other surprising hotspots seem all related to the concept and operation of Coders and PCollections in the DataFlow SDK.

The Dataflow SDKs use objects called Coders to describe how the elements of a given PCollection should be encoded and decoded. A similar benchmark run was performed again this time with configuration changes to the hotspot metering extension to eliminate all hotspots not exceeding 100K points on a scorecard maintained by the hotspot metering extension. Typically this means that a hotspot must have more than 50K calls exceeding 250 nanoseconds, both in terms of inclusive and exclusive (inherent/self) metering. In doing so the Satoris agent eliminated some previous top hotspots (we will come back to this later) but improved the reporting accuracy of the remaining inclusive clock metering measurements.

google.dataflow.1abc.txt.250.250.10k.100k.#10

With the hotspot metering extension disabled in the final iteration, one would expect the timings to marginally reduce but this is not altogether the case here indicating some variation across benchmark iterations possibly caused by different garbage collection cycle times. Object allocation might well be playing a big part in the code execution costs in the counting of words. There’s the obvious splitting of a line into words (tokens) but could there be more going on under hood within the data pipeline?

google.dataflow.1abc.txt.250.250.10k.100k.#final

Some earlier hotspots, in particular, those high on the sorted list, were lost in the last round of changes to the hotspot scorecard caused possibly by some code paths becoming cheaper faster in the early stages of the benchmark iterations. Reducing both inclusive and exclusive hotspot thresholds to 200 nanoseconds, down from 250 nanoseconds, reverts this to some degree though at this extreme low threshold level the measurement (overhead) cost is going to more than dominate the performance model save for those hotspot probes where the total and inherent total are the same – leaf call tree nodes (in terms of instrumentation).

google.dataflow.1abc.txt.200.200.10k.100k.#10

Here is the final performance model with the hotspot extension disabled. The call counts remain unchanged as expected but unlike the last time there does appear to be an expected marginal reduction across the board as the metering engine has less work to doing with all online runtime adaptations effectively deactivated. As with all previous performance snapshots, the execution hotspots pertain largely to (data) value encoding, decoding and copying not the actual work of splitting lines and counting words.

google.dataflow.1abc.txt.200.200.10k.100k.#final

Appendix A – Hotspot Code Listings

com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray #allocation #coping #threadlocal

google.cloud.dataflow.code.coderutils.encodetobytearray

com.google.cloud.dataflow.sdk.util.CoderUtils.decodeFromByteArray #allocation

google.cloud.dataflow.code.coderutils.decodefrombytearray

com.google.cloud.dataflow.sdk.values.PValueBase.toString #allocation #concatenation

google.cloud.dataflow.code.pvaluebase.tostring

com.google.cloud.dataflow.sdk.coders.StringUtf8Coder.encode #allocation #copying

google.cloud.dataflow.code.stringutf8coder.encode

com.google.cloud.dataflow.sdk.util.StringUtils.approximateSimpleName #allocation #tokenization #replacement #looping #reflection

google.cloud.dataflow.code.stringutils.approximatesimplename

com.google.cloud.dataflow.sdk.util.WindowedValue$FullWindowedValueCoder.encode #allocation

google.cloud.dataflow.code.stringutils.windowedvalue.encode.decode

com.google.cloud.dataflow.sdk.util.TriggerRunner.processValue #copying #allocation

google.cloud.dataflow.code.triggerrunner.processvalue

com.google.cloud.dataflow.sdk.coders.KvCoder.decode #allocation

google.cloud.dataflow.code.kvcoder.decode

com.google.cloud.dataflow.sdk.util.ReduceFnRunner.processElement #allocation #looping

google.cloud.dataflow.code.reducefnrunner.processelement

com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureElementEncodable #allocation #concatenation

google.cloud.dataflow.code.directpipelinerunner$evaluator.ensureelementencodable