Thứ Bảy, 26 tháng 11, 2016

Tự xây dựng ứng dụng truy vấn dữ liệu với Flume



Hiện tôi đang nghiên cứu về Apache Flume. Đây là một công cụ rất mạnh để thu thập dữ liệu cho kho dữ liệu hadoop. Flume cũng cung cấp nhiều source sẵn có cho phép người dùng truy cập các nguồn dữ liệu khác nhau. Người dùng cũng có thể tự xây dựng thư viện truy cập tới nguồn dữ liệu khác.

Trong bài viết này, tôi sẽ trình bầy một ví dụ đơn giản lấy file trên máy. Trong quá trình lấy file, có thể tiền xử lý dữ liệu. Ví dụ dữ liệu đầu vào là file có nội dung

20

50

50

04

17

59

18

43

28

58

27

81

Sau khi Flume custom source đọc và đưa dữ liệu vào hadoop thì file có nội dung như sau:
20

2050

205050

20505004

2050500417

205050041759

20505004175918

2050500417591843

205050041759184328

20505004175918432858

Hướng dẫn này sử dụng maven để xây dựng project java

1. Cài đặt maven trong eclipse theo các bước sau

  1. Open Eclipse IDE

  2. Click Help -> Install New Software...

  3. Click Add button at top right corner

4.      At pop up: fill up Name as "M2Eclipse" and Location as "http://download.eclipse.org/technology/m2e/releases" or http://download.eclipse.org/technology/m2e/milestones/1.0

5.      Now click OK

2. Tạo một project trong eclipse sử dụng maven

Trong eclipse, new project, chọn loại là maven
Tiếp theo, maven hỏi bạn sử dụng template nào. Để đơn giản, bạn cứ chọn mặc định quickstart 1.1
Tiếp đến bạn cần khai báo
  • GroupId
  • ArtifactId
  • Version
Trong đó, GroupId là tên namespace của bạn
ArtifactId là tên ứng dụng
Cuối cung ấn Finish là xong bước tạo ứng dụng.

3. Khai báo thư viện cần sử dụng

Trong project, bạn mở file pom.xml và thêm vào đoạn code sau:

<dependencies>

    <dependency>

       <groupId>org.apache.flume</groupId>

       <artifactId>flume-ng-core</artifactId>

       <version>1.6.0</version>

    </dependency>

</dependencies>

Bây giờ file pom.xml của bạn sẽ có dạng tương tự:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <groupId>org.dinte.truongpm</groupId>
 <artifactId>FlumeCustomSource</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <packaging>jar</packaging>
 <name>FlumeCustomSource</name>
 <url>http://maven.apache.org</url>
 <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>
 <dependencies>
 <dependency>
       <groupId>org.apache.flume</groupId>
       <artifactId>flume-ng-core</artifactId>
       <version>1.6.0</version>
    </dependency>
    <dependency>
     <groupId>junit</groupId>
     <artifactId>junit</artifactId>
     <version>3.8.1</version>
     <scope>test</scope>
    </dependency>
 </dependencies>
</project>
4. Mã nguồn của custom source
Trong project tạo mới một class. (Với tôi, đặt tên là MySource)
Nội dung class như sau:

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

import java.nio.charset.Charset;

import org.apache.flume.Context;

import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.PollableSource;

import org.apache.flume.conf.Configurable;

import org.apache.flume.event.EventBuilder;

import org.apache.flume.source.AbstractSource;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MySource extends AbstractSource implements Configurable, PollableSource {

 private static final Logger logger = LoggerFactory.getLogger(MySource.class);

 private String myProp;

 BufferedReader br;

 Thread tailThread;

 @Override

 public void configure(Context context) {

   String myProp = context.getString("filepath", "defaultValue");

   logger.info("Path Property==============>" + myProp);

   this.myProp = myProp;

 }

 @Override

 public void start() {

 ConcatRunner t = new ConcatRunner();

 tailThread = new Thread(t);

 tailThread.start();

}

 @Override

 public void stop () {

 }

@Override

public Status process() throws EventDeliveryException {

// TODO Auto-generated method stub

return null;

}

private class ConcatRunner implements Runnable {

   @Override

   public void run() {

   Event e;

     String sCurrentLine;

   String finalFlumeString = "";

 

   try

     {

      br = new BufferedReader(new FileReader(myProp));

      while ((sCurrentLine = br.readLine()) != null) {

    System.out.println(sCurrentLine);

    finalFlumeString = finalFlumeString +  sCurrentLine ; // Concatinating String

    e = EventBuilder.withBody(finalFlumeString,

                Charset.forName("UTF-8"));

    getChannelProcessor().processEvent(e);

    Thread.sleep(3000);

   }

     }

     catch(Exception ex){

      System.out.println("Exception in Reading File" + ex.getMessage());       

     }

     try {

    if (br != null)br.close();

   } catch (IOException ex) {

    ex.printStackTrace();

   }

           }

} //ConcatRunner Over

}

5. Sử dụng maven để build project
Click chuột phải vào project, chọn Run As -> Maven install
Sau khi build thành công, copy file jar trong thư mục target của source code vào thư mục lib của flume
Thư mục lib ở trong bộ cài đặt của Flume là thư mục chứa các thư viện truy cập tới các nguồn dữ liệu khác nhau. Có thể tham khao thêm tại bài viết lấy dữ liệu từ twitter
6. Khai báo source trong Flume
Tạo file mysource.conf trong thư mục conf của flume có nội dung như sau:

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#source

a1.sources.r1.type = org.dinte.truongpm.FlumeCustomSource.MySource

a1.sources.r1.restart = true

a1.sources.r1.filepath = /home/public_folder/input.txt

#sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://localhost:9000/user/hadoop/twitter_data/

a1.sinks.k1.hdfs.filePrefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.fileType = DataStream

#channel

a1.channels.c1.type = memory

#connect

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

Ở đây, bạn phải tạo file input.txt có nội dung như ở đầu bài viết làm dữ liệu đầu vào và khai báo lại đường dẫn file input.txt nhé!
6. Chạy thử ứng dụng
Sử dụng user hadoop và chạy thử ứng dụng bằng lệnh sau:

[hadoop@localhost Flume]$ bin/flume-ng agent -n a1 -f conf/mysource.conf

Chạy khoảng 30 s rồi sử dụng tổ hợp phím ctl+C để hủy. Vì chạy nhiều cũng chả để làm gì :D

7. Kiểm tra kết quả
Sử dụng user hadoop và chạy thử ứng dụng bằng lệnh sau:
[hadoop@localhost Flume]$ hdfs dfs -cat /user/hadoop/twitter_data/events-.1480065819255
Chú ý: events-.148… là file kết quả được sinh ra, để kiểm tra tên của file này, bạn có thể sử dụng trình duyệt theo đường dẫn: http://localhost:50070/explorer.html#/user/hadoop/twitter_data

Đường dẫn /user/hadoop/twtter_data là thư mục hadoop của tôi, bạn có thể khác

Nếu thành công sẽ cho ra kết quả như sau:
16/11/25 01:29:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20
2050
205050
20505004
2050500417
Chúc các bạn thành công! Có vấn đề gì thì comment nhé!

Không có nhận xét nào :

Đăng nhận xét