日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区

您的位置:首頁(yè)技術(shù)文章
文章詳情頁(yè)

Spring Batch遠(yuǎn)程分區(qū)的本地Jar包模式的代碼詳解

瀏覽:71日期:2023-08-14 14:35:11

1 前言

Spring Batch遠(yuǎn)程分區(qū)對(duì)于大量數(shù)據(jù)的處理非常擅長(zhǎng),它的實(shí)現(xiàn)有多種方式,如本地Jar包模式、MQ模式、Kubernetes模式。這三種模式的如下:

(1)本地Jar包模式:分區(qū)處理的worker為一個(gè)Java進(jìn)程,從jar包啟動(dòng),通過(guò)jvm參數(shù)和數(shù)據(jù)庫(kù)傳遞參數(shù);官方提供示例代碼。

(2)MQ模式:worker是一個(gè)常駐進(jìn)程,Manager和Worker通過(guò)消息隊(duì)列來(lái)傳遞參數(shù);網(wǎng)上有不少相關(guān)示例代碼。

(3)Kubernetes模式:worker為K8s中的Pod,Manager直接啟動(dòng)Pod來(lái)處理;網(wǎng)上并沒(méi)有找到任何示例代碼。

本文將通過(guò)代碼來(lái)講解第一種模式(本地Jar包模式),其它后續(xù)再介紹。

Spring Batch遠(yuǎn)程分區(qū)的本地Jar包模式的代碼詳解

建議先看下面文章了解一下:

Spring Batch入門(mén):Spring Batch入門(mén)教程篇

Spring Batch并行處理介紹:詳解SpringBoot和SpringBatch 使用

2 代碼講解

本文代碼中,Manager和Worker是放在一起的,在同一個(gè)項(xiàng)目里,也只會(huì)打一個(gè)jar包而已;我們通過(guò)profile來(lái)區(qū)別是manager還是worker,也就是通過(guò)Spring Profile實(shí)現(xiàn)一份代碼,兩份邏輯。實(shí)際上也可以拆成兩份代碼,但放一起更方便測(cè)試,而且代碼量不大,就沒(méi)有必要了。

2.1 項(xiàng)目準(zhǔn)備

2.1.1 數(shù)據(jù)庫(kù)

首先我們需要準(zhǔn)備一個(gè)數(shù)據(jù)庫(kù),因?yàn)镸anager和Worker都需要同步狀態(tài)到DB上,不能直接使用嵌入式的內(nèi)存數(shù)據(jù)庫(kù)了,需要一個(gè)外部可共同訪問(wèn)的數(shù)據(jù)庫(kù)。這里我使用的是H2 Database,安裝可參考:把H2數(shù)據(jù)庫(kù)從jar包部署到Kubernetes,并解決Ingress不支持TCP的問(wèn)題。

2.1.2 引入依賴

maven引入依賴如下所示:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-task</artifactId></dependency><dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-deployer-local</artifactId> <version>2.4.1</version></dependency><dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-integration</artifactId></dependency>

spring-cloud-deployer-local用于部署和啟動(dòng)worker,非常關(guān)鍵;其它就是Spring Batch和Task相關(guān)的依賴;以及數(shù)據(jù)庫(kù)連接。

2.1.3 主類(lèi)入口

Springboot的主類(lèi)入口如下:

@EnableTask@SpringBootApplication@EnableBatchProcessingpublic class PkslowRemotePartitionJar { public static void main(String[] args) { SpringApplication.run(PkslowRemotePartitionJar.class, args); }}

在Springboot的基礎(chǔ)上,添加了Spring Batch和Spring Cloud Task的支持。

2.2 關(guān)鍵代碼編寫(xiě)

前面的數(shù)據(jù)庫(kù)搭建和其它代碼沒(méi)有太多可講的,接下來(lái)就開(kāi)始關(guān)鍵代碼的編寫(xiě)。

2.2.1 分區(qū)管理Partitioner

Partitioner是遠(yuǎn)程分區(qū)中的核心bean,它定義了分成多少個(gè)區(qū)、怎么分區(qū),要把什么變量傳遞給worker。它會(huì)返回一組<分區(qū)名,執(zhí)行上下文>的鍵值對(duì),即返回Map<String, ExecutionContext>。把要傳遞給worker的變量放在ExecutionContext中去,支持多種類(lèi)型的變量,如String、int、long等。實(shí)際上,我們不建議通過(guò)ExecutionContext來(lái)傳遞太多數(shù)據(jù);可以傳遞一些標(biāo)識(shí)或主鍵,然后worker自己去拿數(shù)據(jù)即可。

具體代碼如下:

private static final int GRID_SIZE = 4;@Beanpublic Partitioner partitioner() { return new Partitioner() { @Override public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> partitions = new HashMap<>(gridSize); for (int i = 0; i < GRID_SIZE; i++) { ExecutionContext executionContext = new ExecutionContext(); executionContext.put('partitionNumber', i); partitions.put('partition' + i, executionContext); } return partitions; } };}

上面分成4個(gè)區(qū),程序會(huì)啟動(dòng)4個(gè)worker來(lái)處理;給worker傳遞的參數(shù)是partitionNumber。

2.2.2 分區(qū)處理器PartitionHandler

PartitionHandler也是核心的bean,它決定了怎么去啟動(dòng)worker,給它們傳遞什么jvm參數(shù)(跟之前的ExecutionContext傳遞不一樣)。

@Beanpublic PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer, TaskRepository taskRepository) throws Exception { Resource resource = this.resourceLoader.getResource(workerResource); DeployerPartitionHandler partitionHandler = new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, 'workerStep', taskRepository); List<String> commandLineArgs = new ArrayList<>(3); commandLineArgs.add('--spring.profiles.active=worker'); commandLineArgs.add('--spring.cloud.task.initialize-enabled=false'); commandLineArgs.add('--spring.batch.initializer.enabled=false'); partitionHandler .setCommandLineArgsProvider(new PassThroughCommandLineArgsProvider(commandLineArgs)); partitionHandler .setEnvironmentVariablesProvider(new SimpleEnvironmentVariablesProvider(this.environment)); partitionHandler.setMaxWorkers(2); partitionHandler.setApplicationName('PkslowWorkerJob'); return partitionHandler;}

上面代碼中:

resource是worker的jar包地址,表示將啟動(dòng)該程序;

workerStep是worker將要執(zhí)行的step;

commandLineArgs定義了啟動(dòng)worker的jvm參數(shù),如--spring.profiles.active=worker;

environment是manager的系統(tǒng)環(huán)境變量,可以傳遞給worker,當(dāng)然也可以選擇不傳遞;

MaxWorkers是最多能同時(shí)啟動(dòng)多少個(gè)worker,類(lèi)似于線程池大小;設(shè)置為2,表示最多同時(shí)有2個(gè)worker來(lái)處理4個(gè)分區(qū)。

2.2.3 Manager和Worker的Batch定義

完成了分區(qū)相關(guān)的代碼,剩下的就只是如何定義Manager和Worker的業(yè)務(wù)代碼了。

Manager作為管理者,不用太多業(yè)務(wù)邏輯,代碼如下:

@Bean@Profile('!worker')public Job partitionedJob(PartitionHandler partitionHandler) throws Exception { Random random = new Random(); return this.jobBuilderFactory.get('partitionedJob' + random.nextInt()) .start(step1(partitionHandler)) .build();}@Beanpublic Step step1(PartitionHandler partitionHandler) throws Exception { return this.stepBuilderFactory.get('step1') .partitioner(workerStep().getName(), partitioner()) .step(workerStep()) .partitionHandler(partitionHandler) .build();}

Worker主要作用是處理數(shù)據(jù),是我們的業(yè)務(wù)代碼,這里就演示一下如何獲取Manager傳遞過(guò)來(lái)的partitionNumber:

@Beanpublic Step workerStep() { return this.stepBuilderFactory.get('workerStep') .tasklet(workerTasklet(null, null)) .build();}@Bean@StepScopepublic Tasklet workerTasklet(final @Value('#{stepExecutionContext[’partitionNumber’]}') Integer partitionNumber) { return new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { Thread.sleep(6000); //增加延時(shí),查看效果,通過(guò)jps:在jar情況下會(huì)新起java進(jìn)程 System.out.println('This tasklet ran partition: ' + partitionNumber); return RepeatStatus.FINISHED; } };}

通過(guò)表達(dá)式@Value('#{stepExecutionContext[’partitionNumber’]}') 獲取Manager傳遞過(guò)來(lái)的變量;注意要加注解@StepScope。

3 程序運(yùn)行

因?yàn)槲覀兎譃镸anager和Worker,但都是同一份代碼,所以我們先打包一個(gè)jar出來(lái),不然manager無(wú)法啟動(dòng)。配置數(shù)據(jù)庫(kù)和Worker的jar包地址如下:

spring.datasource.url=jdbc:h2:tcp://localhost:9092/testspring.datasource.username=pkslowspring.datasource.password=pkslowspring.datasource.driver-class-name=org.h2.Driverpkslow.worker.resource=file://pkslow/target/remote-partitioning-jar-1.0-SNAPSHOT.jar

執(zhí)行程序如下:

Spring Batch遠(yuǎn)程分區(qū)的本地Jar包模式的代碼詳解

可以看到啟動(dòng)了4次Java程序,還給出日志路徑。

通過(guò)jps命令查看,能看到一個(gè)Manager進(jìn)程,還有兩個(gè)worker進(jìn)程:

Spring Batch遠(yuǎn)程分區(qū)的本地Jar包模式的代碼詳解

4 復(fù)雜變量傳遞

前面講了Manager可以通過(guò)ExecutionContext傳遞變量,如簡(jiǎn)單的String、long等。但其實(shí)它也是可以傳遞復(fù)雜的Java對(duì)象的,但對(duì)應(yīng)的類(lèi)需要可序列化,如:

import java.io.Serializable;public class Person implements Serializable { private Integer age; private String name; private String webSite; //getter and setter}

Manager傳遞:

executionContext.put('person', new Person(0, 'pkslow', 'www.pkslow.com'));

Worker接收:

@Value('#{stepExecutionContext[’person’]}') Person person

5 總結(jié)

本文介紹了Spring Batch遠(yuǎn)程分區(qū)的本地Jar包模式,只能在一臺(tái)機(jī)器上運(yùn)行,所以也是無(wú)法真正發(fā)揮出遠(yuǎn)程分區(qū)的作用。但它對(duì)我們后續(xù)理解更復(fù)雜的模式是有很大幫助的;同時(shí),我們也可以使用本地模式進(jìn)行開(kāi)發(fā)測(cè)試,畢竟它只需要一個(gè)數(shù)據(jù)庫(kù)就行了,依賴很少。

標(biāo)簽: Spring
相關(guān)文章:
日本不卡不码高清免费观看,久久国产精品久久w女人spa,黄色aa久久,三上悠亚国产精品一区二区三区
日韩av字幕| 国产a久久精品一区二区三区| 97久久亚洲| 国产香蕉精品| 中文字幕日本一区二区| 蜜桃视频在线观看一区| 日韩一级欧洲| 岛国av在线播放| 久久av在线| 久久免费大视频| 日本午夜精品久久久| 蜜桃视频在线网站| 亚洲精品自拍| 国产精品av一区二区| 国产精品99久久免费| av不卡在线| 亚洲三级av| 国产极品模特精品一二| 欧美特黄一级| 久久激五月天综合精品| 欧美精品一区二区久久| 综合激情网站| 亚洲一区欧美激情| 丝袜a∨在线一区二区三区不卡| 精品三区视频| 日韩一区二区免费看| 亚洲主播在线| 日韩精品久久久久久久电影99爱| 9999国产精品| 国产在线看片免费视频在线观看| 神马午夜在线视频| 黄色不卡一区| 一本一道久久a久久| 国产精品宾馆| 四虎884aa成人精品最新| 婷婷综合在线| 欧美日韩视频| 欧美一区影院| 最近高清中文在线字幕在线观看1| 欧美不卡在线| 日本欧美在线看| 国产日韩1区| 日韩久久视频| 亚洲色图国产| 精品一区二区三区中文字幕在线| 亚洲天堂黄色| 久久激情五月激情| 99久久亚洲精品蜜臀| 亚洲精品乱码久久久久久蜜桃麻豆| 国产极品模特精品一二| re久久精品视频| 日韩精品一区二区三区中文字幕| 97se综合| 在线亚洲成人| 国产欧美69| 亚洲一区日韩在线| 日本午夜精品一区二区三区电影| 国产一区二区色噜噜| 午夜日本精品| 国产精品任我爽爆在线播放 | 亚洲成人av观看| 欧美伊人久久| 成人午夜精品| 91精品视频一区二区| 日韩国产一区二区| 午夜在线视频一区二区区别| 久久99久久人婷婷精品综合| 中文精品视频| 麻豆视频在线观看免费网站黄 | 亚洲精品欧洲| 色婷婷狠狠五月综合天色拍| 欧美日一区二区在线观看| 久久视频精品| 蜜桃久久久久| 国产欧美日本| 日韩激情视频网站| 亚洲香蕉网站| 欧美一区精品| 日韩视频一区| 成人福利av| 国产欧美日韩视频在线| 免费精品视频| 日韩精品一区二区三区免费观影| 国产日本精品| 性色av一区二区怡红| 日韩电影免费网站| 国产另类在线| 免费欧美在线视频| 欧美午夜精彩| 国产精品yjizz视频网| 国产精品亚洲片在线播放| 国产精品色网| 在线观看精品| 久久av网址| 久久激情五月婷婷| 亚洲无线观看| 久久xxxx| 国产一区亚洲| 日韩理论视频| 国内自拍视频一区二区三区| 欧美日韩va| 亚洲欧美久久精品| 亚洲综合精品| 影音国产精品| 激情欧美一区二区三区| 中文字幕人成乱码在线观看| 久久精品国产免费| 亚洲欧美日韩国产一区二区| 久久精选视频| 九九精品调教| 欧产日产国产精品视频| 久久尤物视频| 国产精品亚洲综合在线观看| 欧美日韩xxxx| 国产亚洲久久| 18国产精品| 91精品啪在线观看国产爱臀| 色8久久久久| 日韩中文字幕一区二区高清99| 免费精品视频最新在线| 午夜一级在线看亚洲| 国产视频一区免费看| 欧美在线资源| 88xx成人免费观看视频库| а√在线中文在线新版| 亚洲天堂资源| 日韩在线短视频| 久久视频一区| 久久在线免费| 激情偷拍久久| 国产日韩专区| 亚洲18在线| 国产日韩欧美一区在线| 国产极品一区| 国产不卡精品在线| 黄色在线网站噜噜噜| 日韩成人亚洲| 欧美中文字幕一区二区| 日韩免费福利视频| 久久精品123| 伊人精品在线| 男女男精品网站| 亚洲精品成a人ⅴ香蕉片| 国产亚洲精aa在线看| 嫩草伊人久久精品少妇av杨幂 | 国产精品4hu.www| 精品美女久久| 成人精品天堂一区二区三区| 亚洲深夜福利| 97成人超碰| 精品一区二区三区在线观看视频 | 国产日韩视频在线| 精品视频在线你懂得| 日韩欧美看国产| 国产亚洲毛片| 国产日韩一区| 久久青青视频| 影院欧美亚洲| 国产麻豆精品久久| 日韩免费高清| 一区二区国产在线观看| 欧美国产中文高清| 超碰在线99| 99亚洲视频| 国产欧美69| 三级小说欧洲区亚洲区| 免费人成黄页网站在线一区二区| 国产精品三p一区二区| 人人草在线视频| 亚洲视频二区| 国内精品亚洲| 午夜宅男久久久| 国产精品xxx| 在线日韩av| 日韩二区三区在线观看| 色婷婷综合网| 亚洲我射av| 国产欧美三级| 久久精品中文| 国产日韩在线观看视频| 欧美亚洲国产精品久久| 日韩va欧美va亚洲va久久| 伊伊综合在线| 亚洲人妖在线| 色在线视频观看| 免费日韩av| 久久久免费人体| 亚洲免费观看| 精品国产一区二区三区性色av| 欧美日韩国产亚洲一区| 久久99视频| 丝袜诱惑制服诱惑色一区在线观看| 国产日韩欧美三区| 五月天久久久| 精品亚洲精品| 日本不卡高清视频| 欧美日韩国产传媒| 国产精品日本一区二区三区在线| 国产综合欧美| 国产精品白丝久久av网站|