028-86922220

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

FlinkSQL如何连接Hive并写入/读取数据

这篇文章主要介绍Flink SQL如何连接Hive并写入/读取数据,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

成都创新互联公司是一家专业提供广德企业网站建设,专注与成都网站建设、成都网站制作、HTML5建站、小程序制作等业务。10年已为广德众多企业、政府机构等服务。创新互联专业网站设计公司优惠进行中。

1. 添加依赖

    
        1.11.2
        2.11
    

    
        
            org.apache.flink
            flink-streaming-scala_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-kafka-0.11_${scala.version}
            ${flink.version}
        
        
            org.apache.flink
            flink-clients_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-table-api-java-bridge_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-table-planner-blink_${scala.version}
            ${flink.version}
        

        
        
            org.apache.flink
            flink-connector-hive_${scala.version}
            ${flink.version}
        

        
        
            org.apache.hive
            hive-exec
            2.1.1
        

        
            org.apache.flink
            flink-shaded-hadoop-2-uber
            2.6.5-7.0
        

        
            org.apache.flink
            flink-json
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-elasticsearch7_${scala.version}
            ${flink.version}
        

        
            org.apache.flink
            flink-csv
            ${flink.version}
        

        
            com.fasterxml.jackson.core
            jackson-databind
            2.4.0
        

        
            com.fasterxml.jackson.core
            jackson-annotations
            2.4.0
        

        
            com.fasterxml.jackson.core
            jackson-core
            2.4.0
        
    

2. 创建blink版本的批处理Table执行环境

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

3. 连接文件系统,创建hive catalog,对表进行操作,类似于Spark on Hive,flink可以直接获取Hive的元数据,并使用flink进行计算。

        // 连接外部文件
        bbTableEnv.connect(new FileSystem().path("file:///E:/d.txt"))
                .withFormat(new Csv().fieldDelimiter(','))
                .withSchema(new Schema().field("id", DataTypes.STRING()))
                .createTemporaryTable("output");

        // 设置 hive 方言
        bbTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        // 获取hive-site.xml目录
        String hiveConfDir = Thread.currentThread().getContextClassLoader().getResource("").getPath().substring(1);
        HiveCatalog hive = new HiveCatalog("hive", "warningplatform", hiveConfDir);
        bbTableEnv.registerCatalog("hive", hive);

        bbTableEnv.useCatalog("hive");
        bbTableEnv.useDatabase("warningplatform");

        bbTableEnv.executeSql("insert into  test select id from    default_catalog.default_database.output");

以上是“Flink SQL如何连接Hive并写入/读取数据”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注创新互联行业资讯频道!


网站名称:FlinkSQL如何连接Hive并写入/读取数据
转载来于:http://www.tsicrk.com/article/iecpdg.html

其他资讯

让你的专属顾问为你服务

1.8630s