Minnow¶
约 1301 个字 862 行代码 1 张图片 预计阅读时间 17 分钟
- 初始的
TCPSocket
是在Linux TCP/IP栈上封装的一个类 lab4
开始自己构建TCPPeer
对端进行数据传输,直接使用网卡上的IP数据包进行TCP包的构建lab5
实现简易ARP协议,可以转换IP地址和MAC地址NetworkInterfacelab6
实现了最长前缀匹配的链路层选路算法IP routelab7
实现了一个使用IProute、NetworkInterface的端到端的TCP报文传输
数据格式¶
IP包¶
-
IPv4数据包构建:不支持IP选项
C++// IPv4 Internet datagram header (note: IP options are not supported) struct IPv4Header { static constexpr size_t LENGTH = 20; // IPv4 header length, not including options static constexpr uint8_t DEFAULT_TTL = 128; // A reasonable default TTL value static constexpr uint8_t PROTO_TCP = 6; // Protocol number for TCP static constexpr uint64_t serialized_length() { return LENGTH; } /* * 0 1 2 3 * 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * |Version| IHL |Type of Service| Total Length | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Identification |Flags| Fragment Offset | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Time to Live | Protocol | Header Checksum | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Source Address | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Destination Address | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * | Options | Padding | * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ */ // IPv4 Header fields uint8_t ver = 4; // IP version uint8_t hlen = LENGTH / 4; // header length (multiples of 32 bits) uint8_t tos = 0; // type of service uint16_t len = 0; // total length of packet uint16_t id = 0; // identification number bool df = true; // don't fragment flag bool mf = false; // more fragments flag uint16_t offset = 0; // fragment offset field uint8_t ttl = DEFAULT_TTL; // time to live field uint8_t proto = PROTO_TCP; // protocol field uint16_t cksum = 0; // checksum field uint32_t src = 0; // src address uint32_t dst = 0; // dst address // Length of the payload uint16_t payload_length() const; // Pseudo-header's contribution to the TCP checksum uint32_t pseudo_checksum() const; // Set checksum to correct value void compute_checksum(); // Return a string containing a header in human-readable format std::string to_string() const; void parse( Parser& parser ); void serialize( Serializer& serializer ) const; };
TCP报文¶
- TCPMessage包含TCPSenderMessage和TCPReceiverMessage
struct TCPMessage
{
TCPSenderMessage sender {};
TCPReceiverMessage receiver {};
};
struct TCPSenderMessage
{
Wrap32 seqno { 0 };
bool SYN {};
std::string payload {};
bool FIN {};
bool RST {};
// How many sequence numbers does this segment use?
size_t sequence_length() const { return SYN + payload.size() + FIN; }
};
struct TCPReceiverMessage
{
std::optional<Wrap32> ackno {};
uint16_t window_size {};
bool RST {};
};
-
在IP上构建自己的TCP报文;将IP数据包通过TUN设备与网络上其他TCP进行通信
C++class TCPOverIPv4Adapter : public FdAdapterBase { public: std::optional<TCPMessage> unwrap_tcp_in_ip( const InternetDatagram& ip_dgram ); InternetDatagram wrap_tcp_in_ip( const TCPMessage& msg ); }; optional<TCPMessage> TCPOverIPv4Adapter::unwrap_tcp_in_ip( const InternetDatagram& ip_dgram ) { // is the IPv4 datagram for us? // Note: it's valid to bind to address "0" (INADDR_ANY) and reply from actual address contacted if ( not listening() and ( ip_dgram.header.dst != config().source.ipv4_numeric() ) ) { return {}; } // is the IPv4 datagram from our peer? if ( not listening() and ( ip_dgram.header.src != config().destination.ipv4_numeric() ) ) { return {}; } // does the IPv4 datagram claim that its payload is a TCP segment? if ( ip_dgram.header.proto != IPv4Header::PROTO_TCP ) { return {}; } // is the payload a valid TCP segment? TCPSegment tcp_seg; if ( not parse( tcp_seg, ip_dgram.payload, ip_dgram.header.pseudo_checksum() ) ) { return {}; } // is the TCP segment for us? if ( tcp_seg.udinfo.dst_port != config().source.port() ) { return {}; } // should we target this source addr/port (and use its destination addr as our source) in reply? if ( listening() ) { if ( tcp_seg.message.sender.SYN and not tcp_seg.message.sender.RST ) { config_mutable().source = Address { inet_ntoa( { htobe32( ip_dgram.header.dst ) } ), config().source.port() }; config_mutable().destination = Address { inet_ntoa( { htobe32( ip_dgram.header.src ) } ), tcp_seg.udinfo.src_port }; set_listening( false ); } else { return {}; } } // is the TCP segment from our peer? if ( tcp_seg.udinfo.src_port != config().destination.port() ) { return {}; } return tcp_seg.message; } //! Takes a TCP segment, sets port numbers as necessary, and wraps it in an IPv4 datagram //! \param[in] seg is the TCP segment to convert InternetDatagram TCPOverIPv4Adapter::wrap_tcp_in_ip( const TCPMessage& msg ) { TCPSegment seg { .message = msg }; // set the port numbers in the TCP segment seg.udinfo.src_port = config().source.port(); seg.udinfo.dst_port = config().destination.port(); // create an Internet Datagram and set its addresses and length InternetDatagram ip_dgram; ip_dgram.header.src = config().source.ipv4_numeric(); ip_dgram.header.dst = config().destination.ipv4_numeric(); ip_dgram.header.len = ip_dgram.header.hlen * 4 + 20 /* tcp header len */ + seg.message.sender.payload.size(); // set payload, calculating TCP checksum using information from IP header seg.compute_checksum( ip_dgram.header.pseudo_checksum() ); ip_dgram.header.compute_checksum(); ip_dgram.payload = serialize( seg ); return ip_dgram; }
FD操作¶
- 对FD的操作比如write和read,都是对系统调用的封装
TCP工作流程¶
- 事件驱动模式进行TCP的监听,调用poll系统调用接收消息;设置回调,将网络上的IP数据包传递到我们设计的TCP上
- TCPPeer构建测试程序,同样使用事件驱动,只是回调略有不同
初始化TCP¶
- 设置poll需要监听的事件
1. 收到数据包,将TCP报文传递给TCPPeer接收
2. 读取pipe
的字节传递给TCPPeer:用户在命令行输入字符串使用TCPSender发送到对端
3. 从Reassembler
中读取字节写入LocalStreamSocket
:用户从reassembler中读取字符串,写入unix域套接字
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_initialize_TCP( const TCPConfig& config )
{
_tcp.emplace( config );
// Set up the event loop
// There are three events to handle:
//
// 1) Incoming datagram received (needs to be given to TCPPeer::receive method)
//
// 2) Outbound bytes received from local application via a write()
// call (needs to be read from the local stream socket and
// given to TCPPeer)
//
// 3) Incoming bytes reassembled by the Reassembler
// (needs to be read from the inbound_stream and written
// to the local stream socket back to the application)
// rule 1: read from filtered packet stream and dump into TCPConnection
_eventloop.add_rule(
"receive TCP segment from the network",
_datagram_adapter.fd(),
Direction::In,
[&] {
if ( auto seg = _datagram_adapter.read() ) {
_tcp->receive( std::move( seg.value() ), [&]( auto x ) { _datagram_adapter.write( x ); } );
}
// debugging output:
if ( _thread_data.eof() and _tcp.value().sender().sequence_numbers_in_flight() == 0 and not _fully_acked ) {
std::cerr << "DEBUG: minnow outbound stream to " << _datagram_adapter.config().destination.to_string()
<< " has been fully acknowledged.\n";
_fully_acked = true;
}
},
[&] { return _tcp->active(); } );
// rule 2: read from pipe into outbound buffer
_eventloop.add_rule(
"push bytes to TCPPeer",
_thread_data,
Direction::In,
[&] {
std::string data;
data.resize( _tcp->outbound_writer().available_capacity() );
_thread_data.read( data );
_tcp->outbound_writer().push( move( data ) );
if ( _thread_data.eof() ) {
_tcp->outbound_writer().close();
_outbound_shutdown = true;
// debugging output:
std::cerr << "DEBUG: minnow outbound stream to " << _datagram_adapter.config().destination.to_string()
<< " finished (" << _tcp.value().sender().sequence_numbers_in_flight() << " seqno"
<< ( _tcp.value().sender().sequence_numbers_in_flight() == 1 ? "" : "s" )
<< " still in flight).\n";
}
_tcp->push( [&]( auto x ) { _datagram_adapter.write( x ); } );
},
[&] {
return ( _tcp->active() ) and ( not _outbound_shutdown )
and ( _tcp->outbound_writer().available_capacity() > 0 );
},
[&] {
_tcp->outbound_writer().close();
_outbound_shutdown = true;
},
[&] {
std::cerr << "DEBUG: minnow outbound stream had error.\n";
_tcp->outbound_writer().set_error();
} );
// rule 3: read from inbound buffer into pipe
_eventloop.add_rule(
"read bytes from inbound stream",
_thread_data,
Direction::Out,
[&] {
Reader& inbound = _tcp->inbound_reader();
// Write from the inbound_stream into
// the pipe, handling the possibility of a partial
// write (i.e., only pop what was actually written).
if ( inbound.bytes_buffered() ) {
const std::string_view buffer = inbound.peek();
const auto bytes_written = _thread_data.write( buffer );
inbound.pop( bytes_written );
}
if ( inbound.is_finished() or inbound.has_error() ) {
_thread_data.shutdown( SHUT_WR );
_inbound_shutdown = true;
// debugging output:
std::cerr << "DEBUG: minnow inbound stream from " << _datagram_adapter.config().destination.to_string()
<< " finished " << ( inbound.has_error() ? "uncleanly.\n" : "cleanly.\n" );
}
},
[&] {
return _tcp->inbound_reader().bytes_buffered()
or ( ( _tcp->inbound_reader().is_finished() or _tcp->inbound_reader().has_error() )
and not _inbound_shutdown );
},
[&] {},
[&] {
std::cerr << "DEBUG: minnow inbound stream had error.\n";
_tcp->inbound_reader().set_error();
} );
}
connect¶
-
connect
事件是向TCP对端写数据(SYN包)C++template<TCPDatagramAdapter AdaptT> void TCPMinnowSocket<AdaptT>::connect( const TCPConfig& c_tcp, const FdAdapterConfig& c_ad ) { if ( _tcp ) { throw std::runtime_error( "connect() with TCPConnection already initialized" ); } _initialize_TCP( c_tcp ); _datagram_adapter.config_mut() = c_ad; std::cerr << "DEBUG: minnow connecting to " << c_ad.destination.to_string() << "...\n"; if ( not _tcp.has_value() ) { throw std::runtime_error( "TCPPeer not successfully initialized" ); } _tcp->push( [&]( auto x ) { _datagram_adapter.write( x ); } ); if ( _tcp->sender().sequence_numbers_in_flight() != 1 ) { throw std::runtime_error( "After TCPConnection::connect(), expected sequence_numbers_in_flight() == 1" ); } _tcp_loop( [&] { return _tcp->sender().sequence_numbers_in_flight() == 1; } ); if ( _tcp->inbound_reader().has_error() ) { std::cerr << "DEBUG: minnow error on connecting to " << c_ad.destination.to_string() << ".\n"; } else { std::cerr << "DEBUG: minnow successfully connected to " << c_ad.destination.to_string() << ".\n"; } _tcp_thread = std::thread( &TCPMinnowSocket::_tcp_main, this ); }
listen_and_accept¶
listen_and_accept
监听对端是否回复ack
C++template<TCPDatagramAdapter AdaptT> void TCPMinnowSocket<AdaptT>::listen_and_accept( const TCPConfig& c_tcp, const FdAdapterConfig& c_ad ) { if ( _tcp ) { throw std::runtime_error( "listen_and_accept() with TCPConnection already initialized" ); } _initialize_TCP( c_tcp ); _datagram_adapter.config_mut() = c_ad; _datagram_adapter.set_listening( true ); std::cerr << "DEBUG: minnow listening for incoming connection...\n"; _tcp_loop( [&] { return ( not _tcp->has_ackno() ) or ( _tcp->sender().sequence_numbers_in_flight() ); } ); std::cerr << "DEBUG: minnow new connection from " << _datagram_adapter.config().destination.to_string() << ".\n"; _tcp_thread = std::thread( &TCPMinnowSocket::_tcp_main, this ); }
执行主函数¶
-
一直循环,真正结束的函数在
_tcp_loop
-
_tcp_loop
结束在poll等待事件结束 -
出错
- TCP连接的fd关闭
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_tcp_main()
{
try {
if ( not _tcp.has_value() ) {
throw std::runtime_error( "no TCP" );
}
_tcp_loop( [] { return true; } );
shutdown( SHUT_RDWR );
if ( not _tcp.value().active() ) {
std::cerr << "DEBUG: minnow TCP connection finished "
<< ( _tcp->inbound_reader().has_error() ? "uncleanly.\n" : "cleanly.\n" );
}
_tcp.reset();
} catch ( const std::exception& e ) {
std::cerr << "Exception in TCPConnection runner thread: " << e.what() << "\n";
throw e;
}
}
//! \param[in] condition is a function returning true if loop should continue
template<TCPDatagramAdapter AdaptT>
void TCPMinnowSocket<AdaptT>::_tcp_loop( const std::function<bool()>& condition )
{
auto base_time = timestamp_ms();
while ( condition() ) {
auto ret = _eventloop.wait_next_event( TCP_TICK_MS );
if ( ret == EventLoop::Result::Exit or _abort ) {
break;
}
if ( not _tcp.has_value() ) {
throw std::runtime_error( "_tcp_loop entered before TCPPeer initialized" );
}
if ( _tcp.value().active() ) {
const auto next_time = timestamp_ms();
_tcp.value().tick( next_time - base_time, [&]( auto x ) { _datagram_adapter.write( x ); } );
_datagram_adapter.tick( next_time - base_time );
base_time = next_time;
}
}
}
TCPPeer¶
- 接收TCPMessage,然后向对端发送消息
class TCPPeer
{
auto make_send( const auto& transmit )
{
return [&]( const TCPSenderMessage& x ) { send( x, transmit ); };
}
public:
void receive( TCPMessage msg, const TransmitFunction& transmit )
{
if ( not active() ) {
return;
}
// Record time in case this peer has to linger after streams finish.
time_of_last_receipt_ = cumulative_time_;
// If SenderMessage occupies a sequence number, make sure to reply.
need_send_ |= ( msg.sender.sequence_length() > 0 );
// If SenderMessage is a "keep-alive" (with intentionally invalid seqno), make sure to reply.
// (N.B. orthodox TCP rules require a reply on any unacceptable segment.)
const auto our_ackno = receiver_.send().ackno;
need_send_ |= ( our_ackno.has_value() and msg.sender.seqno + 1 == our_ackno.value() );
// Did the inbound stream finish before the outbound stream? If so, no need to linger after streams finish.
if ( receiver_.writer().is_closed() and not sender_.reader().is_finished() ) {
linger_after_streams_finish_ = false;
}
// Give incoming TCPSenderMessage to receiver.
receiver_.receive( std::move( msg.sender ) );
// Give incoming TCPReceiverMessage to sender.
sender_.receive( msg.receiver );
// Send reply if needed.
if ( need_send_ ) {
send( sender_.make_empty_message(), transmit );
}
}
private:
void send( const TCPSenderMessage& sender_message, const TransmitFunction& transmit )
{
TCPMessage msg { sender_message, receiver_.send() };
transmit( std::move( msg ) );
need_send_ = false;
}
};
测试端¶
-
bidirectional_stream_copy
触发rule1和rule2;触发_initialize_TCP
的read pipe into outbound buffer;然后发送给对端数据报 -
触发
_initialize_TCP
的receive TCP segment from network
,进行消息的收发 -
rule1: read from stdin into outbound byte stream
-
rule2: read from outbound byte stream into socket
-
rule3: read from socket into inbound byte stream
-
rule4: read from inbound byte stream into stdout
void bidirectional_stream_copy( Socket& socket, string_view peer_name )
{
constexpr size_t buffer_size = 1048576;
EventLoop _eventloop {};
FileDescriptor _input { STDIN_FILENO };
FileDescriptor _output { STDOUT_FILENO };
ByteStream _outbound { buffer_size };
ByteStream _inbound { buffer_size };
bool _outbound_shutdown { false };
bool _inbound_shutdown { false };
socket.set_blocking( false );
_input.set_blocking( false );
_output.set_blocking( false );
// rule 1: read from stdin into outbound byte stream
_eventloop.add_rule(
"read from stdin into outbound byte stream",
_input,
Direction::In,
[&] {
string data;
data.resize( _outbound.writer().available_capacity() );
_input.read( data );
_outbound.writer().push( move( data ) );
if ( _input.eof() ) {
_outbound.writer().close();
}
},
[&] {
return !_outbound.has_error() and !_inbound.has_error() and ( _outbound.writer().available_capacity() > 0 )
and !_outbound.writer().is_closed();
},
[&] { _outbound.writer().close(); },
[&] {
cerr << "DEBUG: Outbound stream had error from source.\n";
_outbound.set_error();
_inbound.set_error();
} );
// rule 2: read from outbound byte stream into socket
_eventloop.add_rule(
"read from outbound byte stream into socket",
socket,
Direction::Out,
[&] {
if ( _outbound.reader().bytes_buffered() ) {
_outbound.reader().pop( socket.write( _outbound.reader().peek() ) );
}
if ( _outbound.reader().is_finished() ) {
socket.shutdown( SHUT_WR );
_outbound_shutdown = true;
cerr << "DEBUG: Outbound stream to " << peer_name << " finished.\n";
}
},
[&] {
return _outbound.reader().bytes_buffered() or ( _outbound.reader().is_finished() and not _outbound_shutdown );
},
[&] { _outbound.writer().close(); },
[&] {
cerr << "DEBUG: Outbound stream had error from destination.\n";
_outbound.set_error();
_inbound.set_error();
} );
// rule 3: read from socket into inbound byte stream
_eventloop.add_rule(
"read from socket into inbound byte stream",
socket,
Direction::In,
[&] {
string data;
data.resize( _inbound.writer().available_capacity() );
socket.read( data );
_inbound.writer().push( move( data ) );
if ( socket.eof() ) {
_inbound.writer().close();
}
},
[&] {
return !_inbound.has_error() and !_outbound.has_error() and ( _inbound.writer().available_capacity() > 0 )
and !_inbound.writer().is_closed();
},
[&] { _inbound.writer().close(); },
[&] {
cerr << "DEBUG: Inbound stream had error from source.\n";
_outbound.set_error();
_inbound.set_error();
} );
// rule 4: read from inbound byte stream into stdout
_eventloop.add_rule(
"read from inbound byte stream into stdout",
_output,
Direction::Out,
[&] {
if ( _inbound.reader().bytes_buffered() ) {
_inbound.reader().pop( _output.write( _inbound.reader().peek() ) );
}
if ( _inbound.reader().is_finished() ) {
_output.close();
_inbound_shutdown = true;
cerr << "DEBUG: Inbound stream from " << peer_name << " finished"
<< ( _inbound.has_error() ? " uncleanly.\n" : ".\n" );
}
},
[&] {
return _inbound.reader().bytes_buffered() or ( _inbound.reader().is_finished() and not _inbound_shutdown );
},
[&] { _inbound.writer().close(); },
[&] {
cerr << "DEBUG: Inbound stream had error from destination.\n";
_outbound.set_error();
_inbound.set_error();
} );
// loop until completion
while ( true ) {
if ( EventLoop::Result::Exit == _eventloop.wait_next_event( -1 ) ) {
return;
}
}
}
IP包发送接收的设备支持¶
-
构建可以通信的TCP,泛型Adapt需要满足write和read的约束;CS144TCPSocket注册了一个TUN(网络隧道)设备来发送和接收IP数据包。
-
write将我们构造的TCP包序列化后写入TUN设备
- read读取IP包头,获取TCP报头和报文(
unwrap_tcp_in_ip
)
template<class T>
concept TCPDatagramAdapter = requires( T a, TCPMessage seg ) {
{
a.write( seg )
} -> std::same_as<void>;
{
a.read()
} -> std::same_as<std::optional<TCPMessage>>;
};
//! \brief A FD adapter for IPv4 datagrams read from and written to a TUN device
class TCPOverIPv4OverTunFdAdapter : public TCPOverIPv4Adapter
{
private:
TunFD _tun;
public:
//! Construct from a TunFD
explicit TCPOverIPv4OverTunFdAdapter( TunFD&& tun ) : _tun( std::move( tun ) ) {}
//! Attempts to read and parse an IPv4 datagram containing a TCP segment related to the current connection
std::optional<TCPMessage> read();
//! Creates an IPv4 datagram from a TCP segment and writes it to the TUN device
void write( const TCPMessage& seg ) { _tun.write( serialize( wrap_tcp_in_ip( seg ) ) ); }
//! Access the underlying TUN device
explicit operator TunFD&() { return _tun; }
//! Access the underlying TUN device
explicit operator const TunFD&() const { return _tun; }
//! Access underlying file descriptor
FileDescriptor& fd() { return _tun; }
};
optional<TCPMessage> TCPOverIPv4OverTunFdAdapter::read()
{
vector<string> strs( 2 );
strs.front().resize( IPv4Header::LENGTH );
_tun.read( strs );
InternetDatagram ip_dgram;
const vector<string> buffers = { strs.at( 0 ), strs.at( 1 ) };
if ( parse( ip_dgram, buffers ) ) {
return unwrap_tcp_in_ip( ip_dgram );
}
return {};
}
TCP实现¶
- 一个TCP的对等端需要实现的核心数据结构和功能如下
class TCPPeer
{
public:
explicit TCPPeer( const TCPConfig& cfg ) : cfg_( cfg ) {}
Writer& outbound_writer() { return sender_.writer(); }
Reader& inbound_reader() { return receiver_.reader(); }
/* Passthrough methods */
void push( const TransmitFunction& transmit ) { sender_.push( make_send( transmit ) ); }
void receive( TCPMessage msg, const TransmitFunction& transmit );
// Testing interface
const TCPReceiver& receiver() const { return receiver_; }
const TCPSender& sender() const { return sender_; }
private:
TCPConfig cfg_;
TCPSender sender_ { ByteStream { cfg_.send_capacity }, cfg_.isn, cfg_.rt_timeout };
TCPReceiver receiver_ { Reassembler { ByteStream { cfg_.recv_capacity } } };
void send( const TCPSenderMessage& sender_message, const TransmitFunction& transmit );
};
重要数据结构¶
Byte_Stream¶
- 实现字符串的先进先出;
Writer
写入字符串,Reader
读出任意长度的字符 - 为了性能,使用
std::queue<std::string>
,记录弹出字符在字符串中的索引实现peek()
void Writer::push( string data )
{
// Your code here.
size_t datalen = data.size();
size_t write_avail = Writer::available_capacity();
if ( Writer::is_closed() || write_avail == 0 || data.empty() ) {
return;
}
if ( datalen > write_avail ) {
data.resize( write_avail );
}
pushed_ += data.size();
buffered_ += data.size();
data_.emplace( std::move( data ) );
}
string_view Reader::peek() const
{
// Your code here.
return data_.empty() ? string_view {} : string_view { data_.front() }.substr( read_index_ );
}
void Reader::pop( uint64_t len )
{
// Your code here.
buffered_ -= len;
poped_ += len;
while ( len != 0U ) {
uint64_t size = data_.front().size() - read_index_;
if ( len < size ) {
read_index_ += len;
break;
}
// 需要向前移动一个string
data_.pop();
read_index_ = 0;
len -= size;
}
}
Reassembler¶
- 将乱序接收的字符串变为有序字节流,使用
map
进行乱序字符串的存储;重点在于处理重叠字串
重叠字串的处理¶
- 找到第一个小于等于
pos
的迭代器,如果map中含有的字符串与要插入的字符串产生重叠,移除含有的字符串重叠部分,增加一个重叠部分的迭代器进行后续删除操作 - 先找插入与字符串末尾重叠的迭代器,防止失效
- 再找与
first_index
重叠的迭代器 -
移除[
lower
,upper
)的迭代器,插入新的字符串~c++
auto Reassembler::split( uint64_t pos ) noexcept
{
auto it { buffer_.lower_bound( pos ) };
if ( it != buffer_.end() and it->first == pos ) {
return it;
}
if ( it == buffer_.begin() ) { // if buffer_.empty() then begin() == end()
return it;
}
if ( const auto pit { prev( it ) }; pit->first + size( pit->second ) > pos ) {
// 构建重叠部分迭代器
const auto res = buffer_.emplace_hint( it, pos, pit->second.substr( pos - pit->first ) );
pit->second.resize( pos - pit->first );
return res;
}
return it;
};
// 对[lower, upper)的迭代器进行视图转换,然后更新reassemble的大小
ranges::for_each( ranges::subrange( lower, upper ) | views::values,
& { reassemble_len_ -= str.size(); } );
reassemble_len_ += size(data);
buffer_.emplace_hint(buffer_.erase(lower, upper), first_index, move(data));
void Reassembler::insert( uint64_t first_index, string data, bool is_last_substring )
{
// Your code here.
const auto try_close = [&]() noexcept -> void {
if ( end_idx_.has_value() && end_idx_.value() == next_unassem_idx_ ) {
output_.writer().close();
}
};
if ( data.empty() ) {
if ( !end_idx_.has_value() && is_last_substring ) {
end_idx_.emplace( first_index );
}
return try_close();
}
if ( writer().is_closed() || writer().available_capacity() == 0 ) {
return;
}
uint64_t unassembled_index { next_unassem_idx_ };
uint64_t unacceptable_index { unassembled_index + writer().available_capacity() };
// 将不再[unassembled_index, unacceptable_index)的字符串进行处理
if ( first_index + size( data ) <= unassembled_index || first_index >= unacceptable_index ) {
return; // Out of ranger
}
if ( first_index + size( data ) > unacceptable_index ) { // Remove unacceptable bytes
data.resize( unacceptable_index - first_index );
is_last_substring = false;
}
if ( first_index < unassembled_index ) { // Remove poped/buffered bytes
data.erase( 0, unassembled_index - first_index );
first_index = unassembled_index;
}
if ( !end_idx_.has_value() and is_last_substring ) {
end_idx_.emplace( first_index + size( data ) );
}
// 对字符串进行分割
// TODO: 可以对字符串移除进行优化
auto upper { split( first_index + size( data ) ) };
auto lower { split( first_index ) };
// 对[lower, upper)的迭代器进行视图转换,然后更新reassemble的大小
ranges::for_each( ranges::subrange( lower, upper ) | views::values,
[&]( const auto& str ) { reassemble_len_ -= str.size(); } );
reassemble_len_ += size( data );
buffer_.emplace_hint( buffer_.erase( lower, upper ), first_index, move( data ) );
// 向writer中写入已经排好序的报文
while ( !buffer_.empty() ) {
auto&& [index, packet] { *buffer_.begin() };
if ( index != next_unassem_idx_ ) {
break;
}
reassemble_len_ -= size( packet );
next_unassem_idx_ += size( packet );
output_.writer().push( move( packet ) );
buffer_.erase( buffer_.begin() );
}
return try_close();
}
TCPReceiver¶
- 64bit转32bit ------- 32bit转64bit:要求最近且不溢出
Wrap32 Wrap32::wrap( uint64_t n, Wrap32 zero_point )
{
// Your code here.
return zero_point + static_cast<uint32_t>( n );
}
uint64_t Wrap32::unwrap( Wrap32 zero_point, uint64_t checkpoint ) const
{
// Your code here.
const uint64_t n_low32 { this->raw_value_ - zero_point.raw_value_ };
const uint64_t c_low32 { checkpoint & MASK_LOW_32 };
const uint64_t res { ( checkpoint & MASK_HIGH_32 ) | n_low32 };
if ( res >= BASE and n_low32 > c_low32 and ( n_low32 - c_low32 ) > ( BASE / 2 ) ) {
return res - BASE;
}
if ( res < MASK_HIGH_32 and c_low32 > n_low32 and ( c_low32 - n_low32 ) > ( BASE / 2 ) ) {
return res + BASE;
}
return res;
}
receive
方法将序列号转换为payload
索引;send
方法返回确认的ACK
void TCPReceiver::receive( TCPSenderMessage message )
{
// Your code here.
if ( writer().has_error() )
return;
if ( message.RST ) {
reader().set_error();
return;
}
if ( !zero_point_.has_value() ) {
if ( !message.SYN ) {
return;
}
zero_point_.emplace( message.seqno );
}
const uint64_t checkpoint { writer().bytes_pushed() + 1 /* SYN */ }; // abs_seqno for expecting payload
const uint64_t absolute_seqno { message.seqno.unwrap( zero_point_.value(), checkpoint ) };
const uint64_t stream_index { absolute_seqno + static_cast<uint64_t>( message.SYN ) - 1 /* SYN */ };
reassembler_.insert( stream_index, move( message.payload ), message.FIN );
}
TCPReceiverMessage TCPReceiver::send() const
{
// Your code here.
const uint16_t window_size { writer().available_capacity() > UINT16_MAX
? static_cast<uint16_t>( UINT16_MAX )
: static_cast<uint16_t>( writer().available_capacity() ) };
if ( zero_point_.has_value() ) {
const uint64_t ack_for_seqno { writer().bytes_pushed() + 1 + static_cast<uint64_t>( writer().is_closed() ) };
return { Wrap32::wrap( ack_for_seqno, zero_point_.value() ), window_size, writer().has_error() };
}
return { nullopt, window_size, writer().has_error() };
}
TCPSender¶
Timer实现¶
- 控制启动、关闭、重装
RTO
、重置定时器; - 使用
tick
接口完成计时; - 按需使得
RTO
翻倍。
class Timer
{
public:
explicit Timer( uint64_t initial_RTO_ms ) : RTO_ms_( initial_RTO_ms ) {}
[[nodiscard]] constexpr bool is_active() noexcept { return is_active_; }
[[nodiscard]] constexpr bool is_expired() noexcept { return is_active_ && running_time_ >= RTO_ms_; }
constexpr void reset() noexcept { running_time_ = 0; }
constexpr void exponential_back() noexcept { RTO_ms_ *= 2; }
constexpr void reload( uint64_t initail_RTO_ms ) noexcept
{
RTO_ms_ = initail_RTO_ms;
reset();
}
constexpr void start() noexcept
{
is_active_ = true;
reset();
}
constexpr void stop() noexcept
{
is_active_ = false;
reset();
}
constexpr Timer& tick( uint64_t ticks ) noexcept
{
running_time_ += is_active_ ? ticks : 0;
return *this;
}
private:
uint64_t RTO_ms_;
bool is_active_ {};
uint64_t running_time_ {};
};
方法实现¶
push
尽可能在一个窗口将reader
缓存未发送的数据发送;receive
确认当前跟踪的消息ack均小于传来的ack,不再跟踪消息;tick
实现指数回退超时和重传
void TCPSender::push( const TransmitFunction& transmit )
{
// Your code here.
// 尽可能多的在一个窗口发送数据
while ( ( window_size_ == 0 ? 1 : window_size_ ) > total_outstanding_ ) {
if ( FIN_sent_ )
break;
auto message { make_empty_message() };
// 标记初始发送的SYN包
if ( !SYN_sent_ ) {
message.SYN = true;
SYN_sent_ = true;
}
// window_size空余容量(包含该条信息的长度)
const uint64_t remaining { ( window_size_ == 0 ? 1 : window_size_ ) - total_outstanding_ };
// 还能增加payload的长度
const size_t payload_len { min( TCPConfig::MAX_PAYLOAD_SIZE, remaining - message.sequence_length() ) };
auto&& payload { message.payload };
while ( reader().bytes_buffered() != 0U && payload.size() < payload_len ) {
string_view view { reader().peek() };
// 如果当前view长度小于payload_len,需要再次读取,所以这里使用循环
view = view.substr( 0, payload_len - payload.size() );
payload += view;
input_.reader().pop( view.size() );
}
// 何时增加FIN标记
if ( !FIN_sent_ && remaining > message.sequence_length() && reader().is_finished() ) {
message.FIN = true;
FIN_sent_ = true;
}
if ( message.sequence_length() == 0 )
break;
// 传输message
transmit( message );
if ( !timer_.is_active() ) {
timer_.start();
}
next_seqno_ += message.sequence_length();
total_outstanding_ += message.sequence_length();
track_messages_.push( message );
}
}
void TCPSender::receive( const TCPReceiverMessage& msg )
{
// Your code here.
// 错误处理
if ( input_.has_error() )
return;
if ( msg.RST ) {
input_.set_error();
return;
}
window_size_ = msg.window_size;
if ( !msg.ackno.has_value() )
return;
const uint64_t recv_ack_seqno { msg.ackno->unwrap( isn_, next_seqno_ ) }; // 接收到的ack序列号
if ( recv_ack_seqno > next_seqno_ )
return; // 超过期待序列号的包直接丢弃
bool acked { false };
// 从队列中删除已经确认的包
while ( !track_messages_.empty() ) {
const auto& message { track_messages_.front() };
if ( ack_seqno_ + message.sequence_length() > recv_ack_seqno )
break; // 没有完全被确认的包丢弃
acked = true;
ack_seqno_ += message.sequence_length();
total_outstanding_ -= message.sequence_length();
track_messages_.pop();
}
// 收到ack包后,重置RTO
if ( acked ) {
total_retransmit_ = 0;
timer_.reload( initial_RTO_ms_ );
track_messages_.empty() ? timer_.stop() : timer_.start();
}
}
void TCPSender::tick( uint64_t ms_since_last_tick, const TransmitFunction& transmit )
{
// Your code here.
// 发送的包超时
if ( timer_.tick( ms_since_last_tick ).is_expired() ) {
if ( track_messages_.empty() )
return;
transmit( track_messages_.front() );
if ( window_size_ != 0 ) {
total_retransmit_ += 1;
timer_.exponential_back();
}
timer_.reset();
}
}