flume 读取数据存入mysql

2017-01-06 07:56:59来源:CSDN作者:u012373815人点击

本文需求是用flume 监控文件,然后将文件的内容存放在mysql数据库中。

本文结构

1.mysql 表设计

2. MysqlSink编写

3.conf 配置

4. 打包测试


1.mysql 表设计

首先声明本文的event ,本文的event内容 是”exec taili,yangexectaili 为数据 content ,“yang“ 为createBY 中间用“,“隔开 。

然后创建数据库和表 如图所示(数据库名字为sinktest ,表名为mysqltest ,id 是自增的。)

这里写图片描述


2. MysqlSink编写

3.1 pom.xml

首先新建maven 项目,pom.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.us</groupId>    <artifactId>flumeDemo</artifactId>    <version>1.0-SNAPSHOT</version>    <properties>        <maven.compiler.target>1.8</maven.compiler.target>        <maven.compiler.source>1.8</maven.compiler.source>        <version.flume>1.7.0</version.flume>    </properties>    <dependencies>        <dependency>            <groupId>org.apache.flume</groupId>            <artifactId>flume-ng-core</artifactId>            <version>${version.flume}</version>        </dependency>        <dependency>            <groupId>org.apache.flume</groupId>            <artifactId>flume-ng-configuration</artifactId>            <version>${version.flume}</version>        </dependency>        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>6.0.5</version>        </dependency>    </dependencies></project>

3.2 java bean

对应我门的事件和数据库表建立java bean对象 Info.java

package com.us.flume;/** * Created by yangyibo on 17/1/5. */public class Info {    private String content;    private String createBy;    public String getContent() {        return content;    }    public void setContent(String content) {        this.content = content;    }    public String getCreateBy() {        return createBy;    }    public void setCreateBy(String createBy) {        this.createBy = createBy;    }}

3.3 自定义Sink编写

MysqlSink.java

package com.us.flume;import com.google.common.base.Preconditions;import com.google.common.base.Throwables;import com.google.common.collect.Lists;import org.apache.flume.*;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import java.sql.SQLException;import java.util.List;/** * Created by yangyibo on 17/1/5. */public class MysqlSink extends AbstractSink implements Configurable {    private Logger LOG = LoggerFactory.getLogger(MysqlSink.class);    private String hostname;    private String port;    private String databaseName;    private String tableName;    private String user;    private String password;    private PreparedStatement preparedStatement;    private Connection conn;    private int batchSize;    public MysqlSink() {        LOG.info("MysqlSink start...");    }    public void configure(Context context) {        hostname = context.getString("hostname");        Preconditions.checkNotNull(hostname, "hostname must be set!!");        port = context.getString("port");        Preconditions.checkNotNull(port, "port must be set!!");        databaseName = context.getString("databaseName");        Preconditions.checkNotNull(databaseName, "databaseName must be set!!");        tableName = context.getString("tableName");        Preconditions.checkNotNull(tableName, "tableName must be set!!");        user = context.getString("user");        Preconditions.checkNotNull(user, "user must be set!!");        password = context.getString("password");        Preconditions.checkNotNull(password, "password must be set!!");        batchSize = context.getInteger("batchSize", 100);        Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");    }    @Override    public void start() {        super.start();        try {            //调用Class.forName()方法加载驱动程序            Class.forName("com.mysql.jdbc.Driver");        } catch (ClassNotFoundException e) {            e.printStackTrace();        }        String url = "jdbc:mysql://" + hostname + ":" + port + "/" + databaseName;        //调用DriverManager对象的getConnection()方法,获得一个Connection对象        try {            conn = DriverManager.getConnection(url, user, password);            conn.setAutoCommit(false);            //创建一个Statement对象            preparedStatement = conn.prepareStatement("insert into " + tableName +                    " (content,create_by) values (?,?)");        } catch (SQLException e) {            e.printStackTrace();            System.exit(1);        }    }    @Override    public void stop() {        super.stop();        if (preparedStatement != null) {            try {                preparedStatement.close();            } catch (SQLException e) {                e.printStackTrace();            }        }        if (conn != null) {            try {                conn.close();            } catch (SQLException e) {                e.printStackTrace();            }        }    }    public Status process() throws EventDeliveryException {        Status result = Status.READY;        Channel channel = getChannel();        Transaction transaction = channel.getTransaction();        Event event;        String content;        List<Info> infos = Lists.newArrayList();        transaction.begin();        try {            for (int i = 0; i < batchSize; i++) {                event = channel.take();                if (event != null) {//对事件进行处理                    //event 的 body 为   "exec tail$i , abel"                    content = new String(event.getBody());                    Info info=new Info();                    if (content.contains(",")) {                        //存储 event 的 content                        info.setContent(content.substring(0, content.indexOf(",")));                        //存储 event 的 create  +1 是要减去那个 ","                        info.setCreateBy(content.substring(content.indexOf(",")+1));                    }else{                        info.setContent(content);                    }                    infos.add(info);                } else {                    result = Status.BACKOFF;                    break;                }            }            if (infos.size() > 0) {                preparedStatement.clearBatch();                for (Info temp : infos) {                    preparedStatement.setString(1, temp.getContent());                    preparedStatement.setString(2, temp.getCreateBy());                    preparedStatement.addBatch();                }                preparedStatement.executeBatch();                conn.commit();            }            transaction.commit();        } catch (Exception e) {            try {                transaction.rollback();            } catch (Exception e2) {                LOG.error("Exception in rollback. Rollback might not have been" +                        "successful.", e2);            }            LOG.error("Failed to commit transaction." +                    "Transaction rolled back.", e);            Throwables.propagate(e);        } finally {            transaction.close();        }        return result;    }}

3.conf 配置

在flume的conf 文件夹下新建配置文件 mysqlSink.conf 内容如下:

注意:localhost 为mysql 数据库所在的服务器IP;/opt/apps/logs/tail.log 为我要监控的文件;com.us.flume.MysqlSink 是本文第2步中自定义sink的mysqlsink的全称命
agent1.sources = source1agent1.sinks = mysqlSinkagent1.channels = channel1# Describe/configure source1agent1.sources.source1.type = execagent1.sources.source1.command = tail -F /opt/apps/logs/tail.logagent1.sources.source1.channels = channel1# Describe mysqlSinkagent1.sinks.mysqlSink.type =com.us.flume.MysqlSinkagent1.sinks.mysqlSink.hostname=localhostagent1.sinks.mysqlSink.port=3306agent1.sinks.mysqlSink.databaseName=sinktestagent1.sinks.mysqlSink.tableName=mysqltestagent1.sinks.mysqlSink.user=rootagent1.sinks.mysqlSink.password=xxxxxxagent1.sinks.mysqlSink.channel = channel1# Use a channel which buffers events in memoryagent1.channels.channel1.type = memoryagent1.channels.channel1.capacity = 1000agent1.channels.channel1.transactionCapactiy = 100

4. 打包测试

4.1 打包

将步骤3 的项目打成 jar包, 放到 flume 的lib 文件夹下。并将mysql-connector-java 的jar 包也放入 flume 的lib 文件夹下。

注意:(使用idea 打包在creat jar for modules时,main class 空着不用填写,因为我门没有可运行类)

4.2 启动测试

在flume的bin目录下下执行启动命令

./flume-ng agent -c /opt/apps/flume/conf -f /opt/apps/flume/conf/mysqlSink.conf -n agent1 -Dflume.root.logger=INFO,console

打开一个新终端 ,向我们监控的文件 发送我门定义的事件。

for i in {1..10};do echo "exec tail$i , yang" >> /opt/apps/logs/tail.log;done;

此时打开数据库可以看到我门的事件已经插入到数据库了。

(由于测试多次,所以id 不是从零开始了。)
这里写图片描述

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台