Skip to content

Commit

Permalink
add rtmp client
Browse files Browse the repository at this point in the history
  • Loading branch information
PHZ76 committed Oct 21, 2019
1 parent c3f8a57 commit 548d927
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 62 deletions.
27 changes: 22 additions & 5 deletions example/rtmp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
#include "xop/RtmpServer.h"
#include "xop/HttpFlvServer.h"
#include "xop/RtmpPublisher.h"
#include "xop/RtmpClient.h"
#include "xop/H264Parser.h"
#include "net/EventLoop.h"

#define TEST_RTMP_PUSHER 1
#define TEST_RTMP_CLIENT 0
#define TEST_MULTI_THREAD 0
#define PUSH_URL "rtmp://127.0.0.1:1935/live/01"
#define RTMP_URL "rtmp://127.0.0.1:1935/live/01"
#define PUSH_FILE "./test.h264"
#define HTTP_URL "http://127.0.0.1:8080/live/01.flv"

Expand All @@ -27,7 +29,7 @@ int main(int argc, char **argv)
/* rtmp server example */
xop::RtmpServer rtmpServer(&eventLoop, "0.0.0.0", 1935);
rtmpServer.setChunkSize(60000);
rtmpServer.setGopCache(); /* enable gop cache */
//rtmpServer.setGopCache(); /* enable gop cache */

/* http-flv server example */
xop::HttpFlvServer httpFlvServer(&eventLoop, "0.0.0.0", 8080);
Expand All @@ -41,6 +43,19 @@ int main(int argc, char **argv)
t.detach();
#endif

#if TEST_RTMP_CLIENT
xop::RtmpClient rtmpClient(&eventLoop);
rtmpClient.setFrameCB([](uint8_t* payload, uint32_t length, uint8_t codecId, uint32_t timestamp) {
// handle frame ...
});

std::string status;
if (rtmpClient.openUrl(RTMP_URL, 3000, status) != 0)
{
printf("Open url %s failed, status: %s\n", RTMP_URL, status.c_str());
}
#endif

while (1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Expand Down Expand Up @@ -225,9 +240,11 @@ int test_rtmp_publisher(xop::EventLoop *eventLoop)
xop::MediaInfo mediaInfo;
xop::RtmpPublisher publisher(eventLoop);
publisher.setChunkSize(60000);
if (publisher.openUrl(PUSH_URL, 5000) < 0)

std::string status;
if (publisher.openUrl(RTMP_URL, 3000, status) < 0)
{
printf("Open url %s failed.\n", PUSH_URL);
printf("Open url %s failed, status: %s\n", RTMP_URL, status.c_str());
return -1;
}

Expand Down Expand Up @@ -261,7 +278,7 @@ int test_rtmp_publisher(xop::EventLoop *eventLoop)

hasSpsPps = true;
publisher.setMediaInfo(mediaInfo); /* set sps pps */
printf("Start rtmp pusher, rtmp url: %s , http-flv url: %s \n\n", PUSH_URL, HTTP_URL);
printf("Start rtmp pusher, rtmp url: %s , http-flv url: %s \n\n", RTMP_URL, HTTP_URL);
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions src/xop/RtmpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void RtmpClient::setFrameCB(const FrameCallback& cb)
m_frameCB = cb;
}

int RtmpClient::openUrl(std::string url, int msec)
int RtmpClient::openUrl(std::string url, int msec, std::string& status)
{
std::lock_guard<std::mutex> lock(m_mutex);

Expand Down Expand Up @@ -60,8 +60,12 @@ int RtmpClient::openUrl(std::string url, int msec)
}

m_taskScheduler = m_eventLoop->getTaskScheduler().get();
m_rtmpConn.reset(new RtmpConnection((RtmpPublisher*)this, m_taskScheduler, tcpSocket.fd()));
m_rtmpConn.reset(new RtmpConnection((RtmpClient*)this, m_taskScheduler, tcpSocket.fd()));
m_taskScheduler->addTriggerEvent([this]() {
if (m_frameCB)
{
m_rtmpConn->setPlayCB(m_frameCB);
}
m_rtmpConn->handshake();
});

Expand All @@ -75,8 +79,9 @@ int RtmpClient::openUrl(std::string url, int msec)
{
xop::Timer::sleep(100);
timeout -= 100;
} while (!m_rtmpConn->isPlaying() && timeout > 0);
} while (!m_rtmpConn->isClosed() && !m_rtmpConn->isPlaying() && timeout > 0);

status = m_rtmpConn->getStatus();
if (!m_rtmpConn->isPlaying())
{
std::shared_ptr<RtmpConnection> rtmpConn = m_rtmpConn;
Expand Down
4 changes: 2 additions & 2 deletions src/xop/RtmpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ namespace xop
class RtmpClient : public Rtmp
{
public:
using FrameCallback = std::function<void()>;
using FrameCallback = std::function<void(uint8_t* payload, uint32_t length, uint8_t codecId, uint32_t timestamp)>;

RtmpClient & operator=(const RtmpClient &) = delete;
RtmpClient(const RtmpClient &) = delete;
RtmpClient(xop::EventLoop *loop);
~RtmpClient();

void setFrameCB(const FrameCallback& cb);
int openUrl(std::string url, int msec = 0);
int openUrl(std::string url, int msec, std::string& status);
void close();
bool isConnected();

Expand Down
140 changes: 93 additions & 47 deletions src/xop/RtmpConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ bool RtmpConnection::onRead(BufferReader& buffer)
ret = handleChunk(buffer);
}

if (m_connState == HANDSHAKE_COMPLETE && m_connMode == RTMP_PUBLISHER)
if (m_connState == HANDSHAKE_COMPLETE && (m_connMode == RTMP_PUBLISHER || m_connMode == RTMP_CLIENT))
{
this->setChunkSize();
this->connect();
Expand Down Expand Up @@ -458,7 +458,7 @@ bool RtmpConnection::handleMessage(RtmpMessage& rtmpMsg)
LOG_INFO("unkonw message type : %d\n", rtmpMsg.typeId);
break;
}

if (!ret) printf("tmpMsg.typeId:%x\n", rtmpMsg.typeId);
return ret;
}

Expand Down Expand Up @@ -540,10 +540,10 @@ bool RtmpConnection::handleInvoke(RtmpMessage& rtmpMsg)

bool RtmpConnection::handleNotify(RtmpMessage& rtmpMsg)
{
if(m_streamId != rtmpMsg.streamId)
{
return false;
}
//if(m_streamId != rtmpMsg.streamId)
//{
// return false;
//}

m_amfDec.reset();
int bytesUsed = m_amfDec.decode((const char *)rtmpMsg.payload.get(), rtmpMsg.length, 1);
Expand Down Expand Up @@ -580,73 +580,109 @@ bool RtmpConnection::handleNotify(RtmpMessage& rtmpMsg)

bool RtmpConnection::handleVideo(RtmpMessage& rtmpMsg)
{
RtmpSession::Ptr sessionPtr = m_rtmpServer->getSession(m_streamPath);
if (sessionPtr == nullptr)
{
return false;
}

uint8_t type = RTMP_VIDEO;
uint8_t *payload = (uint8_t *)rtmpMsg.payload.get();
uint32_t length = rtmpMsg.length;
uint8_t frameType = (payload[0] >> 4) & 0x0f;
uint8_t codecId = payload[0] & 0x0f;

if (frameType == 1 && codecId == RTMP_CODEC_ID_H264)
if (m_connMode == RTMP_CLIENT && m_connState == START_PLAY && m_isPlaying)
{
if (payload[1] == 0)
if (m_playCB)
{
m_playCB(payload, length, codecId, (uint32_t)rtmpMsg._timestamp);
}
}
else
{
RtmpSession::Ptr sessionPtr = m_rtmpServer->getSession(m_streamPath);
if (sessionPtr == nullptr)
{
return false;
}

if (frameType == 1 && codecId == RTMP_CODEC_ID_H264)
{
m_avcSequenceHeaderSize = rtmpMsg.length;
m_avcSequenceHeader.reset(new char[rtmpMsg.length]);
memcpy(m_avcSequenceHeader.get(), rtmpMsg.payload.get(), rtmpMsg.length);
sessionPtr->setAvcSequenceHeader(m_avcSequenceHeader, m_avcSequenceHeaderSize);
type = RTMP_AVC_SEQUENCE_HEADER;
}
if (payload[1] == 0)
{
m_avcSequenceHeaderSize = length;
m_avcSequenceHeader.reset(new char[length]);
memcpy(m_avcSequenceHeader.get(), rtmpMsg.payload.get(), length);
sessionPtr->setAvcSequenceHeader(m_avcSequenceHeader, m_avcSequenceHeaderSize);
type = RTMP_AVC_SEQUENCE_HEADER;
}
}

sessionPtr->sendMediaData(type, rtmpMsg._timestamp, rtmpMsg.payload, rtmpMsg.length);
}

sessionPtr->sendMediaData(type, rtmpMsg._timestamp, rtmpMsg.payload, rtmpMsg.length);
return true;
}

bool RtmpConnection::handleAudio(RtmpMessage& rtmpMsg)
{
RtmpSession::Ptr sessionPtr = m_rtmpServer->getSession(m_streamPath);
if (sessionPtr == nullptr)
{
return false;
}

uint8_t type = RTMP_AUDIO;
uint8_t *payload = (uint8_t *)rtmpMsg.payload.get();
uint32_t length = rtmpMsg.length;
uint8_t soundFormat = (payload[0] >> 4) & 0x0f;
uint8_t soundSize = (payload[0] >> 1) & 0x01;
uint8_t soundRate = (payload[0] >> 2) & 0x03;
uint8_t codecId = payload[0] & 0x0f;

if (soundFormat == RTMP_CODEC_ID_AAC && payload[1] == 0)
if (m_connMode == RTMP_CLIENT && m_connState == START_PLAY && m_isPlaying)
{
if (m_playCB)
{
m_playCB(payload, length, codecId, (uint32_t)rtmpMsg._timestamp);
}
}
else
{
m_aacSequenceHeaderSize = rtmpMsg.length;
m_aacSequenceHeader.reset(new char[rtmpMsg.length]);
memcpy(m_aacSequenceHeader.get(), rtmpMsg.payload.get(), rtmpMsg.length);
sessionPtr->setAacSequenceHeader(m_aacSequenceHeader, m_aacSequenceHeaderSize);
type = RTMP_AAC_SEQUENCE_HEADER;
RtmpSession::Ptr sessionPtr = m_rtmpServer->getSession(m_streamPath);
if (sessionPtr == nullptr)
{
return false;
}

if (soundFormat == RTMP_CODEC_ID_AAC && payload[1] == 0)
{
m_aacSequenceHeaderSize = rtmpMsg.length;
m_aacSequenceHeader.reset(new char[rtmpMsg.length]);
memcpy(m_aacSequenceHeader.get(), rtmpMsg.payload.get(), rtmpMsg.length);
sessionPtr->setAacSequenceHeader(m_aacSequenceHeader, m_aacSequenceHeaderSize);
type = RTMP_AAC_SEQUENCE_HEADER;
}

sessionPtr->sendMediaData(type, rtmpMsg._timestamp, rtmpMsg.payload, rtmpMsg.length);
}

sessionPtr->sendMediaData(type, rtmpMsg._timestamp, rtmpMsg.payload, rtmpMsg.length);
return true;
}

bool RtmpConnection::connect()
{


AmfObjects objects;
m_amfEnc.reset();

m_amfEnc.encodeString("connect", 7);
m_amfEnc.encodeNumber((double)(++m_number));
objects["app"] = AmfObject(m_app);
objects["type"] = AmfObject(std::string("nonprivate"));
objects["swfUrl"] = AmfObject(m_rtmpPublisher->getSwfUrl());
objects["tcUrl"] = AmfObject(m_rtmpPublisher->getTcUrl());

if (m_connMode == RTMP_PUBLISHER)
{
objects["swfUrl"] = AmfObject(m_rtmpPublisher->getSwfUrl());
objects["tcUrl"] = AmfObject(m_rtmpPublisher->getTcUrl());
}
else if (m_connMode == RTMP_CLIENT)
{
objects["swfUrl"] = AmfObject(m_rtmpClient->getSwfUrl());
objects["tcUrl"] = AmfObject(m_rtmpClient->getTcUrl());
}

m_amfEnc.encodeObjects(objects);

m_connState = START_CONNECT;
sendInvokeMessage(RTMP_CHUNK_INVOKE_ID, m_amfEnc.data(), m_amfEnc.size());
return true;
Expand Down Expand Up @@ -945,30 +981,35 @@ bool RtmpConnection::handleOnStatus(RtmpMessage& rtmpMsg)
bool ret = true;

if (m_connState == START_PUBLISH || m_connState == START_PLAY)
{
{
if (m_amfDec.hasObject("code"))
{
AmfObject amfObj = m_amfDec.getObject("code");
m_status = amfObj.amf_string;
if (m_connMode == RTMP_PUBLISHER)
{
if (amfObj.amf_string != "NetStream.Publish.Start")
if (m_status == "NetStream.Publish.Start")
{
ret = false;
m_isPublishing = true;
}
else
else if(m_status == "NetStream.publish.Unauthorized"
|| m_status == "NetStream.Publish.BadConnection" /*"Connection already publishing"*/
|| m_status == "NetStream.Publish.BadName") /*Stream already publishing*/
{
m_isPublishing = true;
ret = false;
}
}
else if (m_connMode == RTMP_CLIENT)
{
if (amfObj.amf_string != "NetStream.Play.Reset" && amfObj.amf_string != "NetStream.Play.Start")
{
if (/*amfObj.amf_string == "NetStream.Play.Reset" || */m_status == "NetStream.Play.Start")
{
ret = false;
m_isPlaying = true;
}
else
else if(m_status == "NetStream.play.Unauthorized"
|| m_status == "NetStream.Play.UnpublishNotify" /*"stream is now unpublished."*/
|| m_status == "NetStream.Play.BadConnection") /*"Connection already playing"*/
{
m_isPlaying = true;
ret = false;
}
}
}
Expand Down Expand Up @@ -1049,6 +1090,11 @@ void RtmpConnection::setChunkSize()
sendRtmpChunks(RTMP_CHUNK_CONTROL_ID, rtmpMsg);
}

void RtmpConnection::setPlayCB(const PlayCallback& cb)
{
m_playCB = cb;
}

bool RtmpConnection::sendInvokeMessage(uint32_t csid, std::shared_ptr<char> payload, uint32_t payloadSize)
{
if(this->isClosed())
Expand Down
Loading

0 comments on commit 548d927

Please sign in to comment.