Fix minor metrics bugs and refactor route implementation

This commit is contained in:
jeff
2018-06-16 16:55:39 +08:00
committed by jeff
parent 28dbcc797b
commit 18932ed1d8
429 changed files with 32117 additions and 3988 deletions

View File

@@ -28,6 +28,10 @@ import (
"golang.org/x/net/http2/hpack"
)
var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
e.SetMaxDynamicTableSizeLimit(v)
}
type itemNode struct {
it interface{}
next *itemNode
@@ -80,6 +84,13 @@ func (il *itemList) isEmpty() bool {
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
// registerStream is used to register an incoming stream with loopy writer.
type registerStream struct {
streamID uint32
wq *writeQuota
}
// headerFrame is also used to register stream on the client-side.
type headerFrame struct {
streamID uint32
hf []hpack.HeaderField
@@ -361,44 +372,47 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
const minBatchSize = 1000
// run should be run in a separate goroutine.
func (l *loopyWriter) run() {
var (
it interface{}
err error
isEmpty bool
)
func (l *loopyWriter) run() (err error) {
defer func() {
errorf("transport: loopyWriter.run returning. Err: %v", err)
if err == ErrConnClosing {
// Don't log ErrConnClosing as error since it happens
// 1. When the connection is closed by some other known issue.
// 2. User closed the connection.
// 3. A graceful close of connection.
infof("transport: loopyWriter.run returning. %v", err)
err = nil
}
}()
for {
it, err = l.cbuf.get(true)
it, err := l.cbuf.get(true)
if err != nil {
return
return err
}
if err = l.handle(it); err != nil {
return
return err
}
if _, err = l.processData(); err != nil {
return
return err
}
gosched := true
hasdata:
for {
it, err = l.cbuf.get(false)
it, err := l.cbuf.get(false)
if err != nil {
return
return err
}
if it != nil {
if err = l.handle(it); err != nil {
return
return err
}
if _, err = l.processData(); err != nil {
return
return err
}
continue hasdata
}
if isEmpty, err = l.processData(); err != nil {
return
isEmpty, err := l.processData()
if err != nil {
return err
}
if !isEmpty {
continue hasdata
@@ -450,30 +464,39 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
return l.framer.fr.WriteSettingsAck()
}
func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
}
l.estdStreams[h.streamID] = str
return nil
}
func (l *loopyWriter) headerHandler(h *headerFrame) error {
if l.side == serverSide {
if h.endStream { // Case 1.A: Server wants to close stream.
// Make sure it's not a trailers only response.
if str, ok := l.estdStreams[h.streamID]; ok {
if str.state != empty { // either active or waiting on stream quota.
// add it str's list of items.
str.itl.enqueue(h)
return nil
}
}
if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
return err
}
return l.cleanupStreamHandler(h.cleanup)
str, ok := l.estdStreams[h.streamID]
if !ok {
warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
return nil
}
// Case 1.B: Server is responding back with headers.
str := &outStream{
state: empty,
itl: &itemList{},
wq: h.wq,
// Case 1.A: Server is responding back with headers.
if !h.endStream {
return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
}
l.estdStreams[h.streamID] = str
return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
// else: Case 1.B: Server wants to close stream.
if str.state != empty { // either active or waiting on stream quota.
// add it str's list of items.
str.itl.enqueue(h)
return nil
}
if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
return err
}
return l.cleanupStreamHandler(h.cleanup)
}
// Case 2: Client wants to originate stream.
str := &outStream{
@@ -632,6 +655,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.outgoingSettingsHandler(i)
case *headerFrame:
return l.headerHandler(i)
case *registerStream:
return l.registerStreamHandler(i)
case *cleanupStream:
return l.cleanupStreamHandler(i)
case *incomingGoAway:
@@ -664,6 +689,8 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
}
}
}
case http2.SettingHeaderTableSize:
updateHeaderTblSize(l.hEnc, s.Val)
}
}
return nil