Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
PHZ76 committed Aug 30, 2019
1 parent 6e1279c commit b8eb38f
Show file tree
Hide file tree
Showing 13 changed files with 502 additions and 328 deletions.
528 changes: 264 additions & 264 deletions example/rtmp_server.cpp

Large diffs are not rendered by default.

92 changes: 58 additions & 34 deletions src/net/TcpConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,58 +35,74 @@ TcpConnection::~TcpConnection()

void TcpConnection::send(std::shared_ptr<char> data, uint32_t size)
{
if (_isClosed)
return;
if (_isClosed)
return;

{
std::lock_guard<std::mutex> lock(_mutex);
_writeBufferPtr->append(data, size);
}

_writeBufferPtr->append(data, size);
this->handleWrite();
return;
}

void TcpConnection::send(const char *data, uint32_t size)
{
if (_isClosed)
return;
if (_isClosed)
return;

{
std::lock_guard<std::mutex> lock(_mutex);
_writeBufferPtr->append(data, size);
}

_writeBufferPtr->append(data, size);
this->handleWrite();
return;
}

void TcpConnection::close()
void TcpConnection::disconnect()
{
if (_isClosed)
return;

this->handleClose();
std::lock_guard<std::mutex> lock(_mutex);
this->close();
}

void TcpConnection::handleRead()
{
if (_isClosed)
return;
{
std::lock_guard<std::mutex> lock(_mutex);

int ret = _readBufferPtr->readFd(_channelPtr->fd());
if (ret <= 0)
{
this->handleClose();
return;
}
if (_isClosed)
return;

int ret = _readBufferPtr->readFd(_channelPtr->fd());
if (ret <= 0)
{
this->close();
return;
}
}

if (_readCB)
{
bool ret = _readCB(shared_from_this(), *_readBufferPtr);
if (false == ret)
{
this->handleClose();
std::lock_guard<std::mutex> lock(_mutex);
this->close();
}
}
}

void TcpConnection::handleWrite()
{
if (_isClosed)
return;
if (_isClosed)
return;

if (!_mutex.try_lock())
{
return;
}

int ret = 0;
bool empty = false;
Expand All @@ -95,7 +111,8 @@ void TcpConnection::handleWrite()
ret = _writeBufferPtr->send(_channelPtr->fd());
if (ret < 0)
{
this->handleClose();
this->close();
_mutex.unlock();
return;
}
empty = _writeBufferPtr->isEmpty();
Expand All @@ -114,26 +131,33 @@ void TcpConnection::handleWrite()
_channelPtr->enableWriting();
_taskScheduler->updateChannel(_channelPtr);
}

_mutex.unlock();
}

void TcpConnection::handleClose()
void TcpConnection::close()
{
std::lock_guard<std::mutex> lock(_mutex);
if (!_isClosed)
{
_isClosed = true;
if (!_isClosed)
{
_isClosed = true;
_taskScheduler->removeChannel(_channelPtr);

if (_closeCB)
_closeCB(shared_from_this());
if (_closeCB)
_closeCB(shared_from_this());

if (_disconnectCB)
_disconnectCB(shared_from_this());
_disconnectCB(shared_from_this());
}
}

_taskScheduler->removeChannel(_channelPtr);
}
void TcpConnection::handleClose()
{
std::lock_guard<std::mutex> lock(_mutex);
this->close();
}

void TcpConnection::handleError()
{
this->handleClose();
std::lock_guard<std::mutex> lock(_mutex);
this->close();
}
18 changes: 11 additions & 7 deletions src/net/TcpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class TcpConnection : public std::enable_shared_from_this<TcpConnection>

void send(std::shared_ptr<char> data, uint32_t size);
void send(const char *data, uint32_t size);
void close();

void disconnect();

bool isClosed() const
{ return _isClosed; }
Expand All @@ -47,18 +48,21 @@ class TcpConnection : public std::enable_shared_from_this<TcpConnection>
virtual void handleRead();
virtual void handleWrite();
virtual void handleClose();
virtual void handleError();
virtual void handleError();

void setDisconnectCallback(const DisconnectCallback& cb)
{ _disconnectCB = cb; }

TaskScheduler *_taskScheduler;
std::shared_ptr<xop::BufferReader> _readBufferPtr;
std::shared_ptr<xop::BufferWriter> _writeBufferPtr;
TaskScheduler *_taskScheduler;
std::shared_ptr<xop::BufferReader> _readBufferPtr;
std::shared_ptr<xop::BufferWriter> _writeBufferPtr;
std::atomic_bool _isClosed;

private:
void close();

std::shared_ptr<xop::Channel> _channelPtr;
std::mutex _mutex;
std::atomic_bool _isClosed;

DisconnectCallback _disconnectCB ;
CloseCallback _closeCB;
ReadCallback _readCB;
Expand Down
14 changes: 14 additions & 0 deletions src/xop/HttpFlvConnection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#include "HttpFlvConnection.h"

using namespace xop;

HttpFlvConnection::HttpFlvConnection(TaskScheduler* taskScheduler, SOCKET sockfd)
: TcpConnection(taskScheduler, sockfd)
{

}

HttpFlvConnection::~HttpFlvConnection()
{

}
20 changes: 20 additions & 0 deletions src/xop/HttpFlvConnection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#ifndef XOP_HTTP_FLV_CONNECTION_H
#define XOP_HTTP_FLV_CONNECTION_H

#include "net/EventLoop.h"
#include "net/TcpConnection.h"

namespace xop
{

class HttpFlvConnection : public TcpConnection
{
public:
HttpFlvConnection(TaskScheduler* taskScheduler, SOCKET sockfd);
~HttpFlvConnection();
};

}


#endif
36 changes: 36 additions & 0 deletions src/xop/HttpFlvServer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#include "HttpFlvServer.h"
#include "RtmpServer.h"
#include "net/SocketUtil.h"
#include "net/Logger.h"

using namespace xop;

HttpFlvServer::HttpFlvServer(xop::EventLoop *loop, std::string ip, uint16_t port)
: TcpServer(loop, ip, port)
{
if (this->start() != 0)
{
LOG_INFO("HTTP-FLV Server listening on %u failed.", port);
}
else
{
LOG_INFO("HTTP-FLV Server listen on %u.\n", port);
}
}

HttpFlvServer::~HttpFlvServer()
{

}

void HttpFlvServer::attach(RtmpServer *rtmpServer)
{
std::lock_guard<std::mutex> locker(m_mutex);
m_rtmpServer = rtmpServer;
}

TcpConnection::Ptr HttpFlvServer::newConnection(SOCKET sockfd)
{
return std::make_shared<HttpFlvConnection>(_eventLoop->getTaskScheduler().get(), sockfd);
}

31 changes: 31 additions & 0 deletions src/xop/HttpFlvServer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#ifndef XOP_HTTP_FLV_SERVER_H
#define XOP_HTTP_FLV_SERVER_H

#include "net/TcpServer.h"
#include "HttpFlvConnection.h"
#include <mutex>

namespace xop
{
class RtmpServer;

class HttpFlvServer : public TcpServer
{
public:
HttpFlvServer(xop::EventLoop *loop, std::string ip, uint16_t port = 8080);
~HttpFlvServer();

void attach(RtmpServer *rtmpServer);

private:
TcpConnection::Ptr newConnection(SOCKET sockfd);

std::mutex m_mutex;
RtmpServer *m_rtmpServer = nullptr;
xop::EventLoop *m_eventLoop = nullptr;
};

}


#endif
33 changes: 16 additions & 17 deletions src/xop/RtmpConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,12 @@ bool RtmpConnection::handleNotify(RtmpMessage& rtmpMsg)

bool RtmpConnection::handleVideo(RtmpMessage& rtmpMsg)
{
RtmpSession::Ptr sessionPtr = m_rtmpServer->getSession(m_streamPath);
if (sessionPtr == nullptr)
{
return false;
}

uint8_t *payload = (uint8_t *)rtmpMsg.payload.get();
uint8_t frameType = (payload[0] >> 4) & 0x0f;
uint8_t codecId = payload[0] & 0x0f;
Expand All @@ -583,6 +589,7 @@ bool RtmpConnection::handleVideo(RtmpMessage& rtmpMsg)
m_avcSequenceHeaderSize = rtmpMsg.length;
m_avcSequenceHeader.reset(new char[rtmpMsg.length]);
memcpy(m_avcSequenceHeader.get(), rtmpMsg.payload.get(), rtmpMsg.length);
sessionPtr->setAvcSequenceHeader(m_avcSequenceHeader, m_avcSequenceHeaderSize);
}
}
else if (codecId == RTMP_CODEC_ID_H264)
Expand All @@ -597,19 +604,18 @@ bool RtmpConnection::handleVideo(RtmpMessage& rtmpMsg)
}
}

if(m_streamPath != "")
{
auto sessionPtr = m_rtmpServer->getSession(m_streamPath);
if(sessionPtr)
{
sessionPtr->sendMediaData(RTMP_VIDEO, rtmpMsg._timestamp, rtmpMsg.payload, rtmpMsg.length);
}
}
sessionPtr->sendMediaData(RTMP_VIDEO, rtmpMsg._timestamp, rtmpMsg.payload, rtmpMsg.length);
return true;
}

bool RtmpConnection::handleAudio(RtmpMessage& rtmpMsg)
{
RtmpSession::Ptr sessionPtr = m_rtmpServer->getSession(m_streamPath);
if (sessionPtr == nullptr)
{
return false;
}

uint8_t *payload = (uint8_t *)rtmpMsg.payload.get();
uint8_t soundFormat = (payload[0] >> 4) & 0x0f;
uint8_t soundSize = (payload[0] >> 1) & 0x01;
Expand All @@ -620,6 +626,7 @@ bool RtmpConnection::handleAudio(RtmpMessage& rtmpMsg)
m_aacSequenceHeaderSize = rtmpMsg.length;
m_aacSequenceHeader.reset(new char[rtmpMsg.length]);
memcpy(m_aacSequenceHeader.get(), rtmpMsg.payload.get(), rtmpMsg.length);
sessionPtr->setAacSequenceHeader(m_aacSequenceHeader, m_aacSequenceHeaderSize);
}
else if (soundFormat == RTMP_CODEC_ID_AAC)
{
Expand All @@ -636,15 +643,7 @@ bool RtmpConnection::handleAudio(RtmpMessage& rtmpMsg)
}
}

if(m_streamPath != "")
{
auto sessionPtr = m_rtmpServer->getSession(m_streamPath);
if(sessionPtr)
{
sessionPtr->sendMediaData(RTMP_AUDIO, rtmpMsg._timestamp, rtmpMsg.payload, rtmpMsg.length);
}
}

sessionPtr->sendMediaData(RTMP_AUDIO, rtmpMsg._timestamp, rtmpMsg.payload, rtmpMsg.length);
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions src/xop/RtmpPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ int RtmpPublisher::openUrl(std::string url, int msec)
std::shared_ptr<RtmpConnection> rtmpConn = m_rtmpConn;
sockfd = rtmpConn->fd();
m_eventLoop->addTriggerEvent([sockfd, rtmpConn]() {
rtmpConn->close();
rtmpConn->disconnect();
});
m_rtmpConn = nullptr;
}
Expand Down Expand Up @@ -169,7 +169,7 @@ void RtmpPublisher::close()
std::shared_ptr<RtmpConnection> rtmpConn = m_rtmpConn;
SOCKET sockfd = rtmpConn->fd();
m_eventLoop->addTriggerEvent([sockfd, rtmpConn]() {
rtmpConn->close();
rtmpConn->disconnect();
});
m_rtmpConn = nullptr;
m_videoTimestamp = 0;
Expand Down
Loading

0 comments on commit b8eb38f

Please sign in to comment.