Spark BullkLoad(hive写hbase)

发布于 2022-01-18  313 次阅读


Spark BullkLoad

bulkload说明

推荐阅读:Hbase BulkLoad

版本信息

  • scala: 2.11
  • spark: 2.4.8
  • hbase: 1.2.0

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <groupId>com</groupId>
    <artifactId>bulkload</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.8</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.8</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.4.8</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.3.2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.32</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <groupId>net.alchim31.maven</groupId>
                <version>3.4.2</version>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>BulkLoad</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

参考代码

hive -> hbase

/**
 *
 * @author 7e2hj
 * @date 2022-01-04
 * @version 1.0
 */
object BulkLoad {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("yarn")
      .appName("BulkLoad")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext
    val hBaseConf = getHBaseConf() //自定义的方法
    val hBaseConn = getHBaseConn(hBaseConf) //自定义的方法
    val admin = hBaseConn.getAdmin
    val fileSystem = FileSystem.get(new Configuration())
    val permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)

    //获取输出路径
    val filePath = new Path("hFilePath")
    // 如果存放 HFile文件的路径已经存在,就删除掉
    if (fileSystem.exists(filePath)) {
      fileSystem.delete(filePath, true)
      print("删除hdfs上存在的路径")
    }

    val hTableName: TableName = TableName.valueOf("hBaseNameSpace", "hBaseTableName")
    val hTable = hBaseConn.getTable(hTableName)
    //从hive表读取数据
    val sql = "select * from table"
    val df: DataFrame = spark.sql(sql)
    val data: RDD[(Array[Byte], (Array[Byte], Array[Byte], Array[Byte]))] =
      df.rdd.map(row => {
        val rowKey = row.getString(0)
        val familyName = row.getString(1)
        val columnName = row.getString(2)
        val str = row.getString(3)
        (Bytes.toBytes(rowKey), (Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(str)))
      })
    println("hive数据计算完成")

    //表不存在则建Hbase表
    createOrNot(admin, hTableName, familyName)

    //设置job
    val job = Job.getInstance(hBaseConf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    HFileOutputFormat2.configureIncrementalLoadMap(job, hTable)

    //生成hfile
    data
      .map(cell => {
        val keyValue = new KeyValue(cell._1, cell._2._1, cell._2._2, cell._2._3)
        (new ImmutableBytesWritable(cell._1), keyValue)
      })
      .sortBy(x=>(x._1,x._2.getKeyString),ascending = true)
      .saveAsNewAPIHadoopFile("hFilePath",
        classOf[ImmutableBytesWritable],
        classOf[KeyValue],
        classOf[HFileOutputFormat2],
        job.getConfiguration)
    println("Hfile生成完成")

    //修改目录及文件权限
    if (fileSystem.exists(filePath)) {
      setPermission(fileSystem, filePath, permission)
    }

    //将Hfile移动到HBase
    val bulkLoader = new LoadIncrementalHFiles(hBaseConf)
    val regionLocator = hBaseConn.getRegionLocator(hTableName)
    bulkLoader.doBulkLoad(filePath, admin, hTable, regionLocator)
    println("Bulk Load完成")

    admin.close()
    hBaseConn.close()
    fileSystem.close()
    sc.stop()
  }

  def setPermission(fs: FileSystem, path: Path, permission: FsPermission): Unit = {
    try
      if (fs.getFileStatus(path).isDirectory) {
        fs.setPermission(path, permission)
        val fileStatus: Array[FileStatus] = fs.listStatus(path)
        for (i <- fileStatus.indices) {
          val status = fileStatus(i)
          setPermission(fs, status.getPath, permission)
        }
      }
      else fs.setPermission(path, permission)
    catch {
      case ex: Exception =>
        ex.printStackTrace()
    }
  }

  def createOrNot(admin: Admin, tableName: TableName, familyName: String) = {
    if (!admin.tableExists(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      tableDesc.addFamily(new HColumnDescriptor(familyName.getBytes()))
      admin.createTable(tableDesc)
    }
  }

}

常见问题

排序问题

报错:Caused by: java.io.IOException: Added a key not lexically larger than previous

原因: spark写hfile时是按照rowkey+列族+列名进行排序的,因此在写入数据时要做到整体有序

权限问题

报错:Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=hbase, access=WRITE, inode="..."

原因: 执行生成Hfile文件的用户时hadoop用户,移动Hfile进HBase的用户是hbase

解决方案:

  1. 使用hbase用户执行
  2. 在生成hfile后将文件与目录的权限修改
  3. 关闭hdfs权限设置

参考

spark实现BulkLoad批量加载方式导入Hbase数据 - 夜空中最亮的仔 - 博客园 (cnblogs.com)