HBase与Spark在CentOS上如何结合使用
导读:在 CentOS 上结合 Spark 与 HBase 的实用指南 一 环境准备与版本建议 操作系统与基础组件:建议使用 CentOS 7/8、JDK 8、Hadoop 3.x、Spark 3.x、HBase 2.x。示例环境:CentOS...
在 CentOS 上结合 Spark 与 HBase 的实用指南
一 环境准备与版本建议
- 操作系统与基础组件:建议使用 CentOS 7/8、JDK 8、Hadoop 3.x、Spark 3.x、HBase 2.x。示例环境:CentOS 7 + Hadoop 3.2.0 + Spark 3.1.2 + HBase 2.2.7 + Scala 2.12 + JDK 8。
- 网络与连通性:确保 Spark 节点与 HBase/ZooKeeper 网络互通,必要时配置 /etc/hosts 或使用内网域名;云上场景需将客户端 IP 加入 白名单 并优先使用 VPC 内网 访问以降低时延。
- 服务启动顺序建议:先启动 ZooKeeper → HDFS(离开安全模式)→ HBase;如需可视化或 Thrift 访问,再启动 HBase Thrift(默认端口 9090,部分发行版为 54001)。
二 依赖与配置
- 依赖管理(Maven 示例,适用于 Spark 3.x + HBase 2.2.x/2.4.x)
- 核心依赖:hbase-client、hbase-common、hbase-server、hbase-mapreduce(版本与集群保持一致)。
- 可选:若使用 Spark SQL 与 HBase 集成的高级封装,可引入 hadoop-hbase-spark(如 org.apache.hadoop.hbase.spark:hbase-spark:2.4.9)。
- 示例依赖片段:
< properties> < scala.version> 2.12.18< /scala.version> < spark.version> 3.1.2< /spark.version> < hbase.version> 2.2.7< /hbase.version> < /properties> < dependencies> < dependency> < groupId> org.apache.hbase< /groupId> < artifactId> hbase-client< /artifactId> < version> ${ hbase.version} < /version> < /dependency> < dependency> < groupId> org.apache.hbase< /groupId> < artifactId> hbase-common< /artifactId> < version> ${ hbase.version} < /version> < /dependency> < dependency> < groupId> org.apache.hbase< /groupId> < artifactId> hbase-server< /artifactId> < version> ${ hbase.version} < /version> < /dependency> < dependency> < groupId> org.apache.hbase< /groupId> < artifactId> hbase-mapreduce< /artifactId> < version> ${ hbase.version} < /jobConf> < /dependency> < !-- 可选:Spark SQL 与 HBase 集成 --> < dependency> < groupId> org.apache.hadoop.hbase.spark< /groupId> < artifactId> hbase-spark< /artifactId> < version> 2.4.9< /version> < /dependency> < /dependencies>
- 客户端配置
- 方式一:将 $HBASE_HOME/conf/hbase-site.xml 分发到 $SPARK_HOME/conf/,或在提交任务时用 –files 携带该文件。
- 方式二:在代码中显式设置连接参数(推荐在集群外显式指定,避免误连 localhost)。
- 常用关键项:
- hbase.zookeeper.quorum(ZooKeeper 主机列表)
- hbase.zookeeper.property.clientPort(默认 2181)
- 云上增强版需使用控制台提供的 Java API 访问地址(端口可能为 30020 等)。
三 读取与写入方式
- 方式 A:HBaseContext + bulkPut(推荐,适合批量写入与复杂作业)
- 适用场景:大规模批处理、需要多次读写 HBase 的作业。
- 核心步骤:创建 HBaseConfiguration → 构建 HBaseContext → 构造 Put 集合 → 调用 bulkPut。
- 示例(Scala):
import org.apache.hadoop.hbase.{ HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().appName("HBaseBulkPut").getOrCreate() val sc = spark.sparkContext val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3") conf.set("hbase.zookeeper.property.clientPort", "2181") val hbaseContext = new HBaseContext(sc, conf) val tableName = TableName.valueOf("my_table") val cf = "cf" val puts = (1 to 1000).map { i => val put = new Put(Bytes.toBytes(s"row$i")) put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("qual"), Bytes.toBytes(s"val$i")) put } hbaseContext.bulkPut(tableName, puts) spark.stop()
- 方式 B:newAPIHadoopRDD / saveAsNewAPIHadoopDataset(MapReduce 接口,适合简单 ETL)
- 适用场景:一次性导入/导出、与既有 MR 作业兼容。
- 示例(读取为 RDD[ImmutableBytesWritable, Result]):
import org.apache.hadoop.hbase.{ HBaseConfiguration, TableName} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().appName("HBaseMRRead").getOrCreate() val sc = spark.sparkContext val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3") conf.set(TableInputFormat.INPUT_TABLE, "my_table") val hbaseRDD = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) hbaseRDD.foreach { case (_, result) => val v = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("qual"))) println(v) } spark.stop()
- 方式 C:Spark SQL + HBase-Spark(DataFrame 读写,便于 SQL 分析)
- 适用场景:需要以 DataFrame 进行 SQL 分析、与 Spark 生态无缝衔接。
- 示例(读取):
import org.apache.spark.sql.SparkSession import org.apache.hadoop.hbase.spark.HBaseContext import org.apache.hadoop.hbase.HBaseConfiguration val spark = SparkSession.builder() .appName("HBaseSparkSQLRead") .config("spark.sql.extensions", "org.apache.hadoop.hbase.spark.HBaseSparkSQL") .config("spark.sql.catalog.hbase", "org.apache.hadoop.hbase.spark.HBaseCatalog") .getOrCreate() val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3") hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") // 注册 HBase 表为临时视图后使用 SQL spark.sql( s""" |CREATE TABLE hbase.${ "my_table"} ( | rowkey STRING, | cf ROW qual STRING |) |USING org.apache.hadoop.hbase.spark |OPTIONS ( | table '${ "my_table"} ', | zookeeperQuorum '${ "zk1,zk2,zk3:2181"} ' |) """.stripMargin) spark.sql("SELECT * FROM hbase.my_table LIMIT 10").show() spark.stop()
- 方式 D:Phoenix + Spark SQL(可选,适合二级索引与 SQL 友好场景)
- 适用场景:需要 二级索引、复杂 SQL、JDBC/ODBC 访问。
- 要点:在 Spark 中通过 Phoenix JDBC 或 spark-hbase-connector 读写 Phoenix 表(底层映射 HBase 表)。
四 提交任务与常见问题
- 提交命令示例(将 hbase-site.xml 随任务分发)
spark-submit \ --master yarn \ --deploy-mode cluster \ --class org.example.HBaseJob \ --files /opt/hbase/conf/hbase-site.xml \ --jars /opt/hbase/lib/hbase-client-2.2.7.jar,/opt/hbase/lib/hbase-common-2.2.7.jar,/opt/hbase/lib/hbase-server-2.2.7.jar \ /opt/apps/hbase-spark-demo.jar - 常见问题与排查
- 连接被拒绝或连到 localhost:未正确设置 hbase.zookeeper.quorum 或未分发 hbase-site.xml,导致默认连接 localhost:2181。请在代码或配置中显式指定 ZooKeeper 地址。
- 缺少依赖导致反复重连或类冲突:确保 HBase 相关 JAR 在 classpath 中,且版本与集群一致;避免不同版本 guava 等库冲突。
- 权限与网络:云上需将客户端 IP 加入 白名单;跨 VPC/公网访问需保证 同一地域/VPC 与相应端口放通。
- 表不存在:建议先在 HBase Shell 创建表与列族,再让 Spark 写入/读取。
- Thrift/可视化:如需通过 Thrift 访问 HBase(如 Python 客户端),需启动 HBase Thrift 服务(常见端口 9090/54001)。
声明:本文内容由网友自发贡献,本站不承担相应法律责任。对本内容有异议或投诉,请联系2913721942#qq.com核实处理,我们将尽快回复您,谢谢合作!
若转载请注明出处: HBase与Spark在CentOS上如何结合使用
本文地址: https://pptw.com/jishu/766019.html
