Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
PHZ76 committed Apr 12, 2020
1 parent 548d927 commit b5b9832
Show file tree
Hide file tree
Showing 52 changed files with 1,657 additions and 1,617 deletions.
87 changes: 53 additions & 34 deletions src/net/Acceptor.cpp
Original file line number Diff line number Diff line change
@@ -1,53 +1,72 @@
#include "Acceptor.h"
#include "Acceptor.h"
#include "EventLoop.h"
#include "SocketUtil.h"
#include "Logger.h"

using namespace xop;

Acceptor::Acceptor(EventLoop* eventLoop, std::string ip, uint16_t port)
: _eventLoop(eventLoop)
, _tcpSocket(new TcpSocket)
Acceptor::Acceptor(EventLoop* eventLoop)
: event_loop_(eventLoop)
, tcp_socket_(new TcpSocket)
{
_tcpSocket->create();
_acceptChannel.reset(new Channel(_tcpSocket->fd()));
SocketUtil::setReuseAddr(_tcpSocket->fd());
SocketUtil::setReusePort(_tcpSocket->fd());
SocketUtil::setNonBlock(_tcpSocket->fd());
_tcpSocket->bind(ip, port);

}

Acceptor::~Acceptor()
{
_eventLoop->removeChannel(_acceptChannel);
_tcpSocket->close();

}

int Acceptor::listen()
int Acceptor::Listen(std::string ip, uint16_t port)
{
if (!_tcpSocket->listen(1024))
{
return -1;
}
_acceptChannel->setReadCallback([this]() { this->handleAccept(); });
_acceptChannel->enableReading();
_eventLoop->updateChannel(_acceptChannel);
return 0;
std::lock_guard<std::mutex> locker(mutex_);

if (tcp_socket_->GetSocket() > 0) {
tcp_socket_->Close();
}

SOCKET sockfd = tcp_socket_->Create();
channel_ptr_.reset(new Channel(sockfd));
SocketUtil::SetReuseAddr(sockfd);
SocketUtil::SetReusePort(sockfd);
SocketUtil::SetNonBlock(sockfd);

if (!tcp_socket_->Bind(ip, port)) {
return -1;
}

if (!tcp_socket_->Listen(1024)) {
return -1;
}

channel_ptr_->SetReadCallback([this]() { this->OnAccept(); });
channel_ptr_->EnableReading();
event_loop_->UpdateChannel(channel_ptr_);
return 0;
}

void Acceptor::handleAccept()
void Acceptor::Close()
{
SOCKET connfd = _tcpSocket->accept();
if (connfd > 0)
{
if (_newConnectionCallback)
{
_newConnectionCallback(connfd);
}
else
{
SocketUtil::close(connfd);
}
}
std::lock_guard<std::mutex> locker(mutex_);

if (tcp_socket_->GetSocket() > 0) {
event_loop_->RemoveChannel(channel_ptr_);
tcp_socket_->Close();
}
}

void Acceptor::OnAccept()
{
std::lock_guard<std::mutex> locker(mutex_);

SOCKET connfd = tcp_socket_->Accept();
if (connfd > 0) {
if (new_connection_callback_) {
new_connection_callback_(connfd);
}
else {
SocketUtil::Close(connfd);
}
}
}

32 changes: 15 additions & 17 deletions src/net/Acceptor.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#ifndef XOP_ACCEPTOR_H
#ifndef XOP_ACCEPTOR_H
#define XOP_ACCEPTOR_H

#include <functional>
#include <memory>
#include <mutex>
#include "Channel.h"
#include "TcpSocket.h"

Expand All @@ -16,30 +17,27 @@ class EventLoop;
class Acceptor
{
public:
Acceptor(EventLoop* eventLoop, std::string ip, uint16_t port);
~Acceptor();
Acceptor(EventLoop* eventLoop);
~Acceptor();

void setNewConnectionCallback(const NewConnectionCallback& cb)
{ _newConnectionCallback = cb; }
void SetNewConnectionCallback(const NewConnectionCallback& cb)
{ new_connection_callback_ = cb; }

void setNewConnectionCallback(NewConnectionCallback&& cb)
{ _newConnectionCallback = cb; }

int listen();
int Listen(std::string ip, uint16_t port);
void Close();

private:
void handleAccept();

EventLoop* _eventLoop = nullptr;

std::shared_ptr<TcpSocket> _tcpSocket;
ChannelPtr _acceptChannel;
void OnAccept();

NewConnectionCallback _newConnectionCallback;
EventLoop* event_loop_ = nullptr;
std::mutex mutex_;
std::unique_ptr<TcpSocket> tcp_socket_;
ChannelPtr channel_ptr_;
NewConnectionCallback new_connection_callback_;
};

}

#endif //
#endif


126 changes: 60 additions & 66 deletions src/net/BufferReader.cpp
Original file line number Diff line number Diff line change
@@ -1,115 +1,109 @@
// PHZ
// PHZ
// 2018-5-15

#include "BufferReader.h"
#include "Socket.h"
#include <cstring>

using namespace xop;
uint32_t xop::readUint32BE(char* data)
uint32_t xop::ReadUint32BE(char* data)
{
uint8_t* p = (uint8_t*)data;
uint32_t value = (p[0] << 24) | (p[1] << 16) | (p[2] << 8) | p[3];
return value;
uint8_t* p = (uint8_t*)data;
uint32_t value = (p[0] << 24) | (p[1] << 16) | (p[2] << 8) | p[3];
return value;
}

uint32_t xop::readUint32LE(char* data)
uint32_t xop::ReadUint32LE(char* data)
{
uint8_t* p = (uint8_t*)data;
uint32_t value = (p[3] << 24) | (p[2] << 16) | (p[1] << 8) | p[0];
return value;
uint8_t* p = (uint8_t*)data;
uint32_t value = (p[3] << 24) | (p[2] << 16) | (p[1] << 8) | p[0];
return value;
}

uint32_t xop::readUint24BE(char* data)
uint32_t xop::ReadUint24BE(char* data)
{
uint8_t* p = (uint8_t*)data;
uint32_t value = (p[0] << 16) | (p[1] << 8) | p[2];
return value;
uint8_t* p = (uint8_t*)data;
uint32_t value = (p[0] << 16) | (p[1] << 8) | p[2];
return value;
}

uint32_t xop::readUint24LE(char* data)
uint32_t xop::ReadUint24LE(char* data)
{
uint8_t* p = (uint8_t*)data;
uint32_t value = (p[2] << 16) | (p[1] << 8) | p[0];
return value;
uint8_t* p = (uint8_t*)data;
uint32_t value = (p[2] << 16) | (p[1] << 8) | p[0];
return value;
}

uint16_t xop::readUint16BE(char* data)
uint16_t xop::ReadUint16BE(char* data)
{
uint8_t* p = (uint8_t*)data;
uint16_t value = (p[0] << 8) | p[1];
return value;
uint8_t* p = (uint8_t*)data;
uint16_t value = (p[0] << 8) | p[1];
return value;
}

uint16_t xop::readUint16LE(char* data)
uint16_t xop::ReadUint16LE(char* data)
{
uint8_t* p = (uint8_t*)data;
uint16_t value = (p[1] << 8) | p[0];
return value;
uint8_t* p = (uint8_t*)data;
uint16_t value = (p[1] << 8) | p[0];
return value;
}

const char BufferReader::kCRLF[] = "\r\n";

BufferReader::BufferReader(uint32_t initialSize)
: _buffer(new std::vector<char>(initialSize))
: buffer_(new std::vector<char>(initialSize))
{
_buffer->resize(initialSize);
buffer_->resize(initialSize);
}

BufferReader::~BufferReader()
{

}

int BufferReader::readFd(SOCKET sockfd)
int BufferReader::Read(SOCKET sockfd)
{
uint32_t size = writableBytes();
if(size < MAX_BYTES_PER_READ) // 重新调整BufferReader大小
{
uint32_t bufferReaderSize = (uint32_t)_buffer->size();
if(bufferReaderSize > MAX_BUFFER_SIZE)
{
return 0; // close
}
uint32_t size = WritableBytes();
if(size < MAX_BYTES_PER_READ) {
uint32_t bufferReaderSize = (uint32_t)buffer_->size();
if(bufferReaderSize > MAX_BUFFER_SIZE) {
return 0;
}

_buffer->resize(bufferReaderSize + MAX_BYTES_PER_READ);
}
buffer_->resize(bufferReaderSize + MAX_BYTES_PER_READ);
}

int bytesRead = ::recv(sockfd, beginWrite(), MAX_BYTES_PER_READ, 0);
if(bytesRead > 0)
{
_writerIndex += bytesRead;
}
int bytes_read = ::recv(sockfd, beginWrite(), MAX_BYTES_PER_READ, 0);
if(bytes_read > 0) {
writer_index_ += bytes_read;
}

return bytesRead;
return bytes_read;
}


uint32_t BufferReader::readAll(std::string& data)
uint32_t BufferReader::ReadAll(std::string& data)
{
uint32_t size = readableBytes();
if(size > 0)
{
data.assign(peek(), size);
_writerIndex = 0;
_readerIndex = 0;
}

return size;
uint32_t size = ReadableBytes();
if(size > 0) {
data.assign(Peek(), size);
writer_index_ = 0;
reader_index_ = 0;
}

return size;
}

uint32_t BufferReader::readUntilCrlf(std::string& data)
uint32_t BufferReader::ReadUntilCrlf(std::string& data)
{
const char* crlf = findLastCrlf();
if(crlf == nullptr)
{
return 0;
}

uint32_t size = (uint32_t)(crlf - peek() + 2);
data.assign(peek(), size);
retrieve(size);

return size;
const char* crlf = FindLastCrlf();
if(crlf == nullptr) {
return 0;
}

uint32_t size = (uint32_t)(crlf - Peek() + 2);
data.assign(Peek(), size);
Retrieve(size);
return size;
}

Loading

0 comments on commit b5b9832

Please sign in to comment.