Sqoop, Spark, Scala, MySQL project

Hi Readers,

In this post I will explain how to import mysql data into hdfs as avrofile using sqoop. Use sparksql to read those avro data, process it and export as csv file and load that csv into mysql.

Step 1:
Check whether the data resides in mysql table. In this example I use order and order_items table.

Use sqoop import to import the order and order_items data into hdfs location “/user/edureka_162051/orderitems”. I use compression as Snappy and avro as datafile.

sqoop import \
--connect "jdbc:mysql://localhost:3306/retail_db" \
--username username \
--password password \
--table order \
--compress \
--compression-codec Snappy \
--target-dir /user/edureka_162051/orderitems \

sqoop import \
--connect "jdbc:mysql://localhost:3306/retail_db" \
--username username \
--password password \
--table order_items \
--compress \
--compression-codec Snappy \
--target-dir /user/edureka_162051/orderitems \

Step 2:
Start the spark shell using ‘spark2-shell –master yarn –packages com.databricks:spark-avro_2.11:3.2.0’. I have added avro package dependency,  as the data I exported using sqoop to hdfs is of type avro format.

Step 3:
In the spark shell import the avro package. Create a sqlContext object. With that sqlContext read the avro file from hdfs. Use join, groupby to the dataframe and create a temptable to use it in sparksql.

import com.databricks.spark.avro._;
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
var orderDF = sqlContext.read.avro("/user/edureka_162051/order")
var orderItemDF = sqlContext.read.avro("/user/edureka_162051/orderitems")
var joinedOrderDataDF = orderDF.join(orderItemDF,orderDF("order_id")===orderItemDF("order_item_order_id"))

var dataFrameResult = joinedOrderDataDF;

var sqlResult = sqlContext.sql("select to_date(from_unixtime(cast(order_date/1000 as bigint))) as order_formatted_date, order_status, cast(sum(order_item_subtotal) as DECIMAL (10,2)) as total_amount, count(distinct(order_id)) as total_orders from order_joined group by to_date(from_unixtime(cast(order_date/1000 as bigint))), order_status order by order_formatted_date desc,order_status,total_amount desc, total_orders");


Step 4:
If you want to convert the dataframe result / sql result to gzip or snappy or csv file use the following steps

To gzip :


To Snappy :


To CSV file:

dataFrameResult.map(x=> x(0) + "," + x(1) + "," + x(2) + "," + x(3)).rdd.saveAsTextFile("/user/edureka_162051/dataframe-csv");
sqlResult.map(x=> x(0) + "," + x(1) + "," + x(2) + "," + x(3)).rdd.saveAsTextFile("/user/edureka_162051/sqlresult-csv");

Step 5:
Export the csv file from hdfs location ‘/user/edureka_162051/dataframe-csv’ to mysql database.
Create a mysql table with exact datatypes. Use sqoop to export data into mysql table. Then you can query using mysql.

sqoop export \
--table result \
--connect "jdbc:mysql://localhost:3306/edureka_162051" \
--username username\
--password password\
--export-dir "/user/edureka_162051/dataframe-csv" \
--columns "total_orders,order_date,total_amount,order_status"


If you have any doubts / stuck with issues please comment. You can share this page with your friends.

Follow me Jose Praveen for future notifications.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s