Skip to content

Commit

Permalink
support rtmp event callback
Browse files Browse the repository at this point in the history
  • Loading branch information
PHZ76 committed May 24, 2020
1 parent 23655b7 commit 08a857f
Show file tree
Hide file tree
Showing 20 changed files with 594 additions and 294 deletions.
65 changes: 65 additions & 0 deletions src/xop/HttpConnection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "HttpConnection.h"

using namespace xop;

HttpConnection::HttpConnection(mg_connection* mg_conn)
: mg_conn_(mg_conn)
, max_queue_length_(kMaxQueueLength)
, buffer_(new std::queue<Packet>)
{

}

HttpConnection::~HttpConnection()
{

}

void HttpConnection::Poll()
{
mutex_.lock();
if (!buffer_->empty()) {
OnSend();
}
mutex_.unlock();
}

SOCKET HttpConnection::GetSocket()
{
return (SOCKET)mg_conn_->sock;
}

void HttpConnection::Send(const char* data, uint32_t data_size)
{
std::lock_guard<std::mutex> locker(mutex_);

if (!mg_conn_) {
return;
}

if (buffer_->size() < max_queue_length_) {
Packet pkt;
pkt.data.reset(new char[data_size + 512]);
memcpy(pkt.data.get(), data, data_size);
pkt.size = data_size;
pkt.write_index = 0;
buffer_->emplace(std::move(pkt));
}
}

void HttpConnection::OnSend()
{
if (!buffer_->empty()) {
//printf("send_queue_len: %llu, send_buf_len: %llu, send_buf_size: %llu \n", \
// buffer_->size(), mg_conn_->send_mbuf.len, mg_conn_->send_mbuf.size);

if (mg_conn_->send_mbuf.len > kMaxBufferLen) {
//buffer_->pop();
return;
}

Packet &pkt = buffer_->front();
mg_send(mg_conn_, pkt.data.get(), pkt.size);
buffer_->pop();
}
}
49 changes: 49 additions & 0 deletions src/xop/HttpConnection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef HTTP_CONNECTION_H
#define HTTP_CONNECTION_H

extern "C" {
#include "mongoose/mongoose.h"
}
#include "net/Socket.h"
#include <cstdint>
#include <memory>
#include <mutex>
#include <queue>

namespace xop {

class HttpConnection
{
public:
HttpConnection(mg_connection* mg_conn);
virtual ~HttpConnection();

virtual void Poll();

virtual void Send(const char* data, uint32_t data_size);
virtual SOCKET GetSocket();

protected:
void OnSend();

mg_connection* mg_conn_;

std::mutex mutex_;

typedef struct {
std::shared_ptr<char> data;
uint32_t size;
uint32_t write_index;
} Packet;

std::unique_ptr<std::queue<Packet>> buffer_;
size_t max_queue_length_ = 0;

static const size_t kMaxQueueLength = 1024;
static const size_t kMaxBufferLen = 1024 * 1024;
};

}

#endif

142 changes: 40 additions & 102 deletions src/xop/HttpFlvConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,85 +4,17 @@

using namespace xop;

HttpFlvConnection::HttpFlvConnection(std::shared_ptr<RtmpServer> rtmp_server, TaskScheduler* scheduler, SOCKET sockfd)
: TcpConnection(scheduler, sockfd)
, rtmp_server_(rtmp_server)
, task_scheduler_(scheduler)
HttpFlvConnection::HttpFlvConnection(mg_connection* mg_conn)
: HttpConnection(mg_conn)
{
this->SetReadCallback([this](std::shared_ptr<TcpConnection> conn, xop::BufferReader& buffer) {
return this->OnRead(buffer);
});

this->SetCloseCallback([this](std::shared_ptr<TcpConnection> conn) {
this->OnClose();
});
}

HttpFlvConnection::~HttpFlvConnection()
{

}

bool HttpFlvConnection::OnRead(BufferReader& buffer)
{
if (buffer.FindLastCrlfCrlf() == nullptr) {
return (buffer.ReadableBytes() >= 4096) ? false : true;
}

const char* last_crlf_crlf = buffer.FindLastCrlfCrlf();
if (last_crlf_crlf == nullptr) {
return true;
}

std::string buf(buffer.Peek(), last_crlf_crlf - buffer.Peek());
buffer.RetrieveUntil(last_crlf_crlf + 4);

auto pos1 = buf.find("GET");
auto pos2 = buf.find(".flv");
if (pos1 == std::string::npos || pos2 == std::string::npos) {
return false;
}

stream_path_ = buf.substr(pos1 + 3, pos2 - 3).c_str();

pos1 = stream_path_.find_first_not_of(" ");
pos2 = stream_path_.find_last_not_of(" ");
stream_path_ = stream_path_.substr(pos1, pos2);
//printf("%s\n", stream_path_.c_str());

auto rtmp_server = rtmp_server_.lock();
if (rtmp_server) {
std::string http_header = "HTTP/1.1 200 OK\r\nContent-Type: video/x-flv\r\n\r\n";
this->Send(http_header.c_str(), (uint32_t)http_header.size());

auto session = rtmp_server->GetSession(stream_path_);
if (session != nullptr) {
session->AddHttpClient(std::dynamic_pointer_cast<HttpFlvConnection>(shared_from_this()));
}
}
else {
return false;
}

return true;
}

void HttpFlvConnection::OnClose()
{
auto rtmp_server = rtmp_server_.lock();
if (rtmp_server != nullptr) {
auto session = rtmp_server->GetSession(stream_path_);
if (session != nullptr) {
auto conn = std::dynamic_pointer_cast<HttpFlvConnection>(shared_from_this());
task_scheduler_->AddTimer([session, conn] {
session->RemoveHttpClient(conn);
return false;
}, 1);
}
}
}


bool HttpFlvConnection::SendMediaData(uint8_t type, uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payload_size)
{
if (payload_size == 0) {
Expand All @@ -102,42 +34,49 @@ bool HttpFlvConnection::SendMediaData(uint8_t type, uint64_t timestamp, std::sha
return true;
}

auto conn = std::dynamic_pointer_cast<HttpFlvConnection>(shared_from_this());
task_scheduler_->AddTriggerEvent([conn, type, timestamp, payload, payload_size] {
if (type == RTMP_VIDEO) {
if (!conn->has_key_frame_) {
uint8_t frame_type = (payload.get()[0] >> 4) & 0x0f;
uint8_t codec_id = payload.get()[0] & 0x0f;
if (frame_type == 1 && codec_id == RTMP_CODEC_ID_H264) {
conn->has_key_frame_ = true;
}
else {
return ;
}
if (type == RTMP_VIDEO) {
if (!has_key_frame_) {
uint8_t frame_type = (payload.get()[0] >> 4) & 0x0f;
uint8_t codec_id = payload.get()[0] & 0x0f;
if (frame_type == 1 && codec_id == RTMP_CODEC_ID_H264) {
has_key_frame_ = true;
}

if (!conn->has_flv_header_) {
conn->SendFlvHeader();
conn->SendFlvTag(conn->FLV_TAG_TYPE_VIDEO, 0, conn->avc_sequence_header_, conn->avc_sequence_header_size_);
conn->SendFlvTag(conn->FLV_TAG_TYPE_AUDIO, 0, conn->aac_sequence_header_, conn->aac_sequence_header_size_);
else {
return true;
}

conn->SendFlvTag(conn->FLV_TAG_TYPE_VIDEO, timestamp, payload, payload_size);
}
else if (type == RTMP_AUDIO) {
if (!conn->has_key_frame_ && conn->avc_sequence_header_size_>0) {
return ;
}
SendVideoData(timestamp, payload, payload_size);
}
else if (type == RTMP_AUDIO) {
if (!has_key_frame_ && avc_sequence_header_size_ > 0) {
return true;
}
SendAudioData(timestamp, payload, payload_size);
}

if (!conn->has_flv_header_) {
conn->SendFlvHeader();
conn->SendFlvTag(conn->FLV_TAG_TYPE_AUDIO, 0, conn->aac_sequence_header_, conn->aac_sequence_header_size_);
}
return true;
}

conn->SendFlvTag(conn->FLV_TAG_TYPE_AUDIO, timestamp, payload, payload_size);
}
});
bool HttpFlvConnection::SendVideoData(uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payload_size)
{
if (!has_flv_header_) {
SendFlvHeader();
SendFlvTag(FLV_TAG_TYPE_VIDEO, 0, avc_sequence_header_, avc_sequence_header_size_);
SendFlvTag(FLV_TAG_TYPE_AUDIO, 0, aac_sequence_header_, aac_sequence_header_size_);
}

SendFlvTag(FLV_TAG_TYPE_VIDEO, timestamp, payload, payload_size);
return true;
}

bool HttpFlvConnection::SendAudioData(uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payload_size)
{
if (!has_flv_header_) {
SendFlvHeader();
SendFlvTag(FLV_TAG_TYPE_AUDIO, 0, aac_sequence_header_, aac_sequence_header_size_);
}

SendFlvTag(FLV_TAG_TYPE_AUDIO, timestamp, payload, payload_size);
return true;
}

Expand All @@ -149,7 +88,7 @@ void HttpFlvConnection::SendFlvHeader()
flv_header[4] |= 0x1;
}

if (aac_sequence_header_size_ > 0) {
if (aac_sequence_header_size_ > 0) {
flv_header[4] |= 0x4;
}

Expand Down Expand Up @@ -181,6 +120,5 @@ int HttpFlvConnection::SendFlvTag(uint8_t type, uint64_t timestamp, std::shared_
this->Send(tag_header, 11);
this->Send(payload.get(), payload_size);
this->Send(previous_tag_size, 4);

return 0;
}
31 changes: 12 additions & 19 deletions src/xop/HttpFlvConnection.h
Original file line number Diff line number Diff line change
@@ -1,42 +1,35 @@
#ifndef XOP_HTTP_FLV_CONNECTION_H
#define XOP_HTTP_FLV_CONNECTION_H

#include "net/EventLoop.h"
#include "net/TcpConnection.h"
#include "HttpConnection.h"
#include "RtmpSink.h"

namespace xop
{

class RtmpServer;

class HttpFlvConnection : public TcpConnection
class HttpFlvConnection : public HttpConnection, public RtmpSink
{
public:
HttpFlvConnection(std::shared_ptr<RtmpServer> rtmp_server, TaskScheduler* taskScheduler, SOCKET sockfd);
HttpFlvConnection(mg_connection* mg_conn);
virtual ~HttpFlvConnection();

bool HasFlvHeader() const
{ return has_flv_header_; }

bool IsPlaying() const
{ return is_playing_; }
virtual bool IsPlaying() override { return is_playing_; }
virtual bool IsPlayer() override { return true; }

bool SendMediaData(uint8_t type, uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payload_size);
virtual bool SendMediaData(uint8_t type, uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payload_size) override;
virtual bool SendVideoData(uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payload_size) override;
virtual bool SendAudioData(uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payload_size) override;

void ResetKeyFrame()
{ has_key_frame_ = false; }
virtual uint32_t GetId() override
{ return (uint32_t)this->GetSocket(); }

private:
friend class RtmpSession;

bool OnRead(BufferReader& buffer);
void OnClose();

bool HasFlvHeader() const { return has_flv_header_; }
void SendFlvHeader();
int SendFlvTag(uint8_t type, uint64_t timestamp, std::shared_ptr<char> payload, uint32_t payload_size);

std::weak_ptr<RtmpServer> rtmp_server_;
TaskScheduler* task_scheduler_ = nullptr;
std::string stream_path_;

std::shared_ptr<char> avc_sequence_header_;
Expand Down
Loading

0 comments on commit 08a857f

Please sign in to comment.