本篇內(nèi)容主要講解“如何將數(shù)據(jù)從Web服務(wù)處理到MongoDB中”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“如何將數(shù)據(jù)從Web服務(wù)處理到MongoDB中”吧!
創(chuàng)新互聯(lián)建站長期為1000+客戶提供的網(wǎng)站建設(shè)服務(wù),團(tuán)隊(duì)從業(yè)經(jīng)驗(yàn)10年,關(guān)注不同地域、不同群體,并針對不同對象提供差異化的產(chǎn)品和服務(wù);打造開放共贏平臺,與合作伙伴共同營造健康的互聯(lián)網(wǎng)生態(tài)環(huán)境。為鳳城企業(yè)提供專業(yè)的成都網(wǎng)站建設(shè)、做網(wǎng)站,鳳城網(wǎng)站改版等技術(shù)服務(wù)。擁有10余年豐富建站經(jīng)驗(yàn)和眾多成功案例,為您定制開發(fā)。
如何創(chuàng)建一個(gè)使用Web服務(wù)數(shù)據(jù)并將其插入MongoDB數(shù)據(jù)庫的Spring Batch應(yīng)用程序。
閱讀本文的開發(fā)人員必須熟悉Spring Batch(示例)和MongoDB。
Mongo數(shù)據(jù)庫部署在MLab中。請按照本快速入門中的步驟操作。
批處理應(yīng)用程序部署在Heroku PaaS中。詳情 請看這里。
IDE STS或IntelliJ或Eclipse。
Java 8 JDK。
注意:批處理也可以在本地運(yùn)行。
全局場景步驟是:
從Web服務(wù)讀取數(shù)據(jù),在這種情況下:https://sunrise-sunset.org/api
獲取城市列表的坐標(biāo),然后調(diào)用API以讀取日出和日落日期時(shí)間。
2.處理數(shù)據(jù)并提取業(yè)務(wù)數(shù)據(jù)
收集數(shù)據(jù)的業(yè)務(wù)處理
3.在MongoDB中插入已處理的數(shù)據(jù)
將處理過的數(shù)據(jù)保存為mongo文檔
輸入:本地文件中JSON格式的城市數(shù)據(jù)列表,如下所示:
[
{
“名字”:“Danemark”,
“城市”:[
{
“名字”:“Copenhague”,
“l(fā)at”:55.676098,
“l(fā)ng”:12.568337,
“timeZone”:“CET”
},
{
“名字”:“奧胡斯”,
“l(fā)at”:56.162939,
“l(fā)ng”:10.203921,
“timeZone”:“CET”
},
{
“名字”:“歐登塞”,
“l(fā)at”:55.39594,
“l(fā)ng”:10.38831,
“timeZone”:“CET”
},
{
“名字”:“奧爾堡”,
“l(fā)at”:57.046707,
“l(fā)ng”:9.935932,
“timeZone”:“CET”
}
]
}
]
我們的場景從本地json文件獲取輸入數(shù)據(jù)。映射bean如下:
國豆:
導(dǎo)入 java。io??尚蛄谢?nbsp;;
導(dǎo)入 java。util。清單 ;
進(jìn)口 com。fastxml。杰克遜。注釋。JsonIgnoreProperties ;
@JsonIgnoreProperties(ignoreUnknown = true)
public class BCountry 實(shí)現(xiàn) Serializable {
private static final long serialVersionUID = 1L ;
私有 字符串 名稱 ;
私人 名單< BCity > 城市 ;
public BCountry(){
super();
}
public BCountry(String name,List < BCity > cities){
super();
這個(gè)。name = name ;
這個(gè)。城市 = 城市 ;
}
public BCountry(String name){
super();
這個(gè)。name = name ;
}
public String getName(){
返回 名稱 ;
}
public void setName(String name){
這個(gè)。name = name ;
}
public List < BCity > getCities(){
返回 城市 ;
}
public void setCities(List < BCity > cities){
這個(gè)。城市 = 城市 ;
}
@覆蓋
public int hashCode(){
final int prime = 31 ;
int result = 1 ;
結(jié)果 = 黃金 * 結(jié)果 +((城市 == 空)? 0:城市。的hashCode());
結(jié)果 = 黃金 * 結(jié)果 +((名稱 == 空)? 0:名稱。的hashCode());
返回 結(jié)果 ;
}
@覆蓋
public boolean equals(Object obj){
if(this == obj)
返回 true ;
if(obj == null)
返回 虛假 ;
如果(的getClass()!= OBJ。的getClass())
返回 虛假 ;
BCountry other =(BCountry)obj ;
if(cities == null){
如果(其他。城市 != 空)
返回 虛假 ;
} 否則 如果(!城市。平等(等。城市))
返回 虛假 ;
if(name == null){
如果(其他。名字 != 空)
返回 虛假 ;
} 否則 如果(!名字。平等(其它。名))
返回 虛假 ;
返回 true ;
}
@覆蓋
public String toString(){
返回 “BCountry [name =” + name + “,cities =” + cities + “]” ;
}
}
和城市豆:
導(dǎo)入 java。io??尚蛄谢?nbsp;;
進(jìn)口 com。fastxml。杰克遜。注釋。JsonIgnoreProperties ;
@JsonIgnoreProperties(ignoreUnknown = true)
公共 類 BCity 實(shí)現(xiàn) Serializable {
private static final long serialVersionUID = 1L ;
private String name,timeZone ;
私人 雙 拉特,lng ;
public BCity(){
super();
}
public BCity(String name,String timeZone,double lat,double lng){
super();
這個(gè)。name = name ;
這個(gè)。timeZone = timeZone ;
這個(gè)。lat = lat ;
這個(gè)。lng = lng ;
}
public String getName(){
返回 名稱 ;
}
public void setName(String name){
這個(gè)。name = name ;
}
public String getTimeZone(){
返回時(shí) 區(qū) ;
}
public void setTimeZone(String timeZone){
這個(gè)。timeZone = timeZone ;
}
public double getLat(){
返回 緯度 ;
}
public void setLat(double lat){
這個(gè)。lat = lat ;
}
public double getLng(){
返回 lng ;
}
public void setLng(double lng){
這個(gè)。lng = lng ;
}
@覆蓋
public String toString(){
返回 “BCity [name =” + name + “,timeZone =” + timeZone + “,lat =” + lat + “,lng =” + lng + “]” ;
}
@覆蓋
public int hashCode(){
final int prime = 31 ;
int result = 1 ;
長 溫度 ;
temp = Double。doubleToLongBits(lat);
result = prime * result +(int)(temp ^(temp >>> 32));
temp = Double。doubleToLongBits(lng);
result = prime * result +(int)(temp ^(temp >>> 32));
結(jié)果 = 黃金 * 結(jié)果 +((名稱 == 空)? 0:名稱。的hashCode());
結(jié)果 = 素 * 結(jié)果 +((的timeZone == 空)? 0:的timeZone。的hashCode());
返回 結(jié)果 ;
}
@覆蓋
public boolean equals(Object obj){
if(this == obj)
返回 true ;
if(obj == null)
返回 虛假 ;
如果(的getClass()!= OBJ。的getClass())
返回 虛假 ;
BCity other =(BCity)obj ;
如果(雙。doubleToLongBits的(LAT)!= 雙。doubleToLongBits的(其他的。LAT))
返回 虛假 ;
如果(雙。doubleToLongBits的(LNG)!= 雙。doubleToLongBits的(其他的。LNG))
返回 虛假 ;
if(name == null){
如果(其他。名字 != 空)
返回 虛假 ;
} 否則 如果(!名字。平等(其它。名))
返回 虛假 ;
if(timeZone == null){
如果(其他。的timeZone != 空)
返回 虛假 ;
} 否則 如果(!的timeZone。平等(其它。的timeZone))
返回 虛假 ;
返回 true ;
}
}
批量閱讀器實(shí)現(xiàn)@LineMapper。您可以使讀者適應(yīng)我們的數(shù)據(jù)源(示例):
導(dǎo)入 java。util。清單 ;
進(jìn)口 組織。彈簧框架。批次。項(xiàng)目。檔案。LineMapper ;
進(jìn)口 com。ahajri。批次。豆子。BCountry ;
進(jìn)口 com。fastxml。杰克遜。數(shù)據(jù)綁定。ObjectMapper ;
進(jìn)口 com。fastxml。杰克遜。數(shù)據(jù)綁定。類型。CollectionType ;
公共 類 BCountryJsonLineMapper 實(shí)現(xiàn)了 LineMapper < List < BCountry >> {
private final ObjectMapper mapper = new ObjectMapper();
@覆蓋
public List < BCountry > mapLine(String line,int lineNumber)throws Exception {
CollectionType collectionType = mapper。getTypeFactory()。constructCollectionType(列表。類,BCountry。類);
返回 映射器。readValue(line,collectionType);
}
}
處理數(shù)據(jù)批處理時(shí),檢查同一天某個(gè)城市的業(yè)務(wù)處理數(shù)據(jù)是否已保存在數(shù)據(jù)庫中。在MongoDB的搜索數(shù)據(jù)的方式就是在這個(gè)詳細(xì)的崗位。
ItemProcessor將@BCountry對象轉(zhuǎn)換為MongoDB Document對象。該過程詳述如下:
public class BCountryPrayTimeEventItemProcessor 實(shí)現(xiàn) ItemProcessor < List < BCountry >,List < Document >> {
private static final String EVENTS_COLLECTION_NAME = “event” ;
private static final Logger LOG = LoggerFactory。getLogger(BCountryPrayTimeEventItemProcessor。類);
@Autowired
private PrayTimeService prayTimeService ;
@Autowired
private CloudMongoService cloudMongoService ;
@覆蓋
public List < Document > 進(jìn)程(List < BCountry > items)拋出 Exception {
final List < Document > docs = new ArrayList <>();
物品。stream()。forEach(item - > {
final String countryName = item。getName();
項(xiàng)目。getCities()。stream()。forEach(c - > {
final Document prayTimeCityEventDoc = new Document();
//循環(huán)城市并為今天提取祈禱時(shí)間
final String cityName = c。getName();
final String cityTimeZone = c。getTimeZone();
final double lat = c。getLat();
final double lng = c。getLng();
final LocalDateTime nowOfCity = LocalDateTime?,F(xiàn)在(了zoneid。的(cityTimeZone));
final QueryParam [] queryParams = new QueryParam [ 5 ];
queryParams [ 0 ] = 新 QueryParam(“CITY_NAME” ,OperatorEnum。EQ。名稱(),的cityName);
queryParams [ 1 ] = 新 QueryParam(“EVENT_TYPE” ,OperatorEnum。EQ。名稱(),事件類型。PRAY_TIME。名稱());
queryParams [ 2 ] = 新 QueryParam(“月”,OperatorEnum。EQ。名稱(),nowOfCity。getMonthValue());
queryParams [ 3 ] = 新 QueryParam(“DAY_OF_MONTH” ,OperatorEnum。EQ。名稱(),nowOfCity。getDayOfMonth());
queryParams [ 4 ] = 新 QueryParam(“COUNTRY_NAME” ,OperatorEnum。EQ。名稱(),國家名稱);
List < Document > foundEvents = null ;
嘗試 {
foundEvents = cloudMongoService。搜索(EVENTS_COLLECTION_NAME,queryParams);
} catch(BusinessException e1){
記錄。錯(cuò)誤(“====>未找到城市祈禱時(shí)間” + 的cityName + “對” + nowOfCity。getDayOfMonth()+ “/”
+ nowOfCity。getMonthValue());
}
嘗試 {
如果(CollectionUtils。的isEmpty(foundEvents)){
//祈禱時(shí)間尚未創(chuàng)建
prayTimeCityEventDoc。put(“country_name”,countryName);
prayTimeCityEventDoc。put(“city_name”,cityName);
prayTimeCityEventDoc。把(“EVENT_TYPE” ,事件類型。PRAY_TIME。名稱());
prayTimeCityEventDoc。把(“復(fù)發(fā)”,RecurringEnum。YEARLY。名稱());
prayTimeCityEventDoc。把(“月”,nowOfCity。getMonthValue());
prayTimeCityEventDoc。把(“DAY_OF_MONTH” ,nowOfCity。getDayOfMonth());
prayTimeCityEventDoc。put(“l(fā)at”,lat);
prayTimeCityEventDoc。put(“l(fā)ng”,lng);
prayTimeCityEventDoc。把(“CREATION_DATE” ,HCDateUtils。convertToDateViaSqlTimestamp(nowOfCity));
final Map < String,Object > prayInfos = prayTimeService。getPrayTimeByLatLngDate(lat,lng,
日期。從(nowOfCity。atZone(了zoneid。的(cityTimeZone))。toInstant()),cityTimeZone);
prayTimeCityEventDoc。把(“pray_infos” ,文件。解析(新 GSON()的toJSON(prayInfos)));
docs。add(prayTimeCityEventDoc);
} else {
記錄。信息(字符串。格式(“====>祈禱的時(shí)間已經(jīng)存在的城市:%S,月:%d,日:%d” ,
cityName,nowOfCity。getMonthValue(),nowOfCity。getDayOfMonth()));
}
} catch(BusinessException e){
記錄。錯(cuò)誤(“計(jì)算祈禱時(shí)間時(shí)出問題:”,e);
拋出 新的 RuntimeException(e);
}
});
});
返回 文檔 ;
}
}
批量配置類:
@組態(tài)
@EnableBatchProcessing
@EnableScheduling
公共 類 BatchConfiguration {
private static final String SCANDINAVIAN_COUNTRIES_JSON_FILE = “scandinavian-countries.json” ;
private static final String EVENT_COLLECTION_NAME = “event_collection” ;
private static final Logger LOG = LoggerFactory。getLogger(BatchConfiguration。類);
@Autowired
private JobBuilderFactory jobBuilderFactory ;
@Autowired
private StepBuilderFactory stepBuilderFactory ;
@Autowired
私有的 MLabMongoService mlabMongoService ;
@豆
public ResourcelessTransactionManager transactionManager(){
返回 新的 ResourcelessTransactionManager();
}
@豆
public MapJobRepositoryFactoryBean mapJobRepositoryFactory(ResourcelessTransactionManager txManager)
拋出 異常 {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(txManager);
工廠。afterPropertiesSet();
返回 工廠 ;
}
@豆
public JobRepository jobRepository(MapJobRepositoryFactoryBean factory)拋出 異常 {
return(JobRepository)工廠。getObject();
}
private SimpleJobLauncher jobLauncher ;
@豆
public SimpleJobLauncher jobLauncher(JobRepository jobRepository){
jobLauncher。setJobRepository(jobRepository);
return jobLauncher ;
}
@PostConstruct
private void initJobLauncher(){
jobLauncher = new SimpleJobLauncher();
}
@豆
FlatFileItemReader < List < BCountry >> reader(){
FlatFileItemReader < List < BCountry >> reader = new FlatFileItemReader <>();
讀者。setName(“scandinaviandCountriesReader”);
讀者。setResource(new ClassPathResource(SCANDINAVIAN_COUNTRIES_JSON_FILE));
讀者。setLineMapper(new BCountryJsonLineMapper());
回報(bào) 讀者 ;
}
@豆
public ItemWriter < List < Document >> writer(){
返回 新的 ItemWriter < List < Document >>(){
@覆蓋
public void write(List <? extends List < Document >> items)拋出 Exception {
嘗試 {
如果(!CollectionUtils。的isEmpty(項(xiàng)目)&& 項(xiàng)目。大?。ǎ? 0){
List < Document > flatDocs = items。stream()。flatMap(List :: stream)。收集(收藏家。toList());
mlabMongoService。insertMany(EVENT_COLLECTION_NAME,flatDocs);
} else {
記錄。警告(“沒有事件可以救......”);
}
} catch(BusinessException e){
拋出 新的 RuntimeException(e);
}
}
};
}
@豆
public BCountryTimeEventItemProcessor processor(){
返回 新的 BCountryTimeEventItemProcessor();
}
@豆
public Job scandvTimeJob(){
返回 jobBuilderFactory。get(“scandvTimeJob”)。incrementmenter(new RunIdIncrementer())。流程(step1())。結(jié)束()
。build();
}
@豆
public Step step1(){
返回 stepBuilderFactory。得到(“step1”)。< List < BCountry >,List < Document >> chunk(10)。讀者(讀者())
。處理器(處理器())。作家(作家())。build();
}
// end :: jobstep []
//每天午夜15分鐘
@Scheduled(cron = “0 15 0 * * *”)
public void startScandvEventTimeJob()throws Exception {
記錄。info(“====>工作開始時(shí)間:” + 新 日期());
JobParameters param = new JobParametersBuilder()。addString(“作業(yè)ID” ,字符串。的valueOf(系統(tǒng)。的currentTimeMillis()))
。toJobParameters();
JobExecution 執(zhí)行 = jobLauncher。run(scandvPrayTimeJob(),param);
記錄。信息(“====>工作完成了狀態(tài):” + 執(zhí)行。的getStatus());
}
}
部署de Batch到Heroku:
git add。
git commit -m “Deploy Batch”
git push heroku master
注意:要禁用默認(rèn)批量啟動(dòng),請將此添加到application.yml
s p r i n g:
b a t c h:
j o b:
? ? 一個(gè)b 升é d:?F 一升小號?
到此,相信大家對“如何將數(shù)據(jù)從Web服務(wù)處理到MongoDB中”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!