Go项目场景一

场景

在项目启动后有一个会开一个goroutine,里面有一个chan接收消息数据,如果接收到消息就将数据库的某些配置拉到本地进行配置下发

之前是设置定时器,每隔一段时间就自动给chan发送消息,现在向进行实时配置下发,就是某些API被调用后(对指定数据库消息进行变更的API)将给chan发送消息

问题

因为API调用时并发进行的,如果在短时间内有大量的特定API调用,则都会给chan发送消息,而对chan的处理只是单协程,那么之后的请求将会chan外堵塞,可能就达到不了实时下发的效果

对于发送消息,其实只要后面调用API的goroutine发送的消息即可,因为后面更新数据库的信息里肯定是包含了前面更新的信息

思考

目前对chan的处理只是单协程,能不能使用多goroutine去并发竞争chan,提高对chan的处理能力?

不能,因为chan中要更改一个全局map的变量,会加锁,即使使用多goroutine在发送消息给chan后也会进行堵塞

能不能使用一种机制,就是给发送消息给chan时,看是否有消息在chan外堵塞,有的话则将自己进行替换

因为之后更新的消息肯定是包含前面更新的消息,所以在之前调用API的goroutine和之后调用API的goroutine同时发送消息时,应该是该舍弃前面的消息保留后面的消息

原理上是可行的,但实际上操作起来很难,因为无法获知chan中是否有缓存,只能取出,并且只能取一个,而且取值操作在chan没有缓存时还会堵塞

并且还是在每次API调用操作时都会给chan发消息,根本上还是会造成chan的堵塞

由上面,想是否可以有一种机制,使得不是每次调API用都给chan发送消息,而是满足一定条件才会给chan发送消息

这样可以根本解决chan堵塞的问题,只要在每次发送前短暂的等待一会看是否有新的goroutine要发送消息,如果要则抛弃本次的消息发送

解决

定义实时配置下发的全局变量和结构体

1
2
3
4
5
var realtimeMachine *RealTimeMachine
type RealTimeMachine struct {
Random int64 //生成随机数
Num int //强制下发触发次数
}

思路:每次发送前随机生成64位的int数赋值给实时配置下发器的变量,然后设置一个短暂的等待时间,时间后看该变量是否发生改变,如果发生改变则放弃给chan发送消息,如果没有改变则给chan发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func ImmediatelyUpdateApi() {
go func() {
xxxApi.ImmediatelyUpdateApi(loader, realtimeMachine, rand.Int63()) //随机生成64位int数
}()
}

func (xxxApi *xxxApixxx) ImmediatelyUpdateApi(loader Loader, machine *translate.RealTimeMachine, random int64) {
if err := xxxApi.Translate(loader); err != nil { //去数据库更新配置到本地变量
logger.Warn("timed get api is null")
return
}
if !xxxApi.CompareDiff() { //如果更新后本地变量没有发生变化,则不需要发送消息给chan了
return
}
machine.Random = random
machine.Num++
//虽然可能因为多次并发而达不到每十次强制触发一次,但是只要有运行多次强制触发一次的机制即可
if machine.Num == 10 {
machine.Num = 0
xxxApi.Chan <- "message"
return
}
//设置等待更新时间,时间内如果没有发生更改(没有新的goroutine发送消息)则发送消息
time.Sleep(time.Millisecond * 300)
if machine.Random == random {
xxxApi.Ch <- "message"
return
}
//如果发生更改则放弃本次消息发送
}

上面有一个计数强制发送的机制,是因为避免这种情况发生:

加入设置的是等待时间300ms,如果每300ms内都有新goroutine发送,那么就会一直放弃本次消息发送,迟迟无法更新配置信息

那么就需要设置一个强制发送的机制,就是累计指定次数,就必须发送消息一次,这样就会闭上上诉情况

虽然可能因为多次并发而达不到指定次数强制触发一次(如果要则需要上锁,那么就会影响性能),但是只要有运行多次强制触发一次的机制即可

下面进行了优化更新!

优化解决

1
2
3
4
5
var realtimeMachine *RealTimeMachine
type RealTimeMachine struct {
Random int64 //生成随机数
Tiem int64 //记录最大超时时间
}
  • 对野生goroutine进行兜底保护

    因为gin框架已经对每一个接口进行了兜底保护,即使发送panic也不会宕机(panic),但是如果在接口中使用了野生goroutine并且在里面发升了panic,那么整个程序都会崩溃!(包括主程序)

  • 使用最大超时时间代替强制发送次数机制,避免了因多次并发而达不到指定次数强制触发一次,使得发送时间更加平滑

  • 最小等待时间应该大于chan接收信息后函数处理的平均时间

    如果小于,则等待后的时间差还是可能最造成chan阻塞(比如处理10ms,等待2ms,那么如果每3ms发送一次信息,那么每次都会按时发送不会被丢弃并且也会阻塞在chan外(内部用了锁))

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func ImmediatelyUpdateApi() {
go func() {
defer func(){ //兜底保护
if err:=recover();err!=nil{
logger("happen panic!")
}
}
xxxApi.ImmediatelyUpdateApi(loader, realtimeMachine, rand.Int63()) //随机生成64位int数
}()
}

func (xxxApi *xxxApixxx) ImmediatelyUpdateApi(loader Loader, machine *translate.RealTimeMachine, random int64) {
if err := xxxApi.Translate(loader); err != nil { //去数据库更新配置到本地变量
logger.Warn("timed get api is null")
return
}
if !xxxApi.CompareDiff() { //如果更新后本地变量没有发生变化,则不需要发送消息给chan了
return
}
machine.Random = random
now := time.Now().Unix()
if now-machine.Time < 1 { //设置最大超时时间,每隔1s的请求都会强制发送
machine.Time = now
envoyRemoteApi.Ch <- "envoy"
return
}
//设置最小等待更新时间,时间内如果没有发生更改(没有新的goroutine发送消息)则发送消息
time.Sleep(time.Millisecond * 300) //最小等待时间应该大于发送消息后函数的处理时间
if machine.Random == random {
xxxApi.Ch <- "message"
return
}
//如果发生更改则放弃本次消息发送
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!