Zero-Copy I/O for Streaming Sockets in C++: Efficient Data Transfer with Pre-Allocated Buffers

Zero-Copy I/O for Streaming Sockets in C++: Efficient Data Transfer with Pre-Allocated Buffers

In network programming, efficient data transfer is crucial for achieving high-performance applications. One approach to optimizing data transfer is through zero-copy I/O, which eliminates unnecessary memory copies during input and output operations. In this context, a zero-copy approach is particularly useful when handling streaming sockets, where data is continuously transmitted in real-time. This article explores the concept of zero-copy I/O and provides a sample implementation in C++ using a pre-allocated buffer.

Understanding Zero-copy I/O:

Traditionally, when data is read from or written to a socket, it involves multiple memory copies. The data is first copied from the kernel buffer to the application buffer and then copied again from the application buffer to its final destination or source. These redundant memory copies can introduce significant overhead and impact performance.

Sample Implementation:

ZeroCopyInputStream and ZeroCopyOutputStream To demonstrate zero-copy I/O for streaming sockets, let's consider a sample implementation in C++ using a pre-allocated buffer. We will create two classes: ZeroCopyInputStream and ZeroCopyOutputStream.

ZeroCopyInputStream:

  • Responsible for reading data from the streaming socket using a pre-allocated buffer.

  • Manages the buffer's position and size to facilitate efficient reading operations.

  • Utilizes system calls like recv or other socket-specific functions to receive data directly into the buffer.

ZeroCopyOutputStream:

  • Handles writing data to the streaming socket using a pre-allocated buffer.

  • Manages the buffer's position and size to enable efficient writing operations.

  • Utilizes system calls like send or other socket-specific functions to directly transmit data from the buffer.

By using these classes, you can achieve zero-copy I/O operations for streaming sockets, eliminating unnecessary memory copies and optimizing data transfer efficiency.

Benefits:-

  1. Reduced memory copies: Eliminating intermediate memory copies minimizes data movement, leading to improved performance and reduced CPU overhead.

  2. Lower latency: By reducing the time required for copying data, zero-copy I/O can help decrease overall latency in data transfer.

  3. Efficient resource utilization: Zero-copy I/O allows for more efficient usage of memory, CPU, and network resources, enabling higher scalability and throughput.

Let's handle the streaming sockets with pre- allocated buffers:-

we can use the Zerocopyinputstream and zerocopycopyoutputstream classes from the protocol buffers library to handle the streaming sockets.

#include <iostream>
#include <google/protobuf/io/zero_copy_stream_impl.h>

// Function to simulate receiving data from a socket
void SimulateSocketReceive(char* buffer, int size) {
    // Simulating receiving data from a socket
    // Here, you would read data from the socket into the buffer
    // For demonstration purposes, let's fill the buffer with dummy data
    for (int i = 0; i < size; ++i) {
        buffer[i] = 'A' + (i % 26);
    }
}

// Function to simulate sending data over a socket
void SimulateSocketSend(const char* buffer, int size) {
    // Simulating sending data over a socket
    // Here, you would write data from the buffer to the socket
    // For demonstration purposes, let's print the data instead
    std::cout.write(buffer, size);
}

int main() {
    // Pre-allocated buffer
    const int kBufferSize = 1024;
    char buffer[kBufferSize];

    // Create ZeroCopyInputStream and ZeroCopyOutputStream objects
    google::protobuf::io::ArrayInputStream input_stream(buffer, kBufferSize);
    google::protobuf::io::ArrayOutputStream output_stream(buffer, kBufferSize);

    // Simulate receiving data from a socket into the buffer
    SimulateSocketReceive(buffer, kBufferSize);

    // Read data from the ZeroCopyInputStream
    char read_buffer[256];
    int read_size = 256;
    while (input_stream.Next(reinterpret_cast<void**>(&read_buffer), &read_size)) {
        // Process or use the read data
        // Here, we'll simulate sending the read data over a socket
        SimulateSocketSend(read_buffer, read_size);
    }

    return 0;
}
  • In the code above, we first include the necessary headers for working with ZeroCopyInputStream and ZeroCopyOutputStream from the Protocol Buffers library. Then, we define a function SimulateSocketReceive to simulate receiving data from a socket into a pre-allocated buffer.

  • In the main() function, we create an instance of ArrayInputStream using the pre-allocated buffer. This ArrayInputStream acts as a stream that reads data from the buffer. Similarly, we create an instance of ArrayOutputStream that writes data to the same buffer.

  • To simulate receiving data from a socket, we call the SimulateSocketReceive function, which fills the buffer with dummy data. Then, we iterate over the data in the buffer using the Next() function of input_stream, which provides a pointer to a portion of the buffer and the size of that portion.

  • Inside the loop, we simulate sending the read data over a socket by calling the SimulateSocketSend function. You can replace this function call with your own logic for sending the data over a socket.

  • This example demonstrates the basic usage of ZeroCopyInputStream and ZeroCopyOutputStream for handling streaming sockets with pre-allocated buffers

let's build the reader-stream header:

The provided code is a header file that defines a class called ZeroCopyNetworkReaderStream, which is a custom implementation of ZeroCopyInputStream from the Protocol Buffers library. Let's go through the code and explain its purpose and functionality:

#ifndef __ZERO_COPY_NETWORK_READER_STREAM_H__
#define __ZERO_COPY_NETWORK_READER_STREAM_H__

#include <google/protobuf/io/zero_copy_stream.h>

namespace protobuf = google::protobuf;

The code begins with include directives for the necessary headers, including google/protobuf/io/zero_copy_stream.h. It also defines a namespace alias protobuf for google::protobuf.

class ZeroCopyNetworkReaderStream : public protobuf::io::ZeroCopyInputStream {
public:
  ZeroCopyNetworkReaderStream(
    int fd,
    protobuf::uint32 totalMessageSize,
    protobuf::uint8* pDataBuffer,
    protobuf::uint32 bufferSize);

  virtual ~ZeroCopyNetworkReaderStream();

  virtual bool Next(const void** data, int* size);

  virtual void BackUp(int count);

  virtual bool Skip(int count);

  virtual protobuf::int64 ByteCount() const;

private:
  // Private member variables

private:
 GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(ZeroCopyNetworkReaderStream);
};

The code defines the ZeroCopyNetworkReaderStream class, which inherits from protobuf::io::ZeroCopyInputStream. This class represents a stream that reads data from a network socket. It has several member functions and private member variables.

The public member functions of ZeroCopyNetworkReaderStream include:

  • The constructor ZeroCopyNetworkReaderStream takes an int fd representing the file descriptor of the network socket, a protobuf::uint32 totalMessageSize representing the total size of the message to be read, a protobuf::uint8* pDataBuffer representing the buffer to store the data, and a protobuf::uint32 bufferSize indicating the size of the buffer.

  • The destructor ~ZeroCopyNetworkReaderStream cleans up any resources held by the stream.

  • The Next function is an overridden method from ZeroCopyInputStream that retrieves the next portion of data from the stream. It takes a const void** data pointer that will be set to the data, and an int* size pointer that will be set to the size of the data. It returns true if there is more data, or false if the end of the stream is reached.

  • The BackUp function is an overridden method from ZeroCopyInputStream that moves the current position of the stream backward by count bytes. This is useful when you want to "unread" some data.

  • The Skip function is an overridden method from ZeroCopyInputStream that skips count bytes in the stream, effectively discarding that portion of data.

  • The ByteCount function is an overridden method from ZeroCopyInputStream that returns the number of bytes read from the stream so far.

The private member variables include:

  • m_fd holds the file descriptor of the network socket.

  • m_remainingMessageSizeBytes keeps track of the remaining bytes to be read from the socket.

  • m_pDataBuffer is a pointer to the buffer where the data is stored.

  • m_bufferSize represents the size of the buffer.

  • m_numBytesLastRead stores the number of bytes read during the last read operation.

  • m_backupPos maintains the position to which the stream has been backed up.

  • m_byteCount keeps track of the total number of bytes read from the stream.

GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(ZeroCopyNetworkReaderStream);

This line uses a macro to disable the copy constructor and assignment operator, preventing the class from being copied or assigned. This is a common practice when a class manages resources that should not be shared.

#endif // __ZERO_COPY_NETWORK_READER_STREAM_H__

This line is a preprocessor directive that marks the end of the header file and ensures that the contents are only included once in a translation unit.

Overall, this header file provides the declaration of the ZeroCopyNetworkReaderStream class, which is intended to be used as a custom implementation of ZeroCopyInputStream for reading data from a network socket. The implementation details of the member functions are not provided in this header file, but they should be defined in a corresponding source file.

Let's implement a reader stream class:-

The provided code is the implementation file for the ZeroCopyNetworkReaderStream class, which was declared in the header file "ZeroCopyNetworkReaderStream.h". Let's go through the code and explain its functionality:

#include "ZeroCopyNetworkReaderStream.h"
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>

The code includes the corresponding header file for the ZeroCopyNetworkReaderStream class and includes additional headers required for socket operations.

ZeroCopyNetworkReaderStream::ZeroCopyNetworkReaderStream(
  int fd,
  protobuf::uint32 totalMessageSize,
  protobuf::uint8* pDataBuffer,
  protobuf::uint32 bufferSize) :
  m_fd(fd),
  m_remainingMessageSizeBytes(totalMessageSize),
  m_pDataBuffer(pDataBuffer),
  m_bufferSize(bufferSize),
  m_numBytesLastRead(0),
  m_backupPos(m_numBytesLastRead),
  m_byteCount(0)
{
}

The constructor implementation initializes the member variables of the ZeroCopyNetworkReaderStream class using an initialization list. It takes the file descriptor fd of the network socket, the total message size totalMessageSize, a pointer pDataBuffer to the buffer for storing data, and the buffer size bufferSize.

ZeroCopyNetworkReaderStream::~ZeroCopyNetworkReaderStream()
{
}

The destructor implementation is empty, as there is no resource cleanup needed beyond the default behavior.

bool ZeroCopyNetworkReaderStream::Next(const void** data, int* size) {
  // Check if data and size pointers are valid
  if (data == 0 || size == 0) {
    return false;
  }

  *size = 0;

  // Check if the remaining message size is zero, indicating the end of the stream
  if (m_remainingMessageSizeBytes == 0) {
    return false;
  }

  // If a BackUp() call has been issued, return the backed-up data
  if (m_backupPos < m_numBytesLastRead) {
    *size = m_numBytesLastRead - m_backupPos;
    *data = &m_pDataBuffer[m_backupPos];
    // Set backup position to the end of the last read
    m_backupPos = m_numBytesLastRead;
    return true;
  }

  // Determine the number of bytes to read in the next iteration
  const protobuf::uint32 numBytesToRead = std::min(m_remainingMessageSizeBytes, m_bufferSize);

  // Receive data from the socket using recv() with MSG_WAITALL flag to ensure complete read
  ssize_t bytesRead = -1;
  do {
    bytesRead = recv(m_fd, m_pDataBuffer, numBytesToRead, MSG_WAITALL);
  } while (bytesRead == -1 && errno == EINTR);

  // Check the received data and handle possible scenarios
  if (bytesRead == 0) {
    // The peer has performed an orderly shutdown during normal receive
    return false;
  }
  if (bytesRead == -1) {
    // Failed to read message from the socket
    return false;
  }
  if (bytesRead != numBytesToRead) {
    // Partially read message
    return false;
  }

  // Set the data and size to the buffer and bytesRead values
  *data = m_pDataBuffer;
  *size = bytesRead;
  m_numBytesLastRead = m_backupPos = bytesRead;
  m_remainingMessageSizeBytes -= bytesRead;
  m_byteCount += bytesRead;

  return true;
}

The Next function implementation reads the next portion of data from the stream. It takes pointers to data and size as arguments. The function first checks if the pointers are valid and returns false if either of them is nullptr.

The function then checks if the remaining message size is zero, indicating the end of the stream, and returns false in that case.

If a BackUp() call has been issued previously, it returns the backed-up data by setting data to the backed-up position and size to the remaining size of the backed-up data.

If the above conditions are not met, the function determines the number of bytes to read in the next iteration based on the minimum of the remaining message size and the buffer size.

It uses the recv() function to receive data from the socket with the MSG_WAITALL flag, ensuring that a complete read of the specified number of bytes is performed. The function handles possible errors and scenarios where the socket is closed or partial data is read.

Finally, the function updates the data, size, and member variables accordingly, and returns true to indicate that more data is available.

void ZeroCopyNetworkReaderStream::BackUp(int count) {
  m_backupPos -= count;
}

The BackUp function implementation moves the backup position of the stream backward by the specified count bytes. It updates the m_backupPos member variable accordingly.

bool ZeroCopyNetworkReaderStream::Skip(int count) {
  // If a BackUp() call has been issued, skip the backed-up data
  if (m_backupPos < m_numBytesLastRead) {
    const protobuf::uint32 numBytesToSkipInBackup =
      std::min(m_numBytesLastRead - m_backupPos, static_cast<protobuf::uint32>(count));
    m_backupPos += numBytesToSkipInBackup;
    count -= numBytesToSkipInBackup;
  }

  // If count is still not zero, read and discard the remaining bytes from the stream
  if (count == 0) {
    return (m_remainingMessageSizeBytes > 0);
  }

  // Read and discard the remaining bytes from the stream
  char dumpBuf[128];
  protobuf::uint32 bytesToDump = std::min(m_remainingMessageSizeBytes, static_cast<protobuf::uint32>(count));
  while (bytesToDump > 0) {
    protobuf::uint32 bytesToDumpThisIteration =
      std::min(bytesToDump, static_cast<protobuf::uint32>(sizeof(dumpBuf)));
    ssize_t bytesRead = -1;
    do {
      bytesRead = recv(m_fd, dumpBuf, bytesToDumpThisIteration, MSG_WAITALL);
    } while (bytesRead == -1 && errno == EINTR);
    if (bytesRead == 0) {
      // The peer has performed an orderly shutdown during data dump
      return false;
    }
    if (bytesRead == -1) {
      // Failed to read data to dump
      return false;
    }

    bytesToDump -= bytesToDumpThisIteration;
    m_remainingMessageSizeBytes -= bytesToDumpThisIteration;
    m_byteCount += bytesToDumpThisIteration;
  }

  return (m_remainingMessageSizeBytes > 0);
}

The Skip function implementation skips count bytes in the stream. If a BackUp() call has been issued previously and there is backed-up data, it skips the backed-up data by updating the backup position and reducing the count accordingly.

If count is still not zero, indicating that more bytes need to be skipped, the function reads and discards the remaining bytes from the stream. It uses a

temporary buffer dumpBuf and recv() to read and discard the data.

The function updates the member variables and returns true if there is still remaining message size, indicating that more data is available.

protobuf::int64 ZeroCopyNetworkReaderStream::ByteCount() const {
  return m_byteCount;
}

The ByteCount function implementation returns the total number of bytes read from the stream so far, as stored in the m_byteCount member variable.

Overall, the implementation file provides the implementation of the member functions of the ZeroCopyNetworkReaderStream class, allowing it to handle reading data from a network socket and providing the necessary functionalities for navigating and managing the stream.

Let's build the writer-stream header

The provided code is a header file that defines a class called ZeroCopyNetworkWriterStream, which is a custom implementation of ZeroCopyOutputStream from the Protocol Buffers library. Let's go through the code and explain its purpose and functionality:

#ifndef __ZERO_COPY_NETWORK_WRITER_STREAM_H__
#define __ZERO_COPY_NETWORK_WRITER_STREAM_H__

#include <google/protobuf/io/zero_copy_stream.h>

namespace protobuf = google::protobuf;

The code begins with include directives for the necessary headers, including google/protobuf/io/zero_copy_stream.h. It also defines a namespace alias protobuf for google::protobuf.

class ZeroCopyNetworkWriterStream : public protobuf::io::ZeroCopyOutputStream {
public:
  ZeroCopyNetworkWriterStream(
    int fd,
    protobuf::uint8* pDataBuffer,
    protobuf::uint32 bufferSize);

  virtual ~ZeroCopyNetworkWriterStream();

  virtual bool Next(void** data, int* size);

  virtual void BackUp(int count);

  virtual protobuf::int64 ByteCount() const;

  bool Flush();

private:
  // Private member variables

private:
 GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(ZeroCopyNetworkWriterStream);
};

The code defines the ZeroCopyNetworkWriterStream class, which inherits from protobuf::io::ZeroCopyOutputStream. This class represents a stream that writes data to a network socket. It has several member functions and private member variables.

The public member functions of ZeroCopyNetworkWriterStream include:

  • The constructor ZeroCopyNetworkWriterStream takes an int fd representing the file descriptor of the network socket, a protobuf::uint8* pDataBuffer representing the buffer for data to be written, and a protobuf::uint32 bufferSize indicating the size of the buffer.

  • The destructor ~ZeroCopyNetworkWriterStream cleans up any resources held by the stream.

  • The Next function is an overridden method from ZeroCopyOutputStream that provides the next available portion of the buffer where data can be written. It takes a void** data pointer that will be set to the data buffer, and an int* size pointer that will be set to the size of the buffer. It returns true if there is more buffer space available, or false if the buffer is full.

  • The BackUp function is an overridden method from ZeroCopyOutputStream that moves the current position of the stream backward by count bytes. This is useful when you want to "unwrite" some data.

  • The ByteCount function is an overridden method from ZeroCopyOutputStream that returns the number of bytes written to the stream so far.

  • The Flush function flushes any buffered data to the network socket and ensures that it is sent. It returns true if the flush is successful, or false if there was an error.

The private member variables are not specified in the code snippet.

GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(ZeroCopyNetworkWriterStream);

This line uses a macro to disable the copy constructor and assignment operator, preventing the class from being copied or assigned. This is a common practice when a class manages resources that should not be shared.

#endif // __ZERO_COPY_NETWORK_WRITER_STREAM_H__

This line is a preprocessor directive that marks the end of the header file and ensures that the contents are only included once in a translation unit.

Overall, this header file provides the declaration of the ZeroCopyNetworkWriterStream class, which is intended to be used as a custom implementation of ZeroCopyOutputStream for writing data to a network socket. The implementation details of the member functions and private variables are not provided in this header file, but they should be defined in a corresponding source file.

Let's implement Writer stream Class:

The provided code is the implementation file for the ZeroCopyNetworkWriterStream class, which was declared in the header file "ZeroCopyNetworkWriterStream.h". Let's go through the code and explain its functionality:

#include "ZeroCopyNetworkWriterStream.h"
#include <sys/types.h>
#include <sys/socket.h>

The code includes the corresponding header file for the ZeroCopyNetworkWriterStream class and includes additional headers required for socket operations.

ZeroCopyNetworkWriterStream::ZeroCopyNetworkWriterStream(
  int fd,
  protobuf::uint8* pDataBuffer,
  protobuf::uint32 bufferSize) :
  m_fd(fd),
  m_pDataBuffer(pDataBuffer),
  m_bufferSize(bufferSize),
  m_numBytesToWrite(0),
  m_byteCount(0)
{
}

The constructor implementation initializes the member variables of the ZeroCopyNetworkWriterStream class using an initialization list. It takes the file descriptor fd of the network socket, a pointer pDataBuffer to the buffer for data to be written, and the buffer size bufferSize.

ZeroCopyNetworkWriterStream::~ZeroCopyNetworkWriterStream()
{
}

The destructor implementation is empty, as there is no resource cleanup needed beyond the default behavior.

bool ZeroCopyNetworkWriterStream::Next(void** data, int* size) {
  // If the buffer is full, flush the data
  if (m_numBytesToWrite == m_bufferSize) {
    bool bret = Flush();
    if (!bret) {
      return false;
    }
    return true;
  }

  // Set the data and size pointers to the remaining buffer space
  *size = m_bufferSize - m_numBytesToWrite;
  *data = &m_pDataBuffer[m_numBytesToWrite];
  m_numBytesToWrite += m_bufferSize - m_numBytesToWrite;

  return true;
}

The Next function implementation provides the next available portion of the buffer where data can be written. If the buffer is already full, it calls the Flush function to send the data over the network before providing the next buffer. The function sets the data pointer to the remaining buffer space and updates the size accordingly. It also updates the m_numBytesToWrite member variable.

void ZeroCopyNetworkWriterStream::BackUp(int count) {
  m_numBytesToWrite -= count;
}

The BackUp function implementation moves the position of the stream backward by the specified count bytes. It updates the m_numBytesToWrite member variable accordingly.

protobuf::int64 ZeroCopyNetworkWriterStream::ByteCount() const {
  return m_byteCount;
}

The ByteCount function implementation returns the total number of bytes written to the stream so far, as stored in the m_byteCount member variable.

bool ZeroCopyNetworkWriterStream::Flush() {
  // Send the buffered data over the network
  ssize_t writtenBytes = -1;
  do {
    writtenBytes = send(m_fd, m_pDataBuffer, m_numBytesToWrite, 0);
  } while (writtenBytes == -1 && errno == EINTR);

  // Check the written bytes and handle possible scenarios
  if (writtenBytes == 0) {
    // The peer has performed an orderly shutdown during data flushing
    return false;
  }
  if (writtenBytes == -1) {
    // Failed to write bytes to the network socket
    return false;
  }
  if (writtenBytes != m_numBytesToWrite) {
    // Unexpected error: sending seems to have succeeded but not all bytes were written
    return false;
  }

  // Update the byte count and return true to indicate successful flush
  m_byteCount += writtenBytes;

  return true;
}

The Flush function implementation flushes any buffered data to the network socket and ensures that it is sent. It uses the send() function to send the buffered data over the network. The function handles possible errors and scenarios where the socket is closed or not all bytes are written.

Finally, the function updates the m_byteCount member variable and returns true to indicate a successful flush.

Overall, the implementation file provides the implementation of the member functions of the ZeroCopyNetworkWriterStream class, allowing it to handle writing data to a network socket and providing the necessary functionalities for managing the stream and flushing the data.

Conclusion:

Zero-copy I/O provides an efficient mechanism for handling streaming sockets, enabling optimized data transfer by eliminating unnecessary memory copies. By utilizing pre-allocated buffers and appropriate system calls, you can achieve high-performance network applications with reduced latency and improved resource utilization. Implementing zero-copy I/O in C++ can significantly enhance the efficiency of your streaming socket-based solutions.

This is a flagship concept!

I hope this article will be helpfull👍