(this article was translated from http://www.formhadoop.es/etl-flume-morphlines/)
Flume is usually used in Hadoop in order to extract data from a data source (logs of servers, internet) and load it into Hadoop (normally: HDFS). However, it is rarely used to realize transformation tasks. Such things are usually done after loading the data into HDFS through some java MapReduce jobs, or through other Hadoop tools such as Hive, Pig or Shark.
It's a bit a pity, since Flume gives you the possibility to realize transformations through interceptors. If we combine them with Morphline, an Open Source library developed by Cloudera, we obtain a powerful combination that allows us to get interesting, easy to write (everything is defined throughout configuration files, no coding is necessary) and real time transformations.
In that series of 2 article, we are going to see how to use interceptors and morphline in order to identify XML files and transform them into a CSV format (easier to process).
The flume version that is used here is 1.5.0. This version is included in the Cloudera version CDH5.1.0.
For the sake of brevity, in this article I will only show some snippets of configuration files. To download the entire files, just pulse here.
Morphline is developed by Cloudera, just like Flume. Flume has both a sink (MorphlineSolrSink) and an interceptor (morphline interceptor). However lots of components are missing in order to be fully integrated into Morphline. For that reason, first of all we must install the Morphline libraries we need into the Flume's classpath.
To get those libraries, we download the Flume's source code, we unzip it, access to the pom for Morphline and we use the Maven command to download all the dependencies:
Our Flume workflow will analyze all the new files that appear in a folder (source: spooldir). Then, through a Morphline interceptor we will be able to detect if it is to an XML file or another file format:
The interceptors are a kind of plugins that can be added into the configuration of a source. When an event goes out of a source, if a list of interceptors is attached to that source, then the event will go through all the transformations indicated by those interceptors. Finally, the event will be put into the Flume channel.
In our example, we only need 2 interceptors:
The timestamp interceptor is the easier one to understand: it add a timestamp (in milliseconds) metadata to the event. Such metadata is needed since the HDFS paths of the sink we defined are dynamic: they depend on the moment the event was captured:
The Morphline interceptor is more complex. Most of its configuration is defined in a configuration file (morphline.conf).
We will explain that file in the rest of this article.
Morphline is a framework developed by Cloudera to make easier the ETL processes. Through a configuration file, it is possible to use several transformations already defined in the Morphline's API. If we can't find the transformation we need, we can add some java code inside the configuration file or develop a new Morphline plugin.
In our example, we won't need any java code. But before starting looking at that configuration file, I ought to tell you 3 important things:
Flume is usually used in Hadoop in order to extract data from a data source (logs of servers, internet) and load it into Hadoop (normally: HDFS). However, it is rarely used to realize transformation tasks. Such things are usually done after loading the data into HDFS through some java MapReduce jobs, or through other Hadoop tools such as Hive, Pig or Shark.
It's a bit a pity, since Flume gives you the possibility to realize transformations through interceptors. If we combine them with Morphline, an Open Source library developed by Cloudera, we obtain a powerful combination that allows us to get interesting, easy to write (everything is defined throughout configuration files, no coding is necessary) and real time transformations.
In that series of 2 article, we are going to see how to use interceptors and morphline in order to identify XML files and transform them into a CSV format (easier to process).
The flume version that is used here is 1.5.0. This version is included in the Cloudera version CDH5.1.0.
For the sake of brevity, in this article I will only show some snippets of configuration files. To download the entire files, just pulse here.
Morphline: installation for Flume
Morphline is developed by Cloudera, just like Flume. Flume has both a sink (MorphlineSolrSink) and an interceptor (morphline interceptor). However lots of components are missing in order to be fully integrated into Morphline. For that reason, first of all we must install the Morphline libraries we need into the Flume's classpath.
To get those libraries, we download the Flume's source code, we unzip it, access to the pom for Morphline and we use the Maven command to download all the dependencies:
wget http://ftp.cixug.es/apache/flume/1.5.0/apache-flume-1.5.0-src.tar.gz
tar xvfz apache-flume-1.5.0-src.tar.gz apache-flume-1.5.0-src/flume-ng-sinks/flume-ng-morphline-solr-sink
cd apache-flume-1.5.0-src/flume-ng-sinks/flume-ng-morphline-solr-sink
mvn dependency:copy-dependencies
The downloaded jars can be found in the target/dependency/ folder. In our example, we only need 5 libraries (take note that if you use more features of Morphline, you may need to copy more files):kite-morphlines-core-0.12.0.jarWe copy those jars in the Flume's folder: /usr/lib/flume-ng/lib/.
metrics-healthchecks-3.0.1.jar
config-1.0.2.jar
kite-morphlines-saxon-0.12.0.jar
Saxon-HE-9.5.1-4.jar
Overview of the Flume workflow
Our Flume workflow will analyze all the new files that appear in a folder (source: spooldir). Then, through a Morphline interceptor we will be able to detect if it is to an XML file or another file format:
- if it isn't an XML file, the Morphline job will be pretty simple since it will only be used to manage the Flume multiplexing.
- if it is an XML file, we will apply more transformations with Morphline, in order to transform the data into a CSV format.
mymorph.sources = mySpoolDir
mymorph.channels = MemChannelRegular MemChannelXml
mymorph.sinks = hdfsRegular hdfsXml
Definition of the Flume interceptors
The interceptors are a kind of plugins that can be added into the configuration of a source. When an event goes out of a source, if a list of interceptors is attached to that source, then the event will go through all the transformations indicated by those interceptors. Finally, the event will be put into the Flume channel.
In our example, we only need 2 interceptors:
mymorph.sources.mySpoolDir.interceptors = i1 i2
mymorph.sources.mySpoolDir.interceptors.i1.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
mymorph.sources.mySpoolDir.interceptors.i1.morphlineFile = /etc/flume-ng/conf/morphline.conf
mymorph.sources.mySpoolDir.interceptors.i1.morphlineId = morphline1
mymorph.sources.mySpoolDir.interceptors.i2.type = timestamp
The timestamp interceptor is the easier one to understand: it add a timestamp (in milliseconds) metadata to the event. Such metadata is needed since the HDFS paths of the sink we defined are dynamic: they depend on the moment the event was captured:
mymorph.sinks.hdfsRegular.hdfs.path = hdfs://formhadoop.local:8020/user/flume/spooldir/regular/%Y/%m/%d/%H/ mymorph.sinks.hdfsXml.hdfs.path = hdfs://formhadoop.local:8020/user/flume/spooldir/xml/%Y/%m/%d/%H/
The Morphline interceptor is more complex. Most of its configuration is defined in a configuration file (morphline.conf).
We will explain that file in the rest of this article.
A few words about Morphline
Morphline is a framework developed by Cloudera to make easier the ETL processes. Through a configuration file, it is possible to use several transformations already defined in the Morphline's API. If we can't find the transformation we need, we can add some java code inside the configuration file or develop a new Morphline plugin.
In our example, we won't need any java code. But before starting looking at that configuration file, I ought to tell you 3 important things:
- it is very easy to do some typos in the configuration file. Morphline use the hocon format, and more than once I forgot one parenthesis or a key ... generating a java error when starting Flume. To avoid such mistakes, I do recommend using a text editor that recognize that hocon format. For instance, with vim you can use that project.
- as usual, looking at the logs when doing tests is a good idea. Flume's logs can be found in /var/log/flume-ng/flume.log. And in the /etc/flume-ng/conf/log4j.properties file we will add the line “log4j.logger.org.kitesdk=TRACE” in our test phase.
- Flume presents a restriction with Morphline: an input event in Morphline must not generate more than 1 output event. In other words, it means that at the end of the Morphline's workflow we mustn't have any field with an array of 2 elements or more. We'll see later how to handle such restriction.
No comments:
Post a Comment