Home
Tags Projects About License

Things to consider while running Google Cloud Dataproc

Things to consider while running Google Cloud Dataproc

As I mentioned earlier, migration from on-premises clusters to the cloud remains one of the most in-demand trends in Big Data. It is no coincidence that almost every SaaS/PaaS provider offers a fully ready or flexibly customizable and scalable cloud product based on Apache Hadoop and Apache Spark, as well as other components for Big Data storage and analysis. And pricing of those products is usually based on a pay-as-you-go model, so you are only invoiced when the code is actually executed.

One such service is Google Cloud Dataproc.

It's not a marketing post in any way, just sharing my experience.

GCP Dataproc

Cloud Dataproc is a managed cluster service running on the Google Cloud Platform(GCP). It provides automatic configuration, scaling, and cluster monitoring. In addition, it provides frequently updated, fully managed versions of popular tools such as Apache Spark, Apache Hadoop, and others. Cloud Dataproc of course has built-in integration with other GCP services such as Cloud Storage, BigQuery, Bigtable, Stackdriver Logging, and Stackdriver Monitoring. If you have used Amazon's AWS, this is the equivalent of their EMR (Elastic MapReduce) service.

The key advantage of Cloud Dataproc is that you can migrate code that you created using Apache Spark for on-premises deployment or for any other cloud provider to GCP without any modification.

Unlike the well-known Hadoop, which is architecturally unified storage based on HDFS and various compute engines on top of the cluster nodes, Cloud Dataproc decouples storage and compute. Let's say an external application sends logs that you want to analyze, you store them in the Cloud Storage (GCS). From the GCS, the data is used by Cloud Dataproc for processing, which then saves it back to the GCS, BigQuery, or Bigtable.

While there are a lot of advantages of using Cloud Dataproc, there are a lot of pitfalls that inexperienced engineers can run into. And in this post, we'll take a look at some things you should take into account when you're building Cloud Dataproc-based pipelines or architecture using it.

Cluster duration

Before you start creating Cloud Dataproc clusters in the GCP, you need to decide how long these clusters going to live. To be a more distinguished gentleman you can call it a cluster duration. Cluster duration is the period of time, measured in minutes, between cluster creation and deletion. Yes, very distinguished, uhmm I see.

Various factors must be taken into account when choosing the cluster duration.

Persistent cluster

You can create a persistent cluster in the cloud that runs 24/7 but this is usually not a good idea. Consider using persistent clusters for the following situations:

  • Frequent runs of processing jobs. To be precise if a number of jobs per day times time to set up a cluster including data load time and data processing time more than 24 hours, then it is advantageous to keep the cluster up and running after the previous job is complete.
  • Processing jobs have dependent input and output. Although it is possible to share data between two independent clusters(via GCS for example), it may be advantageous to keep the results of the previous job on HDFS for processing by the downstream jobs.

One more thing, when using a persistent cluster it makes sense to use the GCS as a destination for logging the output of jobs and to store the Spark event logs. This way we avoid the risk that our disk space on the nodes runs out because of huge log files.

Transient cluster

A very common principle for creating clusters is using one cluster per job, also known as transient clusters. When you create a cluster for one particular job and after the job is finished you simply delete the cluster. It gives you a lot of flexibility to customize such clusters for specifics of the jobs you are running.

One tip for Spark users, in this case, is to make spark.dynamicAllocation.enabled=false because you have no one to share the cluster with and so we can use it completely.

Hint: to avoid paying for inactive clusters, you can use scheduled Cloud Dataproc deletion.

Use preemptive workers

If your computational tasks are flexible enough to execute on a large number of short-lived instances, you can also save up to 70% by using preemptive instances. The duration of a single instance is limited to 24 hours, can be even shorter, and can be killed at any time, and availability depends on supply and demand.

It is important to note that in the case of preemptions, the cluster size is reduced by only a few seconds. From YARN's point of view, preemption is a temporary outage of one or more nodes. In the case of Apache Spark, we can use preemptive virtual machines thanks to Spark's fault tolerance mechanisms. If a node dies, all of the executor tasks allocated to that particular node will be marked as "failed". The tasks will be scheduled in another executor and the entire job process will continue.

Do not think that preemptions do not affect performance at all. On the contrary, it greatly affects performance. If preemptible virtual machines get preempted while a job is running, the job progress will be set back. Not only will in-progress tasks fail, but shuffle data from finished tasks will be lost. In the case of a computationally intensive task, recomputing the RDD is expensive in terms of time and money. In addition, if a large preemption wave occurs in the cluster, killing many machines at once, rescheduling many tasks may not work — in this case, the task may be aborted due to a stage failure.

Since tasks are likely to fail when using preemptible virtual machines, you likely need to increase the number of attempts with configurations below.

YARN:

  • yarn.resourcemanager.am.max-attempts (Default: 2)

Map Reduce:

  • mapreduce.map.maxattempts (Default: 4)
  • mapreduce.reduce.maxattempts (Default: 4)

Spark:

  • spark.task.maxFailures (Default: 4)
  • spark.stage.maxConsecutiveAttempts (Default: 4)

Note that preemptible virtual machines are not used for HDFS storage. If HDFS blocks were kept on them, it's pretty likely that your data will be unavailable. That being said, if you use GCS for storage, you don't need to worry about the on-cluster HDFS.

Size of a cluster

Understanding the workload is key to determining cluster size and the size of individual virtual machines. Running prototypes and benchmarking against real data and real jobs is absolutely essential to deciding on cluster sizes. Fortunately, the ephemeral nature of the cloud makes it easy to size clusters for a specific task, rather than trying to anticipate and purchase hardware in advance, making it easy to resize a cluster as needed but it depends on the cluster duration that you’ve chosen.

There are many different options for configuring the cluster size, the reason for this is that it is based on the size of the virtual machine used. There are many different options depending on whether you prefer more memory, more processors, or overall balance. Choosing those options depends on the specific pipelines that will be running on it.

If I choose a worker node that gives me about 4 GB per executor of available cache space, then with 40 GB of data I would need ten such containers. You can put a larger partition of data on a single executor if you do things like heavy grouping and sorting within partitions and the like. CPU-intensive workloads obviously require more CPU power. So you can do a little math on how many workers you need, based on how big you think one worker might be. This tool can help for Spark applications. Advanced engineers can calculate by themselves what cluster size they need based on the size of the data being processed.

Concurrency also affects the number of processing cores on each of these workloads.

But one of the biggest issues is cost. If I'm trying to get more CPU at the expense of bigger worker nodes, it's more expensive than having the same number of processors on a lot of smaller worker nodes.

The dumbest but most intuitive way(and the one I use all the time) is to just do a binary search until you are satisfied with the processing time. For example, you could start with a cluster of 500 8-core nodes and, if the learning time is too long, increase the cluster size to 600-750 nodes and see if the processing time decreases as you expect — you can repeat this until you are satisfied with the processing time or until it no longer improves.

Although we know that clusters can easily grow or shrink, it is still useful to have some calculations on a napkin as we deploy the clusters especially if those clusters will be transient clusters.

Autoscaling

Autoscaling

After submitting jobs to the cluster, one of the features you can take advantage of with Cloud Dataproc is autoscaling the number of nodes in the cluster to meet the requirements of your application.

With autoscaling, there is no need to manually monitor when the cluster is overloaded or underloaded. An autoscalable cluster is a great way to have a small but affordable long-running cluster that can quickly become a large-scale data processing machine as soon as the workload demands it. In Dataproc, autoscaling is based on the difference between the amount of pending and available memory. If more memory is needed, it scales up, if there is an unused memory, it can scale down.

There are several parameters to pay attention to here.

  • The scaleUpFactor is the most important parameter when we think about performance. A scaleUpFactor or scaleDownFactor of 1.0 means autoscaling will scale so that pending/available memory is 0 (perfect utilization). Depending on the job profile, it is useless to meet all the estimated resource requirements at once, so a high value of this parameter is not synonymous with better performance.
  • Scaling down is easy in logic, but very difficult in practice. Removing nodes during a job execution has a significant negative impact on performance. The key to minimizing this kind of problem is to use Graceful Decommissioning via gracefulDecommissionTimeout parameter.
  • The cooldownPeriod should not be neglected because it affects how much overhead autoscaling logic contributes to cluster lifetime. For example, a very short cooldownPeriod for a load with high variability over a short period results in poor performance due to constant resizing operations that make the cluster unstable.

Note that autoscaling does not support Spark structured streaming, which is a streaming service built on top of Spark SQL. It does not imply scaling to zero. Therefore, it is not suitable for underutilized or idle clusters. If you rely heavily on autoscaling for your application, it may be time to consider moving your workload to Dataflow, which autoscales more efficiently and do that by default.

Limitations

There are several significant limitations related to the sizing of Dataproc clusters.

  • You cannot change the machine type of an existing cluster — Dataproc does not support this. Therefore, you need to consider the size of the individual machines in advance.
  • Dataproc does not limit the number of nodes in a cluster, but other software may have limitations. I personally have not encountered these problems, but heard from colleagues. For example, there are known YARN clusters with 10k nodes, so exceeding that number may not work for Apache Spark on YARN, which Dataproc runs on.
  • You also need to consider GCE limitations, such as various quotas (CPU, RAM, Disk, external IP, etc.) and QPS limitations and make sure you have enough of them for a large cluster.
  • When choosing an option, it is also important to consider the storage implications of your choice. There is a limit to the total amount of permanent disks you can add to each virtual machine. Most types of instances have a limit of 64 TB, whether this is good or bad depends on your application.

Access

When moving Hadoop workloads to the cloud, teams try to use only those security controls that are supported on their clusters. This can be problematic when clusters become ephemeral. To properly secure clusters, they need to control the permissions of other cloud services (such as Cloud Storage) and control who has the permission to create clusters.

Cloud authentication and authorization vary greatly from company to company, and a full discussion is beyond the scope of this post. However, the main point of this advice is that you should use the controls available in GCP Identity and Access Management(IAM) as often as possible.

In addition to Cloud IAM, Cloud Dataproc has a Kerberos Optional Component, which is often used to extend Cloud IAM to the cluster itself or to use existing Active Directory-based identity sources. Using OS Login in Compute Engine can also simplify managing SSH access to Cloud Dataproc clusters. When OS Login is enabled, IAM permissions are automatically mapped to the Linux ID, and you no longer need to create SSH keys.

Cloud Dataproc divides permissions into two categories: clusters and jobs. Cluster permissions are for administrators who create clusters, and jobs are for developers who provide code for the cluster. Granular IAM can also be used to restrict which cluster(s) users can perform which actions.

Configuration

GCP history

Errors happen, and in the Big Data world errors are something you can't avoid and it's always worth planning what happens if you encounter them. Therefore, as I mentioned before, it is better to keep cluster logs separate from the cluster to be able to analyze what went wrong and to reproduce the sequence of events. In Cloud Dataproc, you can achieve this by specifying MapReduce and Spark job history servers on Cloud Storage. Override the MapReduce done directory and the intermediate done directory, as well as the Spark event log directory, to the Cloud Storage directories as shown below:

mapreduce.jobhistory.done-dir=gs://history-bucket/done-dir
mapreduce.jobhistory.intermediate-done-dir=gs://history-bucket/intermediate-done-dir
spark.eventLog.dir=gs://history-bucket/spark-history
spark.history.fs.logDirectory=gs://history-bucket/spark-history

When creating a Cloud Dataproc cluster, there are several ways to have a cluster as a code. This allows you to save the cluster even when it is not running.

The first way to store a cluster as code is to specify initialization actions, which are executable files or scripts that Cloud Dataproc will run on all nodes in the cluster as soon as they are created. Initialization actions often install job dependencies, such as installing Python packages, so that when you need to tear down a long-running cluster or recreate the cluster to upgrade to the latest version of Cloud Dataproc, you can automatically recreate the environment with initialization actions. Take a look at examples of provisioning actions in the Github repository.

Alternatively, you can start by creating a Cloud Dataproc custom image that contains everything that is installed on each cluster node. You can use this image for different shapes and sizes of clusters without having to write scripts for installation and configuration.

Alternatively, to collect information in cluster configuration files, you can export the running cluster configuration to a YAML file. The same YAML can then be used as input for the import command, which can create a new cluster with the same configuration.

Locality

Data location may have a significant impact on the overall cluster performance. It makes sense to deploy clusters near the systems that provide or consume the necessary data. This affects not only latency but also cost. The cost of traffic increases if the traffic leaves the boundaries of zones, regions, or the Google Cloud ecosystem as a whole. You need to make sure that your data region and cluster zone are physically close.

When using Cloud Dataproc, you can choose not to specify a zone and use the Cloud Dataproc Auto zone feature to select a zone in the region of your choice. While this handy feature can optimize cluster location, it cannot predict the location of the data the cluster will access.

Make sure that you do not have network rules that route Cloud Storage traffic through a number of VPN gateways before it eventually reaches your cluster. There are large network channels between Cloud Storage and Compute Engine. So you don't want to throttle bandwidth by sending traffic to the bottleneck.

Materials



Buy me a coffee

More? Well, there you go:

Spark Tips. Partition Tuning

Spark. Anatomy of Spark application

Spark Partitions