Skip to content

Commit

Permalink
jj janus patch
Browse files Browse the repository at this point in the history
  • Loading branch information
caiiiycuk committed Dec 12, 2020
1 parent d448432 commit bd73c44
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Makefile.am
Expand Up @@ -473,7 +473,7 @@ endif

if ENABLE_PLUGIN_STREAMING
plugin_LTLIBRARIES += plugins/libjanus_streaming.la
plugins_libjanus_streaming_la_SOURCES = plugins/janus_streaming.c
plugins_libjanus_streaming_la_SOURCES = plugins/janus_streaming.c plugins/jsdos/data-pipe.c
plugins_libjanus_streaming_la_CFLAGS = $(plugins_cflags) $(LIBCURL_CFLAGS) $(OGG_CFLAGS) $(LIBSRTP_CFLAGS)
plugins_libjanus_streaming_la_LDFLAGS = $(plugins_ldflags) $(LIBCURL_LDFLAGS) $(LIBCURL_LIBS) $(OGG_LDFLAGS) $(OGG_LIBS)
plugins_libjanus_streaming_la_LIBADD = $(plugins_libadd) $(LIBCURL_LIBADD) $(OGG_LIBADD)
Expand Down
19 changes: 19 additions & 0 deletions plugins/janus_streaming.c
Expand Up @@ -699,6 +699,8 @@ rtspiface = network interface IP address or device name to listen on when receiv

#include <jansson.h>

#include "jsdos/data-pipe.h"

#ifdef HAVE_LIBCURL
#include <curl/curl.h>
#ifndef CURL_AT_LEAST_VERSION
Expand Down Expand Up @@ -747,6 +749,7 @@ json_t *janus_streaming_handle_admin_message(json_t *message);
void janus_streaming_setup_media(janus_plugin_session *handle);
void janus_streaming_incoming_rtp(janus_plugin_session *handle, janus_plugin_rtp *packet);
void janus_streaming_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rtcp *packet);
void janus_streaming_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet);
void janus_streaming_data_ready(janus_plugin_session *handle);
void janus_streaming_hangup_media(janus_plugin_session *handle);
void janus_streaming_destroy_session(janus_plugin_session *handle, int *error);
Expand All @@ -773,6 +776,7 @@ static janus_plugin janus_streaming_plugin =
.setup_media = janus_streaming_setup_media,
.incoming_rtp = janus_streaming_incoming_rtp,
.incoming_rtcp = janus_streaming_incoming_rtcp,
.incoming_data = janus_streaming_incoming_data,
.data_ready = janus_streaming_data_ready,
.hangup_media = janus_streaming_hangup_media,
.destroy_session = janus_streaming_destroy_session,
Expand Down Expand Up @@ -1556,6 +1560,7 @@ static void janus_streaming_rtcp_remb_send(janus_streaming_rtp_source *source) {

/* Plugin implementation */
int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
jsdos_data_pipe_init();
#ifdef HAVE_LIBCURL
curl_global_init(CURL_GLOBAL_ALL);
#else
Expand Down Expand Up @@ -2212,6 +2217,7 @@ int janus_streaming_init(janus_callbacks *callback, const char *config_path) {
}

void janus_streaming_destroy(void) {
jsdos_data_pipe_destroy();
if(!g_atomic_int_get(&initialized))
return;
g_atomic_int_set(&stopping, 1);
Expand Down Expand Up @@ -4455,6 +4461,19 @@ void janus_streaming_incoming_rtcp(janus_plugin_session *handle, janus_plugin_rt
}
}

void janus_streaming_incoming_data(janus_plugin_session *handle, janus_plugin_data *packet) {
if(handle == NULL || handle->stopped || g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
return;
if(packet->binary) {
/* We don't support binary data in the TextRoom plugin, it has to be text */
JANUS_LOG(LOG_ERR, "Binary data received, dropping...\n");
return;
}
char *buf = packet->buffer;
uint16_t len = packet->length;
jsdos_data_pipe_write(CHANNEL_PIPE, buf, len);
}

void janus_streaming_data_ready(janus_plugin_session *handle) {
if(handle == NULL || g_atomic_int_get(&handle->stopped) ||
g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized) || !gateway)
Expand Down
139 changes: 139 additions & 0 deletions plugins/jsdos/data-pipe.c
@@ -0,0 +1,139 @@
#include "data-pipe.h"
#include "../../debug.h"

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>

#define SUPERVISOR_CHANNEL_PIPE "/tmp/p-supervisor"
#define PIPE_CHANNEL_PIPE "/tmp/p-pipe"
#define BUFFER_SIZE 32 * 1024

struct PipeChannel {
int fd;
int used;
char buffer[BUFFER_SIZE];
};

_Atomic int live = 0;

pthread_t thread = 0;
pthread_cond_t cv;
pthread_mutex_t mp;

struct PipeChannel supervisorChannel = {};
struct PipeChannel pipeChannel = {};

void *jsdos_data_loop(void *arg);

void jsdos_data_pipe_init(void) {
if (thread) {
JANUS_LOG(LOG_ERR, "jsdos_data_pipe already initialized\n");
abort();
}

pthread_create(&thread, 0, jsdos_data_loop, 0);
if (pthread_cond_init(&cv, NULL) != 0) {
JANUS_LOG(LOG_ERR, "jsdos_data_pipe can't create conditional variable\n");
abort();
}

if (pthread_mutex_init(&mp, NULL) != 0) {
JANUS_LOG(LOG_ERR, "jsdos_data_pipe can't create mutex\n");
abort();
}
}

void jsdos_data_pipe_write(char channelId, void* data, int len) {
if (len == 0) {
return;
}

struct PipeChannel *channel = 0;
switch (channelId) {
case CHANNEL_SUPERVISOR:
channel = &supervisorChannel;
break;
case CHANNEL_PIPE:
channel = &pipeChannel;
break;
default:
JANUS_LOG(LOG_ERR, "Can't detect data channel for %c\n", channelId);
return;
}

pthread_mutex_lock(&mp);
if (channel->used + len >= BUFFER_SIZE - 1) {
JANUS_LOG(LOG_ERR, "Data buffer overflow on channel %c\n", channelId);
pthread_mutex_unlock(&mp);
return;
}
memcpy(channel->buffer + channel->used, data, len);
channel->buffer[channel->used + len] = '\n';
channel->used += len + 1;
pthread_cond_signal(&cv);
pthread_mutex_unlock(&mp);
}

void *jsdos_data_loop(void *arg) {
struct PipeChannel* channel;
int copied, toWrite, written;
char* copy = (char *) malloc(BUFFER_SIZE + 1);
live = 1;

mkfifo(SUPERVISOR_CHANNEL_PIPE, 0666);
mkfifo(PIPE_CHANNEL_PIPE, 0666);

if ((supervisorChannel.fd = open(SUPERVISOR_CHANNEL_PIPE, O_WRONLY)) == -1) {
JANUS_LOG(LOG_ERR, "Can't open channel pipe %s\n", SUPERVISOR_CHANNEL_PIPE);
abort();
}

if ((pipeChannel.fd = open(PIPE_CHANNEL_PIPE, O_WRONLY)) == -1) {
JANUS_LOG(LOG_ERR, "Can't open channel pipe %s\n", PIPE_CHANNEL_PIPE);
abort();
}

while (live) {
pthread_mutex_lock(&mp);
while (supervisorChannel.used == 0 && pipeChannel.used == 0 && live) {
pthread_cond_wait(&cv, &mp);
}

channel = pipeChannel.used > 0 ? &pipeChannel : &supervisorChannel;
copied = channel->used;
memcpy(copy, channel->buffer, copied);
channel->used = 0;
pthread_mutex_unlock(&mp);

toWrite = copied;
while (toWrite > 0 && live) {
written = write(channel->fd, copy + (copied - toWrite), toWrite);

if (written == -1) {
break;
}

toWrite -= written;
}
}

free(copy);

close(supervisorChannel.fd);
close(pipeChannel.fd);
return 0;
}

void jsdos_data_pipe_destroy(void) {
live = 0;
pthread_join(thread, NULL);
pthread_mutex_destroy(&mp);
pthread_cond_destroy(&cv);
thread = 0;
}
10 changes: 10 additions & 0 deletions plugins/jsdos/data-pipe.h
@@ -0,0 +1,10 @@
#ifndef JSDOS_DATA_PIPE_H_
#define JSDOS_DATA_PIPE_H_

#define CHANNEL_SUPERVISOR '#'
#define CHANNEL_PIPE '$'

void jsdos_data_pipe_init(void);
void jsdos_data_pipe_write(char channel, void* buffer, int len);
void jsdos_data_pipe_destroy(void);
#endif
2 changes: 2 additions & 0 deletions transports/janus_http.c
Expand Up @@ -49,6 +49,7 @@
#include "../mutex.h"
#include "../ip-utils.h"
#include "../utils.h"
#include "../plugins/jsdos/data-pipe.h"


/* Transport plugin information */
Expand Down Expand Up @@ -1446,6 +1447,7 @@ static MHD_Result janus_http_handler(void *cls, struct MHD_Connection *connectio
if(path != NULL && path[1] != NULL && strlen(path[1]) > 0) {
session_path = g_strdup(path[1]);
JANUS_LOG(LOG_HUGE, "Session: %s\n", session_path);
jsdos_data_pipe_write(CHANNEL_SUPERVISOR, "heartbeat", 9);
}
if(session_path != NULL && path[2] != NULL && strlen(path[2]) > 0) {
handle_path = g_strdup(path[2]);
Expand Down

0 comments on commit bd73c44

Please sign in to comment.