- Article
- 7 minutes to read
Note
This article relies on an open source library hosted on GitHub at: https://github.com/mspnp/spark-monitoring. The library supports Azure Databricks 10.x (Spark 3.2.x) and earlier. Azure Databricks 11.0 includes breaking changes to the logging systems that the spark-monitoring library integrates with. The work required to update the spark-monitoring library to support Azure Databricks 11.0 (Spark 3.3.0) and newer is not currently planned.
This article describes how to use monitoring dashboards to find performance bottlenecks in Spark jobs on Azure Databricks.
Azure Databricks is an Apache Spark–based analytics service that makes it easy to rapidly develop and deploy big data analytics. Monitoring and troubleshooting performance issues is a critical when operating production Azure Databricks workloads. To identify common performance issues, it's helpful to use monitoring visualizations based on telemetry data.
Prerequisites
To set up the Grafana dashboards shown in this article:
Configure your Databricks cluster to send telemetry to a Log Analytics workspace, using the Azure Databricks Monitoring Library. For details, see the GitHub readme.
Deploy Grafana in a virtual machine. See Use dashboards to visualize Azure Databricks metrics.
(Video) Azure Architecture Center Step by Step - Basic Web Application
The Grafana dashboard that is deployed includes a set of time-series visualizations. Each graph is time-series plot of metrics related to an Apache Spark job, the stages of the job, and tasks that make up each stage.
Azure Databricks performance overview
Azure Databricks is based on Apache Spark, a general-purpose distributed computing system. Application code, known as a job, executes on an Apache Spark cluster, coordinated by the cluster manager. In general, a job is the highest-level unit of computation. A job represents the complete operation performed by the Spark application. A typical operation includes reading data from a source, applying data transformations, and writing the results to storage or another destination.
Jobs are broken down into stages. The job advances through the stages sequentially, which means that later stages must wait for earlier stages to complete. Stages contain groups of identical tasks that can be executed in parallel on multiple nodes of the Spark cluster. Tasks are the most granular unit of execution taking place on a subset of the data.
The next sections describe some dashboard visualizations that are useful for performance troubleshooting.
Job and stage latency
Job latency is the duration of a job execution from when it starts until it completes. It is shown as percentiles of a job execution per cluster and application ID, to allow the visualization of outliers. The following graph shows a job history where the 90th percentile reached 50 seconds, even though the 50th percentile was consistently around 10 seconds.
Investigate job execution by cluster and application, looking for spikes in latency. Once clusters and applications with high latency are identified, move on to investigate stage latency.
Stage latency is also shown as percentiles to allow the visualization of outliers. Stage latency is broken out by cluster, application, and stage name. Identify spikes in task latency in the graph to determine which tasks are holding back completion of the stage.
The cluster throughput graph shows the number of jobs, stages, and tasks completed per minute. This helps you to understand the workload in terms of the relative number of stages and tasks per job. Here you can see that the number of jobs per minute ranges between 2 and 6, while the number of stages is about 12 – 24 per minute.
Sum of task execution latency
This visualization shows the sum of task execution latency per host running on a cluster. Use this graph to detect tasks that run slowly due to the host slowing down on a cluster, or a misallocation of tasks per executor. In the following graph, most of the hosts have a sum of about 30 seconds. However, two of the hosts have sums that hover around 10 minutes. Either the hosts are running slow or the number of tasks per executor is misallocated.
The number of tasks per executor shows that two executors are assigned a disproportionate number of tasks, causing a bottleneck.
Task metrics per stage
The task metrics visualization gives the cost breakdown for a task execution. You can use it see the relative time spent on tasks such as serialization and deserialization. This data might show opportunities to optimize — for example, by using broadcast variables to avoid shipping data. The task metrics also show the shuffle data size for a task, and the shuffle read and write times. If these values are high, it means that a lot of data is moving across the network.
Another task metric is the scheduler delay, which measures how long it takes to schedule a task. Ideally, this value should be low compared to the executor compute time, which is the time spent actually executing the task.
The following graph shows a scheduler delay time (3.7 s) that exceeds the executor compute time (1.1 s). That means more time is spent waiting for tasks to be scheduled than doing the actual work.
In this case, the problem was caused by having too many partitions, which caused a lot of overhead. Reducing the number of partitions lowered the scheduler delay time. The next graph shows that most of the time is spent executing the task.
Streaming throughput and latency
Streaming throughput is directly related to structured streaming. There are two important metrics associated with streaming throughput: Input rows per second and processed rows per second. If input rows per second outpaces processed rows per second, it means the stream processing system is falling behind. Also, if the input data comes from Event Hubs or Kafka, then input rows per second should keep up with the data ingestion rate at the front end.
Two jobs can have similar cluster throughput but very different streaming metrics. The following screenshot shows two different workloads. They are similar in terms of cluster throughput (jobs, stages, and tasks per minute). But the second run processes 12,000 rows/sec versus 4,000 rows/sec.
Streaming throughput is often a better business metric than cluster throughput, because it measures the number of data records that are processed.
Resource consumption per executor
These metrics help to understand the work that each executor performs.
Percentage metrics measure how much time an executor spends on various things, expressed as a ratio of time spent versus the overall executor compute time. The metrics are:
- % Serialize time
- % Deserialize time
- % CPU executor time
- % JVM time
These visualizations show how much each of these metrics contributes to overall executor processing.
Shuffle metrics are metrics related to data shuffling across the executors.
- Shuffle I/O
- Shuffle memory
- File system usage
- Disk usage
Common performance bottlenecks
Two common performance bottlenecks in Spark are task stragglers and a non-optimal shuffle partition count.
Task stragglers
The stages in a job are executed sequentially, with earlier stages blocking later stages. If one task executes a shuffle partition more slowly than other tasks, all tasks in the cluster must wait for the slow task to catch up before the stage can end. This can happen for the following reasons:
A host or group of hosts are running slow. Symptoms: High task, stage, or job latency and low cluster throughput. The summation of tasks latencies per host won't be evenly distributed. However, resource consumption will be evenly distributed across executors.
Tasks have an expensive aggregation to execute (data skewing). Symptoms: High task latency, high stage latency, high job latency, or low cluster throughput, but the summation of latencies per host is evenly distributed. Resource consumption will be evenly distributed across executors.
If partitions are of unequal size, a larger partition may cause unbalanced task execution (partition skewing). Symptoms: Executor resource consumption is high compared to other executors running on the cluster. All tasks running on that executor will run slow and hold the stage execution in the pipeline. Those stages are said to be stage barriers.
Non-optimal shuffle partition count
During a structured streaming query, the assignment of a task to an executor is a resource-intensive operation for the cluster. If the shuffle data isn't the optimal size, the amount of delay for a task will negatively impact throughput and latency. If there are too few partitions, the cores in the cluster will be underutilized which can result in processing inefficiency. Conversely, if there are too many partitions, there's a great deal of management overhead for a small number of tasks.
Use the resource consumption metrics to troubleshoot partition skewing and misallocation of executors on the cluster. If a partition is skewed, executor resources will be elevated in comparison to other executors running on the cluster.
For example, the following graph shows that the memory used by shuffling on the first two executors is 90X bigger than the other executors:
Next steps
- Monitoring Azure Databricks in an Azure Log Analytics Workspace
- Learning path: Build and operate machine learning solutions with Azure Databricks
- Azure Databricks documentation
- Azure Monitor overview
- Monitoring Azure Databricks
- Send Azure Databricks application logs to Azure Monitor
- Use dashboards to visualize Azure Databricks metrics
- Modern analytics architecture with Azure Databricks
- Ingestion, ETL, and stream processing pipelines with Azure Databricks
FAQs
How can I improve my Azure Databricks performance? ›
Databricks recommendations for enhanced performance
The cost-based optimizer accelerates query performance by leveraging table statistics. You can auto optimize Delta tables using optimized writes and automatic file compaction; this is especially useful for long-running Structured Streaming jobs.
Cause. Provisioning an Azure VM typically takes 2-4 minutes, but if all the VMs in a cluster cannot be provisioned at the same time, cluster creation can be delayed. This is due to Azure Databricks having to reissue VM creation requests over a period of time.
Is Azure Databricks deprecated? ›Databricks Light 2.4 Extended Support will be supported through April 30, 2023. It uses Ubuntu 18.04. 5 LTS instead of the deprecated Ubuntu 16.04.
How can you improve the performance of a data base? ›- 1: Check your database server.
- 2: Improve indexing strategies.
- 3: Identify access to database.
- 4: Evaluate connection capacity.
- 5: Optimize Queries.
- 6: Database Performance Resources.
- 1 — Join by broadcast. ...
- 2 — Replace Joins & Aggregations with Windows. ...
- 3 — Minimize Shuffles. ...
- 4 — Cache Properly. ...
- 5 — Break the Lineage — Checkpointing. ...
- 6 — Avoid using UDFs. ...
- 7 — Tackle with Skew Data — salting & repartition. ...
- 8 — Utilize Proper File Formats — Parquet.
...
If you have a large amount of data and only want to optimize a subset of it, you can specify an optional partition predicate using WHERE :
- SQL.
- Python.
- Scala.
After an individual write, Databricks checks if files can further be compacted, and runs an OPTIMIZE job (with 128 MB file sizes instead of the 1 GB file size used in the standard OPTIMIZE ) to further compact files for partitions that have the most number of small files.
What is lazy evaluation in Databricks? ›In Spark, Lazy Evaluation means that You can apply as many TRANSFORMATIONs as you want, but Spark will not start the execution of the process until an ACTION is called. 💡 So transformations are lazy but actions are eager.
How do I keep Databricks cluster running? ›If you want to keep the cluster active all the time, either you can disable "Automatic termination"(if allowed) or create a notebook with simple print or "%sql select 1" commands and schedule it to run at regular intervals(avoid scheduling forever) to keep the cluster active all the time.
Is Databricks faster than Spark? ›In conclusion, Databricks runs faster than AWS Spark in all the performance test. For data reading, aggregation and joining, Databricks is on average 30% faster than AWS and we observed significant runtime difference (Databricks being ~50% faster) in training machine learning models between the two platforms.
Why is my cluster not working in Databricks? ›
If the Azure Databricks cluster manager cannot confirm that the driver is ready within 5 minutes, then cluster launch fails. This can occur because JAR downloading is taking too much time.
Who is Databricks competitor? ›See how Databricks compares to similar products. Databricks's top competitors include Onehouse, iguazio, and DataCanvas.
How is Azure Databricks different from Databricks? ›The simple answer is when we move Databricks to a Microsoft cloud instance it is called Azure Databricks. Azure Databricks is a jointly developed cloud data service from Microsoft and Databricks for data analytics, data engineering, data science and Machine Learning.
Can Databricks replace snowflake? ›For those wanting a top-class data warehouse, Snowflake may be sufficient. For those needing more robust ETL data science, and machine learning features, Databricks is the winner.
How do you troubleshoot database performance issues? ›- Ensure your TempDB database is configured optimally. ...
- Make sure you're running index maintenance frequently. ...
- Implement indexes that provide a benefit to your queries. ...
- Check your most expensive queries and stored procedures. ...
- Monitor your performance counters.
Data defragmentation is one of the best approaches to increasing database performance. Over time, with so much data constantly being written to and deleted from your database, your data can become fragmented.
What is the reason for DB slowness? ›Network issues, excessive network traffic, network bottlenecks, OR database traffic.
Which partitions hinder Spark performance? ›The number of partitions used in Spark is configurable and having too few (causing less concurrency, data skewing and improper resource utilization) or too many (causing task scheduling to take more time than actual execution time) partitions is not good.
How to do performance tuning in Spark? ›- Use DataFrame/Dataset over RDD.
- Use coalesce() over repartition()
- Use mapPartitions() over map()
- Use Serialized data format's.
- Avoid UDF's (User Defined Functions)
- Caching data in memory.
- Reduce expensive Shuffle operations.
- Disable DEBUG & INFO Logging.
The only difference between cache() and persist() is ,using Cache technique we can save intermediate results in memory only when needed while in Persist() we can save the intermediate results in 5 storage levels(MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY).
What is the best partition size in Databricks? ›
The ideal size of each partition is around 100-200 MB. The smaller size of partitions will increase the parallel running jobs, which can improve performance, but too small of a partition will cause overhead and increasing the GC time.
What is the difference between Bloom filter and ZOrder in Databricks? ›Bloom filter is like a looking for a needle in the haystack (with FPP), so it's more useful for strings. Z-Order is best with a couple of columns that are used for filters/joins. They can run independently of each other or work together. In the example, bloom filter is also used for filters.
How do you optimize a Delta table? ›- Change data feed.
- Table protocol versioning.
- Delta Lake APIs.
- Storage configuration.
- Concurrency control.
- Access Delta tables from external data processing engines.
- Optimizations. Optimize performance with file management. Compaction (bin-packing) Data skipping. ...
- Delta table properties reference.
Optimized Writes lowers the number of files output per write, whereas Auto Compaction will perform a more selective version of the OPTIMIZE SQL command after each write, and will bin pack partitions that have a large number of small files per partition (50 small files at time of writing).
What is the difference between optimize and auto optimize in Databricks? ›Auto compaction generates smaller files (128 MB) than OPTIMIZE (1 GB). Auto compaction greedily chooses a limited set of partitions that would best leverage compaction. The number of partitions selected will vary depending on the size of cluster it is launched on.
How do you increase azure Databricks cluster VCPU cores limits? ›- Stop inactive clusters to free up CPU cores for use.
- Open an Azure support case with a request to increase the CPU core quota limit for your subscription.
The cost/benefit of using lazy evaluation decreases as the item being accessed becomes less likely to be accessed. Always using lazy evaluation also implies early optimization. This is a bad practice which often results in code which is much more complex and expensive that might otherwise be the case.
What is data skipping in Databricks? ›Data skipping information is collected automatically when you write data into a Delta table. Delta Lake on Azure Databricks takes advantage of this information (minimum and maximum values) at query time to provide faster queries. You do not need to configure data skipping; the feature is activated whenever applicable.
Which options use lazy evaluation to improve performance? ›Haskell is a good example of such a functional programming language whose fundamentals are based on Lazy Evaluation. Lazy evaluation is used in Unix map functions to improve their performance by loading only required pages from the disk.
How long is a Databricks cluster inactivity? ›If you enable the compliance security profile for your account or your workspace, long-running clusters are automatically restarted after 25 days. Databricks recommends that admins restart clusters manually during a scheduled maintenance window. This reduces the risk of an auto-restart disrupting a scheduled job.
What is the difference between job cluster and interactive cluster in Databricks? ›
Interactive clusters are used to analyze data collaboratively with interactive notebooks. Job clusters are used to run fast and robust automated workflows using the UI or API. So, while in development phase, you will mostly use interactive cluster.
What is the difference between job cluster and all purpose cluster in Databricks? ›Azure Databricks makes a distinction between all-purpose clusters and job clusters. You use all-purpose clusters to analyze data collaboratively using interactive notebooks. You use job clusters to run fast and robust automated jobs. You can create an all-purpose cluster using the UI, CLI, or REST API.
Does Databricks cache data? ›Azure Databricks uses disk caching to accelerate data reads by creating copies of remote Parquet data files in nodes' local storage using a fast intermediate data format. The data is cached automatically whenever a file has to be fetched from a remote location.
Why use Databricks instead of Spark? ›Databricks provides notebooks usable with your cluster. It is possible to configure standalone notebook instances to run code via a standalone Spark instance but Databricks handles the necessary configuration, making the task much easier.
What is Databricks competitive advantage? ›Infinitely scalable and cost-effective.
This means Databricks can handle all types of data (structured, semi-structured and unstructured). It can also handle everything from AI to BI. In simple terms, Databricks can be your data lake as well as your data warehouse.
- Fix the query plan.
- Move to an SKU that has more memory.
- Increase storage size to get more IOPS.
Use the notion of a multiplier, where you can scale out your builds over multiple build agents. For more info see, Organizing Azure Pipeline into Jobs. Consider moving integration, UI, and smoke tests to a release pipeline. This improves the build speed, and hence the speed of the build feedback loop.
How do you increase Azure Databricks cluster VCPU cores limits? ›- Stop inactive clusters to free up CPU cores for use.
- Open an Azure support case with a request to increase the CPU core quota limit for your subscription.
- CREATE INDEX: Creates new indices that can improve the performance.
- DROP INDEX: Drops redundant and unused indices (>90 days)
- FORCE LAST GOOD PLAN: Identifies queries using the last known good execution plan.