Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
PHZ76 committed Sep 6, 2019
1 parent 4e6af77 commit dc64974
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 85 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

后续计划
-
* 多线程支持
* HLS支持
* 性能优化

联系方式
-
Expand Down
65 changes: 35 additions & 30 deletions src/xop/HttpFlvConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ using namespace xop;
HttpFlvConnection::HttpFlvConnection(RtmpServer *rtmpServer, TaskScheduler* taskScheduler, SOCKET sockfd)
: TcpConnection(taskScheduler, sockfd)
, m_rtmpServer(rtmpServer)
, m_taskScheduler(taskScheduler)
{
this->setReadCallback([this](std::shared_ptr<TcpConnection> conn, xop::BufferReader& buffer) {
return this->onRead(buffer);
Expand Down Expand Up @@ -105,47 +106,51 @@ bool HttpFlvConnection::sendMediaData(uint8_t type, uint64_t timestamp, std::sha
m_aacSequenceHeaderSize = payloadSize;
return true;
}
else if (type == RTMP_VIDEO)
{
if (!m_hasKeyFrame)

auto conn = std::dynamic_pointer_cast<HttpFlvConnection>(shared_from_this());
m_taskScheduler->addTriggerEvent([conn, type, timestamp, payload, payloadSize] {
if (type == RTMP_VIDEO)
{
uint8_t frameType = (payload.get()[0] >> 4) & 0x0f;
uint8_t codecId = payload.get()[0] & 0x0f;
if (frameType == 1 && codecId == RTMP_CODEC_ID_H264)
if (!conn->m_hasKeyFrame)
{
m_hasKeyFrame = true;
uint8_t frameType = (payload.get()[0] >> 4) & 0x0f;
uint8_t codecId = payload.get()[0] & 0x0f;
if (frameType == 1 && codecId == RTMP_CODEC_ID_H264)
{
conn->m_hasKeyFrame = true;
}
else
{
return ;
}
}
else

if (!conn->m_hasFlvHeader)
{
return true;
conn->sendFlvHeader();
conn->sendFlvTag(conn->FLV_TAG_TYPE_VIDEO, 0, conn->m_avcSequenceHeader, conn->m_avcSequenceHeaderSize);
conn->sendFlvTag(conn->FLV_TAG_TYPE_AUDIO, 0, conn->m_aacSequenceHeader, conn->m_aacSequenceHeaderSize);
}
}

if (!m_hasFlvHeader)
{
this->sendFlvHeader();
this->sendFlvTag(FLV_TAG_TYPE_VIDEO, 0, m_avcSequenceHeader, m_avcSequenceHeaderSize);
this->sendFlvTag(FLV_TAG_TYPE_AUDIO, 0, m_aacSequenceHeader, m_aacSequenceHeaderSize);
conn->sendFlvTag(conn->FLV_TAG_TYPE_VIDEO, timestamp, payload, payloadSize);
}

this->sendFlvTag(FLV_TAG_TYPE_VIDEO, timestamp, payload, payloadSize);
}
else if (type == RTMP_AUDIO)
{
if (!m_hasKeyFrame && m_avcSequenceHeaderSize>0)
else if (type == RTMP_AUDIO)
{
return true;
}
if (!conn->m_hasKeyFrame && conn->m_avcSequenceHeaderSize>0)
{
return ;
}

if (!m_hasFlvHeader)
{
this->sendFlvHeader();
this->sendFlvTag(FLV_TAG_TYPE_AUDIO, 0, m_aacSequenceHeader, m_aacSequenceHeaderSize);
if (!conn->m_hasFlvHeader)
{
conn->sendFlvHeader();
conn->sendFlvTag(conn->FLV_TAG_TYPE_AUDIO, 0, conn->m_aacSequenceHeader, conn->m_aacSequenceHeaderSize);
}

conn->sendFlvTag(conn->FLV_TAG_TYPE_AUDIO, timestamp, payload, payloadSize);
}
});

this->sendFlvTag(FLV_TAG_TYPE_AUDIO, timestamp, payload, payloadSize);
}

return true;
}

Expand Down
1 change: 1 addition & 0 deletions src/xop/HttpFlvConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class HttpFlvConnection : public TcpConnection
int sendFlvTag(uint8_t type, uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payloadSize);

RtmpServer *m_rtmpServer = nullptr;
TaskScheduler* m_taskScheduler = nullptr;
std::string m_streamPath;

std::shared_ptr<char> m_avcSequenceHeader;
Expand Down
94 changes: 52 additions & 42 deletions src/xop/RtmpConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ bool RtmpConnection::handleOnStatus(RtmpMessage& rtmpMsg)
return true;
}

bool RtmpConnection::sendMetaData(AmfObjects& metaData)
bool RtmpConnection::sendMetaData(AmfObjects metaData)
{
if(this->isClosed())
{
Expand Down Expand Up @@ -1052,37 +1052,40 @@ bool RtmpConnection::sendMediaData(uint8_t type, uint64_t timestamp, std::shared
m_aacSequenceHeaderSize = payloadSize;
}

if(!m_hasKeyFrame && m_avcSequenceHeaderSize>0
&& (type != RTMP_AVC_SEQUENCE_HEADER)
&& (type != RTMP_AAC_SEQUENCE_HEADER))
{
if (this->isKeyFrame(payload, payloadSize))
{
m_hasKeyFrame = true;
}
else
auto conn = std::dynamic_pointer_cast<RtmpConnection>(shared_from_this());
m_taskScheduler->addTriggerEvent([conn, type, timestamp, payload, payloadSize] {
if (!conn->m_hasKeyFrame && conn->m_avcSequenceHeaderSize > 0
&& (type != RTMP_AVC_SEQUENCE_HEADER)
&& (type != RTMP_AAC_SEQUENCE_HEADER))
{
return true;
if (conn->isKeyFrame(payload, payloadSize))
{
conn->m_hasKeyFrame = true;
}
else
{
return ;
}
}
}

RtmpMessage rtmpMsg;
rtmpMsg._timestamp = timestamp;
rtmpMsg.streamId = m_streamId;
rtmpMsg.payload = payload;
rtmpMsg.length = payloadSize;
RtmpMessage rtmpMsg;
rtmpMsg._timestamp = timestamp;
rtmpMsg.streamId = conn->m_streamId;
rtmpMsg.payload = payload;
rtmpMsg.length = payloadSize;

if(type == RTMP_VIDEO || type == RTMP_AVC_SEQUENCE_HEADER)
{
rtmpMsg.typeId = RTMP_VIDEO;
sendRtmpChunks(RTMP_CHUNK_VIDEO_ID, rtmpMsg);
}
else if(type == RTMP_AUDIO || type == RTMP_AAC_SEQUENCE_HEADER)
{
rtmpMsg.typeId = RTMP_AUDIO;
sendRtmpChunks(RTMP_CHUNK_AUDIO_ID, rtmpMsg);
}

if (type == RTMP_VIDEO || type == RTMP_AVC_SEQUENCE_HEADER)
{
rtmpMsg.typeId = RTMP_VIDEO;
conn->sendRtmpChunks(RTMP_CHUNK_VIDEO_ID, rtmpMsg);
}
else if (type == RTMP_AUDIO || type == RTMP_AAC_SEQUENCE_HEADER)
{
rtmpMsg.typeId = RTMP_AUDIO;
conn->sendRtmpChunks(RTMP_CHUNK_AUDIO_ID, rtmpMsg);
}
});

return true;
}

Expand All @@ -1093,13 +1096,17 @@ bool RtmpConnection::sendVideoData(uint64_t timestamp, std::shared_ptr<char> pay
return false;
}

RtmpMessage rtmpMsg;
rtmpMsg.typeId = RTMP_VIDEO;
rtmpMsg._timestamp = timestamp;
rtmpMsg.streamId = m_streamId;
rtmpMsg.payload = payload;
rtmpMsg.length = payloadSize;
sendRtmpChunks(RTMP_CHUNK_VIDEO_ID, rtmpMsg);
auto conn = std::dynamic_pointer_cast<RtmpConnection>(shared_from_this());
m_taskScheduler->addTriggerEvent([conn, timestamp, payload, payloadSize] {
RtmpMessage rtmpMsg;
rtmpMsg.typeId = RTMP_VIDEO;
rtmpMsg._timestamp = timestamp;
rtmpMsg.streamId = conn->m_streamId;
rtmpMsg.payload = payload;
rtmpMsg.length = payloadSize;
conn->sendRtmpChunks(RTMP_CHUNK_VIDEO_ID, rtmpMsg);
});

return true;
}

Expand All @@ -1110,13 +1117,16 @@ bool RtmpConnection::sendAudioData(uint64_t timestamp, std::shared_ptr<char> pay
return false;
}

RtmpMessage rtmpMsg;
rtmpMsg.typeId = RTMP_AUDIO;
rtmpMsg._timestamp = timestamp;
rtmpMsg.streamId = m_streamId;
rtmpMsg.payload = payload;
rtmpMsg.length = payloadSize;
sendRtmpChunks(RTMP_CHUNK_AUDIO_ID, rtmpMsg);
auto conn = std::dynamic_pointer_cast<RtmpConnection>(shared_from_this());
m_taskScheduler->addTriggerEvent([conn, timestamp, payload, payloadSize] {
RtmpMessage rtmpMsg;
rtmpMsg.typeId = RTMP_AUDIO;
rtmpMsg._timestamp = timestamp;
rtmpMsg.streamId = conn->m_streamId;
rtmpMsg.payload = payload;
rtmpMsg.length = payloadSize;
conn->sendRtmpChunks(RTMP_CHUNK_AUDIO_ID, rtmpMsg);
});
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion src/xop/RtmpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class RtmpConnection : public TcpConnection

bool sendInvokeMessage(uint32_t csid, std::shared_ptr<char> payload, uint32_t payloadSize);
bool sendNotifyMessage(uint32_t csid, std::shared_ptr<char> payload, uint32_t payloadSize);
bool sendMetaData(AmfObjects& metaData);
bool sendMetaData(AmfObjects metaData);
bool isKeyFrame(std::shared_ptr<char> payload, uint32_t payloadSize);
bool sendMediaData(uint8_t type, uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payloadSize);
bool sendVideoData(uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payloadSize);
Expand Down
21 changes: 11 additions & 10 deletions src/xop/RtmpPublisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ int RtmpPublisher::openUrl(std::string url, int msec)
{
std::shared_ptr<RtmpConnection> rtmpConn = m_rtmpConn;
sockfd = rtmpConn->fd();
m_eventLoop->addTriggerEvent([sockfd, rtmpConn]() {
m_taskScheduler->addTriggerEvent([sockfd, rtmpConn]() {
rtmpConn->disconnect();
});
m_rtmpConn = nullptr;
Expand All @@ -142,8 +142,9 @@ int RtmpPublisher::openUrl(std::string url, int msec)
return -1;
}

m_rtmpConn.reset(new RtmpConnection((RtmpPublisher*)this, m_eventLoop->getTaskScheduler().get(), tcpSocket.fd()));
m_eventLoop->addTriggerEvent([sockfd, this]() {
m_taskScheduler = m_eventLoop->getTaskScheduler().get();
m_rtmpConn.reset(new RtmpConnection((RtmpPublisher*)this, m_taskScheduler, tcpSocket.fd()));
m_taskScheduler->addTriggerEvent([sockfd, this]() {
m_rtmpConn->handshake();
});

Expand All @@ -168,7 +169,7 @@ void RtmpPublisher::close()
{
std::shared_ptr<RtmpConnection> rtmpConn = m_rtmpConn;
SOCKET sockfd = rtmpConn->fd();
m_eventLoop->addTriggerEvent([sockfd, rtmpConn]() {
m_taskScheduler->addTriggerEvent([sockfd, rtmpConn]() {
rtmpConn->disconnect();
});
m_rtmpConn = nullptr;
Expand Down Expand Up @@ -229,10 +230,10 @@ int RtmpPublisher::pushVideoFrame(uint8_t *data, uint32_t size)
{
m_hasKeyFrame = true;
m_timestamp.reset();
m_eventLoop->addTriggerEvent([=]() {
//m_taskScheduler->addTriggerEvent([=]() {
m_rtmpConn->sendVideoData(0, m_avcSequenceHeader, m_avcSequenceHeaderSize);
m_rtmpConn->sendAudioData(0, m_aacSequenceHeader, m_aacSequenceHeaderSize);
});
//});
}
else
{
Expand Down Expand Up @@ -269,9 +270,9 @@ int RtmpPublisher::pushVideoFrame(uint8_t *data, uint32_t size)
memcpy(buffer + index, data, size);
index += size;
payloadSize = index;
m_eventLoop->addTriggerEvent([=]() {
//m_taskScheduler->addTriggerEvent([=]() {
m_rtmpConn->sendVideoData(timestamp, payload, payloadSize);
});
//});
}

return 0;
Expand Down Expand Up @@ -302,9 +303,9 @@ int RtmpPublisher::pushAudioFrame(uint8_t *data, uint32_t size)
payload.get()[0] = m_audioTag;
payload.get()[1] = 1; // 0: aac sequence header, 1: aac raw data
memcpy(payload.get() + 2, data, size);
m_eventLoop->addTriggerEvent([=]() {
//m_taskScheduler->addTriggerEvent([=]() {
m_rtmpConn->sendAudioData(timestamp, payload, payloadSize);
});
//});
}

return 0;
Expand Down
3 changes: 2 additions & 1 deletion src/xop/RtmpPublisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class RtmpPublisher : public Rtmp
RtmpPublisher() {}
bool isKeyFrame(uint8_t *data, uint32_t size);

xop::EventLoop *m_eventLoop;
xop::EventLoop *m_eventLoop = nullptr;
TaskScheduler *m_taskScheduler = nullptr;
std::mutex m_mutex;
std::shared_ptr<RtmpConnection> m_rtmpConn;

Expand Down

0 comments on commit dc64974

Please sign in to comment.