Skip to content

Minnow

约 1301 个字 862 行代码 1 张图片 预计阅读时间 17 分钟

  • 初始的TCPSocket是在Linux TCP/IP栈上封装的一个类
  • lab4开始自己构建TCPPeer对端进行数据传输,直接使用网卡上的IP数据包进行TCP包的构建
  • lab5实现简易ARP协议,可以转换IP地址和MAC地址NetworkInterface
  • lab6实现了最长前缀匹配的链路层选路算法IP route
  • lab7实现了一个使用IProute、NetworkInterface的端到端的TCP报文传输

数据格式

IP包

  • IPv4数据包构建:不支持IP选项

    C++
    // IPv4 Internet datagram header (note: IP options are not supported)
    struct IPv4Header
    {
      static constexpr size_t LENGTH = 20;        // IPv4 header length, not including options
      static constexpr uint8_t DEFAULT_TTL = 128; // A reasonable default TTL value
      static constexpr uint8_t PROTO_TCP = 6;     // Protocol number for TCP
    
      static constexpr uint64_t serialized_length() { return LENGTH; }
    
      /*
       *   0                   1                   2                   3
       *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
       *  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
       *  |Version|  IHL  |Type of Service|          Total Length         |
       *  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
       *  |         Identification        |Flags|      Fragment Offset    |
       *  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
       *  |  Time to Live |    Protocol   |         Header Checksum       |
       *  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
       *  |                       Source Address                          |
       *  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
       *  |                    Destination Address                        |
       *  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
       *  |                    Options                    |    Padding    |
       *  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
       */
    
      // IPv4 Header fields
      uint8_t ver = 4;           // IP version
      uint8_t hlen = LENGTH / 4; // header length (multiples of 32 bits)
      uint8_t tos = 0;           // type of service
      uint16_t len = 0;          // total length of packet
      uint16_t id = 0;           // identification number
      bool df = true;            // don't fragment flag
      bool mf = false;           // more fragments flag
      uint16_t offset = 0;       // fragment offset field
      uint8_t ttl = DEFAULT_TTL; // time to live field
      uint8_t proto = PROTO_TCP; // protocol field
      uint16_t cksum = 0;        // checksum field
      uint32_t src = 0;          // src address
      uint32_t dst = 0;          // dst address
    
      // Length of the payload
      uint16_t payload_length() const;
    
      // Pseudo-header's contribution to the TCP checksum
      uint32_t pseudo_checksum() const;
    
      // Set checksum to correct value
      void compute_checksum();
    
      // Return a string containing a header in human-readable format
      std::string to_string() const;
    
      void parse( Parser& parser );
      void serialize( Serializer& serializer ) const;
    };
    

TCP报文

  • TCPMessage包含TCPSenderMessage和TCPReceiverMessage
C++
struct TCPMessage
{
  TCPSenderMessage sender {};
  TCPReceiverMessage receiver {};
};

struct TCPSenderMessage
{
  Wrap32 seqno { 0 };

  bool SYN {};
  std::string payload {};
  bool FIN {};

  bool RST {};

  // How many sequence numbers does this segment use?
  size_t sequence_length() const { return SYN + payload.size() + FIN; }
};

struct TCPReceiverMessage
{
  std::optional<Wrap32> ackno {};
  uint16_t window_size {};
  bool RST {};
};
  • 在IP上构建自己的TCP报文;将IP数据包通过TUN设备与网络上其他TCP进行通信

    C++
    class TCPOverIPv4Adapter : public FdAdapterBase
    {
    public:
    std::optional<TCPMessage> unwrap_tcp_in_ip( const InternetDatagram& ip_dgram );
    
    InternetDatagram wrap_tcp_in_ip( const TCPMessage& msg );
    };
    
    optional<TCPMessage> TCPOverIPv4Adapter::unwrap_tcp_in_ip( const InternetDatagram& ip_dgram )
    {
    // is the IPv4 datagram for us?
    // Note: it's valid to bind to address "0" (INADDR_ANY) and reply from actual address contacted
    if ( not listening() and ( ip_dgram.header.dst != config().source.ipv4_numeric() ) ) {
      return {};
    }
    
    // is the IPv4 datagram from our peer?
    if ( not listening() and ( ip_dgram.header.src != config().destination.ipv4_numeric() ) ) {
      return {};
    }
    
    // does the IPv4 datagram claim that its payload is a TCP segment?
    if ( ip_dgram.header.proto != IPv4Header::PROTO_TCP ) {
      return {};
    }
    
    // is the payload a valid TCP segment?
    TCPSegment tcp_seg;
    if ( not parse( tcp_seg, ip_dgram.payload, ip_dgram.header.pseudo_checksum() ) ) {
      return {};
    }
    
    // is the TCP segment for us?
    if ( tcp_seg.udinfo.dst_port != config().source.port() ) {
      return {};
    }
    
    // should we target this source addr/port (and use its destination addr as our source) in reply?
    if ( listening() ) {
      if ( tcp_seg.message.sender.SYN and not tcp_seg.message.sender.RST ) {
        config_mutable().source = Address { inet_ntoa( { htobe32( ip_dgram.header.dst ) } ), config().source.port() };
        config_mutable().destination
          = Address { inet_ntoa( { htobe32( ip_dgram.header.src ) } ), tcp_seg.udinfo.src_port };
        set_listening( false );
      } else {
        return {};
      }
    }
    
    // is the TCP segment from our peer?
    if ( tcp_seg.udinfo.src_port != config().destination.port() ) {
      return {};
    }
    
    return tcp_seg.message;
    }
    
    //! Takes a TCP segment, sets port numbers as necessary, and wraps it in an IPv4 datagram
    //! \param[in] seg is the TCP segment to convert
    InternetDatagram TCPOverIPv4Adapter::wrap_tcp_in_ip( const TCPMessage& msg )
    {
      TCPSegment seg { .message = msg };
      // set the port numbers in the TCP segment
      seg.udinfo.src_port = config().source.port();
      seg.udinfo.dst_port = config().destination.port();
    
      // create an Internet Datagram and set its addresses and length
      InternetDatagram ip_dgram;
      ip_dgram.header.src = config().source.ipv4_numeric();
      ip_dgram.header.dst = config().destination.ipv4_numeric();
      ip_dgram.header.len = ip_dgram.header.hlen * 4 + 20 /* tcp header len */ + seg.message.sender.payload.size();
    
      // set payload, calculating TCP checksum using information from IP header
      seg.compute_checksum( ip_dgram.header.pseudo_checksum() );
      ip_dgram.header.compute_checksum();
      ip_dgram.payload = serialize( seg );
    
      return ip_dgram;
    }
    

FD操作

  • 对FD的操作比如write和read,都是对系统调用的封装

TCP工作流程

  • 事件驱动模式进行TCP的监听,调用poll系统调用接收消息;设置回调,将网络上的IP数据包传递到我们设计的TCP上
  • TCPPeer构建测试程序,同样使用事件驱动,只是回调略有不同

初始化TCP

  • 设置poll需要监听的事件
    1. 收到数据包,将TCP报文传递给TCPPeer接收
    2. 读取pipe的字节传递给TCPPeer:用户在命令行输入字符串使用TCPSender发送到对端
    3. 从Reassembler中读取字节写入LocalStreamSocket:用户从reassembler中读取字符串,写入unix域套接字
C++
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_initialize_TCP( const TCPConfig& config )
{
  _tcp.emplace( config );

  // Set up the event loop

  // There are three events to handle:
  //
  // 1) Incoming datagram received (needs to be given to TCPPeer::receive method)
  //
  // 2) Outbound bytes received from local application via a write()
  //    call (needs to be read from the local stream socket and
  //    given to TCPPeer)
  //
  // 3) Incoming bytes reassembled by the Reassembler
  //    (needs to be read from the inbound_stream and written
  //    to the local stream socket back to the application)

  // rule 1: read from filtered packet stream and dump into TCPConnection
  _eventloop.add_rule(
    "receive TCP segment from the network",
    _datagram_adapter.fd(),
    Direction::In,
    [&] {
      if ( auto seg = _datagram_adapter.read() ) {
        _tcp->receive( std::move( seg.value() ), [&]( auto x ) { _datagram_adapter.write( x ); } );
      }

      // debugging output:
      if ( _thread_data.eof() and _tcp.value().sender().sequence_numbers_in_flight() == 0 and not _fully_acked ) {
        std::cerr << "DEBUG: minnow outbound stream to " << _datagram_adapter.config().destination.to_string()
                  << " has been fully acknowledged.\n";
        _fully_acked = true;
      }
    },
    [&] { return _tcp->active(); } );

  // rule 2: read from pipe into outbound buffer
  _eventloop.add_rule(
    "push bytes to TCPPeer",
    _thread_data,
    Direction::In,
    [&] {
      std::string data;
      data.resize( _tcp->outbound_writer().available_capacity() );
      _thread_data.read( data );
      _tcp->outbound_writer().push( move( data ) );

      if ( _thread_data.eof() ) {
        _tcp->outbound_writer().close();
        _outbound_shutdown = true;

        // debugging output:
        std::cerr << "DEBUG: minnow outbound stream to " << _datagram_adapter.config().destination.to_string()
                  << " finished (" << _tcp.value().sender().sequence_numbers_in_flight() << " seqno"
                  << ( _tcp.value().sender().sequence_numbers_in_flight() == 1 ? "" : "s" )
                  << " still in flight).\n";
      }

      _tcp->push( [&]( auto x ) { _datagram_adapter.write( x ); } );
    },
    [&] {
      return ( _tcp->active() ) and ( not _outbound_shutdown )
             and ( _tcp->outbound_writer().available_capacity() > 0 );
    },
    [&] {
      _tcp->outbound_writer().close();
      _outbound_shutdown = true;
    },
    [&] {
      std::cerr << "DEBUG: minnow outbound stream had error.\n";
      _tcp->outbound_writer().set_error();
    } );

  // rule 3: read from inbound buffer into pipe
  _eventloop.add_rule(
    "read bytes from inbound stream",
    _thread_data,
    Direction::Out,
    [&] {
      Reader& inbound = _tcp->inbound_reader();
      // Write from the inbound_stream into
      // the pipe, handling the possibility of a partial
      // write (i.e., only pop what was actually written).
      if ( inbound.bytes_buffered() ) {
        const std::string_view buffer = inbound.peek();
        const auto bytes_written = _thread_data.write( buffer );
        inbound.pop( bytes_written );
      }

      if ( inbound.is_finished() or inbound.has_error() ) {
        _thread_data.shutdown( SHUT_WR );
        _inbound_shutdown = true;

        // debugging output:
        std::cerr << "DEBUG: minnow inbound stream from " << _datagram_adapter.config().destination.to_string()
                  << " finished " << ( inbound.has_error() ? "uncleanly.\n" : "cleanly.\n" );
      }
    },
    [&] {
      return _tcp->inbound_reader().bytes_buffered()
             or ( ( _tcp->inbound_reader().is_finished() or _tcp->inbound_reader().has_error() )
                  and not _inbound_shutdown );
    },
    [&] {},
    [&] {
      std::cerr << "DEBUG: minnow inbound stream had error.\n";
      _tcp->inbound_reader().set_error();
    } );
}

connect

  • connect事件是向TCP对端写数据(SYN包)

    C++
    template<TCPDatagramAdapter AdaptT>
    void TCPMinnowSocket<AdaptT>::connect( const TCPConfig& c_tcp, const FdAdapterConfig& c_ad )
    {
      if ( _tcp ) {
        throw std::runtime_error( "connect() with TCPConnection already initialized" );
      }
    
      _initialize_TCP( c_tcp );
    
      _datagram_adapter.config_mut() = c_ad;
    
      std::cerr << "DEBUG: minnow connecting to " << c_ad.destination.to_string() << "...\n";
    
      if ( not _tcp.has_value() ) {
        throw std::runtime_error( "TCPPeer not successfully initialized" );
      }
    
      _tcp->push( [&]( auto x ) { _datagram_adapter.write( x ); } );
    
      if ( _tcp->sender().sequence_numbers_in_flight() != 1 ) {
        throw std::runtime_error( "After TCPConnection::connect(), expected sequence_numbers_in_flight() == 1" );
      }
    
      _tcp_loop( [&] { return _tcp->sender().sequence_numbers_in_flight() == 1; } );
      if ( _tcp->inbound_reader().has_error() ) {
        std::cerr << "DEBUG: minnow error on connecting to " << c_ad.destination.to_string() << ".\n";
      } else {
        std::cerr << "DEBUG: minnow successfully connected to " << c_ad.destination.to_string() << ".\n";
      }
    
      _tcp_thread = std::thread( &TCPMinnowSocket::_tcp_main, this );
    }
    

    listen_and_accept

    • listen_and_accept监听对端是否回复ack
    C++
    template<TCPDatagramAdapter AdaptT>
    void TCPMinnowSocket<AdaptT>::listen_and_accept( const TCPConfig& c_tcp, const FdAdapterConfig& c_ad )
    {
      if ( _tcp ) {
        throw std::runtime_error( "listen_and_accept() with TCPConnection already initialized" );
      }
    
      _initialize_TCP( c_tcp );
    
      _datagram_adapter.config_mut() = c_ad;
      _datagram_adapter.set_listening( true );
    
      std::cerr << "DEBUG: minnow listening for incoming connection...\n";
      _tcp_loop( [&] { return ( not _tcp->has_ackno() ) or ( _tcp->sender().sequence_numbers_in_flight() ); } );
      std::cerr << "DEBUG: minnow new connection from " << _datagram_adapter.config().destination.to_string() << ".\n";
    
      _tcp_thread = std::thread( &TCPMinnowSocket::_tcp_main, this );
    }
    

    执行主函数

  • 一直循环,真正结束的函数在_tcp_loop

  • _tcp_loop结束在poll等待事件结束

  • 出错

  • TCP连接的fd关闭
C++
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_tcp_main()
{
  try {
    if ( not _tcp.has_value() ) {
      throw std::runtime_error( "no TCP" );
    }
    _tcp_loop( [] { return true; } );
    shutdown( SHUT_RDWR );
    if ( not _tcp.value().active() ) {
      std::cerr << "DEBUG: minnow TCP connection finished "
                << ( _tcp->inbound_reader().has_error() ? "uncleanly.\n" : "cleanly.\n" );
    }
    _tcp.reset();
  } catch ( const std::exception& e ) {
    std::cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
    throw e;
  }
}

//! \param[in] condition is a function returning true if loop should continue
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_tcp_loop( const std::function<bool()>& condition )
{
  auto base_time = timestamp_ms();
  while ( condition() ) {
    auto ret = _eventloop.wait_next_event( TCP_TICK_MS );
    if ( ret == EventLoop::Result::Exit or _abort ) {
      break;
    }

    if ( not _tcp.has_value() ) {
      throw std::runtime_error( "_tcp_loop entered before TCPPeer initialized" );
    }

    if ( _tcp.value().active() ) {
      const auto next_time = timestamp_ms();
      _tcp.value().tick( next_time - base_time, [&]( auto x ) { _datagram_adapter.write( x ); } );
      _datagram_adapter.tick( next_time - base_time );
      base_time = next_time;
    }
  }
}

TCPPeer

  • 接收TCPMessage,然后向对端发送消息
C++
class TCPPeer
{
  auto make_send( const auto& transmit )
  {
    return [&]( const TCPSenderMessage& x ) { send( x, transmit ); };
  }

public:
  void receive( TCPMessage msg, const TransmitFunction& transmit )
  {
    if ( not active() ) {
      return;
    }

    // Record time in case this peer has to linger after streams finish.
    time_of_last_receipt_ = cumulative_time_;

    // If SenderMessage occupies a sequence number, make sure to reply.
    need_send_ |= ( msg.sender.sequence_length() > 0 );

    // If SenderMessage is a "keep-alive" (with intentionally invalid seqno), make sure to reply.
    // (N.B. orthodox TCP rules require a reply on any unacceptable segment.)
    const auto our_ackno = receiver_.send().ackno;
    need_send_ |= ( our_ackno.has_value() and msg.sender.seqno + 1 == our_ackno.value() );

    // Did the inbound stream finish before the outbound stream? If so, no need to linger after streams finish.
    if ( receiver_.writer().is_closed() and not sender_.reader().is_finished() ) {
      linger_after_streams_finish_ = false;
    }

    // Give incoming TCPSenderMessage to receiver.
    receiver_.receive( std::move( msg.sender ) );

    // Give incoming TCPReceiverMessage to sender.
    sender_.receive( msg.receiver );

    // Send reply if needed.
    if ( need_send_ ) {
      send( sender_.make_empty_message(), transmit );
    }
  }


private:
  void send( const TCPSenderMessage& sender_message, const TransmitFunction& transmit )
  {
    TCPMessage msg { sender_message, receiver_.send() };
    transmit( std::move( msg ) );
    need_send_ = false;
  }
};

测试端

  • bidirectional_stream_copy触发rule1和rule2;触发_initialize_TCP的read pipe into outbound buffer;然后发送给对端数据报

  • 触发_initialize_TCPreceive TCP segment from network,进行消息的收发

  • rule1: read from stdin into outbound byte stream

  • rule2: read from outbound byte stream into socket

  • rule3: read from socket into inbound byte stream

  • rule4: read from inbound byte stream into stdout

C++
void bidirectional_stream_copy( Socket& socket, string_view peer_name )
{
  constexpr size_t buffer_size = 1048576;

  EventLoop _eventloop {};
  FileDescriptor _input { STDIN_FILENO };
  FileDescriptor _output { STDOUT_FILENO };
  ByteStream _outbound { buffer_size };
  ByteStream _inbound { buffer_size };
  bool _outbound_shutdown { false };
  bool _inbound_shutdown { false };

  socket.set_blocking( false );
  _input.set_blocking( false );
  _output.set_blocking( false );

  // rule 1: read from stdin into outbound byte stream
  _eventloop.add_rule(
    "read from stdin into outbound byte stream",
    _input,
    Direction::In,
    [&] {
      string data;
      data.resize( _outbound.writer().available_capacity() );
      _input.read( data );
      _outbound.writer().push( move( data ) );
      if ( _input.eof() ) {
        _outbound.writer().close();
      }
    },
    [&] {
      return !_outbound.has_error() and !_inbound.has_error() and ( _outbound.writer().available_capacity() > 0 )
             and !_outbound.writer().is_closed();
    },
    [&] { _outbound.writer().close(); },
    [&] {
      cerr << "DEBUG: Outbound stream had error from source.\n";
      _outbound.set_error();
      _inbound.set_error();
    } );

  // rule 2: read from outbound byte stream into socket
  _eventloop.add_rule(
    "read from outbound byte stream into socket",
    socket,
    Direction::Out,
    [&] {
      if ( _outbound.reader().bytes_buffered() ) {
        _outbound.reader().pop( socket.write( _outbound.reader().peek() ) );
      }
      if ( _outbound.reader().is_finished() ) {
        socket.shutdown( SHUT_WR );
        _outbound_shutdown = true;
        cerr << "DEBUG: Outbound stream to " << peer_name << " finished.\n";
      }
    },
    [&] {
      return _outbound.reader().bytes_buffered() or ( _outbound.reader().is_finished() and not _outbound_shutdown );
    },
    [&] { _outbound.writer().close(); },
    [&] {
      cerr << "DEBUG: Outbound stream had error from destination.\n";
      _outbound.set_error();
      _inbound.set_error();
    } );

  // rule 3: read from socket into inbound byte stream
  _eventloop.add_rule(
    "read from socket into inbound byte stream",
    socket,
    Direction::In,
    [&] {
      string data;
      data.resize( _inbound.writer().available_capacity() );
      socket.read( data );
      _inbound.writer().push( move( data ) );
      if ( socket.eof() ) {
        _inbound.writer().close();
      }
    },
    [&] {
      return !_inbound.has_error() and !_outbound.has_error() and ( _inbound.writer().available_capacity() > 0 )
             and !_inbound.writer().is_closed();
    },
    [&] { _inbound.writer().close(); },
    [&] {
      cerr << "DEBUG: Inbound stream had error from source.\n";
      _outbound.set_error();
      _inbound.set_error();
    } );

  // rule 4: read from inbound byte stream into stdout
  _eventloop.add_rule(
    "read from inbound byte stream into stdout",
    _output,
    Direction::Out,
    [&] {
      if ( _inbound.reader().bytes_buffered() ) {
        _inbound.reader().pop( _output.write( _inbound.reader().peek() ) );
      }
      if ( _inbound.reader().is_finished() ) {
        _output.close();
        _inbound_shutdown = true;
        cerr << "DEBUG: Inbound stream from " << peer_name << " finished"
             << ( _inbound.has_error() ? " uncleanly.\n" : ".\n" );
      }
    },
    [&] {
      return _inbound.reader().bytes_buffered() or ( _inbound.reader().is_finished() and not _inbound_shutdown );
    },
    [&] { _inbound.writer().close(); },
    [&] {
      cerr << "DEBUG: Inbound stream had error from destination.\n";
      _outbound.set_error();
      _inbound.set_error();
    } );

  // loop until completion
  while ( true ) {
    if ( EventLoop::Result::Exit == _eventloop.wait_next_event( -1 ) ) {
      return;
    }
  }
}

IP包发送接收的设备支持

  • 构建可以通信的TCP,泛型Adapt需要满足write和read的约束;CS144TCPSocket注册了一个TUN(网络隧道)设备来发送和接收IP数据包。

  • write将我们构造的TCP包序列化后写入TUN设备

  • read读取IP包头,获取TCP报头和报文(unwrap_tcp_in_ip)
C++
template<class T>
concept TCPDatagramAdapter = requires( T a, TCPMessage seg ) {
  {
    a.write( seg )
  } -> std::same_as<void>;

  {
    a.read()
  } -> std::same_as<std::optional<TCPMessage>>;
};

//! \brief A FD adapter for IPv4 datagrams read from and written to a TUN device
class TCPOverIPv4OverTunFdAdapter : public TCPOverIPv4Adapter
{
private:
  TunFD _tun;

public:
  //! Construct from a TunFD
  explicit TCPOverIPv4OverTunFdAdapter( TunFD&& tun ) : _tun( std::move( tun ) ) {}

  //! Attempts to read and parse an IPv4 datagram containing a TCP segment related to the current connection
  std::optional<TCPMessage> read();

  //! Creates an IPv4 datagram from a TCP segment and writes it to the TUN device
  void write( const TCPMessage& seg ) { _tun.write( serialize( wrap_tcp_in_ip( seg ) ) ); }

  //! Access the underlying TUN device
  explicit operator TunFD&() { return _tun; }

  //! Access the underlying TUN device
  explicit operator const TunFD&() const { return _tun; }

  //! Access underlying file descriptor
  FileDescriptor& fd() { return _tun; }
};

optional<TCPMessage> TCPOverIPv4OverTunFdAdapter::read()
{
  vector<string> strs( 2 );
  strs.front().resize( IPv4Header::LENGTH );
  _tun.read( strs );

  InternetDatagram ip_dgram;
  const vector<string> buffers = { strs.at( 0 ), strs.at( 1 ) };
  if ( parse( ip_dgram, buffers ) ) {
    return unwrap_tcp_in_ip( ip_dgram );
  }
  return {};
}

TCP实现

  • 一个TCP的对等端需要实现的核心数据结构和功能如下
C++
class TCPPeer
{

public:
  explicit TCPPeer( const TCPConfig& cfg ) : cfg_( cfg ) {}

  Writer& outbound_writer() { return sender_.writer(); }
  Reader& inbound_reader() { return receiver_.reader(); }

  /* Passthrough methods */
  void push( const TransmitFunction& transmit ) { sender_.push( make_send( transmit ) ); }

  void receive( TCPMessage msg, const TransmitFunction& transmit );

  // Testing interface
  const TCPReceiver& receiver() const { return receiver_; }
  const TCPSender& sender() const { return sender_; }

private:
  TCPConfig cfg_;
  TCPSender sender_ { ByteStream { cfg_.send_capacity }, cfg_.isn, cfg_.rt_timeout };
  TCPReceiver receiver_ { Reassembler { ByteStream { cfg_.recv_capacity } } };

  void send( const TCPSenderMessage& sender_message, const TransmitFunction& transmit );
};

重要数据结构

Byte_Stream

  • 实现字符串的先进先出;Writer写入字符串,Reader读出任意长度的字符
  • 为了性能,使用std::queue<std::string>,记录弹出字符在字符串中的索引实现peek()
C++
void Writer::push( string data )
{
  // Your code here.
  size_t datalen = data.size();
  size_t write_avail = Writer::available_capacity();

  if ( Writer::is_closed() || write_avail == 0 || data.empty() ) {
    return;
  }
  if ( datalen > write_avail ) {
    data.resize( write_avail );
  }
  pushed_ += data.size();
  buffered_ += data.size();
  data_.emplace( std::move( data ) );
}

string_view Reader::peek() const
{
  // Your code here.
  return data_.empty() ? string_view {} : string_view { data_.front() }.substr( read_index_ );
}

void Reader::pop( uint64_t len )
{
  // Your code here.
  buffered_ -= len;
  poped_ += len;
  while ( len != 0U ) {
    uint64_t size = data_.front().size() - read_index_;
    if ( len < size ) {
      read_index_ += len;
      break;
    }
    // 需要向前移动一个string
    data_.pop();
    read_index_ = 0;
    len -= size;
  }
}

Reassembler

  • 将乱序接收的字符串变为有序字节流,使用map进行乱序字符串的存储;重点在于处理重叠字串

重叠字串的处理
  • 找到第一个小于等于pos的迭代器,如果map中含有的字符串与要插入的字符串产生重叠,移除含有的字符串重叠部分,增加一个重叠部分的迭代器进行后续删除操作
  • 先找插入与字符串末尾重叠的迭代器,防止失效
  • 再找与first_index重叠的迭代器
  • 移除[lower, upper)的迭代器,插入新的字符串

    ~c++
    auto Reassembler::split( uint64_t pos ) noexcept
    {
    auto it { buffer_.lower_bound( pos ) };
    if ( it != buffer_.end() and it->first == pos ) {
    return it;
    }
    if ( it == buffer_.begin() ) { // if buffer_.empty() then begin() == end()
    return it;
    }
    if ( const auto pit { prev( it ) }; pit->first + size( pit->second ) > pos ) {
    // 构建重叠部分迭代器
    const auto res = buffer_.emplace_hint( it, pos, pit->second.substr( pos - pit->first ) );
    pit->second.resize( pos - pit->first );
    return res;
    }
    return it;
    };
    // 对[lower, upper)的迭代器进行视图转换,然后更新reassemble的大小
    ranges::for_each( ranges::subrange( lower, upper ) | views::values,
    & { reassemble_len_ -= str.size(); } );
    reassemble_len_ += size(data);
    buffer_.emplace_hint(buffer_.erase(lower, upper), first_index, move(data));

C++
void Reassembler::insert( uint64_t first_index, string data, bool is_last_substring )
{
  // Your code here.
  const auto try_close = [&]() noexcept -> void {
    if ( end_idx_.has_value() && end_idx_.value() == next_unassem_idx_ ) {
      output_.writer().close();
    }
  };

  if ( data.empty() ) {
    if ( !end_idx_.has_value() && is_last_substring ) {
      end_idx_.emplace( first_index );
    }
    return try_close();
  }

  if ( writer().is_closed() || writer().available_capacity() == 0 ) {
    return;
  }

  uint64_t unassembled_index { next_unassem_idx_ };
  uint64_t unacceptable_index { unassembled_index + writer().available_capacity() };
  // 将不再[unassembled_index, unacceptable_index)的字符串进行处理
  if ( first_index + size( data ) <= unassembled_index || first_index >= unacceptable_index ) {
    return; // Out of ranger
  }
  if ( first_index + size( data ) > unacceptable_index ) { // Remove unacceptable bytes
    data.resize( unacceptable_index - first_index );
    is_last_substring = false;
  }
  if ( first_index < unassembled_index ) { // Remove poped/buffered bytes
    data.erase( 0, unassembled_index - first_index );
    first_index = unassembled_index;
  }

  if ( !end_idx_.has_value() and is_last_substring ) {
    end_idx_.emplace( first_index + size( data ) );
  }

  // 对字符串进行分割
  // TODO: 可以对字符串移除进行优化
  auto upper { split( first_index + size( data ) ) };
  auto lower { split( first_index ) };
  // 对[lower, upper)的迭代器进行视图转换,然后更新reassemble的大小
  ranges::for_each( ranges::subrange( lower, upper ) | views::values,
                    [&]( const auto& str ) { reassemble_len_ -= str.size(); } );
  reassemble_len_ += size( data );
  buffer_.emplace_hint( buffer_.erase( lower, upper ), first_index, move( data ) );

  // 向writer中写入已经排好序的报文
  while ( !buffer_.empty() ) {
    auto&& [index, packet] { *buffer_.begin() };
    if ( index != next_unassem_idx_ ) {
      break;
    }

    reassemble_len_ -= size( packet );
    next_unassem_idx_ += size( packet );
    output_.writer().push( move( packet ) );
    buffer_.erase( buffer_.begin() );
  }
  return try_close();
}

TCPReceiver

  • 64bit转32bit ------- 32bit转64bit:要求最近且不溢出
C++
Wrap32 Wrap32::wrap( uint64_t n, Wrap32 zero_point )
{
  // Your code here.
  return zero_point + static_cast<uint32_t>( n );
}

uint64_t Wrap32::unwrap( Wrap32 zero_point, uint64_t checkpoint ) const
{
  // Your code here.
  const uint64_t n_low32 { this->raw_value_ - zero_point.raw_value_ };
  const uint64_t c_low32 { checkpoint & MASK_LOW_32 };
  const uint64_t res { ( checkpoint & MASK_HIGH_32 ) | n_low32 };
  if ( res >= BASE and n_low32 > c_low32 and ( n_low32 - c_low32 ) > ( BASE / 2 ) ) {
    return res - BASE;
  }
  if ( res < MASK_HIGH_32 and c_low32 > n_low32 and ( c_low32 - n_low32 ) > ( BASE / 2 ) ) {
    return res + BASE;
  }
  return res;
}
  • receive方法将序列号转换为payload索引;send方法返回确认的ACK
C++
void TCPReceiver::receive( TCPSenderMessage message )
{
  // Your code here.
  if ( writer().has_error() )
    return;
  if ( message.RST ) {
    reader().set_error();
    return;
  }
  if ( !zero_point_.has_value() ) {
    if ( !message.SYN ) {
      return;
    }
    zero_point_.emplace( message.seqno );
  }
  const uint64_t checkpoint { writer().bytes_pushed() + 1 /* SYN */ }; // abs_seqno for expecting payload
  const uint64_t absolute_seqno { message.seqno.unwrap( zero_point_.value(), checkpoint ) };
  const uint64_t stream_index { absolute_seqno + static_cast<uint64_t>( message.SYN ) - 1 /* SYN */ };
  reassembler_.insert( stream_index, move( message.payload ), message.FIN );
}

TCPReceiverMessage TCPReceiver::send() const
{
  // Your code here.
  const uint16_t window_size { writer().available_capacity() > UINT16_MAX
                                 ? static_cast<uint16_t>( UINT16_MAX )
                                 : static_cast<uint16_t>( writer().available_capacity() ) };
  if ( zero_point_.has_value() ) {
    const uint64_t ack_for_seqno { writer().bytes_pushed() + 1 + static_cast<uint64_t>( writer().is_closed() ) };
    return { Wrap32::wrap( ack_for_seqno, zero_point_.value() ), window_size, writer().has_error() };
  }
  return { nullopt, window_size, writer().has_error() };
}

TCPSender

Timer实现
  • 控制启动、关闭、重装 RTO、重置定时器;
  • 使用tick接口完成计时;
  • 按需使得 RTO翻倍。
C++
class Timer
{
public:
  explicit Timer( uint64_t initial_RTO_ms ) : RTO_ms_( initial_RTO_ms ) {}
  [[nodiscard]] constexpr bool is_active() noexcept { return is_active_; }
  [[nodiscard]] constexpr bool is_expired() noexcept { return is_active_ && running_time_ >= RTO_ms_; }
  constexpr void reset() noexcept { running_time_ = 0; }
  constexpr void exponential_back() noexcept { RTO_ms_ *= 2; }
  constexpr void reload( uint64_t initail_RTO_ms ) noexcept
  {
    RTO_ms_ = initail_RTO_ms;
    reset();
  }
  constexpr void start() noexcept
  {
    is_active_ = true;
    reset();
  }
  constexpr void stop() noexcept
  {
    is_active_ = false;
    reset();
  }
  constexpr Timer& tick( uint64_t ticks ) noexcept
  {
    running_time_ += is_active_ ? ticks : 0;
    return *this;
  }

private:
  uint64_t RTO_ms_;
  bool is_active_ {};
  uint64_t running_time_ {};
};
方法实现
  • push尽可能在一个窗口将reader缓存未发送的数据发送;receive确认当前跟踪的消息ack均小于传来的ack,不再跟踪消息;tick实现指数回退超时和重传
C++
void TCPSender::push( const TransmitFunction& transmit )
{
  // Your code here.
  // 尽可能多的在一个窗口发送数据
  while ( ( window_size_ == 0 ? 1 : window_size_ ) > total_outstanding_ ) {
    if ( FIN_sent_ )
      break;
    auto message { make_empty_message() };
    // 标记初始发送的SYN包
    if ( !SYN_sent_ ) {
      message.SYN = true;
      SYN_sent_ = true;
    }
    // window_size空余容量(包含该条信息的长度)
    const uint64_t remaining { ( window_size_ == 0 ? 1 : window_size_ ) - total_outstanding_ };
    // 还能增加payload的长度
    const size_t payload_len { min( TCPConfig::MAX_PAYLOAD_SIZE, remaining - message.sequence_length() ) };
    auto&& payload { message.payload };
    while ( reader().bytes_buffered() != 0U && payload.size() < payload_len ) {
      string_view view { reader().peek() };
      // 如果当前view长度小于payload_len,需要再次读取,所以这里使用循环
      view = view.substr( 0, payload_len - payload.size() );
      payload += view;
      input_.reader().pop( view.size() );
    }
    // 何时增加FIN标记
    if ( !FIN_sent_ && remaining > message.sequence_length() && reader().is_finished() ) {
      message.FIN = true;
      FIN_sent_ = true;
    }

    if ( message.sequence_length() == 0 )
      break;

    // 传输message
    transmit( message );
    if ( !timer_.is_active() ) {
      timer_.start();
    }
    next_seqno_ += message.sequence_length();
    total_outstanding_ += message.sequence_length();
    track_messages_.push( message );
  }
}

void TCPSender::receive( const TCPReceiverMessage& msg )
{
  // Your code here.
  // 错误处理
  if ( input_.has_error() )
    return;
  if ( msg.RST ) {
    input_.set_error();
    return;
  }

  window_size_ = msg.window_size;
  if ( !msg.ackno.has_value() )
    return;
  const uint64_t recv_ack_seqno { msg.ackno->unwrap( isn_, next_seqno_ ) }; // 接收到的ack序列号
  if ( recv_ack_seqno > next_seqno_ )
    return; // 超过期待序列号的包直接丢弃
  bool acked { false };
  // 从队列中删除已经确认的包
  while ( !track_messages_.empty() ) {
    const auto& message { track_messages_.front() };
    if ( ack_seqno_ + message.sequence_length() > recv_ack_seqno )
      break; // 没有完全被确认的包丢弃
    acked = true;
    ack_seqno_ += message.sequence_length();
    total_outstanding_ -= message.sequence_length();
    track_messages_.pop();
  }
  // 收到ack包后,重置RTO
  if ( acked ) {
    total_retransmit_ = 0;
    timer_.reload( initial_RTO_ms_ );
    track_messages_.empty() ? timer_.stop() : timer_.start();
  }
}

void TCPSender::tick( uint64_t ms_since_last_tick, const TransmitFunction& transmit )
{
  // Your code here.
  // 发送的包超时
  if ( timer_.tick( ms_since_last_tick ).is_expired() ) {
    if ( track_messages_.empty() )
      return;
    transmit( track_messages_.front() );
    if ( window_size_ != 0 ) {
      total_retransmit_ += 1;
      timer_.exponential_back();
    }
    timer_.reset();
  }
}