feat(stats): expose RIST profile and Advanced framing

The stats output gave no way to tell which RIST profile a flow used or
whether Advanced framing was active on the wire. Add `profile` and
`advanced_active` to the sender peer stats, and `profile`, `seq_bits`, and
`advanced_active` to the receiver flow stats, with matching fields in the
JSON payloads. Bump RIST_STATS_VERSION to 3 and the JSON schema version to 4.

The Prometheus exporter emits new `rist_client_flow_info` and
`rist_sender_peer_info` series that carry these as labels, so a scrape can
identify an Advanced flow (and 16-bit vs 32-bit framing) without changing the
label set of the existing series. On the sender, advanced_active reflects
whether Advanced framing is in use toward the peer; on the receiver flow it
is true only when the context is Advanced and the flow is on 32-bit framing,
so it reads false when an Advanced context stays on Main framing because the
peer has not advertised Advanced support (TR-06-3 Section 9) and false for
Main and Simple contexts.
This commit is contained in:
Sergio Ammirata
2026-06-21 12:27:27 -04:00
parent 3bcf28caf7
commit 4d55974da5
3 changed files with 125 additions and 2 deletions
+16 -1
View File
@@ -36,6 +36,12 @@ struct rist_stats_sender_peer
uint64_t sent_bytes;
/* total bytes retransmitted */
uint64_t retransmitted_bytes;
/* context profile (enum rist_profile: 0 simple, 1 main, 2 advanced) */
uint8_t profile;
/* Non-zero when Advanced framing is active on the wire. An Advanced
* context uses Main framing until the peer advertises Advanced support
* (TR-06-3 Section 9), so profile alone does not imply Advanced framing. */
uint8_t advanced_active;
};
struct rist_stats_receiver_peer
@@ -102,6 +108,15 @@ struct rist_stats_receiver_flow
uint64_t avg_buffer_time;
/* total data bytes received (payload only, excluding headers) */
uint64_t received_bytes;
/* context profile (enum rist_profile: 0 simple, 1 main, 2 advanced) */
uint8_t profile;
/* on-wire sequence-number width: 16 (Simple/Main framing) or 32
* (Advanced framing). A flow reads 16 until the source upgrades framing. */
uint8_t seq_bits;
/* non-zero when Advanced framing is active on this flow (Advanced context
* on 32-bit framing); reads 0 when an Advanced context stays on Main
* framing and for Main/Simple contexts. */
uint8_t advanced_active;
};
enum rist_stats_type
@@ -110,7 +125,7 @@ enum rist_stats_type
RIST_STATS_RECEIVER_FLOW
};
#define RIST_STATS_VERSION (2)
#define RIST_STATS_VERSION (3)
#define RIST_SENDER_STATS_VERSION (0)
struct rist_stats
+25 -1
View File
@@ -15,7 +15,7 @@
#include "cjson/cJSON.h"
/* Bump on any incompatible shape change to the stats JSON payloads. */
#define RIST_STATS_JSON_SCHEMA_VERSION 3
#define RIST_STATS_JSON_SCHEMA_VERSION 4
static double round_two_digits(double number)
{
@@ -23,6 +23,16 @@ static double round_two_digits(double number)
return (double)(new_number) / 100;
}
static const char *rist_profile_name(int profile)
{
switch (profile) {
case RIST_PROFILE_SIMPLE: return "simple";
case RIST_PROFILE_MAIN: return "main";
case RIST_PROFILE_ADVANCED: return "advanced";
default: return "unknown";
}
}
void rist_sender_flow_statistics(struct rist_sender *ctx)
{
cJSON *stats = cJSON_CreateObject();
@@ -112,6 +122,9 @@ cJSON *rist_sender_peer_statistics(struct rist_peer *peer)
cJSON_AddNumberToObject(peer_obj, "id", peer->adv_peer_id);
cJSON_AddStringToObject(peer_obj, "cname", peer->receiver_name);
cJSON_AddStringToObject(peer_obj, "type", peer->is_data ? "data" : "rtcp");
cJSON_AddNumberToObject(peer_obj, "profile", cctx->profile);
cJSON_AddStringToObject(peer_obj, "profile_name", rist_profile_name(cctx->profile));
cJSON_AddBoolToObject(peer_obj, "advanced_active", peer->is_advanced ? 1 : 0);
if (peer->miface[0])
cJSON_AddStringToObject(peer_obj, "miface", peer->miface);
cJSON *json_stats = cJSON_AddObjectToObject(peer_obj, "stats");
@@ -157,6 +170,8 @@ cJSON *rist_sender_peer_statistics(struct rist_peer *peer)
stats_container->stats.sender_peer.rtt = avg_rtt / RIST_CLOCK;
stats_container->stats.sender_peer.sent_bytes = peer->stats_sender_instant.sent_bytes;
stats_container->stats.sender_peer.retransmitted_bytes = peer->stats_sender_instant.retransmitted_bytes;
stats_container->stats.sender_peer.profile = (uint8_t)cctx->profile;
stats_container->stats.sender_peer.advanced_active = peer->is_advanced ? 1 : 0;
if (cctx->stats_callback != NULL)
cctx->stats_callback(cctx->stats_callback_argument, stats_container);
@@ -199,6 +214,11 @@ void rist_receiver_flow_statistics(struct rist_receiver *ctx, struct rist_flow *
cJSON *flow_obj = cJSON_AddObjectToObject(stats_obj, "flowinstant");
cJSON_AddNumberToObject(flow_obj, "flow_id", flow->flow_id);
cJSON_AddNumberToObject(flow_obj, "dead", flow->dead);
cJSON_AddNumberToObject(flow_obj, "profile", ctx->common.profile);
cJSON_AddStringToObject(flow_obj, "profile_name", rist_profile_name(ctx->common.profile));
cJSON_AddNumberToObject(flow_obj, "seq_bits", flow->short_seq ? 16 : 32);
cJSON_AddBoolToObject(flow_obj, "advanced_active",
(ctx->common.profile >= RIST_PROFILE_ADVANCED && !flow->short_seq) ? 1 : 0);
cJSON *json_stats = cJSON_AddObjectToObject(flow_obj, "stats");
cJSON *peers = cJSON_AddArrayToObject(flow_obj, "peers");
uint32_t flow_rtt = 0;
@@ -372,6 +392,10 @@ void rist_receiver_flow_statistics(struct rist_receiver *ctx, struct rist_flow *
stats_container->stats.receiver_flow.max_inter_packet_spacing = flow->stats_instant.max_ips;
stats_container->stats.receiver_flow.rtt = flow->peer_lst_len ? (flow_rtt / flow->peer_lst_len)/RIST_CLOCK : 0;
stats_container->stats.receiver_flow.avg_buffer_time = avg_buffer_duration;
stats_container->stats.receiver_flow.profile = (uint8_t)ctx->common.profile;
stats_container->stats.receiver_flow.seq_bits = flow->short_seq ? 16 : 32;
stats_container->stats.receiver_flow.advanced_active =
(ctx->common.profile >= RIST_PROFILE_ADVANCED && !flow->short_seq) ? 1 : 0;
/* CALLBACK CALL */
if (ctx->common.stats_callback != NULL)
+84
View File
@@ -81,6 +81,11 @@ struct rist_prometheus_client_flow_stats {
int container_count;
int container_offset;
uint32_t flowid;
/* context profile (enum rist_profile), on-wire seq width (16/32), and
* whether Advanced framing is active, for the rist_client_flow_info series. */
uint8_t profile;
uint8_t seq_bits;
uint8_t advanced_active;
char cname[RIST_MAX_STRING_LONG];
};
@@ -113,6 +118,11 @@ struct rist_prometheus_sender_peer_stats {
int container_count;
int container_offset;
/* context profile (enum rist_profile) and whether Advanced framing is
* currently active toward this peer, for the rist_sender_peer_info series. */
uint8_t profile;
uint8_t advanced_active;
char cname[RIST_MAX_STRING_SHORT];
char *url;
char *local_url;
@@ -177,6 +187,17 @@ uint64_t get_timestamp(void) {
return tv.tv_sec;
}
/* Maps enum rist_profile to a label string for the *_info series. Kept local
* to the exporter so it does not depend on a library-internal symbol. */
static const char *prom_profile_name(uint8_t profile) {
switch (profile) {
case 0: return "simple";
case 1: return "main";
case 2: return "advanced";
default: return "unknown";
}
}
#define str(s) #s
#define PROMETHEUS_METRIC(name, help, unit, type) \
"# HELP "str(name)" "help"\n" \
@@ -304,6 +325,55 @@ static int rist_prometheus_format_receiver_peer_stats(struct rist_prometheus_sta
return offset;
}
/* Info series: constant value 1 carrying the flow profile, seq width, and
* advanced_active as labels, so a scrape can identify Advanced flows without
* changing the label set of the existing series. s->tags ends in '}', so
* reprint it without the trailing brace and append the extra labels. */
static int rist_prometheus_format_client_flow_info(struct rist_prometheus_stats *ctx, char *out, int out_size) {
if (ctx->client_cnt == 0)
return 0;
int offset = 0;
int remaining = out_size;
offset += snprintf(out + offset * (out != NULL), remaining,
PROMETHEUS_GAUGE(rist_client_flow_info, "Flow metadata; value is always 1, see profile, seq_bits and advanced_active labels", "info"));
remaining = MAX((out_size - offset), 0);
for (size_t c = 0; c < ctx->client_cnt; c++) {
struct rist_prometheus_client_flow_stats *s = ctx->clients[c];
if (s->container_count == 0)
continue;
size_t tlen = strlen(s->tags);
offset += snprintf(out + offset * (out != NULL), remaining,
"rist_client_flow_info%.*s,profile=\"%s\",seq_bits=\"%u\",advanced_active=\"%u\"} 1\n",
(int)(tlen > 0 ? tlen - 1 : 0), s->tags,
prom_profile_name(s->profile), (unsigned)s->seq_bits,
(unsigned)(s->advanced_active ? 1 : 0));
remaining = MAX((out_size - offset), 0);
}
return offset;
}
static int rist_prometheus_format_sender_peer_info(struct rist_prometheus_stats *ctx, char *out, int out_size) {
if (ctx->sender_peer_cnt == 0)
return 0;
int offset = 0;
int remaining = out_size;
offset += snprintf(out + offset * (out != NULL), remaining,
PROMETHEUS_GAUGE(rist_sender_peer_info, "Sender peer metadata; value is always 1, see profile and advanced_active labels", "info"));
remaining = MAX((out_size - offset), 0);
for (size_t c = 0; c < ctx->sender_peer_cnt; c++) {
struct rist_prometheus_sender_peer_stats *s = ctx->sender_peers[c];
if (s->container_count == 0 || s->tags == NULL)
continue;
size_t tlen = strlen(s->tags);
offset += snprintf(out + offset * (out != NULL), remaining,
"rist_sender_peer_info%.*s,profile=\"%s\",advanced_active=\"%u\"} 1\n",
(int)(tlen > 0 ? tlen - 1 : 0), s->tags,
prom_profile_name(s->profile), (unsigned)(s->advanced_active ? 1 : 0));
remaining = MAX((out_size - offset), 0);
}
return offset;
}
static void rist_prometheus_cleanup_stale_locked(struct rist_prometheus_stats *ctx, uint64_t now);
void rist_prometheus_handle_client_stats(struct rist_prometheus_stats *ctx, const struct rist_stats *stats_container, uint64_t now, uint64_t receiver_id) {
@@ -361,6 +431,10 @@ void rist_prometheus_handle_client_stats(struct rist_prometheus_stats *ctx, cons
}
cJSON_Delete(receiverstats);
s->profile = stats->profile;
s->seq_bits = stats->seq_bits;
s->advanced_active = stats->advanced_active;
s->container[s->container_offset].rist_client_flow_peers = stats->peer_count;
s->container[s->container_offset].rist_client_flow_bandwidth_bps = stats->bandwidth;
s->container[s->container_offset].rist_client_flow_retry_bandwidth_bps = stats->retry_bandwidth;
@@ -543,6 +617,12 @@ void rist_prometheus_handle_sender_peer_stats(struct rist_prometheus_stats *ctx,
ts_nulls_bandwidth = ts_nulls_bandwidth_item->valuedouble;
}
}
cJSON *profile_item = cJSON_GetObjectItem(peer, "profile");
if (cJSON_IsNumber(profile_item))
s->profile = (uint8_t)profile_item->valueint;
cJSON *adv_item = cJSON_GetObjectItem(peer, "advanced_active");
if (cJSON_IsBool(adv_item))
s->advanced_active = cJSON_IsTrue(adv_item) ? 1 : 0;
break;
}
}
@@ -758,6 +838,8 @@ static int rist_prometheus_stats_format(struct rist_prometheus_stats *ctx) {
}
req_size += rist_prometheus_format_sender_peer_stats(ctx, NULL, 0);
req_size += rist_prometheus_format_receiver_peer_stats(ctx, NULL, 0);
req_size += rist_prometheus_format_client_flow_info(ctx, NULL, 0);
req_size += rist_prometheus_format_sender_peer_info(ctx, NULL, 0);
req_size += sizeof(PROMETHEUS_EOF) - 1;
if ((size_t)(req_size+1) > ctx->format_buf_len) {
@@ -768,6 +850,8 @@ static int rist_prometheus_stats_format(struct rist_prometheus_stats *ctx) {
size += rist_prometheus_format_sender_peer_stats(ctx, &ctx->format_buf[size], (int)ctx->format_buf_len - size);
size += rist_prometheus_format_receiver_peer_stats(ctx, &ctx->format_buf[size], (int)ctx->format_buf_len - size);
size += rist_prometheus_format_client_flow_info(ctx, &ctx->format_buf[size], (int)ctx->format_buf_len - size);
size += rist_prometheus_format_sender_peer_info(ctx, &ctx->format_buf[size], (int)ctx->format_buf_len - size);
size += snprintf(&ctx->format_buf[size], ctx->format_buf_len - size, "%s", PROMETHEUS_EOF);
/* Multi-point mode accumulates samples between scrapes and resets
* after each one. Single-stat-point mode keeps the latest sample