Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ The Cloud VM runs inside a Firecracker microVM. Docker requires:
11. **`/process/exec` API schema**: The `command` field is a single string (the binary name), not an array. Arguments go in the separate `args` array field. Response `stdout_b64` / `stderr_b64` are base64-encoded.

12. **All telemetry producers must publish through `TelemetrySession`, never directly to the raw `EventStream`.** Producers take a `func(events.Event) (events.Envelope, bool)` callback wired to `telemetrySession.Publish` in `cmd/api/main.go`; this is what enforces category gating from `PUT /telemetry`. Publishing straight to `EventStream` bypasses the customer's telemetry config. The only legitimate `EventStream.Publish` callers are `TelemetrySession` itself and tests.

13. **`events.Event.Ts` must be wall-clock (`time.Now()`) captured at emit/observe — never a monotonic or source-derived clock.** On scale-to-zero VMs, `CLOCK_MONOTONIC` freezes during suspend, so any timestamp derived from it (notably the kmsg envelope timestamp behind OOM events) skews backward by the suspended duration. HTTP-published events get stamped by the API handler at ingest; in-process producers (sysmon kmsg reader, supervisord shim, etc.) must stamp `time.Now()` themselves.
7 changes: 6 additions & 1 deletion server/lib/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ func HasCDPCategory(cats []oapi.TelemetryEventCategory) bool {
// Event is the portable event schema. It contains only producer-emitted content;
// pipeline metadata (seq) lives on the Envelope.
type Event struct {
Ts int64 `json:"ts"` // Unix microseconds (µs since epoch)
// Ts is the event time in Unix microseconds. It must be wall-clock
// (time.Now()) captured at emit/observe, never a monotonic or other
// source-derived clock (e.g. a kmsg envelope timestamp), which skews
// on VM suspend. HTTP-published events are stamped by the API handler;
// in-process producers must set it themselves.
Ts int64 `json:"ts"`
Type string `json:"type"`
Category oapi.TelemetryEventCategory `json:"category"`
Source oapi.BrowserEventSource `json:"source"`
Expand Down
7 changes: 5 additions & 2 deletions server/lib/sysmon/kmsg.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type OomInstance struct {
// the standard "CPU: N PID: N Comm: ..." header line. Zero if the
// kernel did not emit that header.
TriggerPid int
// TimeOfDeath is the timestamp of the closing "Killed process" line
// as reported by the kmsg envelope.
// TimeOfDeath is the wall-clock time the closing "Killed process"
// line was observed, taken from the message's Timestamp.
TimeOfDeath time.Time
}

Expand All @@ -102,6 +102,9 @@ type TaskMemSnapshot struct {
// lets the parser run portably under unit tests; the production wiring
// lives in kmsg_linux.go.
type KmsgMessage struct {
// Timestamp is the wall-clock time the record was observed; the
// production source (kmsg_linux.go) sets it to read time, not the
// kmsg envelope timestamp.
Timestamp time.Time
Body string
}
Expand Down
8 changes: 7 additions & 1 deletion server/lib/sysmon/kmsg_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package sysmon
import (
"fmt"
"log/slog"
"time"

"github.com/euank/go-kmsg-parser/v2/kmsgparser"
)
Expand Down Expand Up @@ -39,7 +40,12 @@ func (s *kmsgparserSource) Messages() <-chan KmsgMessage {
go func() {
defer close(out)
for m := range in {
out <- KmsgMessage{Timestamp: m.Timestamp, Body: m.Message}
// Stamp wall-clock read time, not m.Timestamp: the kmsg
// envelope timestamp is CLOCK_MONOTONIC-derived, which freezes
// while the VM is suspended (scale-to-zero) and so skews
// backward by the suspended duration. We only read live records
// (openKmsgSource seeks to end), so read time is accurate.
out <- KmsgMessage{Timestamp: time.Now(), Body: m.Message}
}
}()
return out
Expand Down
53 changes: 53 additions & 0 deletions server/lib/sysmon/kmsg_linux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//go:build linux

package sysmon

import (
"testing"
"time"

"github.com/euank/go-kmsg-parser/v2/kmsgparser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// fakeKmsgParser is a kmsgparser.Parser that replays a fixed set of
// messages. It lets us assert how kmsgparserSource maps the library's
// records onto KmsgMessage without touching /dev/kmsg.
type fakeKmsgParser struct{ msgs []kmsgparser.Message }

func (f *fakeKmsgParser) SeekEnd() error { return nil }
func (f *fakeKmsgParser) SetLogger(kmsgparser.Logger) {}
func (f *fakeKmsgParser) Close() error { return nil }
func (f *fakeKmsgParser) Parse() <-chan kmsgparser.Message {
ch := make(chan kmsgparser.Message, len(f.msgs))
for _, m := range f.msgs {
ch <- m
}
close(ch)
return ch
}

// TestKmsgparserSourceStampsObservationTime verifies the production
// source ignores the kmsg envelope's (monotonic-derived) timestamp and
// stamps the wall-clock observation time instead. This is the fix for
// OOM events landing minutes in the past on scale-to-zero VMs, where
// CLOCK_MONOTONIC freezes during suspend.
func TestKmsgparserSourceStampsObservationTime(t *testing.T) {
// A timestamp the kmsg envelope might carry on a long-suspended VM:
// far in the past relative to real wall-clock.
envelopeTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
src := &kmsgparserSource{p: &fakeKmsgParser{msgs: []kmsgparser.Message{
{Timestamp: envelopeTime, Message: "chromium invoked oom-killer: order=0"},
}}}

before := time.Now()
msg, ok := <-src.Messages()
after := time.Now()

require.True(t, ok, "expected one forwarded message")
assert.Equal(t, "chromium invoked oom-killer: order=0", msg.Body)
assert.False(t, msg.Timestamp.Before(before), "stamp must be >= observation start, got %s", msg.Timestamp)
assert.False(t, msg.Timestamp.After(after), "stamp must be <= observation end, got %s", msg.Timestamp)
assert.False(t, msg.Timestamp.Equal(envelopeTime), "stamp must not be the kmsg envelope (monotonic) time")
}
Loading