深度理解resp.body.close的奥秘http连接的

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!

http连接的疑问

http请求是我们开发中最为常见的一个东西了,特别微服务中,由于服务的拆分,每个人可能负责某一块的业务,当A服务的某个业务依赖B服务的数据时,最常见的就是B服务提供一个接口了。golang提供的原生的httpclient还是非常强大的,但是如果在某些场景中,用的不对,可能会造成意想不到的问题。
先来看个问题:

func main() {
   for i := 0; i < 100; i++ {
      resp, err := http.Get("http://www.baidu.com")
      if err != nil {
         fmt.Println(err)
         return
      }
      _, err = ioutil.ReadAll(resp.Body)
   }
   fmt.Println("goroutine num is", runtime.NumGoroutine())
}
复制代码

第一次接触go,我有以下几个疑问:

  1. 100次循环,是不是就100个连接
  2. 如果是100个连接,那么打印的gouroutine是不是就是101(100个子goroutine+1个主goroutine)
  3. 如果此处连接是复用的,那么虽然是100个请求,是不是始终复用1个连接
  4. 如果是1个连接,那么打印的gouroutine是不是就是2(1个子goroutine+1个主goroutine)

带着以上的几点疑问,我测试了几个例子:

http 不带close

func main() {
   httpWithoutClose()
}

func httpWithoutClose() {
   for i := 0; i < 20; i++ {
      _, err := http.Get("http://www.baidu.com")
      if err != nil {
         fmt.Println(err)
         return
      }
   }
   fmt.Println("goroutine num is ", runtime.NumGoroutine())
}
复制代码

直接发起20个请求,且不读response的body,也不进行response的body的close。
结果: goroutine num is 41
竟然有41个goroutine,21个我倒是可以理解(起码20个请求+1个主goroutine),41个说明每个请求对应2个goroutine。通过阅读源码,发现大致请求的流程如图:

image.png
获取连接之后,会新增两个goroutine,readLoopwriteLoop,这样就可以理解通了,一个负责读,一个负责写。

http带close

func httpWithClose() {
   for i := 0; i < 20; i++ {
      resp, err := http.Get("http://www.baidu.com")
      if err != nil {
         fmt.Println(err)
         return
      }
      resp.Body.Close()
   }
   fmt.Println("goroutine num is", runtime.NumGoroutine())
}
复制代码

直接发起20个请求,且不读response的body,但是进行response的body的close。
结果: goroutine num is 1
说明close之后,回收了readLoopwriteLoop
以readLoop中的一段代码为例:

body := &bodyEOFSignal{
   body: resp.Body,
   earlyCloseFn: func() error {
      waitForBodyRead <- false
      <-eofc // will be closed by deferred call at the end of the function
      return nil

   },
   fn: func(err error) error {
      isEOF := err == io.EOF
      waitForBodyRead <- isEOF
      if isEOF {
         <-eofc // see comment above eofc declaration
      } else if err != nil {
         if cerr := pc.canceled(); cerr != nil {
            return cerr
         }
      }
      return err
   },
}
复制代码

earlyCloseFn 未读body就close的,会走此方法,可以发现向waitForBodyRead推入一个false
Fn正常的读body,当body读完之后,会向waitForBodyRead推入一个true
waitForBodyRead这个chan对接下来的goroutine的生死起着关键作用。
readLoop 本身是个循环:

alive := true
	for alive {
		 ......

		// Before looping back to the top of this function and peeking on
		// the bufio.Reader, wait for the caller goroutine to finish
		// reading the response body. (or for cancellation or death)
		select {
		case bodyEOF := <-waitForBodyRead:
			pc.t.setReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
			alive = alive &&
				bodyEOF && // false的话就退出循环
				!pc.sawEOF &&
				pc.wroteRequest() &&
				tryPutIdleConn(trace)
			if bodyEOF {
				eofc <- struct{}{}
			}
		case <-rc.req.Cancel:
			alive = false
			pc.t.CancelRequest(rc.req)
		case <-rc.req.Context().Done():
			alive = false
			pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
		case <-pc.closech:
			alive = false
		}

		testHookReadLoopBeforeNextRead()
	}
复制代码

只要alive=true,循环就会一直进行下去,当从bodyEOF := <-waitForBodyRead读出的是false,循环退出。然后 readLoop 执行defer函数:

defer func() {
   pc.close(closeErr) // 关闭自身的通道 closech
   pc.t.removeIdleConn(pc) // 回收连接
}()
复制代码

其中pc.close(closeErr),会关闭pc本身的通道closech,然后不是还有个writeLoop吗,writeLoop本身也是个循环,主要负责写的。

func (pc *persistConn) writeLoop() {
   defer close(pc.writeLoopDone)
   for {
      select {
      case wr := <-pc.writech:
         startBytesWritten := pc.nwrite
         err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
         if bre, ok := err.(requestBodyReadError); ok {
            err = bre.error
            // Errors reading from the user's
            // Request.Body are high priority.
            // Set it here before sending on the
            // channels below or calling
            // pc.close() which tears town
            // connections and causes other
            // errors.
            wr.req.setError(err)
         }
         if err == nil {
            err = pc.bw.Flush()
         }
         if err != nil {
            wr.req.Request.closeBody()
            if pc.nwrite == startBytesWritten {
               err = nothingWrittenError{err}
            }
         }
         pc.writeErrCh <- err // to the body reader, which might recycle us
         wr.ch <- err         // to the roundTrip function
         if err != nil {
            pc.close(err)
            return
         }
      case <-pc.closech: //收到消息后,退出
         return
      }
   }
}
复制代码

当收到pc.closech信号的时候,writeLoop也就退出了。

image.png
所以只剩一个主goroutine了。

http 不带close,但是有read

func httpWithoutCloseButRead() {
   for i := 0; i < 20; i++ {
      resp, err := http.Get("http://www.baidu.com")
      if err != nil {
         fmt.Println(err)
         return
      }
      _, err = ioutil.ReadAll(resp.Body)
   }
   fmt.Println("goroutine num is ", runtime.NumGoroutine())
}
复制代码

直接发起20个请求,读取body,但不close。
结果: goroutine num is 3
由上个例子我们知道,当body读取完之后,会向waitForBodyRead推入个true,在true的情况下,readLoop会一直循环,且会把当前的连接放入空闲列表中,供下次使用:

select {
case bodyEOF := <-waitForBodyRead:
	pc.t.setReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
	alive = alive &&
		bodyEOF &&
		!pc.sawEOF &&
		pc.wroteRequest() &&
		tryPutIdleConn(trace) //放入idle list,供下次使用
	if bodyEOF {
		eofc <- struct{}{}
	}
	....
复制代码

因为没有回收readLoopwirteLoop两个goroutine,且下一个请求可以复用连接,所以就是3个

http 带close 且 read

func main() {
   httpCloseAndRead()
}

func httpCloseAndRead() {
   for i := 0; i < 20; i++ {
      resp, err := http.Get("http://www.baidu.com")
      if err != nil {
         fmt.Println(err)
         return
      }
      _, err = ioutil.ReadAll(resp.Body)
      resp.Body.Close()
   }
   fmt.Println("goroutine num is ", runtime.NumGoroutine())
}
复制代码

http 不带close,但是有read的一样。

建议

任何http request的时候,一定要加close

func doRequest() {
   resp, err := http.Get("http://www.baidu.com")
   if err != nil {
      fmt.Println(err)
      return
   }
   defer resp.Body.Close() // 很重要
   _, err = ioutil.ReadAll(resp.Body)
}
复制代码

不加close,可能造成goroutine泄漏,加了一定不会。