Sunday, September 7, 2014

ETL: transformations with Flume and Morphline (II)

This article is the second part of that post.

Now, we will focus on the Morphline's configuration to complete our Flume ETL process and transform an XML file into a CSV.

Remember that you can download the whole files here.

Detection of XML files and multiplexing


In the previous article we explained that we were going to realize different transformation in function of the type of the file: if it is XML or not. The first Morphline operation to do this consist in detecting the type of the file.

Surely there are many ways to do it. I don't pretend to have the most effective one. Indeed, my solution is pretty basic. It consists in checking the file name and detecting if there is an .xml suffix.

The file name will be retrieve with Flume this way:
mymorph.sources.mySpoolDir.basenameHeader=true
Doing so, in Morphline we will get a basename field that will contains the file name.

The ideal solution then would be to check the suffix of the file with a function similar to the "endsWith()" we have in java.
However, such function does not exist in Morphline. What I did then was to split the file name with the "." character (using the "split" Morphline transformation). Finally, I check if the output array contains an "xml" element. This allows me to realize my if-then-else in Morphline:
{
     split {
         inputField : basename
         outputField : fileNameTokens
         separator : “.” 
     }   
}

{
     if {
         conditions : [ 
             { contains { fileNameTokens: xml } } 
         ]   
         then : [ 
             # this is an XML file. Do a complex processing
             { ... }
         ]   
         else : [ 
             # this is an regular file. Do nothing
             {
                 setValues {
                     file_type : regular
                     # fileNameTokens is an array with 2 values so we need to delete it to avoid Flume error (generating 2 records from 1 interceptor call)
                     fileNameTokens : []
                 }
             }
         ]
     }
}
In the previous block, I haven't showed the Morphline configuration to parse the XML files because such parsing is a bit longer and we will see it in more details later on.
In the else block (the one that deals with no XML files), we observe that we don't do any transformation. We only define the content of 2 fields: file_type and fileNameTokens.

 fileNameTokens is a file that comes from the split operation (at the very beginning of our Morphline configuration file). If we treat a file whose name is "foo.txt", then the split operation will return fileNameTokens=[foo, txt], that is to say: an array of 2 itmes. As I said before, the Flume/Morphline interceptor must not return several events for a single event that comes in. Putting in other words: if we were not filtering the fileNameTokens field, we would observe in the Flume's log file an error message similar to:
29 Aug 2014 20:03:05,419 ERROR [pool-5-thread-1]  (org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run:256)
 – FATAL: Spool Directory source mySpoolDir: { spoolDir: /home/operador/tmp/flume }: Uncaught exception in 
SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
org.apache.flume.FlumeException: 
org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor must not generate more than one output value per record field
    at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.toEvent(MorphlineInterceptor.java:181)at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.intercept(MorphlineInterceptor.java:171)
    at org.apache.flume.sink.solr.morphline.MorphlineInterceptor$LocalMorphlineInterceptor.interceptar xvfz apache-flume-1.5.0-src.tar.gz apache-flume-1.5.0-src/flume-ng-sinks/flume-ng-morphline-solr-sink

To avoid this, the easier way is to delete the content of that field:
fileNameTokens : []
The other operation we apply on a non-XML file is to create a new field file_type whose vale is regular. Such field will ve used in the Flume configuration Multiplexing:
mymorph.sources.mySpoolDir.selector.type = multiplexing
mymorph.sources.mySpoolDir.selector.header = file_type
mymorph.sources.mySpoolDir.selector.mapping.regular = MemChannelRegular
mymorph.sources.mySpoolDir.selector.mapping.xml = MemChannelXml
As you can see, a non-XML file will go through the MemChannelRegular channel. On the contrary, when dealing with an XML file we will set the value "xml" to the file_type field in the Morphline's configuration, so that the ouput channel will be MemChannelXml.

Let's finish configurating the Multiplexing. The only thing we need now is to tell Flume the channel associated to each sink:
mymorph.sinks = hdfsRegular hdfsXml
mymorph.sinks.hdfsRegular.channel = MemChannelRegular
mymorph.sinks.hdfsXml.channel = MemChannelXml

Dealing with XML


The XML file we are going to process is the following one:

Fichero de datos XML que se parseará con Morphline

This file for example could represent a catalog of all the different cars a company sells.

As default, the spooldir source deserializers is of type line. That means that each line of my input file will produce a Flume event. This is not what we want in our case. What we would like is a Flume event for each product XML block. Unfortunately, there is currently no deserializer like that (maybe in a not too far future Flume will have one).

The only deserializer we can use here is the BlobDeserializer. It allows us to treat each file in a row, so that we can put together several XML lines that define the same (car) product.
A small warning: each input file will represent a sole Flume event. Therefore, I must have a unique event at the end of my Morphline process, although there are several products in my XML file. What we'll do then is to express the Morphline output as a unique string, each line representing a product:
speedCar|The sport car you have always dreamt of|255|3|42999
familyCar|The car that will please both your wife and your children|140|7|25995
budgetCar|A practical car with a nice price|135|3|7999

One problem of BlobDeserializer is that the whole input file will be loaded in memory. Therefore, that solution won't works great if the input file is big (some gigabytes or more) since that would overcome our Flume service memory.

One last detail to take into account: as a default, the text serializer of Fume adds a new line. This does not work well with the BlobDeserializer deserializer so we need to change that default behaviour.

With all the previous considerations, our final Flume configuration for that part will be:
mymorph.sources.mySpoolDir.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
mymorph.sinks.hdfsRegular.serializer.appendNewline = false
mymorph.sinks.hdfsXml.serializer.appendNewline = false


XQueries transformations with Morphline


In the Morphline configuration block that deals with XML file, the most important part is the one that transforms the XML structure into "CSV like" lines (for those who don't know Xquery or Xpath, I do recommend to first have a quick look at those technologies. Otherwise, the following configuration will be quite difficult to understand):
{
     xquery {
         fragments : [
         {
             fragmentPath : "/"
             queryString : """
             <myresult><message>
             {
                 let $nl := "&#10;"
                 for $prod in /catalog/product
                 return
                     concat(string($prod/name),"|",string($prod/description),"|",string($prod/techDesc/maxSpeed),
                     "|",string($prod/techDesc/numDoors),"|",string($prod/techDesc/pvp),$nl)
             }
             </myresult></message>
             """
         }
         ]
     }
}

With our sample XML file, the result of the previous Xquery transformation will be the following XML block:
<myresult><message>
   speedCar|The sport car you have always dreamt of|255|3|42999
   familyCar|The car that will please both your wife and your children|140|7|25995
   budgetCar|A practical car with a nice price|135|3|7999
</myresult></message>

The name of the myresult tag is not relevant: we won't use it. On the contrary, the next tag is more important since its name defines the field that will contain the results of that Xquery transformation. That is to say: the message field will have the 3 lines that define the cars.

The rest of the transformations defined in my Morphline file are here to do some cleaning. For instance, all the body information of the Flume event must be in an _attachment_body field. And, that field must have a "array of bytes" format.

Conclusions

Throught this article we have seen how to create a data transformation process with Flume and Morphline. In contrast to traditional Hadoop transformation processes, we don't need to develop anything and we can realize all the transformations through configuration files.
The other advantage of integrating transformations directly into Flume, is that we avoid to launch some MapReduce programs that would introduce latencies. Doing so, we make our data consumption much faster and it is now easier for us to build a real-time Big Data solution.

No comments:

Post a Comment