This is an initial investigation into the performance model (profile) of Google Cloud DataFlow which provides a simple, powerful programming model for building both batch and streaming parallel data processing pipelines. The SDK itself does not ship with an actual benchmark suite for performance testing purposes but it does distribute a number of examples including the infamous
WordCount listed partially below and used as the test case for the benchmarking reporting here.
The above functions are applied (added) to the data pipeline processing as follows: Read one or more input files then count the words extracted by a function after which write a formatted output of the results to a file. It is all relatively clear, concise and compact by Java standards. Clearly the most expensive operation would be the splitting up of a text line into words followed subsequently by the counting of such tokenization.
After downloading a relatively large text file (The Adventures of Sherlock Holmes) the following benchmark scripts was created to run the
WordCount example with the Satoris agent installed into the runtime. The script itself executes the same benchmark test case 10 times with the Satoris Java instrumentation agent each time adaptively refining the instrumentation set based on the previous performance snapshot (an exported profile). After the 10th iteration a final performance snapshot is captured this time without any online adaptive strategy employed followed by a timed non-instrumented benchmark test run used to sanity check the agent metering overhead in the last instrumented run.
After an initial performance run it was determined that it would be best to switch away from the default
clock.time meter, metering in microseconds, to the
clock.tick meter, metering in nanoseconds. It was also decided to only instrument the
com.google.cloud.* package and its sub packages. More importantly the
hotspot metering extension threshold for both inclusive and exclusive (self/inherent) metering were set to the same extremely low value of 250 nanoseconds from a default of 10 microseconds and 5 microseconds respectively. Knowing that an actual metered method call, both entering and exiting, will have a (combined) cost of 100 nanoseconds or more this does mean that there will be some perturbation of the performance measurement collected especially for hotspots with a relatively low inherent total percentage. With this in mind It is best to skip over those hotspot probes having an inherent (exclusive) total that is less than 50% of the overall (inclusive) total in any the listing presented below.
For the benchmark run the large text file was duplicated and then both files submitted to the
WordCount benchmarking. Below is the performance model following the 10th benchmark iteration. Surprisingly a large number of the hotspots identified relate to (data) value decoding, encoding and copying (or cloning) all along the data pipeline, hidden completely from the developer. As with many other data flow like technologies there is a heavy serialization cost to be paid as data is shuffled around and across stages and sinks.
One somewhat strange method is
StringUtils.approximateSimpleName which takes a
Class parameter and returns a simplified string version of the class name. Why would this be executed 7 million times in the counting of words in a line feed? This and other surprising hotspots seem all related to the concept and operation of
PCollections in the DataFlow SDK.
“When you create or output pipeline data, you’ll need to specify how the elements in your
PCollections are encoded and decoded to and from byte strings. Byte strings are used for intermediate storage as well reading from sources and writing to sinks. The Dataflow SDKs use objects called
Coders to describe how the elements of a given
PCollection should be encoded and decoded.”
A similar benchmark run was performed again this time with configuration changes to the
hotspot metering extension to eliminate all hotspots not exceeding 100K points on a balance scorecard maintained by the
hotspot metering extension. Typically this means that a hotspot must have more than 50K calls exceeding 250 nanoseconds, both in terms of inclusive and exclusive (inherent/self) metering. In doing so the Satoris agent eliminated some previous top hotspots (we will come back to this later) but improved the reporting accuracy of the remaining inclusive clock metering measurements.
hotspot metering extension disabled in the final iteration one would expect the timings to marginally reduce but this is not altogether the case here indicating some variation across benchmark iterations possibly caused by different garbage collection cycle times. Object allocation might well be playing a big part in the code execution costs in the counting of words. There’s the obvious splitting of a line into words (tokens) but could there be more going on under hood within the data pipeline?
Some earlier hotspots, in particular those high on the sorted list, were lost in the last round of changes to the hotspot scorecard caused possibly by some code paths becoming cheaper faster in the early stages of the benchmark iterations. Reducing both inclusive and exclusive
hotspot thresholds to 200 nanoseconds, down from 250 nanoseconds, reverts this to some degree though at this extreme low threshold level the measurement (overhead) cost is going to more than dominate the performance model save for those hotspot probes where the total and inherent total are the same – leaf call tree nodes (in terms of instrumentation).
Here is the final performance model with the
hotspot extension disabled. The call counts remain unchanged as expected but unlike the last time there does appear to be an expected marginal reduction across the board as the metering engine has less work to doing with all online runtime adaptations effectively deactivated. As with all previous performance snapshots the execution hotspots pertain largely to (data) value encoding, decoding and copying not the actual work of splitting lines and counting words.
APPENDIX – HOTSPOT CODE LISTINGS
com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray #allocation #coping #threadlocal
com.google.cloud.dataflow.sdk.values.PValueBase.toString #allocation #concatenation
com.google.cloud.dataflow.sdk.coders.StringUtf8Coder.encode #allocation #copying
com.google.cloud.dataflow.sdk.util.StringUtils.approximateSimpleName #allocation #tokenization #replacement #looping #reflection
com.google.cloud.dataflow.sdk.util.TriggerRunner.processValue #copying #allocation
com.google.cloud.dataflow.sdk.util.ReduceFnRunner.processElement #allocation #looping
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureElementEncodable #allocation #concatenation