#ifdef _WIN32 #include #include #include"wepoll.c" #else #include #include #include #include #include #include #include #endif #include #include #include #include #include #include #include"picohttpparser.h" #include"base64.h" #include"teeny-sha1.c" #ifdef _WIN32 typedef HANDLE EPoll; typedef int Socket; void *memmem(const void *haystack, size_t haystack_len, const void * const needle, const size_t needle_len) { if(haystack == NULL) return NULL; if(haystack_len == 0) return NULL; if(needle == NULL) return NULL; if(needle_len == 0) return NULL; for(const char *h = haystack; haystack_len >= needle_len; ++h, --haystack_len) { if(!memcmp(h, needle, needle_len)) { return (void*) h; } } return NULL; } #else typedef int EPoll; typedef int Socket; #define closesocket close #endif typedef enum ClientType { UNKNOWN, CLI_STREAMER, CLI_VIEWER } ClientType; typedef enum ClientState { REQUEST, ACTIVE, WEBSOCKET, } ClientState; typedef struct { Socket fd; ClientType type; ClientState state; size_t len, prevlen, cap; uint8_t *buf; // Only for streamers struct phr_chunked_decoder chudec; // Only for websockets struct { int opcode; uint8_t *incoming; size_t incomingSz; } ws; } Client; typedef enum { LOADING_HEADER, STREAMING, } StreamState; static struct Stream { StreamState state; uint8_t *mkvHeader; size_t mkvHeaderSz; int stateChangeIdx; } Stream; static size_t clientsSz; static Client **clients; static char *ValidStreamPath = NULL; static void consume(Client *cli, size_t n) { memmove(cli->buf, cli->buf + n, cli->len - n); cli->len -= n; } static void transmit(Client *cli, const char *buf, size_t sz) { while(sz) { ssize_t s = send(cli->fd, buf, sz, 0); if(s >= 0) { buf += s; sz -= s; } } } static void transmit_all(const char *buf, size_t sz) { for(size_t i = 0; i < clientsSz; i++) { if(clients[i]->state == WEBSOCKET) { transmit(clients[i], buf, sz); } } } #define WS_BIN 2 #define WS_CLOSE 8 #define WS_FIN 128 #define WS_HEADER_MAX 10 static int ws_header(size_t sz, uint8_t hdr[static WS_HEADER_MAX]) { int i; hdr[0] = WS_BIN | WS_FIN; if(sz < 126) { hdr[1] = sz; i = 2; } else if(sz < 65536) { hdr[1] = 126; hdr[2] = sz >> 8; hdr[3] = sz & 0xFF; i = 4; } else { hdr[1] = 127; hdr[2] = (sz >> 56) & 0xFF; hdr[3] = (sz >> 48) & 0xFF; hdr[4] = (sz >> 40) & 0xFF; hdr[5] = (sz >> 32) & 0xFF; hdr[6] = (sz >> 24) & 0xFF; hdr[7] = (sz >> 16) & 0xFF; hdr[8] = (sz >> 8) & 0xFF; hdr[9] = (sz >> 0) & 0xFF; i = 10; } return i; } static void ws_send(Client *cli, const uint8_t *buf, size_t sz) { if(sz == 0) return; uint8_t wshdr[WS_HEADER_MAX]; int wshdrsz = ws_header(sz, wshdr); transmit(cli, wshdr, wshdrsz); transmit(cli, buf, sz); } static void ws_broadcast(const uint8_t *buf, size_t sz) { if(sz == 0) return; uint8_t wshdr[WS_HEADER_MAX]; int wshdrsz = ws_header(sz, wshdr); transmit_all(wshdr, wshdrsz); transmit_all(buf, sz); } static void stream_step(const uint8_t *newbuf, size_t newsz) { if(Stream.state == LOADING_HEADER) { Stream.mkvHeader = realloc(Stream.mkvHeader, Stream.mkvHeaderSz + newsz); memcpy(Stream.mkvHeader + Stream.mkvHeaderSz, newbuf, newsz); Stream.mkvHeaderSz += newsz; uint8_t *clusterEl = memmem(Stream.mkvHeader, Stream.mkvHeaderSz, "\x1F\x43\xB6\x75", 4); if(clusterEl) { ws_broadcast(Stream.mkvHeader, clusterEl - Stream.mkvHeader); ws_broadcast(clusterEl, Stream.mkvHeader + Stream.mkvHeaderSz - clusterEl); Stream.mkvHeaderSz = clusterEl - Stream.mkvHeader; Stream.state = STREAMING; } } else { int i; for(i = 0; i < newsz; i++) { if(newbuf[i] == "\x1A\x45\xDF\xA3"[Stream.stateChangeIdx]) { Stream.stateChangeIdx++; if(Stream.stateChangeIdx == 4) { i++; Stream.stateChangeIdx = 0; Stream.state = LOADING_HEADER; break; } } else { Stream.stateChangeIdx = 0; } } if(Stream.state == LOADING_HEADER) { if(i > 4) { ws_broadcast(newbuf, i - 4); } Stream.mkvHeader = realloc(Stream.mkvHeader, Stream.mkvHeaderSz = 4 + (newsz - i)); memcpy(Stream.mkvHeader, "\x1A\x45\xDF\xA3", 4); memcpy(Stream.mkvHeader + 4, newbuf + i, newsz - i); } else { ws_broadcast(newbuf, newsz); } } } static void receive_ws(Client *cli) { } static int handle(Client *cli) { while(cli->len != 0) { if(cli->state == REQUEST) { int minor_version; struct phr_header headers[96]; const char *method, *path; size_t method_len, path_len, num_headers = sizeof(headers) / sizeof(headers[0]); int pret = phr_parse_request(cli->buf, cli->len, &method, &method_len, &path, &path_len, &minor_version, headers, &num_headers, cli->prevlen); if(pret == -1) { return 0; } if(pret == -2) { return 1; } bool connectionUpgrade = false; bool upgradeWebSocket = false; size_t wsAcceptLen; unsigned char *wsAccept = NULL; bool chunked = false; for(size_t i = 0; i < num_headers; i++) { if(strncmp(headers[i].name, "Upgrade", headers[i].name_len) == 0 && strncmp(headers[i].value, "websocket", headers[i].value_len) == 0) { upgradeWebSocket = true; } else if(strncmp(headers[i].name, "Connection", headers[i].name_len) == 0 && memmem(headers[i].value, headers[i].value_len, "Upgrade", 7)) { connectionUpgrade = true; } else if(strncmp(headers[i].name, "Transfer-Encoding", headers[i].name_len) == 0 && strncmp(headers[i].value, "chunked", headers[i].value_len) == 0) { chunked = true; } else if(strncmp(headers[i].name, "Sec-WebSocket-Key", headers[i].name_len) == 0) { size_t acceptbufsz = headers[i].value_len + 36; char acceptbuf[acceptbufsz]; memcpy(acceptbuf, headers[i].value, headers[i].value_len); memcpy(acceptbuf + headers[i].value_len, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", 36); char sha1bin[20]; char sha1hex[41]; sha1digest(sha1bin, sha1hex, acceptbuf, acceptbufsz); wsAcceptLen = BASE64_ENCODE_OUT_SIZE(sizeof(sha1bin)); wsAccept = malloc(wsAcceptLen); base64_encode(sha1bin, sizeof(sha1bin), wsAccept); } } if(path_len == strlen(ValidStreamPath) && strncmp(path, ValidStreamPath, path_len) == 0) { cli->type = CLI_STREAMER; cli->state = ACTIVE; if(upgradeWebSocket || connectionUpgrade || !chunked) { return 0; } printf("New streamer client\n"); } else { cli->type = CLI_VIEWER; cli->state = WEBSOCKET; if(!upgradeWebSocket || !connectionUpgrade || chunked || !wsAccept) { return 0; } char buf[1024]; int bufnum = snprintf(buf, sizeof(buf), "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: %.*s\r\n\r\n", (int) wsAcceptLen, wsAccept); free(wsAccept); transmit(cli, buf, bufnum); printf("New WS client\n"); if(Stream.state == STREAMING && Stream.mkvHeader) { printf("Sending header\n"); ws_send(cli, Stream.mkvHeader, Stream.mkvHeaderSz); } } consume(cli, pret); cli->prevlen = 0; } else if(cli->state == ACTIVE) { size_t rsize = cli->len; int pret = phr_decode_chunked(&cli->chudec, cli->buf, &rsize); if(pret == -1) return 0; stream_step(cli->buf, rsize); cli->len = 0; if(pret == -2) { return 1; } } else if(cli->state == WEBSOCKET) { if(cli->len < 2) return 1; uint8_t framehdr = cli->buf[0]; bool fin = framehdr & 128; int opcode = framehdr & 15; if(cli->ws.opcode == 0 && opcode) { cli->ws.opcode = opcode; } size_t payloadSz = 0; int i; uint8_t payload0 = cli->buf[1] & 127; if(payload0 < 126) { payloadSz = payload0; i = 2; } else if(payload0 == 126) { if(cli->len < 4) return 1; payloadSz = (cli->buf[2] << 8) + cli->buf[3]; i = 4; } else if(payload0 == 127) { if(cli->len < 10) return 1; payloadSz = ((uint64_t) cli->buf[2] << 56) + ((uint64_t) cli->buf[3] << 48) + ((uint64_t) cli->buf[4] << 40) + ((uint64_t) cli->buf[5] << 32) + ((uint64_t) cli->buf[6] << 24) + ((uint64_t) cli->buf[7] << 16) + ((uint64_t) cli->buf[8] << 8) + ((uint64_t) cli->buf[9] << 0); i = 10; } if(payloadSz > 100) { // Literally just kick return 0; } if(cli->len < i + 4 + payloadSz) { return 1; } uint8_t mask[4] = {cli->buf[i], cli->buf[i + 1], cli->buf[i + 2], cli->buf[i + 3]}; for(size_t b = 0; b < payloadSz; b++) { cli->buf[i + 4 + b] ^= mask[b % 4]; } cli->ws.incoming = realloc(cli->ws.incoming, cli->ws.incomingSz + payloadSz); memcpy(cli->ws.incoming + cli->ws.incomingSz, cli->buf + i + 4, payloadSz); if(fin) { receive_ws(cli); if(cli->ws.opcode == WS_CLOSE) { return 0; } cli->ws.incomingSz = 0; cli->ws.opcode = 0; } } } return 1; } static void rem_cli(Client *cli) { for(size_t i = 0; i < clientsSz; i++) { if(clients[i] == cli) { memmove(clients + i, clients + i + 1, sizeof(*clients) * (clientsSz - i - 1)); clientsSz--; return; } } } static Socket ServSock; static EPoll EP; static int Argc; static char **Argv; static const char *get_arg(const char *key, const char *def) { int z = strlen(key); for(size_t i = 1; i < Argc; i++) { if(strlen(Argv[i]) > z && strstr(Argv[i], key) == Argv[i] && Argv[i][z] == '=') { return Argv[i] + z + 1; } } return def; } static bool get_arg_bool(const char *key) { const char *val = get_arg(key, "0"); return strtol(val, NULL, 0); } int main(int argc, char **argv) { Argc = argc, Argv = argv; const char *streamkey = get_arg("key", NULL); if(!streamkey) { puts("Missing stream key parameter key=..."); return 0; } ValidStreamPath = calloc(1, 6 + strlen(streamkey) + 1); strcat(ValidStreamPath, "/push/"); strcat(ValidStreamPath, streamkey); #ifdef _WIN32 WSAStartup(MAKEWORD(2, 2), &(WSADATA) {}); ServSock = socket(AF_INET6, SOCK_STREAM, 0); ioctlsocket(ServSock, FIONBIO, &(u_long) {1}); #else ServSock = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0); #endif EP = epoll_create1(0); assert(ServSock != -1); if(get_arg_bool("reuseaddr")) { setsockopt(ServSock, SOL_SOCKET, SO_REUSEADDR, &(int) {1}, sizeof(int)); } setsockopt(ServSock, IPPROTO_IPV6, IPV6_V6ONLY, &(int) {0}, sizeof(int)); struct addrinfo *res = NULL; assert(getaddrinfo(NULL, get_arg("port", "25404"), &(struct addrinfo) {.ai_flags = AI_PASSIVE, .ai_family = AF_INET6}, &res) == 0); assert(bind(ServSock, res->ai_addr, res->ai_addrlen) >= 0); freeaddrinfo(res); assert(listen(ServSock, 16) >= 0); epoll_ctl(EP, EPOLL_CTL_ADD, ServSock, &(struct epoll_event) {.events = EPOLLIN | EPOLLOUT, .data = {.fd = ServSock}}); while(1) { #define BUFSZ 8192 char buf[BUFSZ]; #define EPOLL_EVS 2048 struct epoll_event events[EPOLL_EVS]; int nfds = epoll_wait(EP, events, EPOLL_EVS, -1); for(int i = 0; i < nfds; i++) { if(events[i].data.fd == ServSock) { struct sockaddr_storage addr; socklen_t addrlen; Socket clisock = accept(ServSock, (struct sockaddr*) &addr, &addrlen); #ifdef _WIN32 ioctlsocket(clisock, FIONBIO, &(u_long) {1}); #else if(fcntl(clisock, F_SETFL, fcntl(clisock, F_GETFL, 0) | O_NONBLOCK) == -1) { closesocket(clisock); continue; } #endif Client *cli = calloc(1, sizeof(*cli)); cli->fd = clisock; cli->len = 0; cli->buf = malloc(cli->cap = 8192); epoll_ctl(EP, EPOLL_CTL_ADD, clisock, &(struct epoll_event) {.events = EPOLLIN | EPOLLRDHUP | EPOLLHUP, .data = {.ptr = cli}}); clients = realloc(clients, sizeof(*clients) * (clientsSz + 1)); clients[clientsSz++] = cli; } else { Client *cli = events[i].data.ptr; bool forceclose = 0; if(events[i].events & EPOLLIN) { while(1) { ssize_t readcount = recv(cli->fd, buf, sizeof(buf), 0); if(readcount <= 0) { if(errno != EAGAIN && errno != EWOULDBLOCK) { forceclose = true; } break; } if(cli->len + readcount > cli->cap) { cli->buf = realloc(cli->buf, cli->cap = ((cli->len + readcount + 4095) & ~4095)); } memcpy(cli->buf + cli->len, buf, readcount); cli->prevlen = cli->len; cli->len += readcount; } if(handle(cli) == 0) { forceclose = true; } } if(forceclose || (events[i].events & (EPOLLRDHUP | EPOLLHUP))) { epoll_ctl(EP, EPOLL_CTL_DEL, cli->fd, NULL); closesocket(cli->fd); rem_cli(cli); free(cli->buf); free(cli->ws.incoming); free(cli); printf("Client left, now at %lu\n", clientsSz); } } } } }