Overview
Apache Spark is a distributed computing engine designed for efficient processing of large-scale data. It employs distributed parallel computing, distributing tasks of data splitting, computing, and merging across multiple computers, thus achieving efficient data processing and analysis.
Application Scenarios
Large-scale Data Processing and Analysis
Spark can handle massive amounts of data and improves processing efficiency through parallel computing tasks. It is widely used in data processing and analysis in finance, telecommunications, healthcare, and other fields.
Streaming Data Processing
Spark Streaming allows for real-time processing of data streams, transforming them into batch data suitable for analysis and storage. This is very useful in real-time data analysis scenarios such as online advertising and network security.
Machine Learning
Spark includes a machine learning library (MLlib) that supports various machine learning algorithms and model training, used in applications such as recommendation systems and image recognition.
Graph Computing
Spark's graph computing library (GraphX) supports various graph computing algorithms, suitable for scenarios such as social network analysis and recommendation systems.
This blog will introduce two examples of using the Spark computing engine to implement batch data writing into MatrixOne. One example involves migrating data from MySQL to MatrixOne, and the other involves writing Hive data into MatrixOne.
Preliminary Preparations
Hardware Environment
The hardware requirements for this practice are as follows:
Software Environment
The following software environment needs to be installed and deployed for this practice:
- MatrixOne has been installed and started.
- Download and install IntelliJ IDEA version 2022.2.1 or above.
- Download and install JDK 8 or higher.
- If data needs to be imported from Hive, Hadoop and Hive must be installed.
- Download and install MySQL Client 8.0.33.
Example: Migrate Data from MySQL to MatrixOne
Step One: Initialize the Project
-
Start IDEA, click File > New > Project, choose Spring Initializer, and fill in the following configuration parameters:
- Name:mo-spark-demo
- Location:~\Desktop
- Language:Java
- Type:Maven
- Group:com.example
- Artifact:matrixone-spark-demo
- Package name:com.matrixone.demo
- JDK 1.8
-
Add project dependencies, edit the contents of the
pom.xml
in the project root directory as follows:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.mo</groupId> <artifactId>mo-spark-demo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <spark.version>3.2.1</spark.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-catalyst_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-core-asl</artifactId> <version>1.9.13</version> </dependency> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-mapper-asl</artifactId> <version>1.9.13</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> </dependencies> </project>
Step Two: Read MatrixOne Data
After connecting to MatrixOne using the MySQL client, create the necessary databases and tables for the demonstration.
-
Create databases and tables in MatrixOne, and import data:
-
Create a
MoRead.java
class in IDEA to use Spark to read data from MatrixOne:package com.matrixone.spark; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import java.util.Properties; /** * @author MatrixOne * @desc 读取 MatrixOne 数据 */ public class MoRead { // parameters private static String master = "local[2]"; private static String appName = "mo_spark_demo"; private static String srcHost = "192.168.146.10"; private static Integer srcPort = 6001; private static String srcUserName = "root"; private static String srcPassword = "111"; private static String srcDataBase = "test"; private static String srcTable = "person"; public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName(appName).master(master).getOrCreate(); SQLContext sqlContext = new SQLContext(sparkSession); Properties properties = new Properties(); properties.put("user", srcUserName); properties.put("password", srcPassword); Dataset<Row> dataset = sqlContext.read() .jdbc("jdbc:mysql://" + srcHost + ":" + srcPort + "/" + srcDataBase,srcTable, properties); dataset.show(); } }
-
Run
MoRead.Main()
in IDEA, and the execution result is as follows:
Step Three: Write MySQL Data into MatrixOne
Now you can start migrating MySQL data to MatrixOne using Spark.
-
Prepare MySQL data
mysql -h127.0.0.1 -P3306 -uroot -proot mysql> CREATE DATABASE motest; mysql> USE motest; mysql> CREATE TABLE `person` (`id` int DEFAULT NULL, `name` varchar(255) DEFAULT NULL, `birthday` date DEFAULT NULL); mysql> INSERT INTO motest.person (id, name, birthday) VALUES(2, 'lisi', '2023-07-09'),(3, 'wangwu', '2023-07-13'),(4, 'zhaoliu', '2023-08-08');
-
On node3, use the MySQL client to connect to the local MySQL, create the required database and tables, and insert data. Since this example continues to use the test database from the previous example of reading MatrixOne data, we need to empty the person table first.
mysql -h 192.168.146.10 -P 6001 -u root -p 111 mysql> TRUNCATE TABLE test.person;
-
Code in IDEA
Create
Person.java
andMysql2Mo.java
classes to read MySQL data using Spark.TheMysql2Mo.java
class code can be seen in the following example:package com.matrixone.spark; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.*; import java.sql.SQLException; import java.util.Properties; /** * @author MatrixOne * @desc */ public class Mysql2Mo { // parameters private static String master = "local[2]"; private static String appName = "app_spark_demo"; private static String srcHost = "127.0.0.1"; private static Integer srcPort = 3306; private static String srcUserName = "root"; private static String srcPassword = "root"; private static String srcDataBase = "motest"; private static String srcTable = "person"; private static String destHost = "192.168.146.10"; private static Integer destPort = 6001; private static String destUserName = "root"; private static String destPassword = "111"; private static String destDataBase = "test"; private static String destTable = "person"; public static void main(String[] args) throws SQLException { SparkSession sparkSession = SparkSession.builder().appName(appName).master(master).getOrCreate(); SQLContext sqlContext = new SQLContext(sparkSession); Properties connectionProperties = new Properties(); connectionProperties.put("user", srcUserName); connectionProperties.put("password", srcPassword); connectionProperties.put("driver","com.mysql.cj.jdbc.Driver"); //jdbc.url=jdbc:mysql://127.0.0.1:3306/database String url = "jdbc:mysql://" + srcHost + ":" + srcPort + "/" + srcDataBase + "?characterEncoding=utf-8&autoReconnect=true&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai"; //SparkJdbc read table System.out.println("read table person in database"); Dataset<Row> rowDataset = sqlContext.read().jdbc(url,srcTable,connectionProperties).select("*"); //show data //rowDataset.show(); Dataset<Row> dataset = rowDataset.filter("id > 2") .map((MapFunction<Row, Row>) row -> RowFactory.create(row.getInt(0), "spark_" + row.getString(1), row.getDate(2)), RowEncoder.apply(rowDataset.schema())); //show data //dataset.show(); Properties properties = new Properties(); properties.put("user", destUserName); properties.put("password", destPassword);; dataset.write() .mode(SaveMode.Append) .jdbc("jdbc:mysql://" + destHost + ":" + destPort + "/" + destDataBase,destTable, properties); } }
Step Four: View Results
Execute the following SQL in MatrixOne to see the results:
select * from test.person;
+------+---------------+------------+
| id | name | birthday |
+------+---------------+------------+
| 3 | spark_wangwu | 2023-07-12 |
| 4 | spark_zhaoliu | 2023-08-07 |
+------+---------------+------------+
2 rows in set (0.01 sec)