mwss/main.c

168 lines
3.9 KiB
C
Raw Normal View History

2024-12-21 22:43:09 +02:00
#include"mongoose.h"
#include<getopt.h>
#include<stdio.h>
#include<string.h>
#include<stdbool.h>
typedef enum {
LOADING_HEADER,
STREAMING,
} State;
static const uint8_t *STATE_CHANGE_STRING[] = {
[LOADING_HEADER] = "\x1F\x43\xB6\x75",
[STREAMING] = "\x1A\x45\xDF\xA3",
};
static State state = STREAMING;
static int stateChangeIdx;
static char *header;
static size_t headerSize;
static struct mg_connection *streamerConnected = NULL;
static struct {
char *wslisten;
char *tcplisten;
char *tlscert;
char *tlsca;
} settings;
static void ws_broadcast(struct mg_mgr *mgr, const char *data, size_t len) {
for(struct mg_connection *cli = mgr->conns; cli; cli = cli->next) {
if(cli->is_websocket) {
mg_ws_send(cli, data, len, WEBSOCKET_OP_BINARY);
}
}
}
static void fn(struct mg_connection *c, int ev, void *ev_data) {
if(ev == MG_EV_ACCEPT) {
if(mg_url_is_ssl(c->is_websocket ? settings.wslisten : settings.tcplisten)) {
struct mg_tls_opts opts = {.ca = mg_unpacked(settings.tlsca), .cert = mg_unpacked(settings.tlscert), .key = mg_unpacked(settings.tlscert)};
mg_tls_init(c, &opts);
}
} else if(ev == MG_EV_CLOSE) {
if(c == streamerConnected) {
streamerConnected = NULL;
}
} else if(ev == MG_EV_HTTP_MSG) {
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
mg_ws_upgrade(c, hm, NULL);
} else if(ev == MG_EV_WS_OPEN) {
if(state == STREAMING && header) {
mg_ws_send(c, header, headerSize, WEBSOCKET_OP_BINARY);
}
} else if(ev == MG_EV_WS_MSG) {
// Incoming WS messages are ignored.
} else if(ev == MG_EV_READ) {
if(!c->is_websocket) {
if(streamerConnected && streamerConnected != c) {
c->is_closing = 1;
} else {
streamerConnected = c;
}
} else return;
struct mg_iobuf *r = &c->recv;
if(state == LOADING_HEADER) {
header = realloc(header, headerSize + r->len);
memcpy(header + headerSize, r->buf, r->len);
headerSize += r->len;
char *clusterEl = memmem(header, headerSize, "\x1F\x43\xB6\x75", 4);
if(clusterEl) {
ws_broadcast(c->mgr, header, clusterEl - header);
ws_broadcast(c->mgr, clusterEl, header + headerSize - clusterEl);
headerSize = clusterEl - header;
state = STREAMING;
}
} else {
int i;
for(i = 0; i < r->len; i++) {
if(r->buf[i] == STATE_CHANGE_STRING[state][stateChangeIdx]) {
stateChangeIdx++;
if(stateChangeIdx == strlen(STATE_CHANGE_STRING[state])) {
i++;
stateChangeIdx = 0;
state = LOADING_HEADER;
break;
}
} else {
stateChangeIdx = 0;
}
}
if(state == LOADING_HEADER) {
if(i > 4) {
ws_broadcast(c->mgr, r->buf, i - 4);
}
header = realloc(header, headerSize = 4 + (r->len - i));
memcpy(header, STATE_CHANGE_STRING[STREAMING], 4);
memcpy(header + 4, r->buf + i, r->len - i);
} else {
ws_broadcast(c->mgr, r->buf, r->len);
}
}
r->len = 0;
}
}
int main(int argc, char **argv) {
int help = 0, err = 0;
int c;
while((c = getopt(argc, argv, "a:c:i:o:h")) != -1) {
if(c == 'i') {
settings.tcplisten = optarg;
} else if(c == 'o') {
settings.wslisten = optarg;
} else if(c == 'a') {
settings.tlsca = optarg;
} else if(c == 'c') {
settings.tlscert = optarg;
} else if(c == 'h') {
help = 1;
}
}
if(help) {
fprintf(stderr, "Example usage: %s [-c /path/to/cert.pem] [-a /path/to/certauthority.pem] [-h] <-i tcp://[::]:1234> <-o ws://[::]:8000>\n", argv[0]);
return;
}
if(!settings.wslisten) {
fputs("Missing -o parameter. Try -h for help.\n", stderr);
err = 1;
}
if(!settings.tcplisten) {
fputs("Missing -i parameter. Try -h for help\n", stderr);
err = 1;
}
if(err) {
return err;
}
struct mg_mgr mgr;
mg_mgr_init(&mgr);
mg_listen(&mgr, settings.tcplisten, fn, NULL);
mg_http_listen(&mgr, settings.wslisten, fn, NULL);
for (;;) mg_mgr_poll(&mgr, 1000);
mg_mgr_free(&mgr);
return 0;
}