fix application bug

This commit is contained in:
Jeff
2019-05-13 11:19:18 +08:00
committed by zryfish
parent 996d6fe4c5
commit 5462f51e65
717 changed files with 87703 additions and 53426 deletions

View File

@@ -16,7 +16,7 @@ type receiveStreamI interface {
ReceiveStream
handleStreamFrame(*wire.StreamFrame) error
handleRstStreamFrame(*wire.RstStreamFrame) error
handleResetStreamFrame(*wire.ResetStreamFrame) error
closeForShutdown(error)
getWindowUpdate() protocol.ByteCount
}
@@ -28,8 +28,9 @@ type receiveStream struct {
sender streamSender
frameQueue *frameSorter
readOffset protocol.ByteCount
frameQueue *frameSorter
readOffset protocol.ByteCount
finalOffset protocol.ByteCount
currentFrame []byte
currentFrameIsLast bool // is the currentFrame the last frame on this stream
@@ -42,7 +43,7 @@ type receiveStream struct {
closedForShutdown bool // set when CloseForShutdown() is called
finRead bool // set once we read a frame with a FinBit
canceledRead bool // set when CancelRead() is called
resetRemotely bool // set when HandleRstStreamFrame() is called
resetRemotely bool // set when HandleResetStreamFrame() is called
readChan chan struct{}
deadline time.Time
@@ -66,6 +67,7 @@ func newReceiveStream(
flowController: flowController,
frameQueue: newFrameSorter(),
readChan: make(chan struct{}, 1),
finalOffset: protocol.MaxByteCount,
version: version,
}
}
@@ -76,17 +78,17 @@ func (s *receiveStream) StreamID() protocol.StreamID {
// Read implements io.Reader. It is not thread safe!
func (s *receiveStream) Read(p []byte) (int, error) {
s.mutex.Lock()
completed, n, err := s.readImpl(p)
s.mutex.Unlock()
if completed {
s.sender.onStreamCompleted(s.streamID)
s.streamCompleted()
}
return n, err
}
func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.finRead {
return false, 0, io.EOF
}
@@ -168,14 +170,10 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
s.readOffset += protocol.ByteCount(m)
s.mutex.Lock()
// when a RST_STREAM was received, the was already informed about the final byteOffset for this stream
// when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
if !s.resetRemotely {
s.flowController.AddBytesRead(protocol.ByteCount(m))
}
// increase the flow control window, if necessary
if s.streamID != s.version.CryptoStreamID() {
s.flowController.MaybeQueueWindowUpdate()
}
if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
s.finRead = true
@@ -186,73 +184,87 @@ func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, err
}
func (s *receiveStream) dequeueNextFrame() {
s.currentFrame, s.currentFrameIsLast = s.frameQueue.Pop()
var offset protocol.ByteCount
offset, s.currentFrame = s.frameQueue.Pop()
s.currentFrameIsLast = offset+protocol.ByteCount(len(s.currentFrame)) >= s.finalOffset
s.readPosInFrame = 0
}
func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) error {
func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) {
s.mutex.Lock()
defer s.mutex.Unlock()
completed := s.cancelReadImpl(errorCode)
s.mutex.Unlock()
if s.finRead {
return nil
if completed {
s.streamCompleted()
}
if s.canceledRead {
return nil
}
func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ {
if s.finRead || s.canceledRead || s.resetRemotely {
return false
}
s.canceledRead = true
s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
s.signalRead()
if s.version.UsesIETFFrameFormat() {
s.sender.queueControlFrame(&wire.StopSendingFrame{
StreamID: s.streamID,
ErrorCode: errorCode,
})
}
return nil
s.sender.queueControlFrame(&wire.StopSendingFrame{
StreamID: s.streamID,
ErrorCode: errorCode,
})
// We're done with this stream if the final offset was already received.
return s.finalOffset != protocol.MaxByteCount
}
func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
maxOffset := frame.Offset + frame.DataLen()
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
return err
}
s.mutex.Lock()
defer s.mutex.Unlock()
if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.FinBit); err != nil {
return err
}
s.signalRead()
return nil
}
completed, err := s.handleStreamFrameImpl(frame)
s.mutex.Unlock()
func (s *receiveStream) handleRstStreamFrame(frame *wire.RstStreamFrame) error {
completed, err := s.handleRstStreamFrameImpl(frame)
if completed {
s.sender.onStreamCompleted(s.streamID)
s.streamCompleted()
}
return err
}
func (s *receiveStream) handleRstStreamFrameImpl(frame *wire.RstStreamFrame) (bool /*completed */, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) {
maxOffset := frame.Offset + frame.DataLen()
if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
return false, err
}
if frame.FinBit {
s.finalOffset = maxOffset
}
if s.canceledRead {
return frame.FinBit, nil
}
if err := s.frameQueue.Push(frame.Data, frame.Offset); err != nil {
return false, err
}
s.signalRead()
return false, nil
}
func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
s.mutex.Lock()
completed, err := s.handleResetStreamFrameImpl(frame)
s.mutex.Unlock()
if completed {
s.streamCompleted()
}
return err
}
func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
if s.closedForShutdown {
return false, nil
}
if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
return false, err
}
// In gQUIC, error code 0 has a special meaning.
// The peer will reliably continue transmitting, but is not interested in reading from the stream.
// We should therefore just continue reading from the stream, until we encounter the FIN bit.
if !s.version.UsesIETFFrameFormat() && frame.ErrorCode == 0 {
return false, nil
}
s.finalOffset = frame.ByteOffset
// ignore duplicate RST_STREAM frames for this stream (after checking their final offset)
// ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
if s.resetRemotely {
return false, nil
}
@@ -269,16 +281,6 @@ func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
s.handleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
}
func (s *receiveStream) onClose(offset protocol.ByteCount) {
if s.canceledRead && !s.version.UsesIETFFrameFormat() {
s.sender.queueControlFrame(&wire.RstStreamFrame{
StreamID: s.streamID,
ByteOffset: offset,
ErrorCode: 0,
})
}
}
func (s *receiveStream) SetReadDeadline(t time.Time) error {
s.mutex.Lock()
s.deadline = t
@@ -289,7 +291,7 @@ func (s *receiveStream) SetReadDeadline(t time.Time) error {
// CloseForShutdown closes a stream abruptly.
// It makes Read unblock (and return the error) immediately.
// The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
// The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
func (s *receiveStream) closeForShutdown(err error) {
s.mutex.Lock()
s.closedForShutdown = true
@@ -302,6 +304,17 @@ func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
return s.flowController.GetWindowUpdate()
}
func (s *receiveStream) streamCompleted() {
s.mutex.Lock()
finRead := s.finRead
s.mutex.Unlock()
if !finRead {
s.flowController.Abandon()
}
s.sender.onStreamCompleted(s.streamID)
}
// signalRead performs a non-blocking send on the readChan
func (s *receiveStream) signalRead() {
select {