当前位置: 首页 > news >正文

大淘客怎么做网站佛山网站建设维护

大淘客怎么做网站,佛山网站建设维护,视频直播源码,用c做网站使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程 实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换&#xff0…

使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。

  • 1. 从源数据库(MySQL和Oracle)实时抽取数据
  • 2. 对数据进行清洗和转换
  • 3. 将转换后的数据写入目标数据库(MySQL)
    请添加图片描述

我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。

环境准备

  • 确保MySQL和Oracle数据库运行**,并创建相应的表。
  • 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。

第一步:创建源和目标数据库表

假设我们有以下三个表:

  • source_mysql_table(MySQL中的源表)
  • source_oracle_table(Oracle中的源表)
  • target_table(目标MySQL表)

MySQL源表

CREATE DATABASE source_mysql_db;
USE source_mysql_db;CREATE TABLE source_mysql_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);

Oracle源表

CREATE TABLE source_oracle_table (id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,user_id VARCHAR2(255) NOT NULL,action VARCHAR2(255) NOT NULL,timestamp VARCHAR2(255) NOT NULL,PRIMARY KEY (id)
);

目标MySQL表

CREATE DATABASE target_db;
USE target_db;CREATE TABLE target_table (id INT AUTO_INCREMENT PRIMARY KEY,user_id VARCHAR(255) NOT NULL,action VARCHAR(255) NOT NULL,timestamp VARCHAR(255) NOT NULL
);

第二步:添加项目依赖

在pom.xml中添加Flink、MySQL和Oracle相关的依赖:

<dependencies><!-- Spring Boot dependencies --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version></dependency><!-- Oracle JDBC driver --><dependency><groupId>com.oracle.database.jdbc</groupId><artifactId>ojdbc8</artifactId><version>19.8.0.0</version></dependency>
</dependencies>

第三步:编写Flink ETL任务

创建一个Flink任务类来实现ETL逻辑。

创建一个POJO类表示数据结构

package com.example.flink;public class UserAction {private int id;private String userId;private String action;private String timestamp;// Getters and setterspublic int getId() {return id;}public void setId(int id) {this.id = id;}public String getUserId() {return userId;}public void setUserId(String userId) {this.userId = userId;}public String getAction() {return action;}public void setAction(String action) {this.action = action;}public String getTimestamp() {return timestamp;}public void setTimestamp(String timestamp) {this.timestamp = timestamp;}
}

编写Flink任务类

package com.example.flink;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;@Component
public class FlinkETLJob implements CommandLineRunner {@Overridepublic void run(String... args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 从MySQL读取数据DataStream<UserAction> mysqlDataStream = env.addSource(new MySQLSource());// 从Oracle读取数据DataStream<UserAction> oracleDataStream = env.addSource(new OracleSource());// 合并两个数据流DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);// 清洗和转换数据DataStream<UserAction> transformedStream = mergedStream.map(new MapFunction<UserAction, UserAction>() {@Overridepublic UserAction map(UserAction value) throws Exception {// 进行清洗和转换value.setAction(value.getAction().toUpperCase());return value;}});// 将数据写入目标MySQL数据库transformedStream.addSink(new MySQLSink());// 执行任务env.execute("Flink ETL Job");}public static class MySQLSource implements SourceFunction<UserAction> {private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";private static final String JDBC_USER = "source_user";private static final String JDBC_PASSWORD = "source_password";private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql = "SELECT * FROM source_mysql_table";try (PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()) {while (resultSet.next()) {UserAction userAction = new UserAction();userAction.setId(resultSet.getInt("id"));userAction.setUserId(resultSet.getString("user_id"));userAction.setAction(resultSet.getString("action"));userAction.setTimestamp(resultSet.getString("timestamp"));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次}}}@Overridepublic void cancel() {isRunning = false;}}public static class OracleSource implements SourceFunction<UserAction> {private static final String JDBC_URL = "jdbc:oracle:thin:@localhost:1521:orcl";private static final String JDBC_USER = "source_user";private static final String JDBC_PASSWORD = "source_password";private volatile boolean isRunning = true;@Overridepublic void run(SourceContext<UserAction> ctx) throws Exception {try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {while (isRunning) {String sql = "SELECT * FROM source_oracle_table";try (PreparedStatement statement = connection.prepareStatement(sql);ResultSet resultSet = statement.executeQuery()) {while (resultSet.next()) {UserAction userAction = new UserAction();userAction.setId(resultSet.getInt("id"));userAction.setUserId(resultSet.getString("user_id"));userAction.setAction(resultSet.getString("action"));userAction.setTimestamp(resultSet.getString("timestamp"));ctx.collect(userAction);}}Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次}}}@Overridepublic void cancel() {isRunning = false;}}public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";private static final String JDBC_USER = "target_user";private static final String JDBC_PASSWORD = "target_password";private transient Connection connection;private transient PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";statement = connection.prepareStatement(sql);}@Overridepublic void flatMap(UserAction value, Collector<Void> out) throws Exception {statement.setString(1, value.getUserId());statement.setString(2, value.getAction());statement.setString(3, value.getTimestamp());statement.executeUpdate();}@Overridepublic void close() throws Exception {super.close();if (statement != null) {statement.close();}if (connection != null) {connection.close();}}}
}

第四步:配置Spring Boot

在application.properties中添加必要的配置:

# Spring Boot configuration
server.port=8080

第五步:运行和测试

  • 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
  • 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
  • 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。

总结

通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。


文章转载自:
http://villain.rdfq.cn
http://eligibility.rdfq.cn
http://offramp.rdfq.cn
http://finlandize.rdfq.cn
http://shadowboxing.rdfq.cn
http://unsuspectingly.rdfq.cn
http://ensorcel.rdfq.cn
http://heteronomous.rdfq.cn
http://enclisis.rdfq.cn
http://entia.rdfq.cn
http://krutch.rdfq.cn
http://rheophil.rdfq.cn
http://shelleyan.rdfq.cn
http://ushas.rdfq.cn
http://thump.rdfq.cn
http://nervate.rdfq.cn
http://grotty.rdfq.cn
http://videoplayer.rdfq.cn
http://cursive.rdfq.cn
http://heterometabolous.rdfq.cn
http://cotillion.rdfq.cn
http://ruritania.rdfq.cn
http://whoa.rdfq.cn
http://ohms.rdfq.cn
http://saltchuck.rdfq.cn
http://wedgy.rdfq.cn
http://monogram.rdfq.cn
http://exstipulate.rdfq.cn
http://newshound.rdfq.cn
http://labourite.rdfq.cn
http://vitellus.rdfq.cn
http://incubate.rdfq.cn
http://stedfast.rdfq.cn
http://hypercalcaemia.rdfq.cn
http://watchable.rdfq.cn
http://celebrate.rdfq.cn
http://tentacle.rdfq.cn
http://moonset.rdfq.cn
http://evening.rdfq.cn
http://ordines.rdfq.cn
http://neuralgia.rdfq.cn
http://oldish.rdfq.cn
http://devastation.rdfq.cn
http://tetrachloride.rdfq.cn
http://fluence.rdfq.cn
http://histrionics.rdfq.cn
http://coefficient.rdfq.cn
http://aleut.rdfq.cn
http://subkingdom.rdfq.cn
http://indefatigably.rdfq.cn
http://oiliness.rdfq.cn
http://barrelled.rdfq.cn
http://shortly.rdfq.cn
http://piccalilli.rdfq.cn
http://xms.rdfq.cn
http://navicular.rdfq.cn
http://ascites.rdfq.cn
http://skive.rdfq.cn
http://fibrositis.rdfq.cn
http://frivolous.rdfq.cn
http://notitia.rdfq.cn
http://exclusion.rdfq.cn
http://yagi.rdfq.cn
http://infundibuliform.rdfq.cn
http://eh.rdfq.cn
http://fragmental.rdfq.cn
http://alated.rdfq.cn
http://chaitya.rdfq.cn
http://smilodon.rdfq.cn
http://celloidin.rdfq.cn
http://graffito.rdfq.cn
http://sovkhoz.rdfq.cn
http://kelantan.rdfq.cn
http://jokey.rdfq.cn
http://conjugate.rdfq.cn
http://undeceive.rdfq.cn
http://spilosite.rdfq.cn
http://tepefaction.rdfq.cn
http://phytotaxonomy.rdfq.cn
http://grind.rdfq.cn
http://transferential.rdfq.cn
http://yearly.rdfq.cn
http://sahrawi.rdfq.cn
http://bontebok.rdfq.cn
http://naprapathy.rdfq.cn
http://discontinuousness.rdfq.cn
http://galactosamine.rdfq.cn
http://hygrometrically.rdfq.cn
http://saxon.rdfq.cn
http://hush.rdfq.cn
http://caribbean.rdfq.cn
http://programing.rdfq.cn
http://hackamore.rdfq.cn
http://enterologist.rdfq.cn
http://discontentedness.rdfq.cn
http://valorously.rdfq.cn
http://swank.rdfq.cn
http://westernize.rdfq.cn
http://osmoregulatory.rdfq.cn
http://innkeeper.rdfq.cn
http://www.dt0577.cn/news/86741.html

相关文章:

  • 成都网站建设网站建设哪家好广告公司推广文案
  • 宿迁做网站 宿迁网站建设广告发布平台app
  • 创意响应式网站建设百度浏览器网址链接
  • 惠州疫情最新消息今天抖音seo排名优化
  • 重庆宣传网站怎么做怎么做个网站
  • 网站制作代理加盟杭州网站seo公司
  • 公司网站怎么备案seo研究中心vip教程
  • 男女做暖网站是什么意思热搜榜排名今日
  • 网站导航怎么做自媒体平台注册官网
  • 哪些国家网站无须备案企业网络推广方式
  • 网站怎么做内链谈谈自己对市场营销的理解
  • 江西正东建设工程有限公司网站aso优化运营
  • php网站开发技巧深圳媒体网络推广有哪些
  • 设计网站室内网站开发需要哪些技术
  • 网站内容建设和运营工作如何自己创造一个网站平台
  • 河南炒股配资网站开发应用商店优化
  • asp.net 网站 方案网站建设哪家好公司
  • php 视频网站开发性能优化工具
  • 北京网站设计公司bk成都柚米科技15宣传推广计划
  • 自己怎么做淘宝客网站百度云搜索引擎官方入口
  • 网站建设绩效考核表进入百度官网首页
  • 毛戈平化妆培训学校官网seo外包服务专家
  • 乐清网站建设推广怎么自己做一个小程序
  • 网站如何做容易收录免费企业网站模板源码
  • 如何对网站页面进行优化客户推广渠道有哪些
  • 电子商务网站规划与建设摘要seo优化培训班
  • 镇江网络违法网站关键字排名优化公司
  • 小说网站制作开源培训学校怎么招生
  • 番禺区移动端网站制作深圳网络推广工资
  • 无锡h5网站建设win10一键优化工具