Monday, September 1, 2014

ETL: transformations with Flume and Morphline

(this article was translated from http://www.formhadoop.es/etl-flume-morphlines/)

Flume is usually used in Hadoop in order to extract data from a data source (logs of servers, internet) and load it into Hadoop (normally: HDFS). However, it is rarely used to realize transformation tasks. Such things are usually done after loading the data into HDFS through some java MapReduce jobs, or through other Hadoop tools such as Hive, Pig or Shark.

It's a bit a pity, since Flume gives you the possibility to realize transformations through interceptors. If we combine them with Morphline, an Open Source library developed by Cloudera, we obtain a powerful combination that allows us to get interesting, easy to write (everything is defined throughout configuration files, no coding is necessary) and real time transformations.

In that series of 2 article, we are going to see how to use interceptors and morphline in order to identify XML files and transform them into a CSV format (easier to process).

The flume version that is used here is 1.5.0. This version is included in the Cloudera version CDH5.1.0.
For the sake of brevity, in this article I will only show some snippets of configuration files. To download the entire files, just pulse here.


Morphline: installation for Flume


Morphline is developed by Cloudera, just like Flume. Flume has both a sink (MorphlineSolrSink) and an interceptor (morphline interceptor). However lots of components are missing in order to be fully integrated into Morphline. For that reason, first of all we must install the Morphline libraries we need into the Flume's classpath.

To get those libraries, we download the Flume's source code, we unzip it, access to the pom for Morphline and we use the Maven command to download all the dependencies:
wget http://ftp.cixug.es/apache/flume/1.5.0/apache-flume-1.5.0-src.tar.gz
tar xvfz apache-flume-1.5.0-src.tar.gz apache-flume-1.5.0-src/flume-ng-sinks/flume-ng-morphline-solr-sink
cd apache-flume-1.5.0-src/flume-ng-sinks/flume-ng-morphline-solr-sink
mvn dependency:copy-dependencies
The downloaded jars can be found in the target/dependency/ folder. In our example, we only need 5 libraries (take note that if you use more features of Morphline, you may need to copy more files):
kite-morphlines-core-0.12.0.jar
metrics-healthchecks-3.0.1.jar
config-1.0.2.jar
kite-morphlines-saxon-0.12.0.jar
Saxon-HE-9.5.1-4.jar
We copy those jars in the Flume's folder: /usr/lib/flume-ng/lib/.

Overview of the Flume workflow


Our Flume workflow will analyze all the new files that appear in a folder (source: spooldir). Then, through a Morphline interceptor we will be able to detect if it is to an XML file or another file format:
  • if it isn't an XML file, the Morphline job will be pretty simple since it will only be used to manage the Flume multiplexing.
  • if it is an XML file, we will apply more transformations with Morphline, in order to transform the data into a CSV format.
Eventually, data will be loaded in a HDFS folder of our Hadoop cluster. This folder will depend on the type of the input file (XML or not). Now that we know the whole workflow and that we know that we have to do multiplexing, I can show you the very beginning of the Flume's configuration file flume.conf ( remember that you can find the whole file here):
mymorph.sources = mySpoolDir
mymorph.channels = MemChannelRegular MemChannelXml
mymorph.sinks = hdfsRegular hdfsXml


Definition of the Flume interceptors


The interceptors are a kind of plugins that can be added into the configuration of a source. When an event goes out of a source, if a list of interceptors is attached to that source, then the event will go through all the transformations indicated by those interceptors. Finally, the event will be put into the Flume channel.
In our example, we only need 2 interceptors:
mymorph.sources.mySpoolDir.interceptors = i1 i2
mymorph.sources.mySpoolDir.interceptors.i1.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
mymorph.sources.mySpoolDir.interceptors.i1.morphlineFile = /etc/flume-ng/conf/morphline.conf
mymorph.sources.mySpoolDir.interceptors.i1.morphlineId = morphline1
mymorph.sources.mySpoolDir.interceptors.i2.type = timestamp

The timestamp interceptor is the easier one to understand: it add a timestamp (in milliseconds) metadata to the event. Such metadata is needed since the HDFS paths of the sink we defined are dynamic: they depend on the moment the event was captured:
mymorph.sinks.hdfsRegular.hdfs.path = hdfs://formhadoop.local:8020/user/flume/spooldir/regular/%Y/%m/%d/%H/
mymorph.sinks.hdfsXml.hdfs.path = hdfs://formhadoop.local:8020/user/flume/spooldir/xml/%Y/%m/%d/%H/

The Morphline interceptor is more complex. Most of its configuration is defined in a configuration file (morphline.conf).
We will explain that file in the rest of this article.

A few words about Morphline



Morphline is a framework developed by Cloudera to make easier the ETL processes. Through a configuration file, it is possible to use several transformations already defined in the Morphline's API. If we can't find the transformation we need, we can add some java code inside the configuration file or develop a new Morphline plugin.
In our example, we won't need any java code. But before starting looking at that configuration file, I ought to tell you 3 important things:

  • it is very easy to do some typos in the configuration file. Morphline use the hocon format, and more than once I forgot one parenthesis or a key ... generating a java error when starting Flume. To avoid such mistakes, I do recommend using a text editor that recognize that hocon format. For instance, with vim you can use that project.
  • as usual, looking at the logs when doing tests is a good idea. Flume's logs can be found in /var/log/flume-ng/flume.log. And in the /etc/flume-ng/conf/log4j.properties file we will add the line “log4j.logger.org.kitesdk=TRACE” in our test phase.
  • Flume presents a restriction with Morphline: an input event in Morphline must not generate more than 1 output event. In other words, it means that at the end of the Morphline's workflow we mustn't have any field with an array of 2 elements or more. We'll see later how to handle such restriction.
In the next article, we will dive into the Morphline's configuration.

No comments:

Post a Comment