tcp异步框架学习
什么是同步异步
先讲讲同步,同步可以理解为串行,100个人要进入一个空间,可空间入口只有一个,这时候,人们只能排队进入,假设一个人进入需要费时1秒,100个就是100秒,人越多,效率就越低;这时候要解决效率的问题就需要用到异步了。 异步可以理解为并行,为每一个人开辟一个入口,这时候有n个人就会有n个入口,进入的时间永远就只需一秒,这样效率就大大提高了。
异步加锁的必要性
异步解决了进入效率的问题,但人们进入房间是需要做事情的,如果这个事件只是记忆某样东西,不对这个东西进行改变,这时候就没必要加锁,假如空间里面有一堆糖果,100个人里面有50个人的任务只是进来数当前糖果的,这时候就没必要加锁了,而另外50个人的任务是拿糖果,这就有必要加锁了。拿糖果这个任务可以分为两个步骤,先确认是否有糖果,有的话才拿,如果AB两个人通过要执行拿糖果的,AB都确认了有糖果,可是B比A手速要快,先一步把糖果全部拿走了,A再去拿的时候就拿不到了,这时候A的逻辑就会出问题了,他就会一直在那里拿糖果(可永远没有拿到的可能),但是加了锁就不一样了,锁相当于一个闸门,上锁后,只有一个人可以执行任务,其他人只能等待上个人执行完解锁后才能继续去抢锁执行任务。
线程池的概念
异步解决了效率的问题,使空间不再拥堵,但空间再大也会有上限,可是想要进入空间做事情的人是没有上限的,假设这个空间人气很高,一个庞大的人流量都想要进入这个空间做事,如果说为每个人都开辟一个通道,这时候这个空间将面临千疮百孔,濒临崩溃的局面。也许空间加糖果的例子可能还是有点抽象了,这里以银行为例子,银行相当于空间,每一个窗口相当于一个通道,事件相当于不同的业务,在人多的情况下,窗口越多,办理业务的效率也就越高,而对于银行来说,窗口也是有上限的,而且每开一个窗口都需要消耗一定资源(给窗口业务员发工资、窗口设备维护等等)。
线程池的具体设计
上面有提到过无限开启窗口的确定,就是资源消耗的递增,假设银行只有在每个月的最后一天人流量会暴增,为了这一天开启了很多个窗口来提高效率,可是其他的时间里,平均人流量很小,一到两个窗口就能应付,这时候其他的窗口就会无比空闲,假设你是一个精打细算的银行老板,你会容忍其他窗口的无所事事么,要知道维护这些空闲窗口是需要消耗的。 1. 设定一个合理的窗口数,不要为每一个人都开一个窗口,人多的话还是要进行排队,银行就是这样的。。。 2. 合理的窗口数虽然减少了开销,但是作为一个一毛不拔的我,还是觉得浪费了,毕竟在人流量极少的情况下,还是会有无所事事的窗口,这个时候就需要动态的增删窗口了;人流量小的时候,无所事事的窗口关闭,人流量多的时候,增加窗口,极多的时候,那就排队吧。
回归语言
任何事物都是讲究逻辑性的,语言也是符合上面两个例子的逻辑。空间、银行这些都可以看做我们的服务端,而来办理业务的人就是各种携带事件的tcp请求,而窗口就是routine(类型java线程,但比线程的创建消耗要小很多),业务就是你通过routine要异步处理的携带事件。
好的异步协程池
ants框架就是一个非常好用的,对上述所说的动态删减routine,控制routine上限,异步处理事件都有非常好的实现
以下就是个人参考ants框架的举例逻辑实现
// 主函数,模拟1000个事件,交给线程池异步处理,p就是协程池,限制上限10,通过push函数将任何丢到通道中
func main() {
for i := 0; i < 1000; i++ {
// time.Sleep(2 * time.Millisecond)
p.push(Msg{
from: uint64(i),
data: "haha",
})
}
fmt.Println("参与的worker数量:", len(p.workers))
time.Sleep(1 * time.Second)
}
//这里的push函数,导致的实际效果是每次是放一个事件进来等待空闲routine进行处理,一对多
//而ants作者的逻辑效果是一群事件在那里等待争抢空闲routine,多对多
//从逻辑性来开ants作者的逻辑效率会更高
type Pool struct {
max uint64
workers []*Worker //存放工作者
chl chan Msg
lock sync.Mutex
}
type Msg struct {
from uint64 //来自
data string // 消息内容
}
type Worker struct {
ID uint64 //id
pool *Pool //属于哪一个池
h chan Msg
recycleTime time.Time //回收时间
end chan bool
}
var p = newPool(10)
var max uint64
func newPool(i uint64) *Pool {
p := &Pool{
max: i,
workers: make([]*Worker, 0),
chl: make(chan Msg, 1024),
}
go p.handle(i)
return p
}
func (p *Pool) push(msg Msg) {
p.chl <- msg
}
// 这里算是核心业务了,handle对通道里的业务进行轮询,将拿到的业务异步交给协程池的worker处理
func (p *Pool) handle(bm uint64) {
for {
msg := <-p.chl
var w *Worker
if atomic.LoadUint64(&max) >= bm {
for {
l := len(p.workers) - 1
if l < 0 {
// p.lock.Unlock()
continue
} else {
p.lock.Lock()
ws := p.workers
w = ws[l]
ws[l] = nil
p.workers = ws[:l]
p.lock.Unlock()
break
}
}
} else if w == nil {
next := atomic.AddUint64(&max, uint64(1))
w = &Worker{
ID: next - 1,
pool: p,
end: make(chan bool, 1),
h: make(chan Msg, 1),
}
go w.run()
}
w.h <- msg
}
}
// 以下函数就是对worker的循环利用
func (w *Worker) run() {
for {
msg := <-w.h
w.handle(msg)
}
}
func (w *Worker) handle(msg Msg) {
fmt.Printf("工人:%v执行:%v的%v\n", w.ID, msg.from, msg.data)
w.pool.lock.Lock()
w.pool.putWorker(w)
w.pool.lock.Unlock()
}
func (p *Pool) putWorker(w *Worker) {
p.lock.Lock()
p.workers = append(p.workers, w)
p.lock.Unlock()
}
暂时还未实现动态删worker的逻辑,不过也很简单,单独开启一个协程通过定时器和通道来控制worker的死亡,具体逻辑看ants框架