Hiện tôi đang nghiên cứu về Apache Flume. Đây là một công cụ rất mạnh để thu thập dữ liệu cho kho dữ liệu hadoop. Flume cũng cung cấp nhiều source sẵn có cho phép người dùng truy cập các nguồn dữ liệu khác nhau. Người dùng cũng có thể tự xây dựng thư viện truy cập tới nguồn dữ liệu khác.
20
50
50
04
17
59
18
43
28
58
27
81
Sau khi Flume custom source đọc và đưa dữ liệu vào hadoop thì file có nội dung như sau:
20
2050
205050
20505004
2050500417
205050041759
20505004175918
2050500417591843
205050041759184328
20505004175918432858
Hướng dẫn này sử dụng maven để xây dựng project java
1. Cài đặt maven trong eclipse theo các bước sau
Open Eclipse IDE
Click Help -> Install New Software...
Click Add button at top right corner
4. At pop up: fill up Name as "M2Eclipse" and Location as "http://download.eclipse.org/technology/m2e/releases" or http://download.eclipse.org/technology/m2e/milestones/1.0
5. Now click OK
2. Tạo một project trong eclipse sử dụng maven
Trong eclipse, new project, chọn loại là maven
Tiếp theo, maven hỏi bạn sử dụng template nào. Để đơn giản, bạn cứ chọn mặc định quickstart 1.1
Tiếp đến bạn cần khai báo
- GroupId
- ArtifactId
- Version
Trong đó, GroupId là tên namespace của bạn
ArtifactId là tên ứng dụng
Cuối cung ấn Finish là xong bước tạo ứng dụng.
3. Khai báo thư viện cần sử dụng
Trong project, bạn mở file pom.xml và thêm vào đoạn code sau:
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
Bây giờ file pom.xml của bạn sẽ có dạng tương tự:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.dinte.truongpm</groupId>
<artifactId>FlumeCustomSource</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>FlumeCustomSource</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
4. Mã nguồn của custom source
Trong project tạo mới một class. (Với tôi, đặt tên là MySource)
Nội dung class như sau:
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySource extends AbstractSource implements Configurable, PollableSource {
private static final Logger logger = LoggerFactory.getLogger(MySource.class);
private String myProp;
BufferedReader br;
Thread tailThread;
@Override
public void configure(Context context) {
String myProp = context.getString("filepath", "defaultValue");
logger.info("Path Property==============>" + myProp);
this.myProp = myProp;
}
@Override
public void start() {
ConcatRunner t = new ConcatRunner();
tailThread = new Thread(t);
tailThread.start();
}
@Override
public void stop () {
}
@Override
public Status process() throws EventDeliveryException {
// TODO Auto-generated method stub
return null;
}
private class ConcatRunner implements Runnable {
@Override
public void run() {
Event e;
String sCurrentLine;
String finalFlumeString = "";
try
{
br = new BufferedReader(new FileReader(myProp));
while ((sCurrentLine = br.readLine()) != null) {
System.out.println(sCurrentLine);
finalFlumeString = finalFlumeString + sCurrentLine ; // Concatinating String
e = EventBuilder.withBody(finalFlumeString,
Charset.forName("UTF-8"));
getChannelProcessor().processEvent(e);
Thread.sleep(3000);
}
}
catch(Exception ex){
System.out.println("Exception in Reading File" + ex.getMessage());
}
try {
if (br != null)br.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
} //ConcatRunner Over
}
5. Sử dụng maven để build project
Click chuột phải vào project, chọn Run As -> Maven install
Sau khi build thành công, copy file jar trong thư mục target của source code vào thư mục lib của flume
Thư mục lib ở trong bộ cài đặt của Flume là thư mục chứa các thư viện truy cập tới các nguồn dữ liệu khác nhau. Có thể tham khao thêm tại bài viết lấy dữ liệu từ twitter
6. Khai báo source trong Flume
Tạo file mysource.conf trong thư mục conf của flume có nội dung như sau:
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#source
a1.sources.r1.type = org.dinte.truongpm.FlumeCustomSource.MySource
a1.sources.r1.restart = true
a1.sources.r1.filepath = /home/public_folder/input.txt
#sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/user/hadoop/twitter_data/
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.fileType = DataStream
#channel
a1.channels.c1.type = memory
#connect
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
Ở đây, bạn phải tạo file input.txt có nội dung như ở đầu bài viết làm dữ liệu đầu vào và khai báo lại đường dẫn file input.txt nhé!
6. Chạy thử ứng dụng
Sử dụng user hadoop và chạy thử ứng dụng bằng lệnh sau:
[hadoop@localhost Flume]$ bin/flume-ng agent -n a1 -f conf/mysource.conf
Chạy khoảng 30 s rồi sử dụng tổ hợp phím ctl+C để hủy. Vì chạy nhiều cũng chả để làm gì :D
7. Kiểm tra kết quả
Sử dụng user hadoop và chạy thử ứng dụng bằng lệnh sau:
[hadoop@localhost Flume]$ hdfs dfs -cat /user/hadoop/twitter_data/events-.1480065819255
Chú ý: events-.148… là file kết quả được sinh ra, để kiểm tra tên của file này, bạn có thể sử dụng trình duyệt theo đường dẫn: http://localhost:50070/explorer.html#/user/hadoop/twitter_data
Đường dẫn /user/hadoop/twtter_data là thư mục hadoop của tôi, bạn có thể khác
Nếu thành công sẽ cho ra kết quả như sau:
16/11/25 01:29:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20
2050
205050
20505004
2050500417
Chúc các bạn thành công! Có vấn đề gì thì comment nhé!
Không có nhận xét nào :
Đăng nhận xét