wip: pluggable transport abstraction layer

Add rist_transport_set() API to replace raw BSD socket calls with a
pluggable vtable (sendto/recvfrom/sendmsg/poll). Enables non-socket
transports (e.g. WebAssembly, custom I/O).

Also includes wasm32-emscripten Meson cross-compilation file.

Work in progress — not yet tested or integrated with tools.
This commit is contained in:
Sergio Ammirata
2026-05-27 13:50:03 -04:00
parent 205c113c05
commit 6d1133bdea
16 changed files with 391 additions and 31 deletions
+1 -1
View File
@@ -53,7 +53,7 @@
#define be64toh(x) OSSwapBigToHostInt64(x)
#define le64toh(x) OSSwapLittleToHostInt64(x)
#elif defined(__linux__) || defined(__GNU__)
#elif defined(__linux__) || defined(__GNU__) || defined(__EMSCRIPTEN__)
# include <endian.h>
# if !defined(htobe64)
# if __BYTE_ORDER == __LITTLE_ENDIAN
+1
View File
@@ -21,6 +21,7 @@
#include "headers.h"
#include "tun.h"
#include "tunnel.h"
#include "transport.h"
#ifdef __cplusplus
extern "C" {
+1
View File
@@ -34,6 +34,7 @@ if should_install
'sender.h',
'stats.h',
'tun.h',
'transport.h',
'tunnel.h',
'udpsocket.h',
'urlparam.h',
+100
View File
@@ -0,0 +1,100 @@
/*
* Copyright © 2024 SipRadius LLC
* All rights reserved.
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#ifndef LIBRIST_TRANSPORT_H
#define LIBRIST_TRANSPORT_H
#include "common.h"
#include <stddef.h>
#include <stdint.h>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <sys/types.h>
#include <sys/socket.h>
#include <poll.h>
#endif
#ifdef __cplusplus
extern "C" {
#endif
struct rist_ctx;
/**
* @brief Transport operations vtable for abstracting network I/O.
*
* All function pointers follow POSIX socket semantics. The default
* implementation (used when no custom ops are installed) delegates
* directly to the real syscalls, so existing behaviour is unchanged.
*
* For non-POSIX backends (e.g. WebAssembly/WebTransport, unit-test
* harnesses, userspace network stacks) the application provides its
* own implementations via rist_transport_set().
*
* @a opaque is passed as the first argument to every callback and
* can point to arbitrary per-context state owned by the backend.
*/
struct rist_transport_ops {
void *opaque;
/**
* Send a datagram to a specific destination.
* Semantics match POSIX sendto(2) — returns bytes sent or -1.
*/
ssize_t (*sendto)(void *opaque, int fd, const void *buf, size_t len,
int flags, const struct sockaddr *addr, socklen_t addrlen);
/**
* Receive a datagram, capturing the source address.
* Semantics match POSIX recvfrom(2) — returns bytes received or -1.
*/
ssize_t (*recvfrom)(void *opaque, int fd, void *buf, size_t len,
int flags, struct sockaddr *addr, socklen_t *addrlen);
/**
* Wait for I/O readiness on a set of descriptors.
* Semantics match POSIX poll(2) — returns the number of ready
* descriptors, 0 on timeout, or -1 on error.
*/
int (*poll)(void *opaque, struct pollfd *fds, int nfds, int timeout_ms);
/**
* Scatter-gather send (optional).
* Semantics match POSIX sendmsg(2). If NULL, the library
* linearises the iov and falls back to the sendto callback.
*/
#ifndef _WIN32
ssize_t (*sendmsg)(void *opaque, int fd, const struct msghdr *msg,
int flags);
#endif
};
/**
* @brief Install custom transport operations on a RIST context.
*
* Must be called after rist_sender_create() / rist_receiver_create()
* but BEFORE rist_start(). The library takes a shallow copy of @a ops;
* the caller must keep the pointees (callbacks, opaque) alive for the
* lifetime of @a ctx.
*
* Passing NULL restores the default (POSIX socket) transport.
*
* @param ctx RIST sender or receiver context
* @param ops Transport ops to install, or NULL for defaults
* @return 0 on success, -1 on error (e.g. context already started)
*/
RIST_API int rist_transport_set(struct rist_ctx *ctx,
const struct rist_transport_ops *ops);
#ifdef __cplusplus
}
#endif
#endif /* LIBRIST_TRANSPORT_H */
+19 -9
View File
@@ -161,16 +161,23 @@ void test(void) {
elif host_machine.system() == 'gnu'
test_args += '-D_GNU_SOURCE'
add_project_arguments('-D_GNU_SOURCE', language: 'c')
elif host_machine.system() == 'emscripten'
# No TUN, no platform_files needed
add_project_arguments('-D_GNU_SOURCE', language: 'c')
endif
librist_soversion = librist_soversion
have_clock_gettime = cc.has_function('clock_gettime', prefix : '#include <time.h>', args : test_args)
if not have_clock_gettime and host_machine.system() != 'darwin'
lib_rt = cc.find_library('rt', required: false)
have_clock_gettime = cc.has_function('clock_gettime', prefix : '#include <time.h>', args : test_args, dependencies : lib_rt)
if not have_clock_gettime
error('clock_gettime not found')
if host_machine.system() == 'emscripten'
have_clock_gettime = true
else
have_clock_gettime = cc.has_function('clock_gettime', prefix : '#include <time.h>', args : test_args)
if not have_clock_gettime and host_machine.system() != 'darwin'
lib_rt = cc.find_library('rt', required: false)
have_clock_gettime = cc.has_function('clock_gettime', prefix : '#include <time.h>', args : test_args, dependencies : lib_rt)
if not have_clock_gettime
error('clock_gettime not found')
endif
deps += [ lib_rt ]
endif
deps += [ lib_rt ]
endif
add_project_arguments(['-Wshadow', '-pedantic-errors'], language: 'c')
add_project_arguments(cc.get_supported_arguments([
@@ -184,8 +191,10 @@ void test(void) {
'-Wmaybe-uninitialized',
'-Wno-error=deprecated-declarations'
]), language : 'c')
threads = [ dependency('threads') ]
if host_machine.system() != 'freebsd'
if host_machine.system() != 'emscripten'
threads = [ dependency('threads') ]
endif
if host_machine.system() != 'freebsd' and host_machine.system() != 'emscripten'
add_project_arguments(cc.get_supported_arguments([
'-Watomic-implicit-seq-cst']), language: 'c')
endif
@@ -339,6 +348,7 @@ librist = library('librist',
'src/mpegts.c',
'src/peer.c',
'src/udp.c',
'src/transport.c',
'src/stats.c',
'src/udpsocket.c',
'src/libevsocket.c',
+7 -12
View File
@@ -11,6 +11,7 @@
#include "proto/adv.h"
#include "rist-private.h"
#include "udp-private.h"
#include "transport-private.h"
#include "log-private.h"
#include "proto/rist_time.h"
#include <string.h>
@@ -101,8 +102,7 @@ int rist_adv_send_nack_bitmask(struct rist_peer *peer,
if (total < 0)
return -1;
ssize_t ret = sendto(peer->sd, (const char *)pkt, (size_t)total, 0,
&peer->u.address, peer->address_len);
ssize_t ret = rist_transport_sendto(peer, pkt, (size_t)total, 0);
if (ret < 0)
_librist_log_send_error(peer, errno, (size_t)total, "adv-nack-bitmask sendto");
@@ -156,8 +156,7 @@ int rist_adv_send_nack_range(struct rist_peer *peer,
if (total < 0)
return -1;
ssize_t ret = sendto(peer->sd, (const char *)pkt, (size_t)total, 0,
&peer->u.address, peer->address_len);
ssize_t ret = rist_transport_sendto(peer, pkt, (size_t)total, 0);
if (ret < 0)
_librist_log_send_error(peer, errno, (size_t)total, "adv-nack-range sendto");
@@ -218,8 +217,7 @@ int rist_adv_send_rtt_echo_request(struct rist_peer *peer)
if (total < 0)
return -1;
ssize_t ret = sendto(peer->sd, (const char *)pkt, (size_t)total, 0,
&peer->u.address, peer->address_len);
ssize_t ret = rist_transport_sendto(peer, pkt, (size_t)total, 0);
if (ret < 0)
_librist_log_send_error(peer, errno, (size_t)total, "adv-rtt-echo-req sendto");
@@ -277,8 +275,7 @@ int rist_adv_send_rtt_echo_response(struct rist_peer *peer,
if (total < 0)
return -1;
ssize_t ret = sendto(peer->sd, (const char *)pkt, (size_t)total, 0,
&peer->u.address, peer->address_len);
ssize_t ret = rist_transport_sendto(peer, pkt, (size_t)total, 0);
if (ret < 0)
_librist_log_send_error(peer, errno, (size_t)total, "adv-rtt-echo-resp sendto");
@@ -342,8 +339,7 @@ int rist_adv_send_keepalive(struct rist_peer *peer)
if (total < 0)
return -1;
ssize_t ret = sendto(peer->sd, (const char *)pkt, (size_t)total, 0,
&peer->u.address, peer->address_len);
ssize_t ret = rist_transport_sendto(peer, pkt, (size_t)total, 0);
if (ret < 0)
_librist_log_send_error(peer, errno, (size_t)total, "adv-keepalive sendto");
@@ -405,8 +401,7 @@ int rist_adv_send_unsupported(struct rist_peer *peer,
if (total < 0)
return -1;
ssize_t ret = sendto(peer->sd, (const char *)pkt, (size_t)total, 0,
&peer->u.address, peer->address_len);
ssize_t ret = rist_transport_sendto(peer, pkt, (size_t)total, 0);
if (ret < 0)
_librist_log_send_error(peer, errno, (size_t)total, "adv-unsupported sendto");
+16 -1
View File
@@ -64,6 +64,8 @@ struct evsocket_ctx {
struct evsocket_event *_array;
int giveup;
struct evsocket_ctx *next;
evsocket_poll_func poll_override;
void *poll_opaque;
};
#if !defined(_WIN32) || HAVE_PTHREADS
static pthread_mutex_t ctx_list_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -306,7 +308,10 @@ int evsocket_loop_single(struct evsocket_ctx *ctx, int timeout, int max_events)
goto loop_error;
}
pollret = poll(ctx->pfd, ctx->n_events, timeout);
if (ctx->poll_override)
pollret = ctx->poll_override(ctx->poll_opaque, ctx->pfd, ctx->n_events, timeout);
else
pollret = poll(ctx->pfd, ctx->n_events, timeout);
if (pollret <= 0) {
if (pollret < 0) {
rist_log_priv3( RIST_LOG_ERROR, "libevsocket, evsocket_loop: poll returned %d, n_events = %d, error = %d\n",
@@ -359,6 +364,16 @@ void evsocket_loop_stop(struct evsocket_ctx *ctx)
ctx->giveup = 1;
}
void evsocket_set_poll_override(struct evsocket_ctx *ctx,
evsocket_poll_func func,
void *opaque)
{
if (ctx) {
ctx->poll_override = func;
ctx->poll_opaque = opaque;
}
}
int evsocket_geteventcount(struct evsocket_ctx *ctx)
{
if (ctx)
+13
View File
@@ -10,8 +10,21 @@
#include "common/attributes.h"
#ifdef _WIN32
#include <winsock2.h>
#else
#include <poll.h>
#endif
struct evsocket_event;
struct evsocket_ctx;
typedef int (*evsocket_poll_func)(void *opaque, struct pollfd *fds,
int nfds, int timeout_ms);
RIST_PRIV void evsocket_set_poll_override(struct evsocket_ctx *ctx,
evsocket_poll_func func,
void *opaque);
#endif
+10
View File
@@ -4,6 +4,15 @@
#include <stdint.h>
#include <stdlib.h>
#include <memory.h>
#ifdef __EMSCRIPTEN__
/* WASM has no network interfaces — fill with zeros */
int _librist_network_get_macaddr(uint8_t mac[]) {
memset(mac, 0, 6);
return 0;
}
#else /* !__EMSCRIPTEN__ */
#ifndef _WIN32
#include <ifaddrs.h>
#ifdef __linux__
@@ -105,3 +114,4 @@ int _librist_network_get_macaddr(uint8_t mac[]) {
#endif
return 0;
}
#endif /* !__EMSCRIPTEN__ */
+2 -1
View File
@@ -16,6 +16,7 @@
#include "librist_srp.h"
#include "rist-private.h"
#include "udp-private.h"
#include "transport-private.h"
#include "log-private.h"
#include "proto/rist_time.h"
#include "peer.h"
@@ -942,7 +943,7 @@ static void eap_periodic_impl(struct eapsrp_ctx *ctx)
{
if (ctx->last_pkt)
{
sendto(ctx->peer->sd, (const char *)ctx->last_pkt, ctx->last_pkt_size, 0, &ctx->peer->u.address, ctx->peer->address_len);
rist_transport_sendto(ctx->peer, ctx->last_pkt, ctx->last_pkt_size, 0);
//check
ctx->timeout_retries++;
ctx->last_timestamp = now;
+2 -4
View File
@@ -9,6 +9,7 @@
#include "rist-private.h"
#include "endian-shim.h"
#include "udp-private.h"
#include "transport-private.h"
#include "eap.h"
#include "peer.h"
@@ -137,9 +138,7 @@ ssize_t _librist_proto_gre_send_data(struct rist_peer *p, uint8_t payload_type,
ssize_t ret;
int errorcode = 0;
//TODO: abstract this away
#ifndef _WIN32
//TODO: this is POSIX only: add windows equivalent
struct msghdr msghdr;
struct iovec iov[2];
iov[0].iov_base = hdr_buf;
@@ -153,10 +152,9 @@ ssize_t _librist_proto_gre_send_data(struct rist_peer *p, uint8_t payload_type,
msghdr.msg_control = NULL;
msghdr.msg_controllen = 0;
msghdr.msg_flags = 0;
// retry when kernel buffer is full instead of dropping packet (EAGAIN)
int retries = 0;
do {
ret = sendmsg(p->sd, &msghdr, MSG_DONTWAIT);
ret = rist_transport_sendmsg(p, &msghdr, MSG_DONTWAIT);
if (RIST_UNLIKELY(ret < 0)) {
errorcode = errno;
retries++;
+5
View File
@@ -25,6 +25,7 @@
#include "socket-shim.h"
#include "libevsocket.h"
#include "librist.h"
#include "librist/transport.h"
#include "udpsocket.h"
#include "crypto/psk.h"
#include <errno.h>
@@ -365,6 +366,10 @@ struct rist_common_ctx {
bool debug;
uint32_t birthtime_rtp_offset;
/* Pluggable transport (default: POSIX sockets) */
struct rist_transport_ops transport;
bool transport_active;
/* Connection status callback */
connection_status_callback_t connection_status_callback;
void *connection_status_callback_argument;
+45
View File
@@ -0,0 +1,45 @@
/* librist. Copyright © 2024 SipRadius LLC. All right reserved.
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#ifndef RIST_TRANSPORT_PRIVATE_H
#define RIST_TRANSPORT_PRIVATE_H
#include "common/attributes.h"
#include "librist/transport.h"
#include <stddef.h>
struct rist_peer;
struct rist_common_ctx;
/**
* Wrappers that route through the transport ops vtable when one is
* installed, falling back to the real POSIX syscalls otherwise.
* These are the ONLY functions that should touch the wire in the
* library hot-path raw sendto/recvfrom/poll/sendmsg calls must
* not appear anywhere else.
*/
RIST_PRIV ssize_t rist_transport_sendto(struct rist_peer *peer,
const void *buf, size_t len,
int flags);
RIST_PRIV ssize_t rist_transport_recvfrom(struct rist_peer *peer,
void *buf, size_t len,
int flags,
struct sockaddr *addr,
socklen_t *addrlen);
RIST_PRIV int rist_transport_poll(struct rist_common_ctx *ctx,
struct pollfd *fds, int nfds,
int timeout_ms);
#ifndef _WIN32
RIST_PRIV ssize_t rist_transport_sendmsg(struct rist_peer *peer,
const struct msghdr *msg,
int flags);
#endif
#endif /* RIST_TRANSPORT_PRIVATE_H */
+145
View File
@@ -0,0 +1,145 @@
/* librist. Copyright © 2024 SipRadius LLC. All right reserved.
*
* SPDX-License-Identifier: BSD-2-Clause
*/
#include "transport-private.h"
#include "rist-private.h"
#include "libevsocket.h"
#include "librist/transport.h"
#ifndef _WIN32
#include <sys/socket.h>
#include <poll.h>
#else
#include <winsock2.h>
#endif
#include <string.h>
#include <errno.h>
/* ------------------------------------------------------------------ */
/* Public API */
/* ------------------------------------------------------------------ */
int rist_transport_set(struct rist_ctx *ctx,
const struct rist_transport_ops *ops)
{
struct rist_common_ctx *cctx = rist_struct_get_common(ctx);
if (!cctx)
return -1;
if (atomic_load_explicit(&cctx->startup_complete, memory_order_acquire)) {
return -1;
}
if (ops) {
cctx->transport = *ops;
cctx->transport_active = true;
if (cctx->evctx && ops->poll)
evsocket_set_poll_override(cctx->evctx, ops->poll,
ops->opaque);
} else {
memset(&cctx->transport, 0, sizeof(cctx->transport));
cctx->transport_active = false;
if (cctx->evctx)
evsocket_set_poll_override(cctx->evctx, NULL, NULL);
}
return 0;
}
/* ------------------------------------------------------------------ */
/* Internal wrappers */
/* ------------------------------------------------------------------ */
ssize_t rist_transport_sendto(struct rist_peer *peer,
const void *buf, size_t len,
int flags)
{
struct rist_common_ctx *cctx = get_cctx(peer);
if (cctx->transport_active && cctx->transport.sendto) {
return cctx->transport.sendto(cctx->transport.opaque,
peer->sd, buf, len, flags,
&peer->u.address,
peer->address_len);
}
return sendto(peer->sd, (const char *)buf, len, flags,
&peer->u.address, peer->address_len);
}
ssize_t rist_transport_recvfrom(struct rist_peer *peer,
void *buf, size_t len,
int flags,
struct sockaddr *addr,
socklen_t *addrlen)
{
struct rist_common_ctx *cctx = get_cctx(peer);
if (cctx->transport_active && cctx->transport.recvfrom) {
return cctx->transport.recvfrom(cctx->transport.opaque,
peer->sd, buf, len, flags,
addr, addrlen);
}
return recvfrom(peer->sd, (char *)buf, len, flags, addr, addrlen);
}
int rist_transport_poll(struct rist_common_ctx *ctx,
struct pollfd *fds, int nfds,
int timeout_ms)
{
if (ctx->transport_active && ctx->transport.poll) {
return ctx->transport.poll(ctx->transport.opaque,
fds, nfds, timeout_ms);
}
return poll(fds, nfds, timeout_ms);
}
#ifndef _WIN32
ssize_t rist_transport_sendmsg(struct rist_peer *peer,
const struct msghdr *msg,
int flags)
{
struct rist_common_ctx *cctx = get_cctx(peer);
if (cctx->transport_active && cctx->transport.sendmsg) {
return cctx->transport.sendmsg(cctx->transport.opaque,
peer->sd, msg, flags);
}
/* If custom transport is active but sendmsg is NULL, linearise
* and fall back to the sendto callback. */
if (cctx->transport_active && cctx->transport.sendto) {
size_t total = 0;
for (size_t i = 0; i < (size_t)msg->msg_iovlen; i++)
total += msg->msg_iov[i].iov_len;
uint8_t stackbuf[RIST_MAX_PACKET_SIZE + 128];
uint8_t *flat = (total <= sizeof(stackbuf)) ? stackbuf : malloc(total);
if (!flat)
return -1;
size_t off = 0;
for (size_t i = 0; i < (size_t)msg->msg_iovlen; i++) {
memcpy(flat + off, msg->msg_iov[i].iov_base,
msg->msg_iov[i].iov_len);
off += msg->msg_iov[i].iov_len;
}
ssize_t ret = cctx->transport.sendto(
cctx->transport.opaque, peer->sd,
flat, total, flags,
msg->msg_name, msg->msg_namelen);
if (flat != stackbuf)
free(flat);
return ret;
}
return sendmsg(peer->sd, msg, flags);
}
#endif
+3 -3
View File
@@ -21,6 +21,7 @@
#endif
#include "crypto/psk.h"
#include "mpegts.h"
#include "transport-private.h"
#include <stdlib.h>
#include <stddef.h>
#include <errno.h>
@@ -119,8 +120,7 @@ size_t rist_send_seq_rtcp(struct rist_peer *p, uint32_t seq_rtp, uint8_t payload
int retries_count = 0;
int errorcode = 0;
do {
ret = sendto(p->sd, (const char *)adv_buf, (size_t)total, 0,
&(p->u.address), p->address_len);
ret = rist_transport_sendto(p, adv_buf, (size_t)total, 0);
if (RIST_UNLIKELY(ret < 0)) {
errorcode = errno;
retries_count++;
@@ -213,7 +213,7 @@ adv_out:
if (ctx->profile == RIST_PROFILE_SIMPLE) {
// retry when kernel buffer is full instead of dropping packet (EAGAIN)
do {
ret = sendto(p->sd,(const char*)data, len, 0, &(p->u.address), p->address_len);
ret = rist_transport_sendto(p, data, len, 0);
if (RIST_UNLIKELY(ret < 0))
{
errorcode = errno;
+21
View File
@@ -0,0 +1,21 @@
[binaries]
c = 'emcc'
cpp = 'em++'
ar = 'emar'
ranlib = 'emranlib'
strip = '/usr/bin/true'
pkgconfig = '/usr/bin/false'
[built-in options]
c_args = ['-pthread']
c_link_args = ['-pthread']
[properties]
needs_exe_wrapper = true
sys_root = ''
[host_machine]
system = 'emscripten'
cpu_family = 'wasm32'
cpu = 'wasm32'
endian = 'little'