small-package/luci-app-nginx-pingos/modules/nginx-rtmp-module/ngx_live_relay_httpflv.c

638 lines
16 KiB
C

/*
* Copyright (C) AlexWoo(Wu Jie) wj19840501@gmail.com
*/
#include "ngx_live_relay.h"
#include "ngx_http_client.h"
#include "ngx_toolkit_misc.h"
#include "ngx_dynamic_resolver.h"
#include "ngx_rbuf.h"
typedef struct {
ngx_uint_t status;
char *code;
char *level;
char *desc;
} ngx_http_status_code_t;
static ngx_http_status_code_t ngx_http_relay_status_code[] = {
{ 400, "NetStream.Play.BadName", "error", "Bad Request" },
{ 403, "NetStream.Play.Forbidden", "error", "Forbidden" },
{ 404, "NetStream.Play.StreamNotFound", "error", "No such stream" },
{ 503, "NetStream.Play.ServiceUnavailable", "error", "Service Unavailable"},
{ 0, "NetStream.Play.StreamError", "error", "Stream Error" }
};
static ngx_int_t
ngx_live_relay_httpflv_parse(ngx_rtmp_session_t *s, ngx_buf_t *b)
{
u_char ch, *p, *pc;
ngx_rtmp_stream_t *st;
ngx_rtmp_header_t *h;
ngx_chain_t **ll;
size_t len;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_int_t rc = NGX_AGAIN;
enum {
flv_header_F = 0,
flv_header_FL,
flv_header_FLV,
flv_header_Version,
flv_header_Flags,
flv_header_DataOffset0,
flv_header_DataOffset1,
flv_header_DataOffset2,
flv_header_DataOffset3,
flv_tagsize0,
flv_tagsize1,
flv_tagsize2,
flv_tagsize3,
flv_tagtype,
flv_datasize0,
flv_datasize1,
flv_datasize2,
flv_timestamp0,
flv_timestamp1,
flv_timestamp2,
flv_timestamp_extended,
flv_streamid0,
flv_streamid1,
flv_streamid2,
flv_data
} state;
state = s->flv_state;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
for (p = b->pos; p < b->last; ++p) {
ch = *p;
switch (state) {
case flv_header_F:
switch (ch) {
case 'F':
state = flv_header_FL;
break;
default:
rc = NGX_ERROR;
goto done;
}
break;
case flv_header_FL:
switch (ch) {
case 'L':
state = flv_header_FLV;
break;
default:
rc = NGX_ERROR;
goto done;
}
break;
case flv_header_FLV:
switch (ch) {
case 'V':
state = flv_header_Version;
break;
default:
rc = NGX_ERROR;
goto done;
}
break;
case flv_header_Version:
s->flv_version = ch;
if (s->flv_version != 1) {
rc = NGX_ERROR;
goto done;
}
state = flv_header_Flags;
break;
case flv_header_Flags:
s->flv_flags = ch;
state = flv_header_DataOffset0;
break;
case flv_header_DataOffset0:
pc = (u_char *) &s->flv_data_offset;
pc[3] = ch;
state = flv_header_DataOffset1;
break;
case flv_header_DataOffset1:
pc = (u_char *) &s->flv_data_offset;
pc[2] = ch;
state = flv_header_DataOffset2;
break;
case flv_header_DataOffset2:
pc = (u_char *) &s->flv_data_offset;
pc[1] = ch;
state = flv_header_DataOffset3;
break;
case flv_header_DataOffset3:
pc = (u_char *) &s->flv_data_offset;
pc[0] = ch;
state = flv_tagsize0;
break;
case flv_tagsize0:
s->flv_tagsize = 0;
pc = (u_char *) &s->flv_tagsize;
pc[3] = ch;
state = flv_tagsize1;
break;
case flv_tagsize1:
pc = (u_char *) &s->flv_tagsize;
pc[2] = ch;
state = flv_tagsize2;
break;
case flv_tagsize2:
pc = (u_char *) &s->flv_tagsize;
pc[1] = ch;
state = flv_tagsize3;
break;
case flv_tagsize3:
pc = (u_char *) &s->flv_tagsize;
pc[0] = ch;
st = &s->in_streams[0];
h = &st->hdr;
if (h->mlen == 0 && s->flv_first_pts == 0) {
s->flv_first_pts = 1;
if (s->flv_tagsize != 0) {
rc = NGX_ERROR;
goto done;
}
} else {
if (h->mlen + 11 != s->flv_tagsize) {
rc = NGX_ERROR;
goto done;
}
}
state = flv_tagtype;
break;
case flv_tagtype:
if (ch != NGX_RTMP_MSG_AMF_META && ch != NGX_RTMP_MSG_AUDIO
&& ch != NGX_RTMP_MSG_VIDEO)
{
rc = NGX_ERROR;
goto done;
}
st = &s->in_streams[0];
h = &st->hdr;
h->type = ch;
state = flv_datasize0;
break;
case flv_datasize0:
st = &s->in_streams[0];
h = &st->hdr;
h->mlen = 0;
pc = (u_char *) &h->mlen;
pc[2] = ch;
state = flv_datasize1;
break;
case flv_datasize1:
st = &s->in_streams[0];
h = &st->hdr;
pc = (u_char *) &h->mlen;
pc[1] = ch;
state = flv_datasize2;
break;
case flv_datasize2:
st = &s->in_streams[0];
h = &st->hdr;
pc = (u_char *) &h->mlen;
pc[0] = ch;
state = flv_timestamp0;
st->len = h->mlen;
break;
case flv_timestamp0:
st = &s->in_streams[0];
h = &st->hdr;
pc = (u_char *) &h->timestamp;
pc[2] = ch;
state = flv_timestamp1;
break;
case flv_timestamp1:
st = &s->in_streams[0];
h = &st->hdr;
pc = (u_char *) &h->timestamp;
pc[1] = ch;
state = flv_timestamp2;
break;
case flv_timestamp2:
st = &s->in_streams[0];
h = &st->hdr;
pc = (u_char *) &h->timestamp;
pc[0] = ch;
state = flv_timestamp_extended;
break;
case flv_timestamp_extended:
st = &s->in_streams[0];
h = &st->hdr;
pc = (u_char *) &h->timestamp;
pc[3] = ch;
state = flv_streamid0;
break;
case flv_streamid0:
st = &s->in_streams[0];
h = &st->hdr;
h->msid = 0;
pc = (u_char *) &h->msid;
pc[2] = ch;
state = flv_streamid1;
break;
case flv_streamid1:
st = &s->in_streams[0];
h = &st->hdr;
pc = (u_char *) &h->msid;
pc[1] = ch;
state = flv_streamid2;
break;
case flv_streamid2:
st = &s->in_streams[0];
h = &st->hdr;
pc = (u_char *) &h->msid;
pc[0] = ch;
state = flv_data;
break;
case flv_data:
st = &s->in_streams[0];
for (ll = &st->in; (*ll) && (*ll)->buf->last == (*ll)->buf->end;
ll = &(*ll)->next);
for (;;) {
if (*ll == NULL) {
*ll = ngx_get_chainbuf(cscf->chunk_size, 1);
}
len = ngx_min(st->len, b->last - p);
if ((*ll)->buf->end - (*ll)->buf->last >= (long) len) {
(*ll)->buf->last = ngx_cpymem((*ll)->buf->last, p, len);
p += len;
st->len -= len;
break;
}
len = (*ll)->buf->end - (*ll)->buf->last;
(*ll)->buf->last = ngx_cpymem((*ll)->buf->last, p, len);
p += len;
st->len -= len;
ll = &(*ll)->next;
}
if (st->len != 0) {
rc = NGX_AGAIN;
goto done;
}
state = flv_tagsize0;
rc = NGX_OK;
goto done;
}
}
done:
b->pos = p;
s->flv_state = state;
return rc;
}
static void
ngx_live_relay_httpflv_recv_body(void *request, ngx_http_request_t *hcr)
{
ngx_int_t n;
ngx_rtmp_session_t *s;
ngx_chain_t *cl, *l, *in;
ngx_rtmp_header_t *h;
ngx_rtmp_stream_t *st = NULL;
s = request;
n = ngx_http_client_read_body(hcr, &cl);
if (n == 0 || n == NGX_ERROR) {
s->finalize_reason = n == 0? NGX_LIVE_NORMAL_CLOSE:
NGX_LIVE_FLV_RECV_ERR;
ngx_log_error(NGX_LOG_INFO, s->log, ngx_errno,
"http relay, recv body error");
ngx_rtmp_finalize_session(s);
return;
}
l = cl;
for (;;) {
if (l && l->buf->pos == l->buf->last) {
l = l->next;
}
if (l == NULL) {
return;
}
n = ngx_live_relay_httpflv_parse(s, l->buf);
if (n == NGX_ERROR) {
ngx_log_error(NGX_LOG_ERR, s->log, 0,
"http relay, parse flv frame failed in state %d",
s->flv_state);
ngx_http_client_finalize_request(hcr, 1);
return;
}
if (n == NGX_AGAIN) {
continue;
}
/* NGX_OK */
st = &s->in_streams[0];
h = &st->hdr;
in = st->in;
if (ngx_rtmp_receive_message(s, h, in) != NGX_OK) {
ngx_rtmp_finalize_session(s);
return;
}
ngx_put_chainbufs(st->in);
st->in = NULL;
}
}
static void
ngx_live_relay_httpflv_cleanup(void *data)
{
ngx_rtmp_session_t *s;
s = data;
ngx_log_error(NGX_LOG_INFO, s->log, 0, "http flv client, cleanup");
if (s) {
if (s->close.posted) {
ngx_delete_posted_event(&s->close);
}
if (s->finalize_reason == 0) {
s->finalize_reason = NGX_LIVE_FLV_RECV_ERR;
}
ngx_rtmp_finalize_fake_session(s);
}
}
static void
ngx_live_relay_httpflv_error(ngx_rtmp_session_t *s, ngx_uint_t status)
{
ngx_live_stream_t *st;
ngx_rtmp_core_ctx_t *cctx;
char *code, *level, *desc;
size_t i;
for (i = 0; ngx_http_relay_status_code[i].status; ++i) {
if (status != ngx_http_relay_status_code[i].status) {
continue;
}
break;
}
code = ngx_http_relay_status_code[i].code;
level = ngx_http_relay_status_code[i].level;
desc = ngx_http_relay_status_code[i].desc;
ngx_log_error(NGX_LOG_ERR, s->log, 0,
"http relay transit, %d: level='%s' code='%s' description='%s'",
status, level, code, desc);
st = ngx_live_create_stream(&s->serverid, &s->stream);
cctx = st->play_ctx;
for (; cctx; cctx = cctx->next) {
cctx->session->status = status;
ngx_rtmp_send_status(cctx->session, code, level, desc);
if (ngx_strcmp(level, "error") == 0 && !cctx->session->static_pull) {
cctx->session->finalize_reason = NGX_LIVE_RELAY_TRANSIT;
ngx_rtmp_finalize_session(cctx->session);
}
}
}
static void
ngx_live_relay_httpflv_recv(void *request, ngx_http_request_t *hcr)
{
ngx_rtmp_session_t *s;
ngx_uint_t status_code;
s = request;
status_code = ngx_http_client_status_code(hcr);
s->stage = NGX_LIVE_PLAY;
s->ptime = ngx_current_msec;
s->connection = hcr->connection;
ngx_rtmp_set_combined_log(s, hcr->connection->log->data,
hcr->connection->log->handler);
s->log->connection = s->connection->number;
if (ngx_rtmp_core_main_conf->fast_reload && (ngx_exiting || ngx_terminate)) {
ngx_live_relay_httpflv_error(s, NGX_LIVE_PROCESS_EXIT);
s->finalize_reason = NGX_LIVE_PROCESS_EXIT;
ngx_http_client_finalize_request(hcr, 1);
return;
}
if (status_code != NGX_HTTP_OK) {
ngx_live_relay_httpflv_error(s, status_code);
s->finalize_reason = NGX_LIVE_FLV_RECV_ERR;
ngx_http_client_finalize_request(hcr, 1);
return;
}
ngx_live_relay_publish_local(s);
ngx_http_client_set_read_handler(hcr, ngx_live_relay_httpflv_recv_body);
ngx_live_relay_httpflv_recv_body(request, hcr);
}
static ngx_int_t
ngx_live_relay_httpflv_send_request(ngx_rtmp_session_t *s,
ngx_live_relay_url_t *url)
{
ngx_http_request_t *hcr;
ngx_str_t request_url;
size_t len;
ngx_live_relay_ctx_t *ctx;
ngx_http_cleanup_t *cln;
u_char *p;
ctx = ngx_rtmp_get_module_ctx(s, ngx_live_relay_module);
// request url
// http:// + url + ':' + port + '/' + app + '/' + name + ['?' + pargs]
len = 7 + url->url.host.len + 1 + sizeof("65535") - 1
+ 1 + ctx->app.len + 1 + ctx->name.len;
if (ctx->pargs.len) {
len = len + 1 + ctx->pargs.len;
}
request_url.data = ngx_pcalloc(s->pool, len);
if (request_url.data == NULL) {
ngx_log_error(NGX_LOG_ERR, s->log, 0,
"alloc mem for request url failed");
return NGX_ERROR;
}
if (ctx->pargs.len) {
p = ngx_snprintf(request_url.data, len, "http://%V:%d/%V/%V?%V",
&url->url.host, url->port, &ctx->app, &ctx->name, &ctx->pargs);
} else {
p = ngx_snprintf(request_url.data, len, "http://%V:%d/%V/%V",
&url->url.host, url->port, &ctx->app, &ctx->name);
}
request_url.len = p - request_url.data;
// request headers
ngx_keyval_t headers[] = {
{ ngx_string("Host"), ctx->domain },
{ ngx_string("Referer"), ctx->referer },
{ ngx_string("User-Agent"), ctx->user_agent },
{ ngx_null_string, ngx_null_string }
};
s->stage = NGX_LIVE_CONNECT;
s->connect_time = ngx_current_msec;
hcr = ngx_http_client_get(s->log, &request_url, headers, s);
ngx_http_client_set_read_handler(hcr, ngx_live_relay_httpflv_recv);
cln = ngx_http_client_cleanup_add(hcr, 0);
if (cln == NULL) {
ngx_log_error(NGX_LOG_ERR, s->log, 0, "http client add cleanup failed");
ngx_live_relay_httpflv_error(s, NGX_HTTP_INTERNAL_SERVER_ERROR);
ngx_http_client_finalize_request(hcr, 1);
return NGX_ERROR;
}
cln->handler = ngx_live_relay_httpflv_cleanup;
cln->data = s;
s->request = hcr;
s->live_type = NGX_HTTP_FLV_LIVE;
return NGX_OK;
}
ngx_int_t
ngx_live_relay_create_httpflv(ngx_rtmp_session_t *s, ngx_live_relay_t *relay,
ngx_live_relay_url_t *url)
{
ngx_live_relay_ctx_t *rctx;
// must use ngx_sockaddr_t, because sizeof(struct sockaddr)
// is not long enouph, content will be covered by other var
ngx_sockaddr_t nsa;
struct sockaddr *sa;
socklen_t len;
rctx = ngx_rtmp_get_module_ctx(s, ngx_live_relay_module);
if (rctx == NULL) {
return NGX_ERROR;
}
#define NGX_LIVE_RELAY_CTX(para) \
if (ngx_copy_str(s->pool, &rctx->para, &relay->para) != NGX_OK) { \
goto destroy; \
}
NGX_LIVE_RELAY_CTX(domain);
NGX_LIVE_RELAY_CTX(app);
NGX_LIVE_RELAY_CTX(name);
NGX_LIVE_RELAY_CTX(pargs);
NGX_LIVE_RELAY_CTX(referer);
NGX_LIVE_RELAY_CTX(user_agent);
#undef NGX_LIVE_RELAY_CTX
rctx->tag = relay->tag;
// get address, host in url must be resolv sync
sa = (struct sockaddr *) &nsa;
len = ngx_dynamic_resolver_gethostbyname(&url->url.host, sa);
if (len == 0) {
ngx_log_error(NGX_LOG_ERR, s->log, 0,
"relay httpflv: gethostbyname failed %V", &url->url.host);
goto destroy;
}
// send http request
if (ngx_live_relay_httpflv_send_request(s, url) != NGX_OK) {
goto destroy;
}
return NGX_OK;
destroy:
ngx_rtmp_finalize_session(s);
return NGX_ERROR;
}