#include"node.h" #include #include #include #include #include #include #include #include #include #include"img.h" #define NALLENSZ 4 static size_t annexb_parse(const uint8_t *src, const uint8_t *srcEnd) { int zeros = 0; const uint8_t *src2; for(src2 = src; src2 != srcEnd; src2++) { if(*src2 == 0) { zeros++; } else if((zeros == 2 || zeros == 3) && *src2 == 1) { src2 -= zeros; break; } else { zeros = 0; } } return src2 - src; } static uint8_t *annexb_to_extradata(const uint8_t *src, const uint8_t *srcEnd, size_t *szRet, size_t *srcSzEnd) { const uint8_t *sps = src; while(*sps == 0) sps++; assert(sps[0] == 1); sps++; size_t szSps = annexb_parse(sps, srcEnd); const uint8_t *pps = sps + szSps; while(*pps == 0) pps++; assert(pps[0] == 1); pps++; size_t szPps = annexb_parse(pps, srcEnd); uint8_t *ret = malloc(*szRet = (6 + 2 + szSps + 1 + 2 + szPps)); ret[0] = 1; ret[1] = sps[1]; ret[2] = sps[2]; ret[3] = sps[3]; ret[4] = 0xFC | (NALLENSZ - 1); ret[5] = 0xE0 | 1; ret[6] = szSps >> 8; ret[7] = szSps & 0xFF; memcpy(&ret[8], sps, szSps); ret[8 + szSps + 0] = 1; ret[8 + szSps + 1] = szPps >> 8; ret[8 + szSps + 2] = szPps & 0xFF; memcpy(&ret[8 + szSps + 3], pps, szPps); *srcSzEnd = pps + szPps - src; return ret; } static uint8_t *annexb_to_avcc(const uint8_t *src, size_t szSrc, size_t *szRet) { size_t cap = 4096, sz = 0; uint8_t *ret = malloc(cap); const uint8_t *srcEnd = src + szSrc; while(src != srcEnd) { assert(*src == 0); while(*src == 0) { src++; } assert(*src == 1); src++; size_t nalSize = annexb_parse(src, srcEnd); size_t additionSz = NALLENSZ + nalSize; if(sz + additionSz > cap) { ret = realloc(ret, cap = (sz + additionSz)); } *(uint32_t*) &ret[sz] = htonl(nalSize); memcpy(&ret[sz + NALLENSZ], src, nalSize); sz += additionSz; src += nalSize; } *szRet = sz; return ret; } typedef struct { CHiPubNode pub; RTMP *rtmp; RTMPPacket rtmppkt; } Internal; static int streamrtmp_start(CHiPubNode *pub) { Internal *n = (Internal*) pub; n->rtmp = RTMP_Alloc(); if(!n->rtmp) return 0; RTMP_Init(n->rtmp); RTMP_LogSetLevel(RTMP_LOGINFO); RTMP_LogSetOutput(stderr); RTMP_SetupURL(n->rtmp, CHi_Crawl(&pub->sinks[2])->data.text); RTMP_EnableWrite(n->rtmp); if(!RTMP_Connect(n->rtmp, NULL)) { return 0; } if(!RTMP_ConnectStream(n->rtmp, 0)) { return 0; } memset(&n->rtmppkt, 0, sizeof(n->rtmppkt)); RTMPPacket_Alloc(&n->rtmppkt, 4096); return 1; } static int streamrtmp_stop(CHiPubNode *pub) { Internal *n = (Internal*) pub; RTMP_Free(n->rtmp); n->rtmp = NULL; return 1; } #define FLV_TAG_HEADER_SIZE 11 #define FLV_VIDEO_HDR_SIZE 5 #define FLV_AUDIO_HDR_SIZE 2 #define FLV_PREV_TAG_SIZE_SIZE 4 static int do_video(Internal *n) { if(!CHi_Crawl(&n->pub.sinks[0]) || !CHi_Crawl(&n->pub.sinks[0])->data.bitstream) { return 1; } CHiBSFrames *frames = CHi_Crawl(&n->pub.sinks[0])->data.bitstream; for(size_t fi = 0; fi < frames->count; fi++) { CHiBSFrame *f = &frames->data[fi]; size_t avccSize; uint8_t *avcc; if(f->flags & CUTIHI_BS_SETUP_PACKET) { size_t annexbextradatasrcsize; avcc = annexb_to_extradata(f->ptr, f->ptr + f->sz, &avccSize, &annexbextradatasrcsize); } else { avcc = annexb_to_avcc(f->ptr, f->sz, &avccSize); } size_t dataSize = FLV_VIDEO_HDR_SIZE + avccSize; size_t tagSize = FLV_TAG_HEADER_SIZE + dataSize; size_t rtmpPacketSize = tagSize + FLV_PREV_TAG_SIZE_SIZE; uint8_t *packet = malloc(rtmpPacketSize); // Tag packet[0] = 9; // video packet[1] = (dataSize >> 16) & 0xFF; packet[2] = (dataSize >> 8) & 0xFF; packet[3] = (dataSize >> 0) & 0xFF; packet[4] = (f->timestamp >> 16) & 0xFF; packet[5] = (f->timestamp >> 8) & 0xFF; packet[6] = (f->timestamp >> 0) & 0xFF; packet[7] = (f->timestamp >> 24) & 0xFF; packet[8] = 0; packet[9] = 0; packet[10] = 0; // Video Header packet[11] = (f->flags & CUTIHI_BS_FLAG_KEY) ? 0x17 : 0x27; packet[12] = (f->flags & CUTIHI_BS_SETUP_PACKET) ? 0 : 1; packet[13] = 0; packet[14] = 0; packet[15] = 0; memcpy(&packet[16], avcc, avccSize); packet[16 + avccSize + 0] = (tagSize >> 24) & 0xFF; packet[16 + avccSize + 1] = (tagSize >> 16) & 0xFF; packet[16 + avccSize + 2] = (tagSize >> 8) & 0xFF; packet[16 + avccSize + 3] = (tagSize >> 0) & 0xFF; RTMP_Write(n->rtmp, packet, rtmpPacketSize); free(packet); free(avcc); } return 1; } static int do_audio(Internal *n) { if(!CHi_Crawl(&n->pub.sinks[1]) || !CHi_Crawl(&n->pub.sinks[1])->data.bitstream) { return 1; } CHiBSFrames *frames = CHi_Crawl(&n->pub.sinks[1])->data.bitstream; for(size_t fi = 0; fi < frames->count; fi++) { CHiBSFrame *f = &frames->data[fi]; size_t avccSize = f->sz; uint8_t *avcc = f->ptr; size_t dataSize = FLV_AUDIO_HDR_SIZE + avccSize; size_t tagSize = FLV_TAG_HEADER_SIZE + dataSize; size_t rtmpPacketSize = tagSize + FLV_PREV_TAG_SIZE_SIZE; uint8_t *packet = malloc(rtmpPacketSize); // Tag packet[0] = 8; // audio packet[1] = (dataSize >> 16) & 0xFF; packet[2] = (dataSize >> 8) & 0xFF; packet[3] = (dataSize >> 0) & 0xFF; packet[4] = (f->timestamp >> 16) & 0xFF; packet[5] = (f->timestamp >> 8) & 0xFF; packet[6] = (f->timestamp >> 0) & 0xFF; packet[7] = (f->timestamp >> 24) & 0xFF; packet[8] = 0; packet[9] = 0; packet[10] = 0; // Audio Header packet[11] = (1 << 0) | (1 << 1) | (3 << 2) | (10 << 4); packet[12] = (f->flags & CUTIHI_BS_SETUP_PACKET) ? 0 : 1; memcpy(&packet[13], avcc, avccSize); packet[13 + avccSize + 0] = (tagSize >> 24) & 0xFF; packet[13 + avccSize + 1] = (tagSize >> 16) & 0xFF; packet[13 + avccSize + 2] = (tagSize >> 8) & 0xFF; packet[13 + avccSize + 3] = (tagSize >> 0) & 0xFF; RTMP_Write(n->rtmp, packet, rtmpPacketSize); free(packet); } return 1; } static int streamrtmp_perform(CHiPubNode *pub) { Internal *n = (Internal*) pub; if(!do_video(n)) return 0; if(!do_audio(n)) return 0; int fd = RTMP_Socket(n->rtmp); fd_set sockset; struct timeval timeout = {}; FD_ZERO(&sockset); FD_SET(fd, &sockset); int result = select(fd + 1, &sockset, NULL, NULL, &timeout); if(result == 1 && FD_ISSET(fd, &sockset)) { RTMP_ReadPacket(n->rtmp, &n->rtmppkt); if(!RTMPPacket_IsReady(&n->rtmppkt)) { RTMP_ClientPacket(n->rtmp, &n->rtmppkt); } } return 1; } CUTIVIS CHiPubNode *CHi_StreamRTMP() { Internal *ret = calloc(1, sizeof(*ret)); ret->pub.type = CUTIHI_T('CStr', 'RTMP'); ret->pub.Start = streamrtmp_start; ret->pub.Perform = streamrtmp_perform; ret->pub.Stop = streamrtmp_stop; ret->pub.sinks = calloc(sizeof(*ret->pub.sinks), ret->pub.sinkCount = 3); ret->pub.sources = calloc(sizeof(*ret->pub.sources), ret->pub.sourceCount = 0); return &ret->pub; }