項(xiàng)目使用告警系統(tǒng)的邏輯是將實(shí)時(shí)數(shù)據(jù)保存到本地?cái)?shù)據(jù)庫(kù)再使用定時(shí)任務(wù)做判斷,然后產(chǎn)生告警數(shù)據(jù)。這種方式存在告警的延時(shí)實(shí)在是太高了。數(shù)據(jù)從產(chǎn)生到保存,從保存到判斷都會(huì)存在時(shí)間差,按照保存數(shù)據(jù)定時(shí)5分鐘一次,定時(shí)任務(wù)5分鐘一次。最高會(huì)產(chǎn)生10分鐘的誤差,這種告警就沒(méi)什么意義了。
讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來(lái)自于我們對(duì)這個(gè)行業(yè)的熱愛(ài)。我們立志把好的技術(shù)通過(guò)有效、簡(jiǎn)單的方式提供給客戶,將通過(guò)不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名注冊(cè)、虛擬主機(jī)、營(yíng)銷軟件、網(wǎng)站建設(shè)、鹽湖網(wǎng)站維護(hù)、網(wǎng)站推廣。demo設(shè)計(jì)為了簡(jiǎn)單的還原業(yè)務(wù)場(chǎng)景,做了簡(jiǎn)單的demo假設(shè)
實(shí)現(xiàn)一個(gè)對(duì)于學(xué)生成績(jī)?cè)u(píng)價(jià)的實(shí)時(shí)處理程序
數(shù)學(xué)成績(jī),基準(zhǔn)范圍是90-140,超出告警
物理成績(jī),基準(zhǔn)范圍是60-95,超出告警
使用windows環(huán)境演示
準(zhǔn)備工作
1、安裝jdk
2、安裝zookeeper
解壓壓縮包
zoo_sample.cfg將它重命名為zoo.cfg
修改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data
配置環(huán)境變量
3、安裝kafka
解壓壓縮包
修改config/server.properties
log.dirs=D://tools//kafka_2.11-2.1.0//log
flink程序代碼pom
org.apache.flink flink-java1.13.0 org.apache.flink flink-streaming-java_2.121.13.0 org.apache.flink flink-clients_2.121.13.0 org.apache.flink flink-connector-kafka_2.121.13.0 org.projectlombok lombok1.18.12 provided com.alibaba fastjson1.2.62 org.apache.flink flink-connector-kafka_2.111.10.0
主程序
public class StreamAlertDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
FlinkKafkaConsumerkafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
DataStreamSourceinputDataStream = env.addSource(kafkaConsumer);
DataStreamresultStream = inputDataStream.flatMap(new AlertFlatMapper());
resultStream.print().setParallelism(4);
resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
env.execute();
}
}
主程序,配置告警規(guī)則后期可以使用推送或者拉去方式獲取數(shù)據(jù)
public class RuleMap {
private RuleMap(){}
public final static MapinitialRuleMap;
private static List ruleList = new ArrayList<>();
private static ListruleStringList = new ArrayList<>(Arrays.asList(
"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));
static {
for (String i : ruleStringList) {
ruleList.add(JSON.parseObject(i, AlertRule.class));
}
initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
}
}
AlertFlatMapper,處理告警邏輯
public class AlertFlatMapper implements FlatMapFunction{
@Override
public void flatMap(String inVal, Collectorout) throws Exception {
Achievement user = JSON.parseObject(inVal, Achievement.class);
MapinitialRuleMap = RuleMap.initialRuleMap;
List resList = new ArrayList<>();
List mathRule = initialRuleMap.get("MathVal");
for (AlertRule rule : mathRule) {
if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
}
}
List physicsRule = initialRuleMap.get("PhysicsVal");
for (AlertRule rule : physicsRule) {
if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
}
}
String result = JSON.toJSONString(resList);
out.collect(result);
}
private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
switch (type) {
case 0:
return actVal< targetVal;
case 1:
return actVal.equals(targetVal);
case 2:
return actVal >targetVal;
default:
return false;
}
}
}
三個(gè)實(shí)體類
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {
private static final long serialVersionUID = -1L;
private String name;
private Integer mathVal;
private Integer physicsVal;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
private static final long serialVersionUID = -1L;
private String name;
private String descInfo;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {
private static final long serialVersionUID = -1L;
private String target;
//0小于 1等于 2大于
private Integer type;
private Integer criticalVal;
private String descInfo;
}
項(xiàng)目演示創(chuàng)建kafka生產(chǎn)者 test
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
創(chuàng)建kafka消費(fèi)者 demo
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning
啟動(dòng)flink應(yīng)用
給topic test發(fā)送消息
{"name":"liu","MathVal":45,"PhysicsVal":76}
消費(fèi)topic demo
告警系統(tǒng)架構(gòu)你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧