Skip to content

Commit

Permalink
update example
Browse files Browse the repository at this point in the history
  • Loading branch information
PHZ76 committed Oct 19, 2019
1 parent 7f207a6 commit d11b1b4
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 17 deletions.
13 changes: 9 additions & 4 deletions example/rtmp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
#include "xop/H264Parser.h"
#include "net/EventLoop.h"

#define TEST_RTMP_PUSHER 1
#define TEST_RTMP_PUSHER 1
#define TEST_MULTI_THREAD 1
#define PUSH_URL "rtmp://127.0.0.1:1935/live/01"
#define PUSH_FILE "./test.h264"
#define HTTP_URL "http://127.0.0.1:8080/live/01.flv"
Expand All @@ -17,12 +18,16 @@ int test_rtmp_publisher(xop::EventLoop *eventLoop);

int main(int argc, char **argv)
{
xop::EventLoop eventLoop;
int count = 1;
#if TEST_MULTI_THREAD
count = std::thread::hardware_concurrency();
#endif
xop::EventLoop eventLoop(count);

/* rtmp server example */
xop::RtmpServer rtmpServer(&eventLoop, "0.0.0.0", 1935);
rtmpServer.setChunkSize(60000);
rtmpServer.setGopCache(); /* enable gop cache */
//rtmpServer.setGopCache(); /* enable gop cache */

/* http-flv server example */
xop::HttpFlvServer httpFlvServer(&eventLoop, "0.0.0.0", 8080);
Expand Down Expand Up @@ -220,7 +225,7 @@ int test_rtmp_publisher(xop::EventLoop *eventLoop)
xop::MediaInfo mediaInfo;
xop::RtmpPublisher publisher(eventLoop);
publisher.setChunkSize(60000);
if (publisher.openUrl(PUSH_URL) < 0)
if (publisher.openUrl(PUSH_URL, 5000) < 0)
{
printf("Open url %s failed.\n", PUSH_URL);
return -1;
Expand Down
31 changes: 31 additions & 0 deletions src/net/EventLoop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using namespace xop;

EventLoop::EventLoop(uint32_t nThreads)
: _index(1)
{
static std::once_flag oc_init;
std::call_once(oc_init, [] {
Expand Down Expand Up @@ -65,6 +66,7 @@ std::shared_ptr<TaskScheduler> EventLoop::getTaskScheduler()
void EventLoop::loop()
{
std::lock_guard<std::mutex> locker(_mutex);

for (uint32_t n = 0; n < _nThreads; n++)
{
#if defined(__linux) || defined(__linux__)
Expand All @@ -74,8 +76,37 @@ void EventLoop::loop()
#endif
_taskSchedulers.push_back(taskSchedulerPtr);
std::shared_ptr<std::thread> t(new std::thread(&TaskScheduler::start, taskSchedulerPtr.get()));
t->native_handle();
_threads.push_back(t);
}

int priority = TASK_SCHEDULER_PRIORITY_REALTIME;

for (auto iter : _threads)
{
#if defined(__linux) || defined(__linux__)

#elif defined(WIN32) || defined(_WIN32)
switch (priority)
{
case TASK_SCHEDULER_PRIORITY_LOW:
SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_BELOW_NORMAL);
break;
case TASK_SCHEDULER_PRIORITY_NORMAL:
SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_NORMAL);
break;
case TASK_SCHEDULER_PRIORITYO_HIGH:
SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_ABOVE_NORMAL);
break;
case TASK_SCHEDULER_PRIORITY_HIGHEST:
SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_HIGHEST);
break;
case TASK_SCHEDULER_PRIORITY_REALTIME:
SetThreadPriority(iter->native_handle(), THREAD_PRIORITY_TIME_CRITICAL);
break;
}
#endif
}
}

void EventLoop::quit()
Expand Down
32 changes: 19 additions & 13 deletions src/net/EventLoop.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,40 @@
#include "Timer.h"
#include "RingBuffer.h"

#define TASK_SCHEDULER_PRIORITY_LOW 0
#define TASK_SCHEDULER_PRIORITY_NORMAL 1
#define TASK_SCHEDULER_PRIORITYO_HIGH 2
#define TASK_SCHEDULER_PRIORITY_HIGHEST 3
#define TASK_SCHEDULER_PRIORITY_REALTIME 4

namespace xop
{

class EventLoop
{
public:
EventLoop(const EventLoop&) = delete;
EventLoop &operator = (const EventLoop&) = delete;
EventLoop(uint32_t nThreads=1); //std::thread::hardware_concurrency()
virtual ~EventLoop();
EventLoop(uint32_t nThreads=1); //std::thread::hardware_concurrency()
virtual ~EventLoop();

std::shared_ptr<TaskScheduler> getTaskScheduler();
std::shared_ptr<TaskScheduler> getTaskScheduler();

bool addTriggerEvent(TriggerEvent callback);
TimerId addTimer(TimerEvent timerEvent, uint32_t msec);
void removeTimer(TimerId timerId);
void updateChannel(ChannelPtr channel);
void removeChannel(ChannelPtr& channel);
bool addTriggerEvent(TriggerEvent callback);
TimerId addTimer(TimerEvent timerEvent, uint32_t msec);
void removeTimer(TimerId timerId);
void updateChannel(ChannelPtr channel);
void removeChannel(ChannelPtr& channel);

private:
void loop();
void quit();

std::mutex _mutex;
std::mutex _mutex;
uint32_t _nThreads = 1;
uint32_t _index = 1;
std::vector<std::shared_ptr<TaskScheduler>> _taskSchedulers;
std::vector<std::shared_ptr<std::thread>> _threads;
uint32_t _index = 1;
std::vector<std::shared_ptr<TaskScheduler>> _taskSchedulers;
std::vector<std::shared_ptr<std::thread>> _threads;
};

}
Expand Down

0 comments on commit d11b1b4

Please sign in to comment.