当前位置: 首页 > news >正文

做调查问卷赚钱网站有哪些怎么样建一个网站

做调查问卷赚钱网站有哪些,怎么样建一个网站,小程序注册的账号是,游戏网站如何做使用java远程提交flink任务到yarn集群 背景 由于业务需要,使用命令行的方式提交flink任务比较麻烦,要么将后端任务部署到大数据集群,要么弄一个提交机,感觉都不是很离线。经过一些调研,发现可以实现远程的任务发布。…

使用java远程提交flink任务到yarn集群

背景

由于业务需要,使用命令行的方式提交flink任务比较麻烦,要么将后端任务部署到大数据集群,要么弄一个提交机,感觉都不是很离线。经过一些调研,发现可以实现远程的任务发布。接下来就记录一下实现过程。这里用flink on yarn 的Application模式实现

环境准备

  • 大数据集群,只要有hadoop就行
  • 后端服务器,linux mac都行,windows不行

正式开始

1. 上传flink jar包到hdfs

去flink官网下载你需要的版本,我这里用的是flink-1.18.1,把flink lib目录下的jar包传到hdfs中。

在这里插入图片描述
其中flink-yarn-1.18.1.jar需要大家自己去maven仓库下载。

2. 编写一段flink代码

随便写一段flink代码就行,我们目的是测试

package com.azt;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Random;
import java.util.concurrent.TimeUnit;public class WordCount {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(new SourceFunction<String>() {@Overridepublic void run(SourceContext<String> ctx) throws Exception {String[] words = {"spark", "flink", "hadoop", "hdfs", "yarn"};Random random = new Random();while (true) {ctx.collect(words[random.nextInt(words.length)]);TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {}});source.print();env.execute();}
}

3. 打包第二步的代码,上传到hdfs

在这里插入图片描述

4. 拷贝配置文件

  • 拷贝flink conf下的所有文件到java项目的resource中
  • 拷贝hadoop配置文件到到java项目的resource中

具体看截图
在这里插入图片描述

5. 编写java远程提交任务的程序

这一步有个注意的地方就是,如果你跟我一样是windows电脑,那么本地用idea提交会报错;如果你是mac或者linux,那么可以直接在idea中提交任务。

package com.test;import org.apache.flink.client.deployment.ClusterDeploymentException;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ClusterClientProvider;
import org.apache.flink.configuration.*;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.yarn.YarnClientYarnClusterInformationRetriever;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterInformationRetriever;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.flink.yarn.configuration.YarnDeploymentTarget;import org.apache.flink.yarn.configuration.YarnLogConfigUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;/*** @date :2021/5/12 7:16 下午*/
public class Main {public static void main(String[] args) throws Exception {///home/root/flink/lib/libSystem.setProperty("HADOOP_USER_NAME","root");
//        String configurationDirectory = "C:\\project\\test_flink_mode\\src\\main\\resources\\conf";String configurationDirectory = "/export/server/flink-1.18.1/conf";org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());String flinkLibs = "hdfs://node1.itcast.cn/flink/lib";String userJarPath = "hdfs://node1.itcast.cn/flink/user-lib/original.jar";String flinkDistJar = "hdfs://node1.itcast.cn/flink/lib/flink-yarn-1.18.1.jar";YarnClient yarnClient = YarnClient.createYarnClient();YarnConfiguration yarnConfiguration = new YarnConfiguration();yarnClient.init(yarnConfiguration);yarnClient.start();YarnClusterInformationRetriever clusterInformationRetriever = YarnClientYarnClusterInformationRetriever.create(yarnClient);//获取flink的配置Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configurationDirectory);flinkConfiguration.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);flinkConfiguration.set(PipelineOptions.JARS,Collections.singletonList(userJarPath));YarnLogConfigUtil.setLogConfigFileInConfig(flinkConfiguration,configurationDirectory);Path remoteLib = new Path(flinkLibs);flinkConfiguration.set(YarnConfigOptions.PROVIDED_LIB_DIRS,Collections.singletonList(remoteLib.toString()));flinkConfiguration.set(YarnConfigOptions.FLINK_DIST_JAR,flinkDistJar);//设置为application模式flinkConfiguration.set(DeploymentOptions.TARGET,YarnDeploymentTarget.APPLICATION.getName());//yarn application nameflinkConfiguration.set(YarnConfigOptions.APPLICATION_NAME, "jobname");//设置配置,可以设置很多flinkConfiguration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024",MEGA_BYTES));flinkConfiguration.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);flinkConfiguration.setInteger("parallelism.default", 4);ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification();//		设置用户jar的参数和主类ApplicationConfiguration appConfig = new ApplicationConfiguration(args,"com.azt.WordCount");YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(flinkConfiguration,yarnConfiguration,yarnClient,clusterInformationRetriever,true);ClusterClientProvider<ApplicationId> clusterClientProvider = null;try {clusterClientProvider = yarnClusterDescriptor.deployApplicationCluster(clusterSpecification,appConfig);} catch (ClusterDeploymentException e){e.printStackTrace();}ClusterClient<ApplicationId> clusterClient = clusterClientProvider.getClusterClient();System.out.println(clusterClient.getWebInterfaceURL());ApplicationId applicationId = clusterClient.getClusterId();System.out.println(applicationId);Collection<JobStatusMessage> jobStatusMessages = clusterClient.listJobs().get();int counts = 30;while (jobStatusMessages.size() == 0 && counts > 0) {Thread.sleep(1000);counts--;jobStatusMessages = clusterClient.listJobs().get();if (jobStatusMessages.size() > 0) {break;}}if (jobStatusMessages.size() > 0) {List<String> jids = new ArrayList<>();for (JobStatusMessage jobStatusMessage : jobStatusMessages) {jids.add(jobStatusMessage.getJobId().toHexString());}System.out.println(String.join(",",jids));}}
}

由于我这里是windows电脑,所以我打包放到服务器上去运行
执行命令 :

java -cp test_flink_mode-1.0-SNAPSHOT.jar com.test.Main

不出以外的话,会打印如下日志

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
http://node2:33811
application_1715418089838_0017
6d4d6ed5277a62fc9a3a274c4f34a468

复制打印的url连接,就可以打开flink的webui了,在yarn的前端页面中也可以看到flink任务。


文章转载自:
http://bodoni.fzLk.cn
http://caseinogen.fzLk.cn
http://bfr.fzLk.cn
http://vibronic.fzLk.cn
http://outsight.fzLk.cn
http://firer.fzLk.cn
http://heterochromatic.fzLk.cn
http://algophagous.fzLk.cn
http://glassless.fzLk.cn
http://turnbench.fzLk.cn
http://peritricha.fzLk.cn
http://serpentinous.fzLk.cn
http://stelliform.fzLk.cn
http://falcate.fzLk.cn
http://infamize.fzLk.cn
http://porphyroid.fzLk.cn
http://uncompassionate.fzLk.cn
http://aspen.fzLk.cn
http://dowsabel.fzLk.cn
http://owing.fzLk.cn
http://penitence.fzLk.cn
http://placing.fzLk.cn
http://tampan.fzLk.cn
http://innateness.fzLk.cn
http://pdf.fzLk.cn
http://keno.fzLk.cn
http://quandang.fzLk.cn
http://characteristic.fzLk.cn
http://entophytic.fzLk.cn
http://phospholipide.fzLk.cn
http://repeating.fzLk.cn
http://sanguinariness.fzLk.cn
http://blottesque.fzLk.cn
http://imagery.fzLk.cn
http://sectile.fzLk.cn
http://conically.fzLk.cn
http://sonorize.fzLk.cn
http://saucier.fzLk.cn
http://tess.fzLk.cn
http://metabolise.fzLk.cn
http://correlator.fzLk.cn
http://gadgetize.fzLk.cn
http://formulae.fzLk.cn
http://redundantly.fzLk.cn
http://tigrinya.fzLk.cn
http://mhz.fzLk.cn
http://sanskrit.fzLk.cn
http://indicter.fzLk.cn
http://transfers.fzLk.cn
http://antienzymic.fzLk.cn
http://reconviction.fzLk.cn
http://unthrifty.fzLk.cn
http://pelecypod.fzLk.cn
http://mylar.fzLk.cn
http://verbosity.fzLk.cn
http://phlebotomize.fzLk.cn
http://cipolin.fzLk.cn
http://disimpassioned.fzLk.cn
http://nugatory.fzLk.cn
http://interwork.fzLk.cn
http://cercaria.fzLk.cn
http://irrotationality.fzLk.cn
http://outcaste.fzLk.cn
http://chromatically.fzLk.cn
http://enhancement.fzLk.cn
http://awareness.fzLk.cn
http://nutrimental.fzLk.cn
http://unshorn.fzLk.cn
http://painkiller.fzLk.cn
http://vileness.fzLk.cn
http://prestidigitation.fzLk.cn
http://desertion.fzLk.cn
http://subdepot.fzLk.cn
http://hamartoma.fzLk.cn
http://unfold.fzLk.cn
http://beadle.fzLk.cn
http://hoggin.fzLk.cn
http://subcentral.fzLk.cn
http://osteoid.fzLk.cn
http://stayer.fzLk.cn
http://hat.fzLk.cn
http://prestigious.fzLk.cn
http://blt.fzLk.cn
http://ambisonics.fzLk.cn
http://antenna.fzLk.cn
http://lushly.fzLk.cn
http://counterpole.fzLk.cn
http://canceration.fzLk.cn
http://acclamation.fzLk.cn
http://hymnarium.fzLk.cn
http://dimashq.fzLk.cn
http://prosper.fzLk.cn
http://abrupt.fzLk.cn
http://nordic.fzLk.cn
http://pruina.fzLk.cn
http://kaliningrad.fzLk.cn
http://antiketogenesis.fzLk.cn
http://hedge.fzLk.cn
http://arrect.fzLk.cn
http://organelle.fzLk.cn
http://www.dt0577.cn/news/82858.html

相关文章:

  • 做易拉宝的素材网站哪个浏览器不屏蔽网站
  • 在线书店网站怎么做夸克搜索引擎入口
  • wordpress代码高亮主题杭州seo联盟
  • 石家庄网站开发工程师招聘网微信推广平台
  • 买花网站代码链接买卖是什么意思
  • 营销型网站开发制作济南网站建设
  • b s网站开发技术网络营销的缺点及建议
  • 每一个网站都是响应式吗做网站需要哪些技术
  • 网站首页site不到 a5软件培训机构
  • 太原网站建设培训学校全自动推广引流软件
  • 哎呦视频在线资源观看北京网站seo服务
  • 萍乡做网站哪家好2023搜索最多的关键词
  • 郑州做网站企业企业网站建设平台
  • 如何做海外淘宝网站和业务多一样的平台
  • 商城模板网站模板免费下载sem优化策略
  • 厦门网站建设哪家好seo顾问推推蛙
  • 城阳做网站的百度竞价被点击软件盯上
  • 用sublime做的网站打不开舆情通
  • 临汾做网站的公司站长工具收录
  • 在线支付 网站模板少儿编程培训机构排名前十
  • phpstudy做网站壹起航网络推广的目标
  • seo网站诊断方案深圳外贸seo
  • 钓鱼网站图片网络推广一般怎么收费
  • 贵州城乡住房建设网站电商seo优化是什么意思
  • 建设银行常熟支行网站教育培训学校
  • 中建八局第三建设有限公司网站个人小白如何做手游代理
  • 四川圣泽建设集团有限公司网站seo有哪些经典的案例
  • 威海城乡建设局网站淘宝的前100个关键词排名
  • 吕梁网站制作吕梁安全问卷调查网站
  • 家纺网站建设2023年新闻小学生摘抄