spark整合Phoenix相关案例 Demo1: Demo2: Demo3: Demo4:(点个赞) 五、spark+phoenix 六、spark读取phoenix hbase table表数据到dataframe的三种方式比较



相关博文:
CSDN:dingyuanpu:Phoenix4.8整合Spark



Maven依赖:

<dependency>
  <groupId>org.apache.phoenix</groupId>
  <artifactId>phoenix-spark</artifactId>
  <version>${phoenix.version}</version>
  <scope>provided</scope>
</dependency>

spark 读取Phoenix hbase table表到 DataFrame的方式

方式一:spark read读取各数据库的通用方式

spark.read.format("org.apache.phoenix.spark").option("table","subject_score").option("zkUrl","master,slave1,slave2,slave3,slave4").load

方式二:spark.load

val df = sqlContext.load(
  "org.apache.phoenix.spark",
  Map("table" -> "TABLE1", "zkUrl" -> "phoenix-server:2181")
)

方式三:phoenixTableAsDataFrame(需要指定列名,留空就可以不指定列名)

val configuration = new Configuration()
// Can set Phoenix-specific settings, requires 'hbase.zookeeper.quorum'
val sc = new SparkContext("local", "phoenix-test")
val sqlContext = new SQLContext(sc)
// Load the columns 'ID' and 'COL1' from TABLE1 as a DataFrame
val df = sqlContext.phoenixTableAsDataFrame(
  "TABLE1", Array("ID", "COL1"), conf = configuration
)

方式四:phoenixTableAsRDD (需要指定列名,留空就可以不指定列名)

val sc = new SparkContext("local", "phoenix-test")
// Load the columns 'ID' and 'COL1' from TABLE1 as an RDD
val rdd: RDD[Map[String, AnyRef]] = sc.phoenixTableAsRDD(
  "TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
)

Demo2:

使用 phoenix 读取 HBase

方式一:

val df = sqlContext.read.format("jdbc")
  .option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
  .option("url", "jdbc:phoenix:localhost:2181")
  .option("dbtable", "US_POPULATION")
  .load()

方式二:

val df = sqlContext.load(
  "jdbc",
  Map("zkUrl" -> "localhost:2181", "url" -> "jdbc:phoenix:localhost:2181", "dbtable" -> "US_POPULATION", "driver" -> "org.apache.phoenix.jdbc.PhoenixDriver")
)

Demo3:

spark第八篇:与Phoenix整合

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[*]").appName("phoenix-test").getOrCreate()
    // 第一种读取方法
    var df = spark.read.format("org.apache.phoenix.spark").option("table", "test1").option("zkUrl", "192.168.56.11:2181").load()
    df = df.filter("name not like 'hig%'")
      .filter("password like '%0%'")
    df.show()

    val configuration = new Configuration()
    configuration.set("hbase.zookeeper.quorum", "192.168.56.11:2181")
    // 第二种读取方法
    df = spark.sqlContext.phoenixTableAsDataFrame("test1", Array("ID", "INFO.NAME", "INFO.PASSWORD"), conf = configuration)
    df.show()

    //第一种输出方法
    df.write
      .format("org.apache.phoenix.spark")
      .mode("overwrite")
      .option("table", "test2")
      .option("zkUrl", "192.168.56.11:2181")
      .save()

    //第二种输出方法
    df.saveToPhoenix(Map("table" -> "test2", "zkUrl" -> "192.168.56.11:2181"))
  }

phoenixTableAsDataFrame()是org.apache.phoenix.spark.SparkSqlContextFunctions中的方法,saveToPhoenix()是org.apache.phoenix.spark.DataFrameFunctions中的方法,在phoenix-spark-4.10.0-HBase-1.2.jar中。使用这两个方法时必须 import org.apache.phoenix.spark._,否则编辑器识别不出语法,也不会自动import。

Demo4:(点个赞)

Phoenix4.8整合Spark

4.1在phoenix中建表

CREATE TABLE TABLE1 (ID BIGINT NOT NULL PRIMARY KEY, COL1 VARCHAR);
UPSERT INTO TABLE1 (ID, COL1) VALUES (1, 'test_row_1');
UPSERT INTO TABLE1 (ID, COL1) VALUES (2, 'test_row_2');

4.2启动spark-shelll

spark-shell  --jars /opt/phoenix4.8/phoenix-spark-4.8.0-HBase-1.1.jar,/opt/phoenix4.8/phoenix-4.8.0-HBase-1.1-client.jar

4.3使用DataSource API,load为DataFrame

import org.apache.phoenix.spark._
 
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "TABLE1", "zkUrl" -> "192.38.0.231:2181"))
df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID")).show

如果spark2.x版本,使用如下方式(前提是修改了phoenix-spark模块的代码,使之兼容spark2.x,重新编译):

import org.apache.phoenix.spark._
 
val df = spark.read.format("org.apache.phoenix.spark").options(Map("table" -> "TABLE1", "zkUrl" -> "192.38.0.231:2181")).load
df.filter(df("COL1") === "test_row_1" && df("ID") === 1L).select(df("ID")).show

使用spark-sql方式如下

spark-sql --jars /opt/phoenix4.8/phoenix-spark-4.8.0-HBase-1.1.jar,/opt/phoenix4.8/phoenix-4.8.0-HBase-1.1-client.jar

CREATE TABLE spark_ph
USING org.apache.phoenix.spark
OPTIONS (
  table "TABLE1",
  zkUrl "192.38.0.231:2181"
);

4.3使用Configuration类,load为DataFrame

import org.apache.hadoop.conf.Configuration
import org.apache.phoenix.spark._
 
val configuration = new Configuration()
 
val df = sqlContext.phoenixTableAsDataFrame("TABLE1", Array("ID", "COL1"), conf = configuration)
 
df.show

4.4使用Zookeeper URL ,load为RDD

import org.apache.phoenix.spark._
 
//将ID和COL1两列,加载为一个RDD
val rdd = sc.phoenixTableAsRDD("TABLE1", Seq("ID", "COL1"), zkUrl = Some("192.38.0.231:2181"))
 
rdd.count()
 
val firstId = rdd.first()("ID").asInstanceOf[Long]
val firstCol = rdd.first()("COL1").asInstanceOf[String]

4.5通过Spark向Phoenix中写入数据(RDD方式)

phoenix中创建表output_test_table:
CREATE TABLE OUTPUT_TEST_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);

import org.apache.phoenix.spark._
 
val sc = new SparkContext("local", "phoenix-test")
val dataSet = List((1L, "1", 1), (2L, "2", 2), (3L, "3", 3))
 
sc.parallelize(dataSet).saveToPhoenix("OUTPUT_TEST_TABLE", Seq("ID","COL1","COL2"), zkUrl = Some("192.38.0.231:2181"))

0: jdbc:phoenix:localhost> select * from output_test_table;

±----±------±------+
| ID | COL1 | COL2 |
±----±------±------+
| 1 | 1 | 1 |
| 2 | 2 | 2 |
| 3 | 3 | 3 |
±----±------±------+
3 rows selected (0.168 seconds)

4.6通过Spark向Phoenix中写入数据(DataFrame方式)

phoenix中创建两张表:

CREATE TABLE INPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
upsert into input_table values(1,'col1',1);
upsert into input_table values(2,'col2',2);
CREATE TABLE OUTPUT_TABLE (id BIGINT NOT NULL PRIMARY KEY, col1 VARCHAR, col2 INTEGER);
import org.apache.phoenix.spark._
import org.apache.spark.sql.SaveMode
 
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "INPUT_TABLE", "zkUrl" -> "192.38.0.231:2181"))
 
df.save("org.apache.phoenix.spark", Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "192.38.0.231:2181"))

spark2.x向phoenix加载数据:

df.write.format("org.apache.phoenix.spark").options( Map("table" -> "OUTPUT_TABLE", "zkUrl" -> "192.38.0.231:2181")).mode("overwrite").save

0: jdbc:phoenix:localhost> select * from output_table;
±----±------±------+
| ID | COL1 | COL2 |
±----±------±------+
| 1 | col1 | 1 |
| 2 | col2 | 2 |
±----±------±------+
2 rows selected (0.092 seconds)

五、spark+phoenix

5.1pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.huidian.spark</groupId>
    <artifactId>SparkPhoenix</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.3.1</spark.version>
        <scala.version>2.11</scala.version>
        <phoenix.version>4.14.1</phoenix.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>4.14.1-HBase-1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>4.14.1-HBase-1.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

5.2main

package com.hdc.spark.phoenix

import org.apache.spark.sql.SparkSession

object SparkPhoenixDemo {

    def main(args: Array[String]): Unit = {

        val zookeeper = "hdc-data1,hdc-data2,hdc-data3:2181"
        val tbname = "TEST.TEST_PHOENIX"

        val spark = SparkSession.builder().master("local").appName("spark-phoenix-demo").config("spark.some.config.option", "some-value").getOrCreate()
          //方式1:(不推荐)部署时需要添加额外的PhoenixDriver包
//        val df = spark.read.format("jdbc").option("driver","org.apache.phoenix.jdbc.PhoenixDriver")
//                .option("url","jdbc:phoenix:"+zookeeper)
//                .option("dbtable",tbname)
//                .load()

        //方式2:
        val df = spark.read
                .format("org.apache.phoenix.spark")
                .option("table", tbname)
                .option("zkUrl", zookeeper)
                .load()
        df.show(10):
        spark.stop()
    }
}

六、spark读取phoenix hbase table表数据到dataframe的三种方式比较

原文地址:
spark读取phoenix hbase table表数据到dataframe的三种方式比较

6.1第一种:jdbc分区方式读取

var ds=spark.read.format("jdbc")
.option("url","jdbc:phoenix:master,slave1")
.option("dbtable", "itxw.table")
.option("zkUrl","master")
.option("partitionColumn", "id")
.option("lowerBound", "0")
.option("upperBound","15213372")
.option("numPartitions","500").load

这种方式和读取mysql数据表的方式是一样的,为了防止数据倾斜,最好获取maxid作为upperBound。这种方式是速度最快的了。

6.2第二种:官方提供的phoenixTableAsDataFrame的方法

import org.apache.phoenix.spark._
var ds=spark.sqlContext.phoenixTableAsDataFrame("itxw.table",Seq("PROJECT_NAME","PROJECT_ID"),Option("project_id='itxw.net'"),Option("master"),null)

phoenix官方提供的phoenixTableAsDataFrame这种方法,速度还是很可观的,需要指定列名,也可以不指定列名,也可以指定条件。

此外:

 val configuration = new Configuration()
 org.apache.phoenix.spark.toSparkSqlContextFunctions(spark.sqlContext).phoenixTableAsDataFrame("datacenter.quest_score",Seq(),Option(""),Option("master"),null,configuration)

上面的写法中import org.apache.phoenix.spark._,和这里按包调用时调用的不是同一个方法。

6.3第三种:spark read org.apache.phoenix.spark方法

var ds=spark.read.format("org.apache.phoenix.spark")
.option("table", "itxw.table")
.option("zkUrl",ConfigModel.hbaseZkurl).load

这种方法写法简单,貌似官方文档也提到这种写法,但速度不咋滴,估计和phoenix种select * 很慢是一个道理吧。