Minnow¶
- 初始的TCPSocket是在Linux TCP/IP栈上封装的一个类
- lab4开始自己构建- TCPPeer对端进行数据传输,直接使用网卡上的IP数据包进行TCP包的构建
- lab5实现简易ARP协议,可以转换IP地址和MAC地址NetworkInterface
- lab6实现了最长前缀匹配的链路层选路算法IP route
- lab7实现了一个使用IProute、NetworkInterface的端到端的TCP报文传输
数据格式¶
IP包¶
-  IPv4数据包构建:不支持IP选项 C++// IPv4 Internet datagram header (note: IP options are not supported) struct IPv4Header { static constexpr size_t LENGTH = 20; // IPv4 header length, not including options static constexpr uint8_t DEFAULT_TTL = 128; // A reasonable default TTL value static constexpr uint8_t PROTO_TCP = 6; // Protocol number for TCP static constexpr uint64_t serialized_length() { return LENGTH; } /* * 0 1 2 3 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * |Version| IHL |Type of Service| Total Length | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Identification |Flags| Fragment Offset | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Time to Live | Protocol | Header Checksum | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Source Address | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Destination Address | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Options | Padding | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ */ // IPv4 Header fields uint8_t ver = 4; // IP version uint8_t hlen = LENGTH / 4; // header length (multiples of 32 bits) uint8_t tos = 0; // type of service uint16_t len = 0; // total length of packet uint16_t id = 0; // identification number bool df = true; // don't fragment flag bool mf = false; // more fragments flag uint16_t offset = 0; // fragment offset field uint8_t ttl = DEFAULT_TTL; // time to live field uint8_t proto = PROTO_TCP; // protocol field uint16_t cksum = 0; // checksum field uint32_t src = 0; // src address uint32_t dst = 0; // dst address // Length of the payload uint16_t payload_length() const; // Pseudo-header's contribution to the TCP checksum uint32_t pseudo_checksum() const; // Set checksum to correct value void compute_checksum(); // Return a string containing a header in human-readable format std::string to_string() const; void parse( Parser& parser ); void serialize( Serializer& serializer ) const; };
TCP报文¶
- TCPMessage包含TCPSenderMessage和TCPReceiverMessage
struct TCPMessage
{
  TCPSenderMessage sender {};
  TCPReceiverMessage receiver {};
};
struct TCPSenderMessage
{
  Wrap32 seqno { 0 };
  bool SYN {};
  std::string payload {};
  bool FIN {};
  bool RST {};
  // How many sequence numbers does this segment use?
  size_t sequence_length() const { return SYN + payload.size() + FIN; }
};
struct TCPReceiverMessage
{
  std::optional<Wrap32> ackno {};
  uint16_t window_size {};
  bool RST {};
};
-  在IP上构建自己的TCP报文;将IP数据包通过TUN设备与网络上其他TCP进行通信 C++class TCPOverIPv4Adapter : public FdAdapterBase { public: std::optional<TCPMessage> unwrap_tcp_in_ip( const InternetDatagram& ip_dgram ); InternetDatagram wrap_tcp_in_ip( const TCPMessage& msg ); }; optional<TCPMessage> TCPOverIPv4Adapter::unwrap_tcp_in_ip( const InternetDatagram& ip_dgram ) { // is the IPv4 datagram for us? // Note: it's valid to bind to address "0" (INADDR_ANY) and reply from actual address contacted if ( not listening() and ( ip_dgram.header.dst != config().source.ipv4_numeric() ) ) { return {}; } // is the IPv4 datagram from our peer? if ( not listening() and ( ip_dgram.header.src != config().destination.ipv4_numeric() ) ) { return {}; } // does the IPv4 datagram claim that its payload is a TCP segment? if ( ip_dgram.header.proto != IPv4Header::PROTO_TCP ) { return {}; } // is the payload a valid TCP segment? TCPSegment tcp_seg; if ( not parse( tcp_seg, ip_dgram.payload, ip_dgram.header.pseudo_checksum() ) ) { return {}; } // is the TCP segment for us? if ( tcp_seg.udinfo.dst_port != config().source.port() ) { return {}; } // should we target this source addr/port (and use its destination addr as our source) in reply? if ( listening() ) { if ( tcp_seg.message.sender.SYN and not tcp_seg.message.sender.RST ) { config_mutable().source = Address { inet_ntoa( { htobe32( ip_dgram.header.dst ) } ), config().source.port() }; config_mutable().destination = Address { inet_ntoa( { htobe32( ip_dgram.header.src ) } ), tcp_seg.udinfo.src_port }; set_listening( false ); } else { return {}; } } // is the TCP segment from our peer? if ( tcp_seg.udinfo.src_port != config().destination.port() ) { return {}; } return tcp_seg.message; } //! Takes a TCP segment, sets port numbers as necessary, and wraps it in an IPv4 datagram //! \param[in] seg is the TCP segment to convert InternetDatagram TCPOverIPv4Adapter::wrap_tcp_in_ip( const TCPMessage& msg ) { TCPSegment seg { .message = msg }; // set the port numbers in the TCP segment seg.udinfo.src_port = config().source.port(); seg.udinfo.dst_port = config().destination.port(); // create an Internet Datagram and set its addresses and length InternetDatagram ip_dgram; ip_dgram.header.src = config().source.ipv4_numeric(); ip_dgram.header.dst = config().destination.ipv4_numeric(); ip_dgram.header.len = ip_dgram.header.hlen * 4 + 20 /* tcp header len */ + seg.message.sender.payload.size(); // set payload, calculating TCP checksum using information from IP header seg.compute_checksum( ip_dgram.header.pseudo_checksum() ); ip_dgram.header.compute_checksum(); ip_dgram.payload = serialize( seg ); return ip_dgram; }
FD操作¶
- 对FD的操作比如write和read,都是对系统调用的封装
TCP工作流程¶
- 事件驱动模式进行TCP的监听,调用poll系统调用接收消息;设置回调,将网络上的IP数据包传递到我们设计的TCP上
- TCPPeer构建测试程序,同样使用事件驱动,只是回调略有不同
初始化TCP¶
- 设置poll需要监听的事件
 1. 收到数据包,将TCP报文传递给TCPPeer接收
 2. 读取pipe的字节传递给TCPPeer:用户在命令行输入字符串使用TCPSender发送到对端
 3. 从Reassembler中读取字节写入LocalStreamSocket:用户从reassembler中读取字符串,写入unix域套接字
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_initialize_TCP( const TCPConfig& config )
{
  _tcp.emplace( config );
  // Set up the event loop
  // There are three events to handle:
  //
  // 1) Incoming datagram received (needs to be given to TCPPeer::receive method)
  //
  // 2) Outbound bytes received from local application via a write()
  //    call (needs to be read from the local stream socket and
  //    given to TCPPeer)
  //
  // 3) Incoming bytes reassembled by the Reassembler
  //    (needs to be read from the inbound_stream and written
  //    to the local stream socket back to the application)
  // rule 1: read from filtered packet stream and dump into TCPConnection
  _eventloop.add_rule(
    "receive TCP segment from the network",
    _datagram_adapter.fd(),
    Direction::In,
    [&] {
      if ( auto seg = _datagram_adapter.read() ) {
        _tcp->receive( std::move( seg.value() ), [&]( auto x ) { _datagram_adapter.write( x ); } );
      }
      // debugging output:
      if ( _thread_data.eof() and _tcp.value().sender().sequence_numbers_in_flight() == 0 and not _fully_acked ) {
        std::cerr << "DEBUG: minnow outbound stream to " << _datagram_adapter.config().destination.to_string()
                  << " has been fully acknowledged.\n";
        _fully_acked = true;
      }
    },
    [&] { return _tcp->active(); } );
  // rule 2: read from pipe into outbound buffer
  _eventloop.add_rule(
    "push bytes to TCPPeer",
    _thread_data,
    Direction::In,
    [&] {
      std::string data;
      data.resize( _tcp->outbound_writer().available_capacity() );
      _thread_data.read( data );
      _tcp->outbound_writer().push( move( data ) );
      if ( _thread_data.eof() ) {
        _tcp->outbound_writer().close();
        _outbound_shutdown = true;
        // debugging output:
        std::cerr << "DEBUG: minnow outbound stream to " << _datagram_adapter.config().destination.to_string()
                  << " finished (" << _tcp.value().sender().sequence_numbers_in_flight() << " seqno"
                  << ( _tcp.value().sender().sequence_numbers_in_flight() == 1 ? "" : "s" )
                  << " still in flight).\n";
      }
      _tcp->push( [&]( auto x ) { _datagram_adapter.write( x ); } );
    },
    [&] {
      return ( _tcp->active() ) and ( not _outbound_shutdown )
             and ( _tcp->outbound_writer().available_capacity() > 0 );
    },
    [&] {
      _tcp->outbound_writer().close();
      _outbound_shutdown = true;
    },
    [&] {
      std::cerr << "DEBUG: minnow outbound stream had error.\n";
      _tcp->outbound_writer().set_error();
    } );
  // rule 3: read from inbound buffer into pipe
  _eventloop.add_rule(
    "read bytes from inbound stream",
    _thread_data,
    Direction::Out,
    [&] {
      Reader& inbound = _tcp->inbound_reader();
      // Write from the inbound_stream into
      // the pipe, handling the possibility of a partial
      // write (i.e., only pop what was actually written).
      if ( inbound.bytes_buffered() ) {
        const std::string_view buffer = inbound.peek();
        const auto bytes_written = _thread_data.write( buffer );
        inbound.pop( bytes_written );
      }
      if ( inbound.is_finished() or inbound.has_error() ) {
        _thread_data.shutdown( SHUT_WR );
        _inbound_shutdown = true;
        // debugging output:
        std::cerr << "DEBUG: minnow inbound stream from " << _datagram_adapter.config().destination.to_string()
                  << " finished " << ( inbound.has_error() ? "uncleanly.\n" : "cleanly.\n" );
      }
    },
    [&] {
      return _tcp->inbound_reader().bytes_buffered()
             or ( ( _tcp->inbound_reader().is_finished() or _tcp->inbound_reader().has_error() )
                  and not _inbound_shutdown );
    },
    [&] {},
    [&] {
      std::cerr << "DEBUG: minnow inbound stream had error.\n";
      _tcp->inbound_reader().set_error();
    } );
}
connect¶
-  connect事件是向TCP对端写数据(SYN包)C++template<TCPDatagramAdapter AdaptT> void TCPMinnowSocket<AdaptT>::connect( const TCPConfig& c_tcp, const FdAdapterConfig& c_ad ) { if ( _tcp ) { throw std::runtime_error( "connect() with TCPConnection already initialized" ); } _initialize_TCP( c_tcp ); _datagram_adapter.config_mut() = c_ad; std::cerr << "DEBUG: minnow connecting to " << c_ad.destination.to_string() << "...\n"; if ( not _tcp.has_value() ) { throw std::runtime_error( "TCPPeer not successfully initialized" ); } _tcp->push( [&]( auto x ) { _datagram_adapter.write( x ); } ); if ( _tcp->sender().sequence_numbers_in_flight() != 1 ) { throw std::runtime_error( "After TCPConnection::connect(), expected sequence_numbers_in_flight() == 1" ); } _tcp_loop( [&] { return _tcp->sender().sequence_numbers_in_flight() == 1; } ); if ( _tcp->inbound_reader().has_error() ) { std::cerr << "DEBUG: minnow error on connecting to " << c_ad.destination.to_string() << ".\n"; } else { std::cerr << "DEBUG: minnow successfully connected to " << c_ad.destination.to_string() << ".\n"; } _tcp_thread = std::thread( &TCPMinnowSocket::_tcp_main, this ); }listen_and_accept¶- listen_and_accept监听对端是否回复ack
 C++template<TCPDatagramAdapter AdaptT> void TCPMinnowSocket<AdaptT>::listen_and_accept( const TCPConfig& c_tcp, const FdAdapterConfig& c_ad ) { if ( _tcp ) { throw std::runtime_error( "listen_and_accept() with TCPConnection already initialized" ); } _initialize_TCP( c_tcp ); _datagram_adapter.config_mut() = c_ad; _datagram_adapter.set_listening( true ); std::cerr << "DEBUG: minnow listening for incoming connection...\n"; _tcp_loop( [&] { return ( not _tcp->has_ackno() ) or ( _tcp->sender().sequence_numbers_in_flight() ); } ); std::cerr << "DEBUG: minnow new connection from " << _datagram_adapter.config().destination.to_string() << ".\n"; _tcp_thread = std::thread( &TCPMinnowSocket::_tcp_main, this ); }执行主函数¶
-  一直循环,真正结束的函数在 _tcp_loop
-  _tcp_loop结束在poll等待事件结束
-  出错 
- TCP连接的fd关闭
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_tcp_main()
{
  try {
    if ( not _tcp.has_value() ) {
      throw std::runtime_error( "no TCP" );
    }
    _tcp_loop( [] { return true; } );
    shutdown( SHUT_RDWR );
    if ( not _tcp.value().active() ) {
      std::cerr << "DEBUG: minnow TCP connection finished "
                << ( _tcp->inbound_reader().has_error() ? "uncleanly.\n" : "cleanly.\n" );
    }
    _tcp.reset();
  } catch ( const std::exception& e ) {
    std::cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
    throw e;
  }
}
//! \param[in] condition is a function returning true if loop should continue
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_tcp_loop( const std::function<bool()>& condition )
{
  auto base_time = timestamp_ms();
  while ( condition() ) {
    auto ret = _eventloop.wait_next_event( TCP_TICK_MS );
    if ( ret == EventLoop::Result::Exit or _abort ) {
      break;
    }
    if ( not _tcp.has_value() ) {
      throw std::runtime_error( "_tcp_loop entered before TCPPeer initialized" );
    }
    if ( _tcp.value().active() ) {
      const auto next_time = timestamp_ms();
      _tcp.value().tick( next_time - base_time, [&]( auto x ) { _datagram_adapter.write( x ); } );
      _datagram_adapter.tick( next_time - base_time );
      base_time = next_time;
    }
  }
}
TCPPeer¶
- 接收TCPMessage,然后向对端发送消息
class TCPPeer
{
  auto make_send( const auto& transmit )
  {
    return [&]( const TCPSenderMessage& x ) { send( x, transmit ); };
  }
public:
  void receive( TCPMessage msg, const TransmitFunction& transmit )
  {
    if ( not active() ) {
      return;
    }
    // Record time in case this peer has to linger after streams finish.
    time_of_last_receipt_ = cumulative_time_;
    // If SenderMessage occupies a sequence number, make sure to reply.
    need_send_ |= ( msg.sender.sequence_length() > 0 );
    // If SenderMessage is a "keep-alive" (with intentionally invalid seqno), make sure to reply.
    // (N.B. orthodox TCP rules require a reply on any unacceptable segment.)
    const auto our_ackno = receiver_.send().ackno;
    need_send_ |= ( our_ackno.has_value() and msg.sender.seqno + 1 == our_ackno.value() );
    // Did the inbound stream finish before the outbound stream? If so, no need to linger after streams finish.
    if ( receiver_.writer().is_closed() and not sender_.reader().is_finished() ) {
      linger_after_streams_finish_ = false;
    }
    // Give incoming TCPSenderMessage to receiver.
    receiver_.receive( std::move( msg.sender ) );
    // Give incoming TCPReceiverMessage to sender.
    sender_.receive( msg.receiver );
    // Send reply if needed.
    if ( need_send_ ) {
      send( sender_.make_empty_message(), transmit );
    }
  }
private:
  void send( const TCPSenderMessage& sender_message, const TransmitFunction& transmit )
  {
    TCPMessage msg { sender_message, receiver_.send() };
    transmit( std::move( msg ) );
    need_send_ = false;
  }
};
测试端¶
-  bidirectional_stream_copy触发rule1和rule2;触发_initialize_TCP的read pipe into outbound buffer;然后发送给对端数据报
-  触发 _initialize_TCP的receive TCP segment from network,进行消息的收发
-  rule1: read from stdin into outbound byte stream 
-  rule2: read from outbound byte stream into socket 
-  rule3: read from socket into inbound byte stream 
-  rule4: read from inbound byte stream into stdout 
void bidirectional_stream_copy( Socket& socket, string_view peer_name )
{
  constexpr size_t buffer_size = 1048576;
  EventLoop _eventloop {};
  FileDescriptor _input { STDIN_FILENO };
  FileDescriptor _output { STDOUT_FILENO };
  ByteStream _outbound { buffer_size };
  ByteStream _inbound { buffer_size };
  bool _outbound_shutdown { false };
  bool _inbound_shutdown { false };
  socket.set_blocking( false );
  _input.set_blocking( false );
  _output.set_blocking( false );
  // rule 1: read from stdin into outbound byte stream
  _eventloop.add_rule(
    "read from stdin into outbound byte stream",
    _input,
    Direction::In,
    [&] {
      string data;
      data.resize( _outbound.writer().available_capacity() );
      _input.read( data );
      _outbound.writer().push( move( data ) );
      if ( _input.eof() ) {
        _outbound.writer().close();
      }
    },
    [&] {
      return !_outbound.has_error() and !_inbound.has_error() and ( _outbound.writer().available_capacity() > 0 )
             and !_outbound.writer().is_closed();
    },
    [&] { _outbound.writer().close(); },
    [&] {
      cerr << "DEBUG: Outbound stream had error from source.\n";
      _outbound.set_error();
      _inbound.set_error();
    } );
  // rule 2: read from outbound byte stream into socket
  _eventloop.add_rule(
    "read from outbound byte stream into socket",
    socket,
    Direction::Out,
    [&] {
      if ( _outbound.reader().bytes_buffered() ) {
        _outbound.reader().pop( socket.write( _outbound.reader().peek() ) );
      }
      if ( _outbound.reader().is_finished() ) {
        socket.shutdown( SHUT_WR );
        _outbound_shutdown = true;
        cerr << "DEBUG: Outbound stream to " << peer_name << " finished.\n";
      }
    },
    [&] {
      return _outbound.reader().bytes_buffered() or ( _outbound.reader().is_finished() and not _outbound_shutdown );
    },
    [&] { _outbound.writer().close(); },
    [&] {
      cerr << "DEBUG: Outbound stream had error from destination.\n";
      _outbound.set_error();
      _inbound.set_error();
    } );
  // rule 3: read from socket into inbound byte stream
  _eventloop.add_rule(
    "read from socket into inbound byte stream",
    socket,
    Direction::In,
    [&] {
      string data;
      data.resize( _inbound.writer().available_capacity() );
      socket.read( data );
      _inbound.writer().push( move( data ) );
      if ( socket.eof() ) {
        _inbound.writer().close();
      }
    },
    [&] {
      return !_inbound.has_error() and !_outbound.has_error() and ( _inbound.writer().available_capacity() > 0 )
             and !_inbound.writer().is_closed();
    },
    [&] { _inbound.writer().close(); },
    [&] {
      cerr << "DEBUG: Inbound stream had error from source.\n";
      _outbound.set_error();
      _inbound.set_error();
    } );
  // rule 4: read from inbound byte stream into stdout
  _eventloop.add_rule(
    "read from inbound byte stream into stdout",
    _output,
    Direction::Out,
    [&] {
      if ( _inbound.reader().bytes_buffered() ) {
        _inbound.reader().pop( _output.write( _inbound.reader().peek() ) );
      }
      if ( _inbound.reader().is_finished() ) {
        _output.close();
        _inbound_shutdown = true;
        cerr << "DEBUG: Inbound stream from " << peer_name << " finished"
             << ( _inbound.has_error() ? " uncleanly.\n" : ".\n" );
      }
    },
    [&] {
      return _inbound.reader().bytes_buffered() or ( _inbound.reader().is_finished() and not _inbound_shutdown );
    },
    [&] { _inbound.writer().close(); },
    [&] {
      cerr << "DEBUG: Inbound stream had error from destination.\n";
      _outbound.set_error();
      _inbound.set_error();
    } );
  // loop until completion
  while ( true ) {
    if ( EventLoop::Result::Exit == _eventloop.wait_next_event( -1 ) ) {
      return;
    }
  }
}
IP包发送接收的设备支持¶
-  构建可以通信的TCP,泛型Adapt需要满足write和read的约束;CS144TCPSocket注册了一个TUN(网络隧道)设备来发送和接收IP数据包。 
-  write将我们构造的TCP包序列化后写入TUN设备 
- read读取IP包头,获取TCP报头和报文(unwrap_tcp_in_ip)
template<class T>
concept TCPDatagramAdapter = requires( T a, TCPMessage seg ) {
  {
    a.write( seg )
  } -> std::same_as<void>;
  {
    a.read()
  } -> std::same_as<std::optional<TCPMessage>>;
};
//! \brief A FD adapter for IPv4 datagrams read from and written to a TUN device
class TCPOverIPv4OverTunFdAdapter : public TCPOverIPv4Adapter
{
private:
  TunFD _tun;
public:
  //! Construct from a TunFD
  explicit TCPOverIPv4OverTunFdAdapter( TunFD&& tun ) : _tun( std::move( tun ) ) {}
  //! Attempts to read and parse an IPv4 datagram containing a TCP segment related to the current connection
  std::optional<TCPMessage> read();
  //! Creates an IPv4 datagram from a TCP segment and writes it to the TUN device
  void write( const TCPMessage& seg ) { _tun.write( serialize( wrap_tcp_in_ip( seg ) ) ); }
  //! Access the underlying TUN device
  explicit operator TunFD&() { return _tun; }
  //! Access the underlying TUN device
  explicit operator const TunFD&() const { return _tun; }
  //! Access underlying file descriptor
  FileDescriptor& fd() { return _tun; }
};
optional<TCPMessage> TCPOverIPv4OverTunFdAdapter::read()
{
  vector<string> strs( 2 );
  strs.front().resize( IPv4Header::LENGTH );
  _tun.read( strs );
  InternetDatagram ip_dgram;
  const vector<string> buffers = { strs.at( 0 ), strs.at( 1 ) };
  if ( parse( ip_dgram, buffers ) ) {
    return unwrap_tcp_in_ip( ip_dgram );
  }
  return {};
}
TCP实现¶
- 一个TCP的对等端需要实现的核心数据结构和功能如下
class TCPPeer
{
public:
  explicit TCPPeer( const TCPConfig& cfg ) : cfg_( cfg ) {}
  Writer& outbound_writer() { return sender_.writer(); }
  Reader& inbound_reader() { return receiver_.reader(); }
  /* Passthrough methods */
  void push( const TransmitFunction& transmit ) { sender_.push( make_send( transmit ) ); }
  void receive( TCPMessage msg, const TransmitFunction& transmit );
  // Testing interface
  const TCPReceiver& receiver() const { return receiver_; }
  const TCPSender& sender() const { return sender_; }
private:
  TCPConfig cfg_;
  TCPSender sender_ { ByteStream { cfg_.send_capacity }, cfg_.isn, cfg_.rt_timeout };
  TCPReceiver receiver_ { Reassembler { ByteStream { cfg_.recv_capacity } } };
  void send( const TCPSenderMessage& sender_message, const TransmitFunction& transmit );
};
重要数据结构¶
Byte_Stream¶
- 实现字符串的先进先出;Writer写入字符串,Reader读出任意长度的字符
- 为了性能,使用std::queue<std::string>,记录弹出字符在字符串中的索引实现peek()
void Writer::push( string data )
{
  // Your code here.
  size_t datalen = data.size();
  size_t write_avail = Writer::available_capacity();
  if ( Writer::is_closed() || write_avail == 0 || data.empty() ) {
    return;
  }
  if ( datalen > write_avail ) {
    data.resize( write_avail );
  }
  pushed_ += data.size();
  buffered_ += data.size();
  data_.emplace( std::move( data ) );
}
string_view Reader::peek() const
{
  // Your code here.
  return data_.empty() ? string_view {} : string_view { data_.front() }.substr( read_index_ );
}
void Reader::pop( uint64_t len )
{
  // Your code here.
  buffered_ -= len;
  poped_ += len;
  while ( len != 0U ) {
    uint64_t size = data_.front().size() - read_index_;
    if ( len < size ) {
      read_index_ += len;
      break;
    }
    // 需要向前移动一个string
    data_.pop();
    read_index_ = 0;
    len -= size;
  }
}
Reassembler¶
- 将乱序接收的字符串变为有序字节流,使用map进行乱序字符串的存储;重点在于处理重叠字串
重叠字串的处理¶
- 找到第一个小于等于pos的迭代器,如果map中含有的字符串与要插入的字符串产生重叠,移除含有的字符串重叠部分,增加一个重叠部分的迭代器进行后续删除操作
- 先找插入与字符串末尾重叠的迭代器,防止失效
- 再找与first_index重叠的迭代器
-  移除[ lower,upper)的迭代器,插入新的字符串~c++ 
 auto Reassembler::split( uint64_t pos ) noexcept
 {
 auto it { buffer_.lower_bound( pos ) };
 if ( it != buffer_.end() and it->first == pos ) {
 return it;
 }
 if ( it == buffer_.begin() ) { // if buffer_.empty() then begin() == end()
 return it;
 }
 if ( const auto pit { prev( it ) }; pit->first + size( pit->second ) > pos ) {
 // 构建重叠部分迭代器
 const auto res = buffer_.emplace_hint( it, pos, pit->second.substr( pos - pit->first ) );
 pit->second.resize( pos - pit->first );
 return res;
 }
 return it;
 };
 // 对[lower, upper)的迭代器进行视图转换,然后更新reassemble的大小
 ranges::for_each( ranges::subrange( lower, upper ) | views::values,
 & { reassemble_len_ -= str.size(); } );
 reassemble_len_ += size(data);
 buffer_.emplace_hint(buffer_.erase(lower, upper), first_index, move(data));
void Reassembler::insert( uint64_t first_index, string data, bool is_last_substring )
{
  // Your code here.
  const auto try_close = [&]() noexcept -> void {
    if ( end_idx_.has_value() && end_idx_.value() == next_unassem_idx_ ) {
      output_.writer().close();
    }
  };
  if ( data.empty() ) {
    if ( !end_idx_.has_value() && is_last_substring ) {
      end_idx_.emplace( first_index );
    }
    return try_close();
  }
  if ( writer().is_closed() || writer().available_capacity() == 0 ) {
    return;
  }
  uint64_t unassembled_index { next_unassem_idx_ };
  uint64_t unacceptable_index { unassembled_index + writer().available_capacity() };
  // 将不再[unassembled_index, unacceptable_index)的字符串进行处理
  if ( first_index + size( data ) <= unassembled_index || first_index >= unacceptable_index ) {
    return; // Out of ranger
  }
  if ( first_index + size( data ) > unacceptable_index ) { // Remove unacceptable bytes
    data.resize( unacceptable_index - first_index );
    is_last_substring = false;
  }
  if ( first_index < unassembled_index ) { // Remove poped/buffered bytes
    data.erase( 0, unassembled_index - first_index );
    first_index = unassembled_index;
  }
  if ( !end_idx_.has_value() and is_last_substring ) {
    end_idx_.emplace( first_index + size( data ) );
  }
  // 对字符串进行分割
  // TODO: 可以对字符串移除进行优化
  auto upper { split( first_index + size( data ) ) };
  auto lower { split( first_index ) };
  // 对[lower, upper)的迭代器进行视图转换,然后更新reassemble的大小
  ranges::for_each( ranges::subrange( lower, upper ) | views::values,
                    [&]( const auto& str ) { reassemble_len_ -= str.size(); } );
  reassemble_len_ += size( data );
  buffer_.emplace_hint( buffer_.erase( lower, upper ), first_index, move( data ) );
  // 向writer中写入已经排好序的报文
  while ( !buffer_.empty() ) {
    auto&& [index, packet] { *buffer_.begin() };
    if ( index != next_unassem_idx_ ) {
      break;
    }
    reassemble_len_ -= size( packet );
    next_unassem_idx_ += size( packet );
    output_.writer().push( move( packet ) );
    buffer_.erase( buffer_.begin() );
  }
  return try_close();
}
TCPReceiver¶
- 64bit转32bit ------- 32bit转64bit:要求最近且不溢出
Wrap32 Wrap32::wrap( uint64_t n, Wrap32 zero_point )
{
  // Your code here.
  return zero_point + static_cast<uint32_t>( n );
}
uint64_t Wrap32::unwrap( Wrap32 zero_point, uint64_t checkpoint ) const
{
  // Your code here.
  const uint64_t n_low32 { this->raw_value_ - zero_point.raw_value_ };
  const uint64_t c_low32 { checkpoint & MASK_LOW_32 };
  const uint64_t res { ( checkpoint & MASK_HIGH_32 ) | n_low32 };
  if ( res >= BASE and n_low32 > c_low32 and ( n_low32 - c_low32 ) > ( BASE / 2 ) ) {
    return res - BASE;
  }
  if ( res < MASK_HIGH_32 and c_low32 > n_low32 and ( c_low32 - n_low32 ) > ( BASE / 2 ) ) {
    return res + BASE;
  }
  return res;
}
- receive方法将序列号转换为- payload索引;- send方法返回确认的ACK
void TCPReceiver::receive( TCPSenderMessage message )
{
  // Your code here.
  if ( writer().has_error() )
    return;
  if ( message.RST ) {
    reader().set_error();
    return;
  }
  if ( !zero_point_.has_value() ) {
    if ( !message.SYN ) {
      return;
    }
    zero_point_.emplace( message.seqno );
  }
  const uint64_t checkpoint { writer().bytes_pushed() + 1 /* SYN */ }; // abs_seqno for expecting payload
  const uint64_t absolute_seqno { message.seqno.unwrap( zero_point_.value(), checkpoint ) };
  const uint64_t stream_index { absolute_seqno + static_cast<uint64_t>( message.SYN ) - 1 /* SYN */ };
  reassembler_.insert( stream_index, move( message.payload ), message.FIN );
}
TCPReceiverMessage TCPReceiver::send() const
{
  // Your code here.
  const uint16_t window_size { writer().available_capacity() > UINT16_MAX
                                 ? static_cast<uint16_t>( UINT16_MAX )
                                 : static_cast<uint16_t>( writer().available_capacity() ) };
  if ( zero_point_.has_value() ) {
    const uint64_t ack_for_seqno { writer().bytes_pushed() + 1 + static_cast<uint64_t>( writer().is_closed() ) };
    return { Wrap32::wrap( ack_for_seqno, zero_point_.value() ), window_size, writer().has_error() };
  }
  return { nullopt, window_size, writer().has_error() };
}
TCPSender¶
Timer实现¶
- 控制启动、关闭、重装 RTO、重置定时器;
- 使用tick接口完成计时;
- 按需使得 RTO翻倍。
class Timer
{
public:
  explicit Timer( uint64_t initial_RTO_ms ) : RTO_ms_( initial_RTO_ms ) {}
  [[nodiscard]] constexpr bool is_active() noexcept { return is_active_; }
  [[nodiscard]] constexpr bool is_expired() noexcept { return is_active_ && running_time_ >= RTO_ms_; }
  constexpr void reset() noexcept { running_time_ = 0; }
  constexpr void exponential_back() noexcept { RTO_ms_ *= 2; }
  constexpr void reload( uint64_t initail_RTO_ms ) noexcept
  {
    RTO_ms_ = initail_RTO_ms;
    reset();
  }
  constexpr void start() noexcept
  {
    is_active_ = true;
    reset();
  }
  constexpr void stop() noexcept
  {
    is_active_ = false;
    reset();
  }
  constexpr Timer& tick( uint64_t ticks ) noexcept
  {
    running_time_ += is_active_ ? ticks : 0;
    return *this;
  }
private:
  uint64_t RTO_ms_;
  bool is_active_ {};
  uint64_t running_time_ {};
};
方法实现¶
- push尽可能在一个窗口将- reader缓存未发送的数据发送;- receive确认当前跟踪的消息ack均小于传来的ack,不再跟踪消息;- tick实现指数回退超时和重传
void TCPSender::push( const TransmitFunction& transmit )
{
  // Your code here.
  // 尽可能多的在一个窗口发送数据
  while ( ( window_size_ == 0 ? 1 : window_size_ ) > total_outstanding_ ) {
    if ( FIN_sent_ )
      break;
    auto message { make_empty_message() };
    // 标记初始发送的SYN包
    if ( !SYN_sent_ ) {
      message.SYN = true;
      SYN_sent_ = true;
    }
    // window_size空余容量(包含该条信息的长度)
    const uint64_t remaining { ( window_size_ == 0 ? 1 : window_size_ ) - total_outstanding_ };
    // 还能增加payload的长度
    const size_t payload_len { min( TCPConfig::MAX_PAYLOAD_SIZE, remaining - message.sequence_length() ) };
    auto&& payload { message.payload };
    while ( reader().bytes_buffered() != 0U && payload.size() < payload_len ) {
      string_view view { reader().peek() };
      // 如果当前view长度小于payload_len,需要再次读取,所以这里使用循环
      view = view.substr( 0, payload_len - payload.size() );
      payload += view;
      input_.reader().pop( view.size() );
    }
    // 何时增加FIN标记
    if ( !FIN_sent_ && remaining > message.sequence_length() && reader().is_finished() ) {
      message.FIN = true;
      FIN_sent_ = true;
    }
    if ( message.sequence_length() == 0 )
      break;
    // 传输message
    transmit( message );
    if ( !timer_.is_active() ) {
      timer_.start();
    }
    next_seqno_ += message.sequence_length();
    total_outstanding_ += message.sequence_length();
    track_messages_.push( message );
  }
}
void TCPSender::receive( const TCPReceiverMessage& msg )
{
  // Your code here.
  // 错误处理
  if ( input_.has_error() )
    return;
  if ( msg.RST ) {
    input_.set_error();
    return;
  }
  window_size_ = msg.window_size;
  if ( !msg.ackno.has_value() )
    return;
  const uint64_t recv_ack_seqno { msg.ackno->unwrap( isn_, next_seqno_ ) }; // 接收到的ack序列号
  if ( recv_ack_seqno > next_seqno_ )
    return; // 超过期待序列号的包直接丢弃
  bool acked { false };
  // 从队列中删除已经确认的包
  while ( !track_messages_.empty() ) {
    const auto& message { track_messages_.front() };
    if ( ack_seqno_ + message.sequence_length() > recv_ack_seqno )
      break; // 没有完全被确认的包丢弃
    acked = true;
    ack_seqno_ += message.sequence_length();
    total_outstanding_ -= message.sequence_length();
    track_messages_.pop();
  }
  // 收到ack包后,重置RTO
  if ( acked ) {
    total_retransmit_ = 0;
    timer_.reload( initial_RTO_ms_ );
    track_messages_.empty() ? timer_.stop() : timer_.start();
  }
}
void TCPSender::tick( uint64_t ms_since_last_tick, const TransmitFunction& transmit )
{
  // Your code here.
  // 发送的包超时
  if ( timer_.tick( ms_since_last_tick ).is_expired() ) {
    if ( track_messages_.empty() )
      return;
    transmit( track_messages_.front() );
    if ( window_size_ != 0 ) {
      total_retransmit_ += 1;
      timer_.exponential_back();
    }
    timer_.reset();
  }
}
