feat(trace-utils)!: add encoder v1 to v04 + refactor#2145
feat(trace-utils)!: add encoder v1 to v04 + refactor#2145anais-raison wants to merge 10 commits into
Conversation
📚 Documentation Check Results📦
|
Clippy Allow Annotation ReportComparing clippy allow annotations between branches:
Summary by Rule
Annotation Counts by File
Annotation Stats by Crate
About This ReportThis report tracks Clippy allow annotations for specific rules, showing how they've changed in this PR. Decreasing the number of these annotations generally improves code quality. |
🔒 Cargo Deny Results📦
|
|
Artifact Size Benchmark Reportaarch64-alpine-linux-musl
aarch64-unknown-linux-gnu
libdatadog-x64-windows
libdatadog-x86-windows
x86_64-alpine-linux-musl
x86_64-unknown-linux-gnu
|
yannham
left a comment
There was a problem hiding this comment.
LGTM overall 👍 I think the naming convention brings more clarity indeed.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5fbf20172c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let ctx = ChunkContext { | ||
| trace_id: &chunk.trace_id, | ||
| priority: chunk.priority, | ||
| origin: &chunk.origin, | ||
| sampling_mechanism: chunk.sampling_mechanism, | ||
| attributes: &chunk.attributes, | ||
| }; |
There was a problem hiding this comment.
Preserve payload-level tags when downgrading to v0.4
When a native V1 TracerPayload carries tags at the payload level (for example payload.env, payload.app_version, or payload.attributes) and the individual spans do not duplicate them, this downgrade path never reads those fields: it builds the per-span context only from each chunk and then encodes spans from that context. Agents that lack /v1.0/traces will therefore receive /v0.4/traces payloads missing env/version/top-level attributes that the native V1 encoder would have sent, causing traces to be indexed without those tags during fallback.
Useful? React with 👍 / 👎.
| for &(k, v) in &merged_attrs { | ||
| match v { | ||
| AttributeValue::String(s) => { | ||
| write_str(writer, k.borrow())?; | ||
| write_str(writer, s.borrow())?; | ||
| } | ||
| AttributeValue::Bool(b) => { | ||
| write_str(writer, k.borrow())?; | ||
| write_bool_as_str(writer, *b)?; |
There was a problem hiding this comment.
Filter promoted keys before writing downgraded attrs
When a V1 span or chunk attribute uses a key that this downgrade also writes from a dedicated field (for example env, _dd.p.dm, _dd.origin, or _dd.p.tid), the attribute is still emitted here after the promoted entry, so the v0.4 meta map contains duplicate keys. Msgpack maps do not define duplicate-key precedence, and different consumers may keep different values, so fallback can send the wrong sampling/origin/trace-id tags for payloads that carry both the typed field and an attribute with the same Datadog key.
Useful? React with 👍 / 👎.
| write_array_len(writer, payload.chunks.len() as u32)?; | ||
| for chunk in &payload.chunks { |
There was a problem hiding this comment.
Do not send dropped V1 chunks as normal v0.4 traces
For a native V1 chunk with dropped_trace == true, the native V1 encoder preserves that sampler decision, but the v0.4 fallback has no equivalent marker and this loop still serializes every chunk as an ordinary /v0.4/traces trace. If such a dropped chunk contains spans, falling back to an agent without /v1.0/traces will ingest traces that the tracer marked as dropped instead of omitting them or otherwise preserving the drop semantics.
Useful? React with 👍 / 👎.
ajgajg1134
left a comment
There was a problem hiding this comment.
Some of the function names are confusing me a bit / don't match the comments?
Also I think some of the v1 attribute types need to be "un-done" differently, specifically key values and lists shouldn't just get put in meta_struct and should be flattened into individual meta/metrics entries.
Once you have that changed, we should validate this with some integration tests against the real agent / intake to make sure the payloads match the agent decode and appear correctly in the UI
| //! | `AttributeValue::String` / `Bool` | `meta[k]` (`"true"` / `"false"` for bool) | | ||
| //! | `AttributeValue::Float` / `Int` | `metrics[k]` (Int cast to `f64`) | | ||
| //! | `AttributeValue::Bytes` / `KeyValue` | `meta_struct[k]` (re-encoded as msgpack) | | ||
| //! | `AttributeValue::List` | `meta_struct[k]` (re-encoded as msgpack) | |
There was a problem hiding this comment.
I think that for downgrading lists and keyvalue types we have to explode the keys out to the way the UI expects them (This is what intake will also need to do). e.g. for a list key.0, key.1,... and for KeyValue it ends up being separate flat keys key.keyB.keyC:value. Let me know if you need some more examples here or if the v1 docs need to be updated!
Aaalibaba42
left a comment
There was a problem hiding this comment.
Mostly nits and/or styling, most of these can be disregarded
| write_map_len(writer, span_len)?; | ||
|
|
||
| write_const_msg_pack_str!(writer, "service")?; | ||
| write_str(writer, span.service.borrow())?; | ||
|
|
||
| write_const_msg_pack_str!(writer, "name")?; | ||
| write_str(writer, span.name.borrow())?; | ||
|
|
||
| write_const_msg_pack_str!(writer, "resource")?; | ||
| write_str(writer, span.resource.borrow())?; | ||
|
|
||
| write_const_msg_pack_str!(writer, "trace_id")?; | ||
| write_u64(writer, trace_id_low)?; | ||
|
|
||
| write_const_msg_pack_str!(writer, "span_id")?; | ||
| write_u64(writer, span.span_id)?; | ||
|
|
||
| if span.parent_id != 0 { | ||
| write_const_msg_pack_str!(writer, "parent_id")?; | ||
| write_u64(writer, span.parent_id)?; | ||
| } | ||
|
|
||
| write_const_msg_pack_str!(writer, "start")?; | ||
| write_i64(writer, span.start)?; | ||
|
|
||
| write_const_msg_pack_str!(writer, "duration")?; | ||
| write_sint(writer, span.duration)?; | ||
|
|
||
| if span.error { | ||
| write_const_msg_pack_str!(writer, "error")?; | ||
| write_sint(writer, 1)?; | ||
| } | ||
|
|
||
| if counts.meta > 0 { | ||
| write_const_msg_pack_str!(writer, "meta")?; | ||
| write_map_len(writer, counts.meta)?; | ||
|
|
||
| if !span.env.borrow().is_empty() { | ||
| write_const_msg_pack_str!(writer, "env")?; | ||
| write_str(writer, span.env.borrow())?; | ||
| } | ||
| if !span.version.borrow().is_empty() { | ||
| write_const_msg_pack_str!(writer, "version")?; | ||
| write_str(writer, span.version.borrow())?; | ||
| } | ||
| if !span.component.borrow().is_empty() { | ||
| write_const_msg_pack_str!(writer, "component")?; | ||
| write_str(writer, span.component.borrow())?; | ||
| } | ||
| if let Some(kind_str) = kind_meta { | ||
| write_const_msg_pack_str!(writer, "span.kind")?; | ||
| write_str(writer, kind_str)?; | ||
| } | ||
| if trace_id_high != 0 { | ||
| // Lower-case hex without `0x` prefix — the agent expects this format. | ||
| write_const_msg_pack_str!(writer, "_dd.p.tid")?; | ||
| write_str(writer, &format!("{trace_id_high:016x}"))?; | ||
| } | ||
| if !chunk.origin.borrow().is_empty() { | ||
| write_const_msg_pack_str!(writer, "_dd.origin")?; | ||
| write_str(writer, chunk.origin.borrow())?; | ||
| } | ||
| if let Some(mechanism) = chunk.sampling_mechanism { | ||
| write_const_msg_pack_str!(writer, "_dd.p.dm")?; | ||
| write_str(writer, &format!("-{mechanism}"))?; | ||
| } | ||
| for &(k, v) in &merged_attrs { | ||
| match v { | ||
| AttributeValue::String(s) => { | ||
| write_str(writer, k.borrow())?; | ||
| write_str(writer, s.borrow())?; | ||
| } | ||
| AttributeValue::Bool(b) => { | ||
| write_str(writer, k.borrow())?; | ||
| write_bool_as_str(writer, *b)?; | ||
| } | ||
| _ => {} | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if counts.metrics > 0 { | ||
| write_const_msg_pack_str!(writer, "metrics")?; | ||
| write_map_len(writer, counts.metrics)?; | ||
|
|
||
| if let Some(priority) = chunk.priority { | ||
| write_const_msg_pack_str!(writer, "_sampling_priority_v1")?; | ||
| write_f64(writer, priority as f64)?; | ||
| } | ||
| for &(k, v) in &merged_attrs { | ||
| match v { | ||
| AttributeValue::Float(f) => { | ||
| write_str(writer, k.borrow())?; | ||
| write_f64(writer, *f)?; | ||
| } | ||
| AttributeValue::Int(i) => { | ||
| write_str(writer, k.borrow())?; | ||
| write_f64(writer, *i as f64)?; | ||
| } | ||
| _ => {} | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if !span.r#type.borrow().is_empty() { | ||
| write_const_msg_pack_str!(writer, "type")?; | ||
| write_str(writer, span.r#type.borrow())?; | ||
| } | ||
|
|
||
| if counts.meta_struct > 0 { | ||
| write_const_msg_pack_str!(writer, "meta_struct")?; | ||
| write_map_len(writer, counts.meta_struct)?; | ||
|
|
||
| for &(k, v) in &merged_attrs { | ||
| write_meta_struct_entry(writer, k, v)?; | ||
| } | ||
| } | ||
|
|
||
| if !span.span_links.is_empty() { | ||
| encode_span_links(writer, &span.span_links)?; | ||
| } | ||
| if !span.span_events.is_empty() { | ||
| encode_span_events(writer, &span.span_events)?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Maybe this needs to be iterative like this, at a glance there are variations in some segments, but it "feels" a bit dirty, idk if macro magic can help but it's clearly just a styling nit
| match v { | ||
| AttributeValue::String(s) => { | ||
| write_map_len(writer, 2)?; | ||
| write_const_msg_pack_str!(writer, "type")?; | ||
| write_u8(writer, 0)?; | ||
| write_const_msg_pack_str!(writer, "string_value")?; | ||
| write_str(writer, s.borrow())?; | ||
| } | ||
| AttributeValue::Bool(b) => { | ||
| write_map_len(writer, 2)?; | ||
| write_const_msg_pack_str!(writer, "type")?; | ||
| write_u8(writer, 1)?; | ||
| write_const_msg_pack_str!(writer, "bool_value")?; | ||
| write_bool(writer, *b).map_err(ValueWriteError::InvalidDataWrite)?; | ||
| } | ||
| AttributeValue::Int(i) => { | ||
| write_map_len(writer, 2)?; | ||
| write_const_msg_pack_str!(writer, "type")?; | ||
| write_u8(writer, 2)?; | ||
| write_const_msg_pack_str!(writer, "int_value")?; | ||
| write_sint(writer, *i)?; | ||
| } | ||
| AttributeValue::Float(f) => { | ||
| write_map_len(writer, 2)?; | ||
| write_const_msg_pack_str!(writer, "type")?; | ||
| write_u8(writer, 3)?; | ||
| write_const_msg_pack_str!(writer, "double_value")?; | ||
| write_f64(writer, *f)?; | ||
| } |
There was a problem hiding this comment.
Same with these, maybe repetition can be captured in a macro but idk if it's worth it, at least here it's spelled out clearly. Feel free to disregard
| macro_rules! write_const_msg_pack_str { | ||
| ($writer:expr, $str:expr) => {{ | ||
| use rmp::encode::ValueWriteError; | ||
| const STRING_ENCODING_LEN: usize = super::msp_string_encoding_len($str); | ||
| const STRING_ENCODING: [u8; STRING_ENCODING_LEN] = super::msp_const_string_encoding($str); | ||
|
|
||
| $writer | ||
| .write_bytes(&STRING_ENCODING) | ||
| .map_err(ValueWriteError::InvalidDataWrite) | ||
| }}; | ||
| } |
There was a problem hiding this comment.
Out of curiosity why does this have to be a macro ?
There was a problem hiding this comment.
This was already existing, I just moved it from span_v04.rs to mod.rs to be able to use it in span_v1.rs.
| .map_err(super::super::flatten_value_write_infallible) | ||
| .unwrap_infallible(); |
There was a problem hiding this comment.
Why do you need to unwrap_infallible here can't you propagate the Result up and only unwrap in the top level function ?
What does this PR do?
Adds a V1 → v0.4 msgpack downgrade encoder so a tracer that emits V1 spans natively can still send traces to an agent that only advertises
/v0.4/traces.Also renames the existing encoder modules/functions so the convention is uniform: parent module = output wire format, file/function suffix = input span type. The four combinations now coexist:
v04/span_v04.rs— v0.4 → v0.4 (native)v04/span_v1.rs— v1 → v0.4 (downgrade, new)v1/span_v04.rs— v0.4 → V1 (upgrade)v1/span_v1.rs— v1 → V1 (native)Motivation
APMSP-2811