#include"mongoose.h" #include #include #include #include typedef enum { LOADING_HEADER, STREAMING, } State; static const uint8_t *STATE_CHANGE_STRING[] = { [LOADING_HEADER] = "\x1F\x43\xB6\x75", [STREAMING] = "\x1A\x45\xDF\xA3", }; static State state = STREAMING; static int stateChangeIdx; static char *header; static size_t headerSize; static struct mg_connection *streamerConnected = NULL; static struct { char *wslisten; char *tcplisten; char *tlscert; char *tlsca; } settings; static void ws_broadcast(struct mg_mgr *mgr, const char *data, size_t len) { for(struct mg_connection *cli = mgr->conns; cli; cli = cli->next) { if(cli->is_websocket) { mg_ws_send(cli, data, len, WEBSOCKET_OP_BINARY); } } } static void fn(struct mg_connection *c, int ev, void *ev_data) { if(ev == MG_EV_ACCEPT) { if(mg_url_is_ssl(c->is_websocket ? settings.wslisten : settings.tcplisten)) { struct mg_tls_opts opts = {.ca = mg_unpacked(settings.tlsca), .cert = mg_unpacked(settings.tlscert), .key = mg_unpacked(settings.tlscert)}; mg_tls_init(c, &opts); } } else if(ev == MG_EV_CLOSE) { if(c == streamerConnected) { streamerConnected = NULL; } } else if(ev == MG_EV_HTTP_MSG) { struct mg_http_message *hm = (struct mg_http_message *) ev_data; mg_ws_upgrade(c, hm, NULL); } else if(ev == MG_EV_WS_OPEN) { if(state == STREAMING && header) { mg_ws_send(c, header, headerSize, WEBSOCKET_OP_BINARY); } } else if(ev == MG_EV_WS_MSG) { // Incoming WS messages are ignored. } else if(ev == MG_EV_READ) { if(!c->is_websocket) { if(streamerConnected && streamerConnected != c) { c->is_closing = 1; return; } else { streamerConnected = c; } } else return; struct mg_iobuf *r = &c->recv; if(state == LOADING_HEADER) { header = realloc(header, headerSize + r->len); memcpy(header + headerSize, r->buf, r->len); headerSize += r->len; char *clusterEl = memmem(header, headerSize, "\x1F\x43\xB6\x75", 4); if(clusterEl) { ws_broadcast(c->mgr, header, clusterEl - header); ws_broadcast(c->mgr, clusterEl, header + headerSize - clusterEl); headerSize = clusterEl - header; state = STREAMING; } } else { int i; for(i = 0; i < r->len; i++) { if(r->buf[i] == STATE_CHANGE_STRING[state][stateChangeIdx]) { stateChangeIdx++; if(stateChangeIdx == strlen(STATE_CHANGE_STRING[state])) { i++; stateChangeIdx = 0; state = LOADING_HEADER; break; } } else { stateChangeIdx = 0; } } if(state == LOADING_HEADER) { if(i > 4) { ws_broadcast(c->mgr, r->buf, i - 4); } header = realloc(header, headerSize = 4 + (r->len - i)); memcpy(header, STATE_CHANGE_STRING[STREAMING], 4); memcpy(header + 4, r->buf + i, r->len - i); } else { ws_broadcast(c->mgr, r->buf, r->len); } } r->len = 0; } } int main(int argc, char **argv) { int help = 0, err = 0; int c; while((c = getopt(argc, argv, "a:c:i:o:h")) != -1) { if(c == 'i') { settings.tcplisten = optarg; } else if(c == 'o') { settings.wslisten = optarg; } else if(c == 'a') { settings.tlsca = optarg; } else if(c == 'c') { settings.tlscert = optarg; } else if(c == 'h') { help = 1; } } if(help) { fprintf(stderr, "Example usage: %s [-c /path/to/cert.pem] [-a /path/to/certauthority.pem] [-h] <-i tcp://[::]:1234> <-o ws://[::]:8000>\n", argv[0]); return; } if(!settings.wslisten) { fputs("Missing -o parameter. Try -h for help.\n", stderr); err = 1; } if(!settings.tcplisten) { fputs("Missing -i parameter. Try -h for help\n", stderr); err = 1; } if(err) { return err; } struct mg_mgr mgr; mg_mgr_init(&mgr); mg_listen(&mgr, settings.tcplisten, fn, NULL); mg_http_listen(&mgr, settings.wslisten, fn, NULL); for (;;) mg_mgr_poll(&mgr, 1000); mg_mgr_free(&mgr); return 0; }