Real-time adjustments with Hadoop metrics Krisztian Horvath 15 October 2014

To properly understand and to be fully aware of the state of our Hadoop clusters at any time we needed a scalable and flexible solution to monitor our Hadoop nodes. After investigating the possible solutions we realized that there is no available solution which satisfies all our needs thus we’ve created one and recently just open sourced it, called Baywatch. Baywatch is capable to capture and visualize real-time changes on Hadoop clusters to understand and make adjustments based on the submitted jobs resource allocation needs. To plan ahead, viewing and comparing old and new metrics is just as important as analyzing real-time ones, not to mention that we can find possible weaknesses and defects in our clusters.

To be able to do all of the above mentioned, Baywatch processes the metrics information produced by the Hadoop daemons. This might already sound familiar as we have another project called Periscope where you can create alarms and cluster scaling activities making use of the same metrics, but just consuming it in a different way. Combine these 2 components and you’ll have a powerful tool and you’ll be able to view your cluster’s state and based on that make smart decisions to scale up or down, or simply just set alarms. If you’re thrilled to see it in action we are at Strata and happy to show you a quick demo.

Hadoop metrics

So what are these metrics? As I mentioned it earlier metrics are collections of information about Hadoop daemons, e.g: the ResourceManager produces information about the queue statuses which we use in Periscope when we re-prioritise applications. To distinguish these metrics they are grouped into named contexts, e.g jvm for java virtual machine metrics, rpc for debugging rcp calls, but there are many more:

  • yarn
  • rpcdetailed
  • metricssystem
  • mapred
  • dfs
  • ugi

This Metrics2 framework is designed to collect and dispatch per-process metrics to monitor the overall status of the Hadoop system. In Hadoop related technologies it is a common design to use sources and sinks, just like in this case. Metrics sources are where the metrics are generated and metrics sinks consume the records generated by the metrics sources. A metrics system would poll the metrics sources periodically and pass the metrics records to metrics sinks.

It is really easy to implement new sinks and sources, just for reference here’s the FileSink:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
  @Override
  public void putMetrics(MetricsRecord record) {
    writer.print(record.timestamp());
    writer.print(" ");
    writer.print(record.context());
    writer.print(".");
    writer.print(record.name());
    String separator = ": ";
    for (MetricsTag tag : record.tags()) {
      writer.print(separator);
      separator = ", ";
      writer.print(tag.name());
      writer.print("=");
      writer.print(tag.value());
    }
    for (AbstractMetric metric : record.metrics()) {
      writer.print(separator);
      separator = ", ";
      writer.print(metric.name());
      writer.print("=");
      writer.print(metric.value());
    }
    writer.println();
  }

and the FairSchedulerQueueMetrics:

1
2
3
4
5
6
7
8
  @Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;
  @Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores;
  @Metric("Steady fair share of memory in MB") MutableGaugeInt steadyFairShareMB;
  @Metric("Steady fair share of CPU in vcores") MutableGaugeInt steadyFairShareVCores;
  @Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB;
  @Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores;
  @Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB;
  @Metric("Maximum share of CPU in vcores") MutableGaugeInt maxShareVCores;

Hadoop comes by default with 3 sinks:

  • FileSink
  • GraphiteSink
  • GangliaSink30

Configuration

The Metrics2 framework uses the PropertiesConfiguration thus the metrics sinks needs to be defined in a configuration-file: hadoop-metrics2.properties. The declaration should be familiar for those who used Apache Flume before. Here is an example taken from our Ambari docker image:

1
2
3
4
5
6
7
8
9
*.sink.logstash.class=org.apache.hadoop.metrics2.sink.FileSink
namenode.sink.logstash.filename=/var/log/hadoop-metrics/namenode-metrics.out
secondarynamenode.sink.logstash.filename=/var/log/hadoop-metrics/secondarynamenode-metrics.out
datanode.sink.logstash.filename=/var/log/hadoop-metrics/datanode-metrics.out
resourcemanager.sink.logstash.filename=/var/log/hadoop-metrics/resourcemanager-metrics.out
nodemanager.sink.logstash.filename=/var/log/hadoop-metrics/nodemanager-metrics.out
maptask.sink.logstash.filename=/var/log/hadoop-metrics/maptask-metrics.out
reducetask.sink.logstash.filename=/var/log/hadoop-metrics/reducetask-metrics.out
mrappmaster.sink.logstash.filename=/var/log/hadoop-metrics/mrappmaster-metrics.out

Hadoop WebServices

There is another way to obtain these metrics without any configuration which the Periscope leverages. It’s the WebServices provided by Hadoop. Jax-RS is used to define the mappings, e.g collect the ResourceManager queue related metrics on mapping /ws/v1/cluster:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
  @GET
  @Path("/scheduler")
  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
  public SchedulerTypeInfo getSchedulerInfo() {
    init();
    ResourceScheduler rs = rm.getResourceScheduler();
    SchedulerInfo sinfo;
    if (rs instanceof CapacityScheduler) {
      CapacityScheduler cs = (CapacityScheduler) rs;
      CSQueue root = cs.getRootQueue();
      sinfo = new CapacitySchedulerInfo(root);
    } else if (rs instanceof FairScheduler) {
      FairScheduler fs = (FairScheduler) rs;
      sinfo = new FairSchedulerInfo(fs);
    } else if (rs instanceof FifoScheduler) {
      sinfo = new FifoSchedulerInfo(this.rm);
    } else {
      throw new NotFoundException("Unknown scheduler configured");
    }
    return new SchedulerTypeInfo(sinfo);
  }

The only difference is that you’re application have to poll now, while the other way you can create forwarders to create push events just like we did with Baywatch. To which to use depends on you’re needs.

Summary and resources

As you see using Baywatch and Periscope you can monitor and scale your cluster based on the configured policies – all available open sources in our GitHub page.

For updates follow us on LinkedIn, Twitter or Facebook.

Comments

Recent Posts