go-zero docker-compose 搭建課件服務(九):http統(tǒng)一返回和集成日志服務
蘭陵ssl適用于網(wǎng)站、小程序/APP、API接口等需要進行數(shù)據(jù)傳輸應用場景,ssl證書未來市場廣闊!成為成都創(chuàng)新互聯(lián)的ssl證書銷售渠道,可以享受市場價格4-6折優(yōu)惠!如果有意向歡迎電話聯(lián)系或者加微信:18982081108(備注:SSL證書合作)期待與您的合作!
https://github.com/liuyuede123/go-zero-courseware
一般返回中會有code
,message
,data
。當請求成功的時候code
返回0或者200,message
返回success,data
為要獲取的數(shù)據(jù);當請求失敗的時候code
返回自定義的錯誤碼,message
返回展示給前端的錯誤信息,data
為空。
我們將封裝一個錯誤返回的函數(shù),應用到api handler的返回
在user服務中創(chuàng)建了common文件夾,里面存一些公用的方法,創(chuàng)建response/response.go
package response
import (
"go-zero-courseware/user/common/xerr"
"net/http"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/rest/httpx"
"google.golang.org/grpc/status"
)
type Response struct {
Code uint32 `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data"`
}
//http返回
func HttpResult(r *http.Request, w http.ResponseWriter, resp interface{}, err error) {
if err == nil {
//成功返回
r := &Response{
Code: 0,
Message: "success",
Data: resp,
}
httpx.WriteJson(w, http.StatusOK, r)
} else {
//錯誤返回
errcode := uint32(500)
errmsg := "服務器錯誤"
causeErr := errors.Cause(err) // err類型
if e, ok := causeErr.(*xerr.CodeError); ok { //自定義錯誤類型
//自定義CodeError
errcode = e.GetErrCode()
errmsg = e.GetErrMsg()
} else {
if gstatus, ok := status.FromError(causeErr); ok { // grpc err錯誤
grpcCode := uint32(gstatus.Code())
errcode = grpcCode
errmsg = gstatus.Message()
}
}
logx.WithContext(r.Context()).Errorf("【API-ERR】 : %+v ", err)
httpx.WriteJson(w, http.StatusBadRequest, &Response{
Code: errcode,
Message: errmsg,
Data: nil,
})
}
}
創(chuàng)建xerr/errors.go文件,定義CodeError結構體
package xerr
import (
"fmt"
)
/**
常用通用固定錯誤
*/
type CodeError struct {
errCode uint32
errMsg string
}
//返回給前端的錯誤碼
func (e *CodeError) GetErrCode() uint32 {
return e.errCode
}
//返回給前端顯示端錯誤信息
func (e *CodeError) GetErrMsg() string {
return e.errMsg
}
func (e *CodeError) Error() string {
return fmt.Sprintf("ErrCode:%d,ErrMsg:%s", e.errCode, e.errMsg)
}
func NewErrCodeMsg(errCode uint32, errMsg string) *CodeError {
return &CodeError{errCode: errCode, errMsg: errMsg}
}
由于api一般調(diào)用的rpc的請求,獲取到的錯誤無法展示給前端使用,我們會使用自定義的錯誤類型。當讓rpc中的錯誤也可能是前端直接可以展示的錯誤,或者是數(shù)據(jù)庫的某個異常拋出的錯誤,如果想?yún)^(qū)分這些錯誤,可以自己定義業(yè)務端code和message做下區(qū)分就行。這里我們統(tǒng)一api服務中處理。
當api或者rpc中有一些未知錯誤拋出的時候我們需要寫入到日志中,包括具體的錯誤信息和堆棧信息。這些后續(xù)放到日志服務ELK中可以方便查看。
修改userinfohandler.go、userloginhandler.go、userregisterhandler.go中的返回
...
response.HttpResult(r, w, resp, err)
修改userinfologic.go
...
func (l *UserInfoLogic) UserInfo(req *types.UserInfoRequest) (resp *types.UserInfoResponse, err error) {
info, err := l.svcCtx.UserRpc.UserInfo(l.ctx, &userclient.UserInfoRequest{
Id: req.Id,
})
if err != nil {
// 自定義的錯誤返回
return nil, xerr.NewErrCodeMsg(500, "用戶查詢失敗")
}
return &types.UserInfoResponse{
Id: info.Id,
Username: info.Username,
LoginName: info.LoginName,
Sex: info.Sex,
}, nil
}
修改userloginlogic.go
...
func (l *UserLoginLogic) UserLogin(req *types.LoginRequest) (resp *types.LoginResponse, err error) {
login, err := l.svcCtx.UserRpc.Login(l.ctx, &userclient.LoginRequest{
LoginName: req.LoginName,
Password: req.Password,
})
if err != nil {
return nil, xerr.NewErrCodeMsg(500, "用戶登錄失敗")
}
now := time.Now().Unix()
login.Token, err = l.getJwtToken(l.svcCtx.Config.Auth.AccessSecret, now, l.svcCtx.Config.Auth.AccessExpire, int64(login.Id))
if err != nil {
// 返回錯誤信息,并打印堆棧信息到日志
return nil, errors.Wrapf(xerr.NewErrCodeMsg(5000, "token生成失敗"), "loginName: %s,err:%v", req, err)
}
return &types.LoginResponse{
Id: login.Id,
Token: login.Token,
}, nil
}
...
修改userregisterlogic.go
...
func (l *UserRegisterLogic) UserRegister(req *types.RegisterRequest) (resp *types.RegisterResponse, err error) {
_, err = l.svcCtx.UserRpc.Register(l.ctx, &userclient.RegisterRequest{
LoginName: req.LoginName,
Username: req.Username,
Password: req.Password,
Sex: req.Sex,
})
if err != nil {
// 自定義的錯誤返回
return nil, xerr.NewErrCodeMsg(5000, "注冊用戶失敗")
}
return &types.RegisterResponse{}, nil
}
關于errors.Wrapf
第一個參數(shù)是錯誤信息,第二個是格式化之后的錯誤信息字符串,args是fromat中的動態(tài)參數(shù)。最終還是返回我們傳入的error,但是會把堆棧信息也打印出來。這個為后面的日志服務做鋪墊
func Wrapf(err error, format string, args ...interface{}) error {
if err == nil {
return nil
}
err = &withMessage{
cause: err,
msg: fmt.Sprintf(format, args...),
}
return &withStack{
err,
callers(),
}
}
關于鑒權
對于鑒權,如果鑒權失敗,之前是直接返回401狀態(tài)碼,但是我們想同樣的返回錯誤信息和message
此時就需要自定義一個鑒權失敗的回調(diào)函數(shù)
我們在response.go中增加一個鑒權失敗的回調(diào)函數(shù)
...
func JwtUnauthorizedResult(w http.ResponseWriter, r *http.Request, err error) {
httpx.WriteJson(w, http.StatusUnauthorized, &Response{401, "鑒權失敗", nil})
}
然后在api入口程序user.go中修改代碼如下
...
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
// 此處加入鑒權失敗的回調(diào)
server := rest.MustNewServer(c.RestConf, rest.WithUnauthorizedCallback(response.JwtUnauthorizedResult))
defer server.Stop()
ctx := svc.NewServiceContext(c)
handler.RegisterHandlers(server, ctx)
fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
server.Start()
}
然后我們再看下user的rpc服務
這里我們會引入一個攔截器。什么是攔截器?
定義:UnaryServerInterceptor 提供了一個鉤子來攔截服務器上一元 RPC 的執(zhí)行。 信息包含攔截器可以操作的這個 RPC 的所有信息。 處理程序是包裝器服務方法實現(xiàn)。 攔截器負責調(diào)用處理程序完成 RPC。
其實就是攔截handler做一些返回前和返回后的處理
我們需要在common中新增一個攔截器方法,新建文件rpcserver/rpcserver.go
package rpcserver
import (
"context"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"go-zero-courseware/user/common/xerr"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func LoggerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
resp, err = handler(ctx, req)
if err != nil {
causeErr := errors.Cause(err) // err類型
if e, ok := causeErr.(*xerr.CodeError); ok { //自定義錯誤類型
logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v", err)
//轉成grpc err
err = status.Error(codes.Code(e.GetErrCode()), e.GetErrMsg())
} else {
logx.WithContext(ctx).Errorf("【RPC-SRV-ERR】 %+v", err)
}
}
return resp, err
}
然后在入口文件user.go中添加一個攔截器
...
s.AddUnaryInterceptors(rpcserver.LoggerInterceptor)
...
課件服務和上面類似,這里就不一一添加修改了
我們需要搭建一個ELK體系的服務,流程圖如下:
將會用到以下服務:
服務名 | 端口號 | |
---|---|---|
elasticsearch | 9200 | |
kibana | 5601 | |
go-stash | ||
filebeat | ||
zookeeper | 2181 | |
kafka | 9092 |
docker-compose如下:
user服務中我們引入了日志地址,到我們的宿主機上。之所以這樣做,是因為在mac系統(tǒng)上docker的日志文件路徑和linux上的不一致。找了半天也沒在mac上找到容器的日志。所以用戶服務中的日志會寫到文件中然后同步到宿主機的data/log目錄下。
還有就是filebeat日志中,我們會從宿主機上的日志同步到filebeat指定目錄。然后filebeat會同步到kafka
version: '3.5'
# 網(wǎng)絡配置
networks:
backend:
driver: bridge
# 服務容器配置
services:
etcd: # 自定義容器名稱
build:
context: etcd # 指定構建使用的 Dockerfile 文件
environment:
- TZ=Asia/Shanghai
- ALLOW_NONE_AUTHENTICATION=yes
- ETCD_ADVERTISE_CLIENT_URLS=http://etcd:2379
ports: # 設置端口映射
- "2379:2379"
networks:
- backend
restart: always
etcd-manage:
build:
context: etcd-manage
environment:
- TZ=Asia/Shanghai
ports:
- "7000:8080" # 設置容器8080端口映射指定宿主機端口,用于宿主機訪問可視化web
depends_on: # 依賴容器
- etcd # 在 etcd 服務容器啟動后啟動
networks:
- backend
restart: always
courseware-rpc: # 自定義容器名稱
build:
context: courseware # 指定構建使用的 Dockerfile 文件
dockerfile: rpc/Dockerfile
environment: # 設置環(huán)境變量
- TZ=Asia/Shanghai
privileged: true
ports: # 設置端口映射
- "9400:9400" # 課件服務rpc端口
stdin_open: true # 打開標準輸入,可以接受外部輸入
tty: true
networks:
- backend
restart: always # 指定容器退出后的重啟策略為始終重啟
courseware-api: # 自定義容器名稱
build:
context: courseware # 指定構建使用的 Dockerfile 文件
dockerfile: api/Dockerfile
environment: # 設置環(huán)境變量
- TZ=Asia/Shanghai
privileged: true
ports: # 設置端口映射
- "8400:8400" # 課件服務api端口
stdin_open: true # 打開標準輸入,可以接受外部輸入
tty: true
networks:
- backend
restart: always # 指定容器退出后的重啟策略為始終重啟
user-rpc: # 自定義容器名稱
build:
context: user # 指定構建使用的 Dockerfile 文件
dockerfile: rpc/Dockerfile
environment: # 設置環(huán)境變量
- TZ=Asia/Shanghai
privileged: true
volumes:
- ./data/log/user-rpc:/var/log/go-zero/user-rpc # 日志的映射地址
ports: # 設置端口映射
- "9300:9300" # 課件服務rpc端口
stdin_open: true # 打開標準輸入,可以接受外部輸入
tty: true
networks:
- backend
restart: always # 指定容器退出后的重啟策略為始終重啟
user-api: # 自定義容器名稱
build:
context: user # 指定構建使用的 Dockerfile 文件
dockerfile: api/Dockerfile
environment: # 設置環(huán)境變量
- TZ=Asia/Shanghai
privileged: true
volumes:
- ./data/log/user-api:/var/log/go-zero/user-api
ports: # 設置端口映射
- "8300:8300" # 課件服務api端口
stdin_open: true # 打開標準輸入,可以接受外部輸入
tty: true
networks:
- backend
restart: always # 指定容器退出后的重啟策略為始終重啟
elasticsearch:
build:
context: ./elasticsearch
environment:
- TZ=Asia/Shanghai
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
privileged: true
ports:
- "9200:9200"
networks:
- backend
restart: always
prometheus:
build:
context: ./prometheus
environment:
- TZ=Asia/Shanghai
privileged: true
volumes:
- ./prometheus/prometheus.yml:/opt/bitnami/prometheus/conf/prometheus.yml # 將 prometheus 配置文件掛載到容器里
- ./prometheus/target.json:/opt/bitnami/prometheus/conf/targets.json # 將 prometheus 配置文件掛載到容器里
ports:
- "9090:9090" # 設置容器9090端口映射指定宿主機端口,用于宿主機訪問可視化web
networks:
- backend
restart: always
grafana:
build:
context: ./grafana
environment:
- TZ=Asia/Shanghai
privileged: true
ports:
- "3000:3000"
networks:
- backend
restart: always
jaeger:
build:
context: ./jaeger
environment:
- TZ=Asia/Shanghai
- SPAN_STORAGE_TYPE=elasticsearch
- ES_SERVER_URLS=http://elasticsearch:9200
- LOG_LEVEL=debug
privileged: true
ports:
- "6831:6831/udp"
- "6832:6832/udp"
- "5778:5778"
- ":"
- "4317:4317"
- "4318:4318"
- ":"
- ":"
- ":"
- "9411:9411"
networks:
- backend
restart: always
kibana:
build:
context: ./kibana
environment:
- elasticsearch.hosts=http://elasticsearch:9200
- TZ=Asia/Shanghai
privileged: true
ports:
- "5601:5601"
networks:
- backend
restart: always
depends_on:
- elasticsearch
go-stash:
build:
context: ./go-stash
environment:
- TZ=Asia/Shanghai
privileged: true
volumes:
- ./go-stash/go-stash.yml:/app/etc/config.yaml
networks:
- backend
restart: always
depends_on:
- elasticsearch
- kafka
filebeat:
build:
context: ./filebeat
environment:
- TZ=Asia/Shanghai
entrypoint: "filebeat -e -strict.perms=false"
privileged: true
volumes:
- ./filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml
- ./data/log:/var/lib/docker/containers # 宿主機上的日志同步到filebeat指定目錄
networks:
- backend
restart: always
depends_on:
- kafka
zookeeper:
build:
context: ./zookeeper
environment:
- TZ=Asia/Shanghai
privileged: true
networks:
- backend
ports:
- "2181:2181"
restart: always
kafka:
build:
context: ./kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
- TZ=Asia/Shanghai
- ALLOW_PLAINTEXT_LISTENER=yes
restart: always
privileged: true
networks:
- backend
depends_on:
- zookeeper
(項目根目錄下自行創(chuàng)建對應的Dokcerfile)
filebeat需要引入配置文件filebeat.yml如下:
其中filebeat需要從宿主機同步數(shù)據(jù),就是上面用戶服務中生成的日志文件,會同步到filebeat的對應文件中
拉取過來的文件會輸出到kafka指定的topic中,我們這里定義的是courseware-log
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/lib/docker/containers/*/*.log # 此為宿主機同步過來的日志文件
filebeat.config:
modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
processors:
- add_cloud_metadata: ~
- add_docker_metadata: ~
output.kafka:
enabled: true
hosts: ["kafka:9092"]
#要提前創(chuàng)建topic
topic: "courseware-log"
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes:
required_acks: 1
用戶服務中也需要修改etc下的user.yaml配置,增加日志的配置,輸出到data/log目錄下
Log:
Mode: file
Path: /var/log/go-zero/user-api
Level: error
Log:
Mode: file
Path: /var/log/go-zero/user-rpc
Level: error
我們啟動下相關服務,請求下user-api的接口
然后回到項目中查看data/log中是否生成相關日志
日志正常輸出,再到filebeat服務中,查看文件是否同步上去:
# 進入容器
docker exec -it 231bf79f3d5e21cea153bd94bfee0e3c67a693e727d0b660 /bin/sh
# 查看目錄
cd /var/lib/docker/containers
ls
user-api user-rpc
然后我們再到kafka的容器中
# 進入到容器
docker exec -it cb764aeb86e8296a805e47c85f65ac5334c3edfe36e7a39a81ca1bad67f /bin/sh
# 到bin目錄下
cd /opt/bitnami/kafka/bin
# 可以看到這些調(diào)試腳本
$ ls
connect-distributed.sh kafka-cluster.sh kafka-consumer-perf-test.sh kafka-get-offsets.sh kafka-producer-perf-test.sh kafka-server-stop.sh kafka-verifiable-consumer.sh zookeeper-server-start.sh
connect-mirror-maker.sh kafka-configs.sh kafka-delegation-tokens.sh kafka-leader-election.sh kafka-reassign-partitions.sh kafka-storage.sh kafka-verifiable-producer.sh zookeeper-server-stop.sh
connect-standalone.sh kafka-console-consumer.sh kafka-delete-records.sh kafka-log-dirs.sh kafka-replica-verification.sh kafka-streams-application-reset.sh trogdor.sh zookeeper-shell.sh
kafka-acls.sh kafka-console-producer.sh kafka-dump-log.sh kafka-metadata-shell.sh kafka-run-class.sh kafka-topics.sh windows
kafka-broker-api-versions.sh kafka-consumer-groups.sh kafka-features.sh kafka-mirror-maker.sh kafka-server-start.sh kafka-transactions.sh zookeeper-security-migration.sh
$
先看下有沒有創(chuàng)建courseware-log
的topic,如果沒有就創(chuàng)建一個
$ ./kafka-topics.sh --bootstrap-server kafka:9092 --list
__consumer_offsets
courseware-log
# 沒有就創(chuàng)建,創(chuàng)建的命令。最新版的kafka不需要指定zookeeper
./kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic courseware-log
# 建錯了刪除用這個
./kafka-topics.sh --delete --bootstrap-server kafka:9092 --topic courseware-log
# 發(fā)布消息用這個
./kafka-console-producer.sh --broker-list kafka:9092 --topic courseware-log
# 消費用這個
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic courseware-log --from-beginning
我們執(zhí)行消費腳本看下日志會不會過來。
現(xiàn)在還沒有日志進來,我們請求一下接口讓接口報錯,可以看到日志開始消費了
到這里日志已經(jīng)流轉到kafka中了。
下面是go-stash從kafka拉取日志處理并保存到elasticsearch的流程:
go-stash需要引入配置文件go-stash.yml,內(nèi)容如下:
參數(shù)可參考github go-stash
Clusters:
- Input:
Kafka:
Name: go-stash
Brokers:
- "kafka:9092"
Topics:
- courseware-log
Group: pro
Consumers: 16
Filters:
- Action: drop
Conditions:
- Key: k8s_container_name
Value: "-rpc"
Type: contains
- Key: level
Value: info
Type: match
Op: and
- Action: remove_field
Fields:
# - message
- _source
- _type
- _score
- _id
- "@version"
- topic
- index
- beat
- docker_container
- offset
- prospector
- source
- stream
- "@metadata"
- Action: transfer
Field: message
Target: data
Output:
ElasticSearch:
Hosts:
- "http://elasticsearch:9200"
Index: "courseware-{{yyyy-MM-dd}}"
問題:
但是這里mac上又遇到一個問題就是對接go-stash時mac上的docker中會報錯
2022/09/08 21:51:10 {"@timestamp":"2022-09-08T21:51:10.346+08:00","level":"error","content":"cpu_linux.go:29 open cpuacct.usage_percpu: no such file or directory"}
具體可以看這里https://github.com/zeromicro/go-zero/issues/311 還沒有找到好的解決辦法。
后續(xù):
之后又重啟了下docker發(fā)現(xiàn)問題解決了,同步到es生效了。
接下來我們請求下用戶服務的接口,到es查看,索引已經(jīng)創(chuàng)建,錯誤信息已經(jīng)寫進去了
然后我們訪問http://127.0.0.1:5601/進到kibana后臺,點擊Discover,并創(chuàng)建索引
搜索到課件服務的索引后點擊下一步
選擇@timestamp,點擊創(chuàng)建
重新點擊Discover之后可以看到課件的日志服務創(chuàng)建完成