新聞中心
在現(xiàn)代數(shù)據(jù)處理過程中,大規(guī)模數(shù)據(jù)的處理已經(jīng)成為了大數(shù)據(jù)領(lǐng)域的主要任務(wù)之一。Spark是當(dāng)前比較流行的大數(shù)據(jù)處理框架之一,可以用于構(gòu)建分布式數(shù)據(jù)處理系統(tǒng),具有可擴(kuò)展性和性能等優(yōu)點(diǎn)。在現(xiàn)實(shí)生活中,我們經(jīng)常會(huì)遇到需要將一列數(shù)據(jù)轉(zhuǎn)換成一條數(shù)據(jù)庫記錄的情況,這時(shí)我們可以使用Spark來實(shí)現(xiàn)這個(gè)任務(wù)。

在保亭黎族等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站建設(shè)、成都做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需求定制制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),營銷型網(wǎng)站建設(shè),外貿(mào)網(wǎng)站建設(shè),保亭黎族網(wǎng)站建設(shè)費(fèi)用合理。
Spark是一個(gè)分布式計(jì)算框架,能夠?qū)Υ笠?guī)模的數(shù)據(jù)進(jìn)行分布式處理。它的特點(diǎn)是高效、可擴(kuò)展、易于使用,并能夠處理多種類型的數(shù)據(jù)。實(shí)現(xiàn)將一列數(shù)據(jù)轉(zhuǎn)換成一條數(shù)據(jù)記錄的過程,也可以稱之為“行轉(zhuǎn)列”操作,Spark提供了map、flatMap、reduce等操作,可以輕松地實(shí)現(xiàn)這個(gè)功能。
在具體實(shí)現(xiàn)中,我們可以使用Spark的分布式計(jì)算框架來實(shí)現(xiàn)將一列數(shù)據(jù)轉(zhuǎn)換成一條數(shù)據(jù)庫記錄。整個(gè)過程可以分為以下幾個(gè)步驟:
1. 數(shù)據(jù)預(yù)處理:需要對原始數(shù)據(jù)做一些預(yù)處理,比如去除頭部和尾部的空格,轉(zhuǎn)換編碼格式等。預(yù)處理數(shù)據(jù)可以使后續(xù)的處理步驟更加高效和準(zhǔn)確。
2. 讀取數(shù)據(jù):Spark支持從多種數(shù)據(jù)源中讀取數(shù)據(jù),包括文本文件、壓縮文件、數(shù)據(jù)庫等。在讀取數(shù)據(jù)之前,需要確定數(shù)據(jù)的文件格式和數(shù)據(jù)編碼格式。
3. 轉(zhuǎn)換數(shù)據(jù):接下來,需要對讀取到的數(shù)據(jù)進(jìn)行轉(zhuǎn)換操作。通常情況下,用逗號(hào)或制表符分隔的單行數(shù)據(jù)中,每個(gè)字段都是一組重復(fù)數(shù)據(jù),包括日期、地點(diǎn)、名稱等信息。我們需要將這些信息轉(zhuǎn)換成一條數(shù)據(jù)庫記錄。
4. 過濾數(shù)據(jù):在數(shù)據(jù)轉(zhuǎn)換過程中,需要根據(jù)實(shí)際業(yè)務(wù)需求過濾不符合條件的數(shù)據(jù)。比如可以使用map和filter操作,根據(jù)指定關(guān)鍵字篩選出需要的數(shù)據(jù)。
5. 寫入數(shù)據(jù):將轉(zhuǎn)換后的數(shù)據(jù)寫入數(shù)據(jù)庫中。Spark可以通過JDBC或ODBC驅(qū)動(dòng)來向數(shù)據(jù)庫插入數(shù)據(jù)。
對于上述幾個(gè)步驟,我們可以使用如下偽代碼來實(shí)現(xiàn)將一列數(shù)據(jù)轉(zhuǎn)換成一條數(shù)據(jù)庫記錄:
“`python
# 定義讀取數(shù)據(jù)路徑
path = “data.csv”
# 讀取數(shù)據(jù)
data = spark.read.csv(path)
# 預(yù)處理數(shù)據(jù)
data = data.map(lambda line: line.strip().encode(‘utf-8’))
# 轉(zhuǎn)換數(shù)據(jù)
data = data.flatMap(lambda line: line.split(‘,’))
# 過濾數(shù)據(jù)
data = data.filter(lambda line: ‘keyword’ in line)
# 寫入數(shù)據(jù)庫
data.write.format(“jdbc”) \
.option(“url”, “jdbc:mysql://host:port/database”) \
.option(“driver”, “com.mysql.jdbc.Driver”) \
.option(“dbtable”, “tablename”) \
.option(“user”, “username”) \
.option(“password”, “password”) \
.mode(“append”) \
.save()
“`
在上述偽代碼中,我們使用Spark讀取了數(shù)據(jù)文件data.csv,對讀取到的數(shù)據(jù)做了預(yù)處理和轉(zhuǎn)換操作,并過濾出符合條件的數(shù)據(jù),最后將數(shù)據(jù)插入到MySQL數(shù)據(jù)庫中。
來說,使用Spark實(shí)現(xiàn)將一列數(shù)據(jù)轉(zhuǎn)換成一條數(shù)據(jù)庫記錄功能,需要經(jīng)過數(shù)據(jù)預(yù)處理、數(shù)據(jù)讀取、數(shù)據(jù)轉(zhuǎn)換、數(shù)據(jù)過濾和數(shù)據(jù)寫入等多個(gè)步驟。使用Spark的分布式計(jì)算框架結(jié)合JDBC或ODBC驅(qū)動(dòng),可以輕松地實(shí)現(xiàn)這個(gè)功能,并且支持多種數(shù)據(jù)源和數(shù)據(jù)格式。
相關(guān)問題拓展閱讀:
- Spark SQL(十):Hive On Spark
- 創(chuàng)建sparksqltable代碼
Spark SQL(十):Hive On Spark
Hive是目前大數(shù)據(jù)領(lǐng)域,事實(shí)上的SQL標(biāo)準(zhǔn)。其底層默認(rèn)是基于MapReduce實(shí)現(xiàn)的,但是由于MapReduce速度實(shí)在比較慢,因此這幾年,陸續(xù)出來了新的SQL查詢引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。
Spark SQL與Hive On Spark是不一樣的。Spark SQL是Spark自己研發(fā)出來的針對各種數(shù)據(jù)源,包括Hive、ON、Parquet、JDBC、RDD等都可以執(zhí)行查詢的,一套基于Spark計(jì)算引擎的查詢引擎。因此它是Spark的一個(gè)項(xiàng)目,只不過提供了逗閉針對Hive執(zhí)行查詢的工功能而已,適合在一些使用Spark技術(shù)棧的大數(shù)據(jù)應(yīng)用類系統(tǒng)中使用。
而Hive On Spark,是Hive的一個(gè)項(xiàng)目,它是將Spark作為底層的查詢引擎(不通過MapReduce作為唯一的查詢引擎)。Hive On Spark,只適用于Hive,在可預(yù)見的未來,很有可能Hive默認(rèn)的底層引擎就從MapReduce切換為Spark了;適合于將原有早粗的Hive數(shù)據(jù)倉庫以及數(shù)據(jù)統(tǒng)計(jì)分析替山睜裂換為Spark引擎,作為全公司通用的大數(shù)據(jù)統(tǒng)計(jì)分析引擎。
Hive On Spark做了一些優(yōu)化:
1、Map Join
Spark SQL默認(rèn)對join是支持使用broadcast機(jī)制將小表廣播到各個(gè)節(jié)點(diǎn)上,以進(jìn)行join的。但是問題是,這會(huì)給Driver和Worker帶來很大的內(nèi)存開銷。因?yàn)閺V播的數(shù)據(jù)要一直保留在Driver內(nèi)存中。所以目前采取的是,類似乎MapReduce的Distributed Cache機(jī)制,即提高HDFS replica factor的復(fù)制因子,以讓數(shù)據(jù)在每個(gè)計(jì)算節(jié)點(diǎn)上都有一個(gè)備份,從而可以在本地進(jìn)行數(shù)據(jù)讀取。
2、Cache Table
對于某些需要對一張表執(zhí)行多次操作的場景,Hive On Spark內(nèi)部做了優(yōu)化,即將要多次操作的表cache到內(nèi)存中,以便于提升性能。但是這里要注意,并不是對所有的情況都會(huì)自動(dòng)進(jìn)行cache。所以說,Hive On Spark還有很多不完善的地方。
Hive QL語句 =>
語法分析 => AST =>
生成邏輯執(zhí)行計(jì)劃 => Operator Tree =>
優(yōu)化邏輯執(zhí)行計(jì)劃 => Optimized Operator Tree =>
生成物理執(zhí)行計(jì)劃 => Task Tree =>
優(yōu)化物理執(zhí)行計(jì)劃 => Optimized Task Tree =>
執(zhí)行優(yōu)化后的Optimized Task Tree
創(chuàng)建sparksqltable代碼
SQLContext具體的執(zhí)行過程告友如下:
(1)SQL | HQL語句經(jīng)過SqlParse解析成UnresolvedLogicalPlan。
(2)使用yzer結(jié)合數(shù)據(jù)字典(catalog)進(jìn)行綁定,生成resolvedLogicalPlan,在這個(gè)過程中,Catalog提取出SchemRDD,并注冊類似case class的對象,然后把表注冊進(jìn)內(nèi)存中。
(3)Analyzed Logical Plan經(jīng)過Catalyst Optimizer優(yōu)化器優(yōu)化處理后,生成Optimized Logical Plan,該過程完成以后,以下的部分在Spark core中完成。
(4)Optimized Logical Plan的結(jié)果交給SparkPlanner,然后SparkPlanner處理后交給PhysicalPlan,經(jīng)過該過程后生成Spark Plan。
(5)使用SparkPlan將LogicalPlan轉(zhuǎn)換成PhysicalPlan。
(6)使用prepareForExecution()將PhysicalPlan轉(zhuǎn)換成可執(zhí)行物理計(jì)滾友搜劃。
(7)使用execute()執(zhí)行可執(zhí)行物理計(jì)劃。
(8)生成DataFrame。
登錄后復(fù)制
在整個(gè)運(yùn)行過程中涉及到多個(gè)SparkSQL的組件,如SqlParse、yzer、optimizer、SparkPlan等等
某電商平臺(tái),需要對訂單數(shù)據(jù)進(jìn)行分析,已知訂單數(shù)據(jù)包括兩個(gè)文件,分別為訂單數(shù)據(jù)orders和訂單明細(xì)數(shù)據(jù)order_items,orders記錄了用戶購買商品的訂單ID,訂單號(hào),用戶ID及下單日期。order_items記錄了商品ID,訂單ID以及明細(xì)ID。它們的結(jié)構(gòu)與關(guān)系如下圖所示:
orders表:(order_id,order_number,buyer_id,create_dt)
訂單大歷ID 訂單號(hào)用戶ID 下單日期
-15 04:58:21
-15 04:45:31
-15 03:12:23
-15 02:37:32
-15 02:18:56
-15 01:33:46
-15 01:04:41
-15 01:02:20
-15 00:38:02
-15 00:18:43
order_items表:(item_id,order_id,goods_id )
明細(xì)ID 訂單ID 商品ID
登錄后復(fù)制
?
創(chuàng)建orders表和order_items表,并統(tǒng)計(jì)該電商網(wǎng)站都有哪些用戶購買了什么商品。
操作
在spark-shell下,使用case class方式定義RDD,創(chuàng)建orders表
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Orders(order_id:String,order_number:String,buyer_id:String,create_dt:String)
val dforders = sc.textFile(“/myspark5/orders”).map(_.split(‘\t’)).map(line=>Orders(line(0),line(1),line(2),line(3))).toDF()
dforders.registerTempTable(“orders”)
登錄后復(fù)制
驗(yàn)證創(chuàng)建的表是否成功。
sqlContext.sql(“show tables”).map(t=>”tableName is:”+t(0)).collect().foreach(println)
sqlContext.sql(“select order_id,buyer_id from orders”).collect
登錄后復(fù)制
在Spark Shell下,使用applyScheme方式定義RDD,創(chuàng)建order_items表。
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val rddorder_items = sc.textFile(“/myspark5/order_items”)
val roworder_items = rddorder_items.map(_.split(“\t”)).map( p=>Row(p(0),p(1),p(2) ) )
val schemaorder_items = “item_id order_id goods_id”
val schema = StructType(schemaorder_items.split(” “).map(fieldName=>StructField(fieldName,StringType,true)) )
val dforder_items = sqlContext.applySchema(roworder_items, schema)
dforder_items.registerTempTable(“order_items”)
登錄后復(fù)制
驗(yàn)證創(chuàng)建表是否成功
sqlContext.sql(“show tables”).map(t=>”tableName is:”+t(0)).collect().foreach(println)
sqlContext.sql(“select order_id,goods_id from order_items “).collect
登錄后復(fù)制
將order表及order_items表進(jìn)行join操作,統(tǒng)計(jì)該電商網(wǎng)站,都有哪些用戶購買了什么商品
sqlContext.sql(“select orders.buyer_id, order_items.goods_id from order_items join orders on order_items.order_id=orders.order_id “).collect
登錄后復(fù)制
Spark SQL
spark-sql
創(chuàng)建表orders及表order_items。
create table orders (order_id string,order_number string,buyer_id string,create_dt string)
row format delimited fields terminated by ‘\t’ stored as textfile;
create table order_items(item_id string,order_id string,goods_id string)
row format delimited fields terminated by ‘\t’ stored as textfile;
登錄后復(fù)制
查看已創(chuàng)建的表。
show tables;
登錄后復(fù)制
表名后的false意思是該表不是臨時(shí)表。
將HDFS中/myspark5下的orders表和order_items表中數(shù)據(jù)加載進(jìn)剛創(chuàng)建的兩個(gè)表中。
load data inpath ‘/myspark5/orders’ into table orders;
load data inpath ‘/myspark5/order_items’ into table order_items;
登錄后復(fù)制
14.驗(yàn)證數(shù)據(jù)是否加載成功。
select * from orders;
select * from order_items;
登錄后復(fù)制
15.處理文件,將order表及order_items表進(jìn)行join操作,統(tǒng)計(jì)該電商網(wǎng)站,都有哪些用戶購買了什么商品。
spark 一列變一條數(shù)據(jù)庫的介紹就聊到這里吧,感謝你花時(shí)間閱讀本站內(nèi)容,更多關(guān)于spark 一列變一條數(shù)據(jù)庫,用Spark實(shí)現(xiàn)一列數(shù)據(jù)變成一條數(shù)據(jù)庫記錄,Spark SQL(十):Hive On Spark,創(chuàng)建sparksqltable代碼的信息別忘了在本站進(jìn)行查找喔。
香港服務(wù)器選創(chuàng)新互聯(lián),2H2G首月10元開通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)互聯(lián)網(wǎng)服務(wù)提供商,擁有超過10年的服務(wù)器租用、服務(wù)器托管、云服務(wù)器、虛擬主機(jī)、網(wǎng)站系統(tǒng)開發(fā)經(jīng)驗(yàn)。專業(yè)提供云主機(jī)、虛擬主機(jī)、域名注冊、VPS主機(jī)、云服務(wù)器、香港云服務(wù)器、免備案服務(wù)器等。
名稱欄目:用Spark實(shí)現(xiàn)一列數(shù)據(jù)變成一條數(shù)據(jù)庫記錄(spark一列變一條數(shù)據(jù)庫)
轉(zhuǎn)載來源:http://fisionsoft.com.cn/article/djcsjeg.html


咨詢
建站咨詢
