diff --git a/chain_capabilities/evm/go.mod b/chain_capabilities/evm/go.mod index 49b4e60cb..8611e492a 100644 --- a/chain_capabilities/evm/go.mod +++ b/chain_capabilities/evm/go.mod @@ -12,7 +12,8 @@ require ( github.com/smartcontractkit/chainlink-evm v0.3.4-0.20260410162948-2dca02f24e98 github.com/smartcontractkit/chainlink-evm/gethwrappers v0.0.0-20251022073203-7d8ae8cf67c1 github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260410144512-ca02ad6ed16a - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32 + github.com/smartcontractkit/chainlink-protos/metering/go v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.43.0 go.uber.org/zap v1.27.1 @@ -217,3 +218,9 @@ require ( ) replace github.com/fbsobreira/gotron-sdk => github.com/smartcontractkit/chainlink-tron/relayer/gotron-sdk v0.0.5-0.20250528121202-292529af39df + +// Local replaces for the unpublished resourcemanager/metering stack; drop once +// chainlink-common and chainlink-protos/metering/go are tagged. +replace github.com/smartcontractkit/chainlink-common => ../../../chainlink-common + +replace github.com/smartcontractkit/chainlink-protos/metering/go => ../../../chainlink-protos/metering/go diff --git a/chain_capabilities/evm/go.sum b/chain_capabilities/evm/go.sum index c52e3b89b..f9351bb2d 100644 --- a/chain_capabilities/evm/go.sum +++ b/chain_capabilities/evm/go.sum @@ -473,8 +473,6 @@ github.com/smartcontractkit/capabilities/libs v0.0.0-20260609124022-2749e4a32bfb github.com/smartcontractkit/capabilities/libs v0.0.0-20260609124022-2749e4a32bfb/go.mod h1:LS7F8U2YZNc0Vt8f6SVWUUigGLxdxZMpyC7VCcUTagg= github.com/smartcontractkit/chain-selectors v1.0.101 h1:TF4ma9h3QeyIZ8XoEmgI5lrUvZfzHAz8tfR0pV0+GCA= github.com/smartcontractkit/chain-selectors v1.0.101/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260601211238-9f526774fef0 h1:ekpMT6wV+caBWnaBGUD/j1eoal+DhNLq7jv1hFf/nyU= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260601211238-9f526774fef0/go.mod h1:6jgqiFXFJHqjkvFFmuf8gvoUFa6Ygx/D1tKnIL+CCF8= github.com/smartcontractkit/chainlink-common/keystore v1.1.1-0.20260529092756-a94bc8ce96d6 h1:fWsYxxj35fp1/6YZngoTsOTMLqDie4N5X0osAOdhUTE= github.com/smartcontractkit/chainlink-common/keystore v1.1.1-0.20260529092756-a94bc8ce96d6/go.mod h1:6JexOOhPhknQ0QMuppFIlOpm6wCp54yZMxai+tWugwY= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260601211238-9f526774fef0 h1:NExKM/D0HneOq/N5LGTbkV4VOa0UHCvfTNEb4GqYpto= @@ -491,8 +489,8 @@ github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260410144512- github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20260410144512-ca02ad6ed16a/go.mod h1:7ketk4ischPQW/JQgmyHz6zdzLUJv1VC29SiSgosydQ= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 h1:iljEJss3WOwcsMkWy72Yn2zvjw7Gyxc+RXL7r8YKM6g= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32 h1:GNl+lLK0QCakqA1J1i7FoOai2JrOGOzNzSniMijaCjA= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= diff --git a/chain_capabilities/evm/main.go b/chain_capabilities/evm/main.go index 063f4d335..d514a7834 100644 --- a/chain_capabilities/evm/main.go +++ b/chain_capabilities/evm/main.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "os" "strconv" "time" @@ -28,11 +29,13 @@ import ( "github.com/smartcontractkit/capabilities/chain_capabilities/evm/trigger" "github.com/smartcontractkit/capabilities/libs/loopserver" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" evmcappb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm" evmcapserver "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm/server" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/core" @@ -142,9 +145,19 @@ func (c *capabilityGRPCService) Initialise(ctx context.Context, dependencies cor // TODO: add org resolver capabilityID := fmt.Sprintf("%s (%d)", c.id, cfg.ChainID) + // The ResourceManager owns the snapshot tick; the LogTriggerService starts it + // as a sub-service and registers itself, so it must be configured with a + // snapshot interval here. Identity/snapshots are gated by the same metering + // env flag as MeterRecords. + resourceManager := resourcemanager.NewResourceManager(c.lggr, resourcemanager.ResourceManagerConfig{ + Enabled: meterRecordsEnabled(c.lggr), + Emitter: beholder.GetEmitter(), + SnapshotInterval: resourcemanager.DefaultSnapshotInterval, + }) + baseIdentity := newBaseMeteringIdentity(dependencies) c.triggerService, err = trigger.NewLogTriggerService(evmRelayer, trigger.NewLogTriggerStore(), c.lggr, capabilityID, processor, messageBuilder, cfg.LogTriggerPollInterval, cfg.LogTriggerSendChannelBufferSize, cfg.LogTriggerLimitQueryLogSize, c.limitsFactory, - dependencies.OrgResolver, dependencies.TriggerEventStore) + dependencies.OrgResolver, dependencies.TriggerEventStore, resourceManager, baseIdentity, c.chainSelector) if err != nil { return fmt.Errorf("error when creating trigger: %w", err) } @@ -179,6 +192,62 @@ func (c *capabilityGRPCService) Initialise(ctx context.Context, dependencies cor return nil } +// defaultMeteringProduct is the fallback metering product dimension used when +// the host did not inject one via the standardized Initialise channel (a legacy +// node or a boot path not yet updated). The other deployment dimensions +// (environment, zone, node_id) have no meaningful constant and are left empty +// in that case, as documented on StandardCapabilitiesDependencies. +const defaultMeteringProduct = "cre" + +// newBaseMeteringIdentity builds the EVM log trigger's base metering identity +// from the host-injected dependencies. It carries the six coarse dimensions +// plus the service-level resource/resource_type; the per-resource ResourceID is +// set per emit/snapshot. DONID is the capability DON when the host injected one +// (deps.CapabilityDonID); when 0, it is left empty here and resolved per emit +// from the consumer's WorkflowDonID (see LogTriggerService.resolveDONID). This +// reads deps.CapabilityDonID at the Initialise layer so the change is orthogonal +// to capabilities#619's NewLogTriggerService signature edit. +func newBaseMeteringIdentity(deps core.StandardCapabilitiesDependencies) resourcemanager.ResourceIdentity { + product := deps.Product + if product == "" { + product = defaultMeteringProduct + } + var donID string + if deps.CapabilityDonID != 0 { + donID = strconv.FormatUint(uint64(deps.CapabilityDonID), 10) + } + return resourcemanager.ResourceIdentity{ + Product: product, + Environment: deps.Environment, + Zone: deps.Zone, + DONID: donID, + NodeID: deps.NodeID, + Service: trigger.MeteringService, + Resource: trigger.MeteringResource, + ResourceType: trigger.MeteringResourceType, + } +} + +// meterRecordsEnabledEnvVar gates MeterRecord emission; the name is the +// cross-producer convention for the metering rollout (SHARED-2718). +const meterRecordsEnabledEnvVar = "CL_METER_RECORDS_ENABLED" + +// meterRecordsEnabled reads the metering gate from the environment. Unset or +// unparseable values disable emission; metering config must never prevent the +// capability from starting. +func meterRecordsEnabled(lggr logger.Logger) bool { + v := os.Getenv(meterRecordsEnabledEnvVar) + if v == "" { + return false + } + enabled, err := strconv.ParseBool(v) + if err != nil { + lggr.Warnw("Invalid value for "+meterRecordsEnabledEnvVar+", meter record emission disabled", "value", v, "error", err) + return false + } + return enabled +} + func (c *capabilityGRPCService) unmarshalConfig(configStr string) (*config.Config, error) { var cfg config.Config if err := json.Unmarshal([]byte(configStr), &cfg); err != nil { diff --git a/chain_capabilities/evm/trigger/physical_filter_id.go b/chain_capabilities/evm/trigger/physical_filter_id.go new file mode 100644 index 000000000..0d587739d --- /dev/null +++ b/chain_capabilities/evm/trigger/physical_filter_id.go @@ -0,0 +1,63 @@ +package trigger + +import ( + "crypto/sha256" + "encoding/hex" + "sort" + "strings" + + evmtypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/evm" +) + +// physicalFilterID returns the workflow-independent content identity of an EVM +// log filter: the lowercase hex SHA-256 over a canonical encoding of the +// filter's physical matching criteria (chain selector, addresses, event +// signatures, and positional topic slots). Two filters that match exactly the +// same on-chain logs hash to the same ID regardless of which workflow or +// trigger registered them, or of the order their addresses/sigs/topics were +// supplied. It is used as ResourceIdentity.ResourceID and as the +// RESERVE/RELEASE event identity so identical filters share one billable +// physical resource (R4). +// +// Canonicalization rules (each rule defeats a source of non-determinism): +// - addresses and event sigs are lowercased 0x-prefixed hex and sorted +// ascending: the matching set is order-independent; +// - topic2/topic3/topic4 are POSITIONAL — a value in topic2 is a different +// filter than the same value in topic3 — so each slot is encoded under its +// own positional tag, and within a slot the values are sorted ascending; +// - the chain selector scopes the hash so identical filters on different +// chains stay distinct. +// +// The preimage uses "|" as a top-level separator and "," within a set; the +// per-element hex encodings are fixed-width and contain neither, so the +// encoding is unambiguous. +func physicalFilterID(chainSelector string, addresses []evmtypes.Address, eventSigs, topic2, topic3, topic4 []evmtypes.Hash) string { + sortedAddrs := make([]string, len(addresses)) + for i, a := range addresses { + sortedAddrs[i] = "0x" + hex.EncodeToString(a[:]) + } + sort.Strings(sortedAddrs) + + canonHashes := func(hs []evmtypes.Hash) string { + out := make([]string, len(hs)) + for i, h := range hs { + out[i] = "0x" + hex.EncodeToString(h[:]) + } + sort.Strings(out) + return strings.Join(out, ",") + } + + // Topic slots are encoded positionally so the same value in different slots + // produces a different identity. + preimage := strings.Join([]string{ + "cs=" + chainSelector, + "addrs=" + strings.Join(sortedAddrs, ","), + "sigs=" + canonHashes(eventSigs), + "t2=" + canonHashes(topic2), + "t3=" + canonHashes(topic3), + "t4=" + canonHashes(topic4), + }, "|") + + sum := sha256.Sum256([]byte(preimage)) + return hex.EncodeToString(sum[:]) +} diff --git a/chain_capabilities/evm/trigger/store.go b/chain_capabilities/evm/trigger/store.go index ee781ea63..5658110b2 100644 --- a/chain_capabilities/evm/trigger/store.go +++ b/chain_capabilities/evm/trigger/store.go @@ -12,7 +12,25 @@ import ( ) type filter struct { - filterID string + filterID string + // physicalFilterID is the workflow-independent content hash of the filter's + // physical matching criteria (chain selector + canonicalized addresses, + // event sigs, and positional topics). It is the metering ResourceID and the + // RESERVE/RELEASE event identity, so the unregister, cleanup, snapshot, and + // graceful-close paths all reuse it from here without the request input. + physicalFilterID string + // reservedAddressCount is the number of filter addresses metered in the + // RESERVE record when the filter was registered. The matching RELEASE + // must carry the same value, and UnregisterLogTrigger ignores its request + // input, so the count is stashed here at registration. + reservedAddressCount int64 + // donID is stashed from the registration RequestMetadata so the + // unregister/cleanup/snapshot/close paths can emit a metering record with + // the same identity as the RESERVE, without the original request. It is the + // resolved metering DON ID string (capability DON, or the consumer + // WorkflowDonID fallback when the host did not inject a capability DON); + // empty when neither is known. + donID string expressions []query.Expression confidence primitives.ConfidenceLevel } diff --git a/chain_capabilities/evm/trigger/trigger.go b/chain_capabilities/evm/trigger/trigger.go index 02d3fe447..d085335f6 100644 --- a/chain_capabilities/evm/trigger/trigger.go +++ b/chain_capabilities/evm/trigger/trigger.go @@ -21,6 +21,7 @@ import ( evmservice "github.com/smartcontractkit/chainlink-common/pkg/chains/evm" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/orgresolver" "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" @@ -32,6 +33,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives/evm" "github.com/smartcontractkit/chainlink-common/pkg/workflows" "github.com/smartcontractkit/chainlink-common/pkg/workflows/events" + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" "github.com/smartcontractkit/capabilities/chain_capabilities/evm/monitoring" ) @@ -42,6 +44,22 @@ const ( defaultLimitQueryLogSize = 1000 ) +// Metering identity constants for the EVM log trigger (SHARED-2711). These are +// the service-level dimensions of the base ResourceIdentity: Service is the +// stable service constant (it must not encode deployment environment or zone, +// which ride on the structured identity's coarse dimensions), Resource is the +// resource pool, and ResourceType is the billing unit converted to credits. +const ( + MeteringService = "evm-log-trigger" + MeteringResource = "log_filters" + MeteringResourceType = "log_filter_addresses" +) + +// LogTriggerService is a resourcemanager.Meterable: it is registered with the +// ResourceManager at start so the manager's snapshot tick polls its active +// filters. +var _ resourcemanager.Meterable = (*LogTriggerService)(nil) + type LogTriggerService struct { services.Service @@ -53,6 +71,17 @@ type LogTriggerService struct { lggr logger.Logger beholderProcessor beholder.ProtoProcessor messageBuilder *monitoring.MessageBuilder + resourceManager *resourcemanager.ResourceManager + // baseIdentity is the producer's base metering identity: the six coarse + // dimensions plus service/resource/resource_type, built once at Initialise. + // The per-resource ResourceID is set per emit/snapshot via WithResourceID. + // When the host did not inject a capability DON ID, baseIdentity.DONID is + // empty and is filled per-emit from the consumer's WorkflowDonID. + baseIdentity resourcemanager.ResourceIdentity + chainSelector string // decimal chain selector, the chain label on meter records + // rmUnregister removes this service from the ResourceManager's snapshot + // registry; it is set when the RM is started in start and invoked in close. + rmUnregister func() triggers LogTriggerStore logTriggerPollInterval time.Duration @@ -73,7 +102,10 @@ func NewLogTriggerService(evmService types.EVMService, store LogTriggerStore, lg logTriggerSendChannelBufferSize uint64, logTriggerLimitQueryLogSize uint64, limitsFactory limits.Factory, orgResolver orgresolver.OrgResolver, - triggerEventStore capabilities.EventStore) (*LogTriggerService, error) { + triggerEventStore capabilities.EventStore, + resourceManager *resourcemanager.ResourceManager, + baseIdentity resourcemanager.ResourceIdentity, + chainSelector uint64) (*LogTriggerService, error) { if capabilityID == "" { return nil, fmt.Errorf("capabilityID must be non-empty") } @@ -103,6 +135,9 @@ func NewLogTriggerService(evmService types.EVMService, store LogTriggerStore, lg lggr: lggr, beholderProcessor: beholderProcessor, messageBuilder: messageBuilder, + resourceManager: resourceManager, + baseIdentity: baseIdentity, + chainSelector: strconv.FormatUint(chainSelector, 10), triggers: store, logTriggerPollInterval: logTriggerPollInterval, logTriggerSendChannelBufferSize: currentSendChannelBufferSize, @@ -112,6 +147,9 @@ func NewLogTriggerService(evmService types.EVMService, store LogTriggerStore, lg if lts.orgResolver == nil { lts.lggr.Warn("OrgResolver is nil, EVM log trigger capability will not be able to fetch organization ID") } + if lts.resourceManager == nil { + lts.lggr.Warn("ResourceManager is nil, EVM log trigger capability will not emit meter records") + } if err := lts.initLimiters(limitsFactory); err != nil { return nil, err } @@ -160,14 +198,49 @@ func (lts *LogTriggerService) start(ctx context.Context) error { ticker := services.NewTicker(duration) lts.lggr.Infof("Starting clean up of failed log poller filters every %s seconds", duration) lts.srvcEng.GoTick(ticker, lts.cleanUpStaleFilters) + + // The ResourceManager owns the snapshot tick: start it as a sub-service of + // this service and Register ourselves so its tick polls GetUtilization. We + // never run our own snapshot loop. The RM is fail-open and starting it must + // not gate the trigger service, so a start error is logged, not returned. + if lts.resourceManager != nil { + if err := lts.resourceManager.Start(ctx); err != nil { + lts.lggr.Errorw("failed to start metering ResourceManager; snapshots disabled", "err", err) + } else { + lts.rmUnregister = lts.resourceManager.Register(lts) + } + } return nil } func (lts *LogTriggerService) close() error { lts.baseTrigger.Stop() + // On graceful shutdown, release every still-active filter so a clean stop is + // not seen downstream as a leaked reservation. These releases reuse each + // filter's stashed physicalFilterID + reserved count, so they dedup against + // any user-facing RELEASE on the identical idempotency key. + lts.releaseActiveFiltersOnClose(context.Background()) + if lts.rmUnregister != nil { + lts.rmUnregister() + } + if lts.resourceManager != nil { + return lts.resourceManager.Close() + } return nil } +// releaseActiveFiltersOnClose emits a RELEASE MeterRecord for every filter still +// present in the store at shutdown, carrying the reserved address count. It runs +// before the ResourceManager is unregistered/closed. +func (lts *LogTriggerService) releaseActiveFiltersOnClose(ctx context.Context) { + if lts.resourceManager == nil { + return + } + for _, state := range lts.triggers.ReadAll() { + lts.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RELEASE, state.filter, state.reservedAddressCount) + } +} + func (lts *LogTriggerService) cleanUpStaleFilters(ctx context.Context) { lts.lggr.Debugf("Starting cleanUpStaleFilters") telemetryContext := monitoring.TelemetryContext{TsStart: time.Now().UnixMilli(), RequestMetadata: capabilities.RequestMetadata{ @@ -202,7 +275,14 @@ func (lts *LogTriggerService) cleanUpStaleFilters(ctx context.Context) { if err := lts.EVMService.UnregisterLogTracking(ctx, filterID); err != nil { summary := fmt.Sprintf("failed to unregister log-tracking from the clean up thread: '%v' source triggerID: %s", err, filterID) monitoring.LogAndEmitError(ctx, lts.lggr, lts.beholderProcessor, lts.messageBuilder.BuildLogTriggerCleanUpError(telemetryContext, summary, err.Error())) + continue } + // This is log-poller filter hygiene only; it emits no MeterRecord. An + // orphaned filter has no trigger state, so it is already absent from + // GetUtilization and therefore from subsequent Snapshots. Billing + // reconciles the lost reservation by that absence (the snapshot liveness + // mechanism), not by a synthetic cleanup RELEASE. We never expect, nor + // design around, orphan cleanup as a metering event. } } @@ -282,6 +362,21 @@ func (lts *LogTriggerService) RegisterLogTrigger(ctx context.Context, triggerID Topic4: t4, } + expressions, confidence := lts.createLogRequest(ctx, addresses, sigs, t2, t3, t4, input.GetConfidence()) + + // Build the filter's metering identity once from the already-converted + // inputs: a workflow-independent content hash and the resolved DON ID. It is + // stashed on the trigger state so every later path (unregister, cleanup, + // snapshot, close) reproduces the same identity without the request input. + loggedFilter := filter{ + filterID: filterID, + physicalFilterID: physicalFilterID(lts.chainSelector, addresses, sigs, t2, t3, t4), + reservedAddressCount: int64(len(addresses)), + donID: lts.resolveDONID(meta.WorkflowDonID), + expressions: expressions, + confidence: confidence, + } + if err = lts.EVMService.RegisterLogTracking(ctx, filterQuery); err != nil { registerError := fmt.Errorf("failed to register log-tracking: '%w' for triggerID: %s, addresses: %v, eventSig: %v, topic2: %v, topic3: %v, topic4: %v", err, triggerID, filterQuery.Addresses, filterQuery.EventSigs, filterQuery.Topic2, filterQuery.Topic3, filterQuery.Topic4) @@ -296,7 +391,9 @@ func (lts *LogTriggerService) RegisterLogTrigger(ctx context.Context, triggerID monitoring.LogAndEmitError(ctx, lts.lggr, lts.beholderProcessor, lts.messageBuilder.BuildLogTriggerError(telemetryContext, triggerID, summary, err.Error())) return nil, caperrors.NewPublicSystemError(registerError, caperrors.Unavailable) } - expressions, confidence := lts.createLogRequest(ctx, addresses, sigs, t2, t3, t4, input.GetConfidence()) + // RESERVE only after RegisterLogTracking succeeds: a failed registration + // holds no addresses, so it must not be billed (see the no-reserve test). + lts.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RESERVE, loggedFilter, loggedFilter.reservedAddressCount) monitoring.EmitInitiated(ctx, lts.lggr, lts.beholderProcessor, lts.messageBuilder.BuildLogTriggerInitiated(telemetryContext, input)) @@ -310,11 +407,7 @@ func (lts *LogTriggerService) RegisterLogTrigger(ctx context.Context, triggerID cancelFunc: cancel, lastBlock: fromBlock, unfinalizedSentEventIDs: make(map[string]*big.Int), - filter: filter{ - filterID: filterID, - expressions: expressions, - confidence: confidence, - }, + filter: loggedFilter, }) ctx = meta.ContextWithCRE(ctx) lts.startPolling(ctx, telemetryContext, triggerID, input, logCh) @@ -365,6 +458,75 @@ func (lts *LogTriggerService) generateFilterID(triggerID string) string { return triggerID + SuffixLogTriggerFilterID } +// resolveDONID returns the metering DON ID for an emit, applying the +// capabilities#619 0->WorkflowDonID rule: when the host injected a capability +// DON ID, baseIdentity.DONID is non-empty and used as-is; otherwise the +// consumer workflow's DON ID (from the request metadata) is the documented +// fallback. The result is stashed on the filter at registration so the +// unregister/cleanup/snapshot/close paths reproduce the same identity without +// the request. Empty when neither source is known. +func (lts *LogTriggerService) resolveDONID(workflowDonID uint32) string { + if lts.baseIdentity.DONID != "" { + return lts.baseIdentity.DONID + } + if workflowDonID != 0 { + return strconv.FormatUint(uint64(workflowDonID), 10) + } + return "" +} + +// identity returns the base metering identity with its DON ID and ResourceID +// set for one resource. donID is the value stashed on the filter at +// registration (see resolveDONID); resourceID is the physical filter content +// hash (empty when unrecoverable, e.g. an orphaned filter). +func (lts *LogTriggerService) identity(donID, resourceID string) resourcemanager.ResourceIdentity { + id := lts.baseIdentity + id.DONID = donID + id.ResourceID = resourceID + return id +} + +// emitMeterRecord emits a MeterRecord for a filter whose full state is known +// (register/unregister/close). The physical filter content hash is both the +// ResourceID and the RESERVE/RELEASE event identity, so a register retry and a +// later unregister/cleanup of the same physical filter dedup on an identical +// idempotency key. The resource is fully identified by its ResourceIdentity; +// there is no separate label metadata. Emission is fail-open and must never +// gate the path that calls it. +func (lts *LogTriggerService) emitMeterRecord(ctx context.Context, action meteringpb.MeterAction, f filter, value int64) { + if lts.resourceManager == nil { + return + } + identity := lts.identity(f.donID, f.physicalFilterID) + lts.resourceManager.EmitMeterRecord(ctx, identity, action, + resourcemanager.NewUtilization(identity, action, value, f.physicalFilterID)) +} + +// ResourceIdentity implements resourcemanager.Meterable: it returns the +// producer's base identity (the six coarse dimensions plus +// service/resource/resource_type). The per-resource DON ID and ResourceID are +// populated per active filter by GetUtilization. +func (lts *LogTriggerService) ResourceIdentity() resourcemanager.ResourceIdentity { + return lts.baseIdentity +} + +// GetUtilization implements resourcemanager.Meterable: it returns one snapshot +// entry per currently active log filter for the snapshot tick. It is a cheap +// in-memory read — triggers.ReadAll already returns a copy — with no I/O and no +// lock held across the loop, as the snapshot contract requires (R6). +func (lts *LogTriggerService) GetUtilization(_ context.Context) []resourcemanager.SnapshotEntry { + triggers := lts.triggers.ReadAll() + entries := make([]resourcemanager.SnapshotEntry, 0, len(triggers)) + for _, state := range triggers { + f := state.filter + entries = append(entries, resourcemanager.SnapshotEntry{ + Identity: lts.identity(f.donID, f.physicalFilterID), + Value: f.reservedAddressCount, + }) + } + return entries +} + func (lts *LogTriggerService) startPolling(ctx context.Context, telemetryContext monitoring.TelemetryContext, triggerID string, input *evmcappb.FilterLogTriggerRequest, logCh chan capabilities.TriggerAndId[*evmcappb.Log]) { lts.lggr.Infof("Starting polling for triggerID: %s, interval: %d", triggerID, lts.logTriggerPollInterval) ticker := defaultTickerFactory.NewTicker(lts.logTriggerPollInterval) @@ -712,6 +874,13 @@ func (lts *LogTriggerService) UnregisterLogTrigger(ctx context.Context, triggerI trigger.cancelFunc() lts.triggers.Delete(triggerID) lts.baseTrigger.UnregisterTrigger(triggerID) + // The reservation ends with the trigger state, which is the only holder of + // the reserved address count and the physical filter identity. Both are + // reused from the stashed filter so this RELEASE carries the same value and + // identity as its RESERVE. If UnregisterLogTracking fails below, the filter + // is orphaned at the log poller and the stale-filter cleanup unregisters it + // later (silently — metering already emitted this RELEASE here). + lts.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RELEASE, trigger.filter, trigger.reservedAddressCount) err := lts.EVMService.UnregisterLogTracking(ctx, lts.generateFilterID(triggerID)) if err != nil { diff --git a/chain_capabilities/evm/trigger/trigger_metering_test.go b/chain_capabilities/evm/trigger/trigger_metering_test.go new file mode 100644 index 000000000..1a02523d1 --- /dev/null +++ b/chain_capabilities/evm/trigger/trigger_metering_test.go @@ -0,0 +1,490 @@ +package trigger + +import ( + "bytes" + "context" + "errors" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + evmcappb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm" + evmservice "github.com/smartcontractkit/chainlink-common/pkg/chains/evm" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + evmtypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/evm" + evmmock "github.com/smartcontractkit/chainlink-common/pkg/types/mocks" + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" +) + +const testChainSelector = "5009297550715157269" + +// testBaseIdentity is the producer base identity the metering tests build their +// LogTriggerService with. It carries every coarse dimension so the tests can +// assert each one is populated on the emitted records (the host-injected +// identity contract). DONID is the capability DON; an empty DONID exercises the +// WorkflowDonID fallback. +func testBaseIdentity() resourcemanager.ResourceIdentity { + return resourcemanager.ResourceIdentity{ + Product: "cre", + Environment: "staging", + Zone: "wf-zone-a", + DONID: "42", + NodeID: "csa-pubkey-hex", + Service: MeteringService, + Resource: MeteringResource, + ResourceType: MeteringResourceType, + } +} + +// fakeMeterEmitter captures MeterRecords and MeterSnapshots emitted through the +// ResourceManager. The two message types are distinguished by the entity +// attribute the emitter is called with. The manager now emits one MeterSnapshot +// per active resource (no single Snapshot envelope), so snapshots accumulates +// one message per resource. +type fakeMeterEmitter struct { + err error + emitCalls int + records []*meteringpb.MeterRecord + snapshots []*meteringpb.MeterSnapshot +} + +func (f *fakeMeterEmitter) Emit(_ context.Context, body []byte, attrKVs ...any) error { + f.emitCalls++ + if f.err != nil { + return f.err + } + if isSnapshotEmit(attrKVs) { + var snapshot meteringpb.MeterSnapshot + if err := proto.Unmarshal(body, &snapshot); err != nil { + return err + } + f.snapshots = append(f.snapshots, &snapshot) + return nil + } + var record meteringpb.MeterRecord + if err := proto.Unmarshal(body, &record); err != nil { + return err + } + f.records = append(f.records, &record) + return nil +} + +// isSnapshotEmit reports whether the emitter attributes name the MeterSnapshot +// entity, so the fake can demux the two message types off the same Emit method. +// The key is beholder.AttrKeyEntity ("beholder_entity") and the value is the +// snapshot entity constant the ResourceManager emits with. +func isSnapshotEmit(attrKVs []any) bool { + for i := 0; i+1 < len(attrKVs); i += 2 { + if attrKVs[i] == beholder.AttrKeyEntity && attrKVs[i+1] == "metering.v1.MeterSnapshot" { + return true + } + } + return false +} + +// newMeteredTriggerObject builds a LogTriggerService whose ResourceManager is +// enabled and wired to a fake emitter. The poll interval is stretched so the +// polling goroutine stays quiet; metering happens on the register, unregister, +// cleanup, snapshot, and close paths only. +func newMeteredTriggerObject(t *testing.T, mockEVM *evmmock.EVMService, store LogTriggerStore) (*LogTriggerService, *fakeMeterEmitter, *clockwork.FakeClock) { + t.Helper() + lts := createTriggerObject(t, mockEVM, store) + lts.logTriggerPollInterval = time.Hour + emitter := &fakeMeterEmitter{} + clock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + lts.resourceManager = resourcemanager.NewResourceManager(logger.Test(t), + resourcemanager.ResourceManagerConfig{ + Enabled: true, + Emitter: emitter, + SnapshotInterval: time.Minute, + Clock: clock, + }) + lts.baseIdentity = testBaseIdentity() + lts.chainSelector = testChainSelector + return lts, emitter, clock +} + +// meteringTestInput is a registration request with two filter addresses, so +// tests can tell an address count apart from a hardcoded 1. +func meteringTestInput() *evmcappb.FilterLogTriggerRequest { + return &evmcappb.FilterLogTriggerRequest{ + Addresses: [][]byte{expectedAddress, bytes.Repeat([]byte{0x42}, evmtypes.AddressLength)}, + Topics: topicsWithEventSig0, + } +} + +// assertBaseIdentity checks the six coarse dimensions + service/resource on the +// emitted record identity, proving the host-injected identity is carried. +func assertBaseIdentity(t *testing.T, id *meteringpb.ResourceIdentity) { + t.Helper() + require.NotNil(t, id) + require.Equal(t, "cre", id.GetProduct()) + require.Equal(t, "staging", id.GetEnvironment()) + require.Equal(t, "wf-zone-a", id.GetZone()) + require.Equal(t, "42", id.GetDonId()) + require.Equal(t, "csa-pubkey-hex", id.GetNodeId()) + require.Equal(t, MeteringService, id.GetService()) + require.Equal(t, MeteringResource, id.GetResource()) + require.Equal(t, MeteringResourceType, id.GetResourceType()) +} + +// expectedPhysicalFilterID recomputes the physical filter id for the metering +// test input via the production helper, so the tests assert against the real +// canonicalization rather than a frozen literal. +func expectedPhysicalFilterID(t *testing.T, input *evmcappb.FilterLogTriggerRequest) string { + t.Helper() + svc := &LogTriggerService{} + eventSigs, t2, t3, t4 := svc.getTopics(input) + addrs, err := evmservice.ConvertAddressesFromProto(input.GetAddresses()) + require.NoError(t, err) + sigs, err := evmservice.ConvertHashesFromProto(eventSigs) + require.NoError(t, err) + h2, err := evmservice.ConvertHashesFromProto(t2) + require.NoError(t, err) + h3, err := evmservice.ConvertHashesFromProto(t3) + require.NoError(t, err) + h4, err := evmservice.ConvertHashesFromProto(t4) + require.NoError(t, err) + return physicalFilterID(testChainSelector, addrs, sigs, h2, h3, h4) +} + +func TestLogTriggerMetering_ReserveOnRegister(t *testing.T) { + evmService := initMocks(t) + evmService.EXPECT().GetLatestLPBlock(mock.Anything).Return(&finalizedExpBlock, nil).Once() + evmService.On("RegisterLogTracking", mock.Anything, mock.Anything).Return(nil).Once() + service, emitter := newMeteredTriggerObject(t, evmService, NewLogTriggerStore()) + + meta := capabilities.RequestMetadata{WorkflowID: "wf-id", WorkflowOwner: "0xOwner"} + _, err := service.RegisterLogTrigger(t.Context(), triggerID, meta, meteringTestInput()) + require.NoError(t, err) + + require.Len(t, emitter.records, 1) + record := emitter.records[0] + assertBaseIdentity(t, record.GetIdentity()) + physID := expectedPhysicalFilterID(t, meteringTestInput()) + require.Equal(t, physID, record.GetIdentity().GetResourceId(), "resource_id must be the physical filter content hash") + require.Equal(t, meteringpb.MeterAction_METER_ACTION_RESERVE, record.GetAction()) + require.NotNil(t, record.GetUtilization()) + require.Equal(t, int64(2), record.GetUtilization().GetValue(), "RESERVE value must equal the filter address count") + expectedID := service.identity("42", physID) + require.Equal(t, + resourcemanager.IdempotencyKey(expectedID, meteringpb.MeterAction_METER_ACTION_RESERVE, physID), + record.GetUtilization().GetIdempotencyKey()) +} + +func TestLogTriggerMetering_DonIDFallbackToWorkflowDon(t *testing.T) { + evmService := initMocks(t) + evmService.EXPECT().GetLatestLPBlock(mock.Anything).Return(&finalizedExpBlock, nil).Once() + evmService.On("RegisterLogTracking", mock.Anything, mock.Anything).Return(nil).Once() + service, emitter := newMeteredTriggerObject(t, evmService, NewLogTriggerStore()) + // Host did not inject a capability DON; the consumer's WorkflowDonID is the + // documented fallback resolved at emit time. + service.baseIdentity.DONID = "" + + meta := capabilities.RequestMetadata{WorkflowID: "wf-id", WorkflowOwner: "0xOwner", WorkflowDonID: 7} + _, err := service.RegisterLogTrigger(t.Context(), triggerID, meta, meteringTestInput()) + require.NoError(t, err) + + require.Len(t, emitter.records, 1) + require.Equal(t, "7", emitter.records[0].GetIdentity().GetDonId(), "empty capability DON must fall back to WorkflowDonID") +} + +func TestLogTriggerMetering_NoReserveOnRegisterFailure(t *testing.T) { + evmService := initMocks(t) + evmService.EXPECT().GetLatestLPBlock(mock.Anything).Return(&finalizedExpBlock, nil).Once() + evmService.On("RegisterLogTracking", mock.Anything, mock.Anything).Return(errors.New("mocked register failure")).Once() + service, emitter := newMeteredTriggerObject(t, evmService, NewLogTriggerStore()) + + _, err := service.RegisterLogTrigger(t.Context(), triggerID, capabilities.RequestMetadata{WorkflowID: "wf-id"}, meteringTestInput()) + require.Error(t, err) + require.Zero(t, emitter.emitCalls, "no RESERVE may be emitted for a failed registration") +} + +func TestLogTriggerMetering_ReleaseOnUnregister(t *testing.T) { + meta := capabilities.RequestMetadata{WorkflowID: "wf-id", WorkflowOwner: "0xOwner"} + + registerTrigger := func(t *testing.T, service *LogTriggerService) { + t.Helper() + _, err := service.RegisterLogTrigger(t.Context(), triggerID, meta, meteringTestInput()) + require.NoError(t, err) + // The trigger state (holding the reserved address count) is written by + // the polling goroutine; wait for it before unregistering. + require.Eventually(t, func() bool { + _, ok := service.triggers.Read(triggerID) + return ok + }, time.Second, time.Millisecond) + } + + assertRelease := func(t *testing.T, service *LogTriggerService, record *meteringpb.MeterRecord) { + t.Helper() + assertBaseIdentity(t, record.GetIdentity()) + require.Equal(t, meteringpb.MeterAction_METER_ACTION_RELEASE, record.GetAction()) + require.NotNil(t, record.GetUtilization()) + require.Equal(t, int64(2), record.GetUtilization().GetValue(), "RELEASE must carry the same value that was reserved") + physID := expectedPhysicalFilterID(t, meteringTestInput()) + require.Equal(t, physID, record.GetIdentity().GetResourceId()) + expectedID := service.identity("42", physID) + require.Equal(t, + resourcemanager.IdempotencyKey(expectedID, meteringpb.MeterAction_METER_ACTION_RELEASE, physID), + record.GetUtilization().GetIdempotencyKey()) + } + + t.Run("release pairs the reserve", func(t *testing.T) { + evmService := initMocks(t) + evmService.EXPECT().GetLatestLPBlock(mock.Anything).Return(&finalizedExpBlock, nil).Once() + evmService.On("RegisterLogTracking", mock.Anything, mock.Anything).Return(nil).Once() + evmService.On("UnregisterLogTracking", mock.Anything, mock.Anything).Return(nil).Once() + service, emitter := newMeteredTriggerObject(t, evmService, NewLogTriggerStore()) + + registerTrigger(t, service) + require.NoError(t, service.UnregisterLogTrigger(t.Context(), triggerID, meta, &evmcappb.FilterLogTriggerRequest{})) + + require.Len(t, emitter.records, 2) + require.Equal(t, meteringpb.MeterAction_METER_ACTION_RESERVE, emitter.records[0].GetAction()) + assertRelease(t, service, emitter.records[1]) + require.Equal(t, emitter.records[0].GetUtilization().GetValue(), emitter.records[1].GetUtilization().GetValue()) + require.Equal(t, emitter.records[0].GetIdentity().GetResourceId(), emitter.records[1].GetIdentity().GetResourceId(), + "RESERVE and RELEASE must share one physical resource_id") + }) + + t.Run("release emitted even when UnregisterLogTracking fails", func(t *testing.T) { + evmService := initMocks(t) + evmService.EXPECT().GetLatestLPBlock(mock.Anything).Return(&finalizedExpBlock, nil).Once() + evmService.On("RegisterLogTracking", mock.Anything, mock.Anything).Return(nil).Once() + evmService.On("UnregisterLogTracking", mock.Anything, mock.Anything).Return(errors.New("mocked unregister failure")).Once() + service, emitter := newMeteredTriggerObject(t, evmService, NewLogTriggerStore()) + + registerTrigger(t, service) + // The reservation is released here (from the stashed count) before the + // UnregisterLogTracking RPC. If the RPC fails the filter is orphaned at + // the log poller; the cleanup thread unregisters it silently, emitting + // no further metering record. + require.Error(t, service.UnregisterLogTrigger(t.Context(), triggerID, meta, &evmcappb.FilterLogTriggerRequest{})) + + require.Len(t, emitter.records, 2) + assertRelease(t, service, emitter.records[1]) + }) +} + +func TestLogTriggerMetering_OrphanCleanupEmitsNothing(t *testing.T) { + // Orphan cleanup is log-poller filter hygiene, never a metering event. A + // lost reservation is reconciled by the resource's absence from subsequent + // Snapshots (the liveness mechanism), not by a synthetic cleanup RELEASE. + t.Run("stale filter cleanup emits no meter record", func(t *testing.T) { + mockEVM := evmmock.NewEVMService(t) + store := NewLogTriggerStore() + service, emitter := newMeteredTriggerObject(t, mockEVM, store) + + liveFilterID := service.generateFilterID("live-trigger") + staleFilterID := service.generateFilterID("stale-trigger") + mockEVM.On("GetFiltersNames", mock.Anything).Return([]string{liveFilterID, staleFilterID}, nil).Once() + mockEVM.On("UnregisterLogTracking", mock.Anything, staleFilterID).Return(nil).Once() + // mimicking there's a live trigger with the filter registered to log poller + store.Write("live-trigger", logTriggerState{filter: filter{filterID: liveFilterID}}) + + service.cleanUpStaleFilters(t.Context()) + + require.Zero(t, emitter.emitCalls, "orphan cleanup must not emit any MeterRecord") + }) + + t.Run("emits nothing when cleanup unregister fails", func(t *testing.T) { + mockEVM := evmmock.NewEVMService(t) + service, emitter := newMeteredTriggerObject(t, mockEVM, NewLogTriggerStore()) + + staleFilterID := service.generateFilterID("stale-trigger") + mockEVM.On("GetFiltersNames", mock.Anything).Return([]string{staleFilterID}, nil).Once() + mockEVM.On("UnregisterLogTracking", mock.Anything, staleFilterID).Return(errors.New("mocked cleanup failure")).Once() + + service.cleanUpStaleFilters(t.Context()) + require.Zero(t, emitter.emitCalls, "orphan cleanup never emits a meter record") + }) +} + +func TestLogTriggerMetering_FailOpen(t *testing.T) { + evmService := initMocks(t) + evmService.EXPECT().GetLatestLPBlock(mock.Anything).Return(&finalizedExpBlock, nil).Once() + evmService.On("RegisterLogTracking", mock.Anything, mock.Anything).Return(nil).Once() + service, emitter := newMeteredTriggerObject(t, evmService, NewLogTriggerStore()) + emitter.err = errors.New("mocked emitter failure") + + _, err := service.RegisterLogTrigger(t.Context(), triggerID, capabilities.RequestMetadata{WorkflowID: "wf-id"}, meteringTestInput()) + require.NoError(t, err, "a metering emit failure must never fail registration") + require.Equal(t, 1, emitter.emitCalls, "the emit was attempted and its failure swallowed") +} + +// TestPhysicalFilterID_Canonicalization proves the content hash is independent +// of the order addresses / event sigs / per-slot topic values are supplied, and +// independent of which workflow or trigger registered the filter, while staying +// sensitive to the positional topic slot. +func TestPhysicalFilterID_Canonicalization(t *testing.T) { + addrA := evmtypes.Address(expectedAddress) + addrB := evmtypes.Address(bytes.Repeat([]byte{0x42}, evmtypes.AddressLength)) + sig1 := evmtypes.Hash(eventSig0Example) + sig2 := evmtypes.Hash(bytes.Repeat([]byte{0x11}, evmtypes.HashLength)) + none := []evmtypes.Hash{} + + t.Run("address order does not change the id", func(t *testing.T) { + id1 := physicalFilterID(testChainSelector, []evmtypes.Address{addrA, addrB}, []evmtypes.Hash{sig1}, none, none, none) + id2 := physicalFilterID(testChainSelector, []evmtypes.Address{addrB, addrA}, []evmtypes.Hash{sig1}, none, none, none) + require.Equal(t, id1, id2) + }) + + t.Run("event sig order does not change the id", func(t *testing.T) { + id1 := physicalFilterID(testChainSelector, []evmtypes.Address{addrA}, []evmtypes.Hash{sig1, sig2}, none, none, none) + id2 := physicalFilterID(testChainSelector, []evmtypes.Address{addrA}, []evmtypes.Hash{sig2, sig1}, none, none, none) + require.Equal(t, id1, id2) + }) + + t.Run("topic values within a slot are order-independent", func(t *testing.T) { + id1 := physicalFilterID(testChainSelector, []evmtypes.Address{addrA}, []evmtypes.Hash{sig1}, []evmtypes.Hash{sig1, sig2}, none, none) + id2 := physicalFilterID(testChainSelector, []evmtypes.Address{addrA}, []evmtypes.Hash{sig1}, []evmtypes.Hash{sig2, sig1}, none, none) + require.Equal(t, id1, id2) + }) + + t.Run("topic slots are positional", func(t *testing.T) { + inSlot2 := physicalFilterID(testChainSelector, []evmtypes.Address{addrA}, []evmtypes.Hash{sig1}, []evmtypes.Hash{sig2}, none, none) + inSlot3 := physicalFilterID(testChainSelector, []evmtypes.Address{addrA}, []evmtypes.Hash{sig1}, none, []evmtypes.Hash{sig2}, none) + require.NotEqual(t, inSlot2, inSlot3, "the same value in topic2 vs topic3 is a different filter") + }) + + t.Run("different chain selector changes the id", func(t *testing.T) { + id1 := physicalFilterID(testChainSelector, []evmtypes.Address{addrA}, []evmtypes.Hash{sig1}, none, none, none) + id2 := physicalFilterID("999", []evmtypes.Address{addrA}, []evmtypes.Hash{sig1}, none, none, none) + require.NotEqual(t, id1, id2) + }) + + t.Run("identical filters from different workflows/triggers share one id", func(t *testing.T) { + // physicalFilterID takes only physical criteria; workflow/trigger are not + // inputs. Two registrations with identical criteria therefore collide by + // construction, which this asserts end to end through the register path. + evmService := initMocks(t) + evmService.EXPECT().GetLatestLPBlock(mock.Anything).Return(&finalizedExpBlock, nil).Twice() + evmService.On("RegisterLogTracking", mock.Anything, mock.Anything).Return(nil).Twice() + service, emitter := newMeteredTriggerObject(t, evmService, NewLogTriggerStore()) + + _, err := service.RegisterLogTrigger(t.Context(), "trigger-A", + capabilities.RequestMetadata{WorkflowID: "wf-1", WorkflowOwner: "0xOwner"}, meteringTestInput()) + require.NoError(t, err) + _, err = service.RegisterLogTrigger(t.Context(), "trigger-B", + capabilities.RequestMetadata{WorkflowID: "wf-2", WorkflowOwner: "0xOther"}, meteringTestInput()) + require.NoError(t, err) + + require.Len(t, emitter.records, 2) + require.Equal(t, + emitter.records[0].GetIdentity().GetResourceId(), + emitter.records[1].GetIdentity().GetResourceId(), + "identical physical filters must share one resource_id across workflows/triggers") + }) +} + +// TestLogTriggerMetering_Snapshot drives one snapshot tick and asserts one +// MeterSnapshot per active filter, each fully identified by its +// ResourceIdentity (physical resource_id) with the right value. The manager +// emits one MeterSnapshot message per resource; there is no label metadata, so +// snapshots are keyed by their physical resource_id. +func TestLogTriggerMetering_Snapshot(t *testing.T) { + mockEVM := evmmock.NewEVMService(t) + store := NewLogTriggerStore() + service, emitter, clock := newMeteredTriggerObject(t, mockEVM, store) + + physA := expectedPhysicalFilterID(t, meteringTestInput()) + store.Write("trigger-A", logTriggerState{filter: filter{ + filterID: service.generateFilterID("trigger-A"), + physicalFilterID: physA, + reservedAddressCount: 2, + donID: "42", + }}) + store.Write("trigger-B", logTriggerState{filter: filter{ + filterID: service.generateFilterID("trigger-B"), + physicalFilterID: "physB", + reservedAddressCount: 5, + donID: "42", + }}) + + unregister := service.resourceManager.Register(service) + t.Cleanup(unregister) + servicetest.Run(t, service.resourceManager) + require.NoError(t, clock.BlockUntilContext(t.Context(), 1)) + clock.Advance(time.Minute) + + require.Eventually(t, func() bool { + return len(emitter.snapshots) == 2 + }, time.Second, time.Millisecond) + + require.Len(t, emitter.snapshots, 2, "one MeterSnapshot per active filter") + + byResourceID := map[string]*meteringpb.MeterSnapshot{} + for _, s := range emitter.snapshots { + assertBaseIdentity(t, s.GetIdentity()) + byResourceID[s.GetIdentity().GetResourceId()] = s + } + + a := byResourceID[physA] + require.NotNil(t, a) + require.Equal(t, int64(2), a.GetUtilization().GetValue()) + + b := byResourceID["physB"] + require.NotNil(t, b) + require.Equal(t, int64(5), b.GetUtilization().GetValue()) +} + +// TestLogTriggerMetering_Snapshot_NothingActive asserts an empty store emits no +// snapshots: billing zeroes a resource out by its absence from later snapshots. +func TestLogTriggerMetering_Snapshot_NothingActive(t *testing.T) { + mockEVM := evmmock.NewEVMService(t) + service, emitter, clock := newMeteredTriggerObject(t, mockEVM, NewLogTriggerStore()) + + unregister := service.resourceManager.Register(service) + t.Cleanup(unregister) + servicetest.Run(t, service.resourceManager) + require.NoError(t, clock.BlockUntilContext(t.Context(), 1)) + clock.Advance(time.Minute) + + require.Empty(t, emitter.snapshots, "an empty store emits no MeterSnapshot") +} + +// TestLogTriggerMetering_ReleaseOnGracefulClose asserts that closing the service +// releases every still-active filter so a clean shutdown is not seen as a leak. +func TestLogTriggerMetering_ReleaseOnGracefulClose(t *testing.T) { + mockEVM := evmmock.NewEVMService(t) + store := NewLogTriggerStore() + service, emitter := newMeteredTriggerObject(t, mockEVM, store) + + physA := expectedPhysicalFilterID(t, meteringTestInput()) + store.Write("trigger-A", logTriggerState{filter: filter{ + filterID: service.generateFilterID("trigger-A"), + physicalFilterID: physA, + reservedAddressCount: 2, + donID: "42", + }}) + store.Write("trigger-B", logTriggerState{filter: filter{ + filterID: service.generateFilterID("trigger-B"), + physicalFilterID: "physB", + reservedAddressCount: 5, + donID: "42", + }}) + + service.releaseActiveFiltersOnClose(t.Context()) + + require.Len(t, emitter.records, 2, "one RELEASE per active filter on graceful close") + for _, record := range emitter.records { + require.Equal(t, meteringpb.MeterAction_METER_ACTION_RELEASE, record.GetAction()) + assertBaseIdentity(t, record.GetIdentity()) + } + // The records carry no label metadata, so pair them by their physical + // resource_id (the only per-filter discriminator on the record). + byResourceID := map[string]*meteringpb.MeterRecord{} + for _, record := range emitter.records { + byResourceID[record.GetIdentity().GetResourceId()] = record + } + require.Equal(t, int64(2), byResourceID[physA].GetUtilization().GetValue()) + require.Equal(t, int64(5), byResourceID["physB"].GetUtilization().GetValue()) +} diff --git a/chain_capabilities/evm/trigger/trigger_test.go b/chain_capabilities/evm/trigger/trigger_test.go index 23841d764..f91b28d79 100644 --- a/chain_capabilities/evm/trigger/trigger_test.go +++ b/chain_capabilities/evm/trigger/trigger_test.go @@ -30,6 +30,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" evmcappb "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/chain-capabilities/evm" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" "github.com/smartcontractkit/chainlink-common/pkg/services" evmtypes "github.com/smartcontractkit/chainlink-common/pkg/types/chains/evm" evmmock "github.com/smartcontractkit/chainlink-common/pkg/types/mocks" @@ -1261,33 +1262,33 @@ func TestNewLogTriggerService(t *testing.T) { t.Run("empty capability id", func(t *testing.T) { lggr := logger.Test(t) - _, err := NewLogTriggerService(evmService, store, lggr, "", beholderProcessor, messageBuilder, time.Second, 0, 0, limits.Factory{Logger: lggr}, nil, capabilities.NewMemEventStore()) + _, err := NewLogTriggerService(evmService, store, lggr, "", beholderProcessor, messageBuilder, time.Second, 0, 0, limits.Factory{Logger: lggr}, nil, capabilities.NewMemEventStore(), nil, resourcemanager.ResourceIdentity{}, 0) require.Error(t, err) require.Contains(t, err.Error(), "capabilityID must be non-empty") }) t.Run("ok initialize interval", func(t *testing.T) { - trigger, err := NewLogTriggerService(evmService, store, logger.Test(t), testLogTriggerCapabilityID, beholderProcessor, messageBuilder, 10*time.Second, 0, 0, testLimitsFactory(t), nil, capabilities.NewMemEventStore()) + trigger, err := NewLogTriggerService(evmService, store, logger.Test(t), testLogTriggerCapabilityID, beholderProcessor, messageBuilder, 10*time.Second, 0, 0, testLimitsFactory(t), nil, capabilities.NewMemEventStore(), nil, resourcemanager.ResourceIdentity{}, 0) require.NoError(t, err) require.Equal(t, 10*time.Second, trigger.logTriggerPollInterval) require.Equal(t, uint64(1000), trigger.logTriggerSendChannelBufferSize) require.Equal(t, uint64(1000), trigger.limitAndSort.Limit.Count) }) t.Run("ok initialize all params", func(t *testing.T) { - trigger, err := NewLogTriggerService(evmService, store, logger.Test(t), testLogTriggerCapabilityID, beholderProcessor, messageBuilder, 10*time.Second, 100, 50, testLimitsFactory(t), nil, capabilities.NewMemEventStore()) + trigger, err := NewLogTriggerService(evmService, store, logger.Test(t), testLogTriggerCapabilityID, beholderProcessor, messageBuilder, 10*time.Second, 100, 50, testLimitsFactory(t), nil, capabilities.NewMemEventStore(), nil, resourcemanager.ResourceIdentity{}, 0) require.NoError(t, err) require.Equal(t, 10*time.Second, trigger.logTriggerPollInterval) require.Equal(t, uint64(100), trigger.logTriggerSendChannelBufferSize) require.Equal(t, uint64(50), trigger.limitAndSort.Limit.Count) }) t.Run("ok initialize buffer only", func(t *testing.T) { - trigger, err := NewLogTriggerService(evmService, store, logger.Test(t), testLogTriggerCapabilityID, beholderProcessor, messageBuilder, 10*time.Second, 10000, 0, testLimitsFactory(t), nil, capabilities.NewMemEventStore()) + trigger, err := NewLogTriggerService(evmService, store, logger.Test(t), testLogTriggerCapabilityID, beholderProcessor, messageBuilder, 10*time.Second, 10000, 0, testLimitsFactory(t), nil, capabilities.NewMemEventStore(), nil, resourcemanager.ResourceIdentity{}, 0) require.NoError(t, err) require.Equal(t, 10*time.Second, trigger.logTriggerPollInterval) require.Equal(t, uint64(10000), trigger.logTriggerSendChannelBufferSize) require.Equal(t, uint64(defaultLimitQueryLogSize), trigger.limitAndSort.Limit.Count) //default value for limit as 0 was provided }) t.Run("ok initialize query limit only", func(t *testing.T) { - trigger, err := NewLogTriggerService(evmService, store, logger.Test(t), testLogTriggerCapabilityID, beholderProcessor, messageBuilder, 10*time.Second, 0, 100, testLimitsFactory(t), nil, capabilities.NewMemEventStore()) + trigger, err := NewLogTriggerService(evmService, store, logger.Test(t), testLogTriggerCapabilityID, beholderProcessor, messageBuilder, 10*time.Second, 0, 100, testLimitsFactory(t), nil, capabilities.NewMemEventStore(), nil, resourcemanager.ResourceIdentity{}, 0) require.NoError(t, err) require.Equal(t, 10*time.Second, trigger.logTriggerPollInterval) require.Equal(t, uint64(defaultSendChannelBufferSize), trigger.logTriggerSendChannelBufferSize) //default value for buffer size as 0 was provided @@ -1296,25 +1297,25 @@ func TestNewLogTriggerService(t *testing.T) { // negative tests t.Run("negative poll interval", func(t *testing.T) { lggr := logger.Test(t) - _, err := NewLogTriggerService(evmService, store, lggr, testLogTriggerCapabilityID, beholderProcessor, messageBuilder, -1*time.Second, 0, 0, limits.Factory{Logger: lggr}, nil, nil) + _, err := NewLogTriggerService(evmService, store, lggr, testLogTriggerCapabilityID, beholderProcessor, messageBuilder, -1*time.Second, 0, 0, limits.Factory{Logger: lggr}, nil, nil, nil, resourcemanager.ResourceIdentity{}, 0) require.Error(t, err) require.Contains(t, err.Error(), "logTriggerPollInterval must be positive, got: -1s") }) t.Run("limit query log size >= send channel buffer size", func(t *testing.T) { lggr := logger.Test(t) - _, err := NewLogTriggerService(evmService, store, lggr, testLogTriggerCapabilityID, beholderProcessor, messageBuilder, time.Second, 5, 10, limits.Factory{Logger: lggr}, nil, nil) + _, err := NewLogTriggerService(evmService, store, lggr, testLogTriggerCapabilityID, beholderProcessor, messageBuilder, time.Second, 5, 10, limits.Factory{Logger: lggr}, nil, nil, nil, resourcemanager.ResourceIdentity{}, 0) require.Error(t, err) require.Contains(t, err.Error(), "logTriggerLimitQueryLogSize (10) must be less than logTriggerSendChannelBufferSize (5)") }) t.Run("limit query log size >= default send channel buffer size", func(t *testing.T) { lggr := logger.Test(t) - _, err := NewLogTriggerService(evmService, store, lggr, testLogTriggerCapabilityID, beholderProcessor, messageBuilder, time.Second, 0, defaultSendChannelBufferSize+1, limits.Factory{Logger: lggr}, nil, nil) + _, err := NewLogTriggerService(evmService, store, lggr, testLogTriggerCapabilityID, beholderProcessor, messageBuilder, time.Second, 0, defaultSendChannelBufferSize+1, limits.Factory{Logger: lggr}, nil, nil, nil, resourcemanager.ResourceIdentity{}, 0) require.Error(t, err) require.Contains(t, err.Error(), "logTriggerLimitQueryLogSize (1001) must be less than logTriggerSendChannelBufferSize (1000)") }) t.Run("nil trigger event store", func(t *testing.T) { lggr := logger.Test(t) - _, err := NewLogTriggerService(evmService, store, lggr, testLogTriggerCapabilityID, beholderProcessor, messageBuilder, time.Second, 0, 0, limits.Factory{Logger: lggr}, nil, nil) + _, err := NewLogTriggerService(evmService, store, lggr, testLogTriggerCapabilityID, beholderProcessor, messageBuilder, time.Second, 0, 0, limits.Factory{Logger: lggr}, nil, nil, nil, resourcemanager.ResourceIdentity{}, 0) require.Error(t, err) require.Contains(t, err.Error(), "no trigger event store provided") }) diff --git a/cron/go.mod b/cron/go.mod index d76d4aae6..51aaac6a0 100644 --- a/cron/go.mod +++ b/cron/go.mod @@ -2,13 +2,21 @@ module github.com/smartcontractkit/capabilities/cron go 1.26.2 +// Unpublished local stack for SHARED-2709; drop once chainlink-common and +// chainlink-protos/metering/go are tagged. +replace ( + github.com/smartcontractkit/chainlink-common => ../../chainlink-common + github.com/smartcontractkit/chainlink-protos/metering/go => ../../chainlink-protos/metering/go +) + require ( github.com/go-co-op/gocron/v2 v2.18.0 github.com/google/uuid v1.6.0 github.com/jonboulle/clockwork v0.5.0 github.com/smartcontractkit/capabilities/libs v0.0.0-20260210010829-97eb42ca2924 github.com/smartcontractkit/chainlink-common v0.11.2-0.20260529092756-a94bc8ce96d6 - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32 + github.com/smartcontractkit/chainlink-protos/metering/go v0.0.0-00010101000000-000000000000 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/metric v1.43.0 @@ -85,7 +93,7 @@ require ( github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260529092756-a94bc8ce96d6 // indirect github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b // indirect - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997 // indirect + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9 // indirect github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e // indirect github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 // indirect github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d // indirect diff --git a/cron/go.sum b/cron/go.sum index f6f3aad2a..d07775fc9 100644 --- a/cron/go.sum +++ b/cron/go.sum @@ -213,18 +213,16 @@ github.com/smartcontractkit/capabilities/libs v0.0.0-20260210010829-97eb42ca2924 github.com/smartcontractkit/capabilities/libs v0.0.0-20260210010829-97eb42ca2924/go.mod h1:v0O0Au8RE00Z89QxBE6I2q9bR9r3+RO1gLD3oaO2WB0= github.com/smartcontractkit/chain-selectors v1.0.100 h1:wpiSpmI/eFjY+wx/nPr5VuNF4hki0prIBMKEaQWn3g4= github.com/smartcontractkit/chain-selectors v1.0.100/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260529092756-a94bc8ce96d6 h1:hms02zQQ0BPcp9CBwh/xda5KwJWdU0IIA/yjtwyRoA4= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260529092756-a94bc8ce96d6/go.mod h1:jueIfDkkRexwGgLbVB7vGCZlNtd383zuwi4uHHwcbqc= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260529092756-a94bc8ce96d6 h1:ucHu2bPDT/58AzSgnPDyp4IjnjVbrVWYD3bG5jCbXMY= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260529092756-a94bc8ce96d6/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 h1:iljEJss3WOwcsMkWy72Yn2zvjw7Gyxc+RXL7r8YKM6g= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32 h1:GNl+lLK0QCakqA1J1i7FoOai2JrOGOzNzSniMijaCjA= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b/go.mod h1:dkR2uYg9XYJuT1JASkPzWE51jjFkVb86P7a/yXe5/GM= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997 h1:W0HKHO8eE8BckTRnhSdqjHKbJcnk068nEWYnWRu6tJY= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9 h1:LQy2j2+TdKLSWsUTUYuqmQPn8kjqCLjGI3ZJYGtDc08= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12ijqMM9tvYVEm+nR826WsrNi6zCKpwBhuApq127wHs= diff --git a/cron/main.go b/cron/main.go index a085c0391..c2f40b391 100644 --- a/cron/main.go +++ b/cron/main.go @@ -1,16 +1,48 @@ package main import ( + "os" + "strconv" + "github.com/smartcontractkit/capabilities/cron/trigger" "github.com/smartcontractkit/capabilities/libs/loopserver" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/cron/server" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/loop" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" ) +// meterRecordsEnabledEnvVar gates MeterRecord emission; the name is the +// cross-producer convention for the metering rollout (SHARED-2718). +const meterRecordsEnabledEnvVar = "CL_METER_RECORDS_ENABLED" + +// meterRecordsEnabled reads the metering gate from the environment. Unset or +// unparseable values disable emission; metering config must never prevent the +// capability from starting. +func meterRecordsEnabled(lggr logger.Logger) bool { + v := os.Getenv(meterRecordsEnabledEnvVar) + if v == "" { + return false + } + enabled, err := strconv.ParseBool(v) + if err != nil { + lggr.Warnw("Invalid value for "+meterRecordsEnabledEnvVar+", meter record emission disabled", "value", v, "error", err) + return false + } + return enabled +} + func main() { loopserver.ServeNew(trigger.ServiceName, func(s *loop.Server) loop.StandardCapabilities { - triggerService, err := trigger.NewTriggerService(s.Logger, nil, s.LimitsFactory) + meters := resourcemanager.NewResourceManager(s.Logger, resourcemanager.ResourceManagerConfig{ + Enabled: meterRecordsEnabled(s.Logger), + Emitter: beholder.GetEmitter(), + SnapshotInterval: resourcemanager.DefaultSnapshotInterval, + }) + + triggerService, err := trigger.NewTriggerService(s.Logger, nil, s.LimitsFactory, meters) if err != nil { s.Logger.Fatalw("Failed to create cron trigger service", "error", err) } diff --git a/cron/trigger/metering_test.go b/cron/trigger/metering_test.go new file mode 100644 index 000000000..44efd13f6 --- /dev/null +++ b/cron/trigger/metering_test.go @@ -0,0 +1,375 @@ +package trigger + +import ( + "context" + "encoding/json" + "errors" + "sync" + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + crontypedapi "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/cron" + "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" + "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" +) + +// fakeMeterEmitter captures metering records delivered through a real +// ResourceManager, so tests assert on exactly the bytes production would emit. +// It demultiplexes MeterRecord and MeterSnapshot messages by their beholder +// entity attribute. A non-nil err simulates delivery failure: nothing is +// recorded. +type fakeMeterEmitter struct { + mu sync.Mutex + err error + records []*meteringpb.MeterRecord + snapshots []*meteringpb.MeterSnapshot +} + +func (f *fakeMeterEmitter) Emit(_ context.Context, body []byte, attrKVs ...any) error { + f.mu.Lock() + defer f.mu.Unlock() + if f.err != nil { + return f.err + } + if attrEntity(attrKVs) == "metering.v1.MeterSnapshot" { + snapshot := &meteringpb.MeterSnapshot{} + if err := proto.Unmarshal(body, snapshot); err != nil { + return err + } + f.snapshots = append(f.snapshots, snapshot) + return nil + } + record := &meteringpb.MeterRecord{} + if err := proto.Unmarshal(body, record); err != nil { + return err + } + f.records = append(f.records, record) + return nil +} + +// attrEntity extracts the beholder entity attribute value from the variadic +// key/value attrs the ResourceManager passes to Emit. +func attrEntity(attrKVs []any) string { + for i := 0; i+1 < len(attrKVs); i += 2 { + if k, ok := attrKVs[i].(string); ok && k == "beholder_entity" { + if v, ok := attrKVs[i+1].(string); ok { + return v + } + } + } + return "" +} + +func (f *fakeMeterEmitter) Records() []*meteringpb.MeterRecord { + f.mu.Lock() + defer f.mu.Unlock() + return append([]*meteringpb.MeterRecord(nil), f.records...) +} + +func (f *fakeMeterEmitter) Snapshots() []*meteringpb.MeterSnapshot { + f.mu.Lock() + defer f.mu.Unlock() + return append([]*meteringpb.MeterSnapshot(nil), f.snapshots...) +} + +// meteredTestDeps are the host-injected identity dimensions used by metering +// tests. They mirror what the host populates via the Initialise channel. +var meteredTestDeps = core.StandardCapabilitiesDependencies{ + Product: "cre-mainline", + Environment: "staging", + Zone: "wf-zone-a", + NodeID: "csa-pubkey-1", + CapabilityDonID: 7, +} + +// expectedBaseIdentity is the base identity the Service builds from +// meteredTestDeps (resource_id left empty; set per trigger). +var expectedBaseIdentity = resourcemanager.ResourceIdentity{ + Product: "cre-mainline", + Environment: "staging", + Zone: "wf-zone-a", + DONID: "7", + NodeID: "csa-pubkey-1", + Service: "cron-trigger", + Resource: "trigger_registrations", + ResourceType: "operations", +} + +// newMeteredTriggerService builds an initialised trigger service whose +// ResourceManager is enabled and wired to emitter, with identity sourced from +// meteredTestDeps. Snapshots use a fake clock so tests advance the tick +// deterministically. +func newMeteredTriggerService(t *testing.T, clock clockwork.Clock, emitter resourcemanager.Emitter) (*Service, *resourcemanager.ResourceManager, *clockwork.FakeClock) { + t.Helper() + + fakeClock, ok := clock.(*clockwork.FakeClock) + if !ok { + fakeClock = clockwork.NewFakeClockAt(clock.Now()) + clock = fakeClock + } + + meters := resourcemanager.NewResourceManager(logger.Nop(), resourcemanager.ResourceManagerConfig{ + Enabled: true, + Emitter: emitter, + SnapshotInterval: time.Minute, + Clock: clock, + }) + ts, err := NewTriggerService(logger.Nop(), clock, limits.Factory{}, meters) + require.NoError(t, err) + + config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) + require.NoError(t, err) + + deps := meteredTestDeps + deps.Config = string(config) + require.NoError(t, ts.Initialise(t.Context(), deps)) + + return ts, meters, fakeClock +} + +func TestCronTrigger_Metering_ReserveAndRelease(t *testing.T) { + t.Parallel() + + fakeClock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + emitter := &fakeMeterEmitter{} + ts, _, _ := newMeteredTriggerService(t, fakeClock, emitter) + + metadata := capabilities.RequestMetadata{ + WorkflowID: workflowID1, + WorkflowOwner: "0xOwner-1", + } + ch, capErr := ts.RegisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + records := emitter.Records() + require.Len(t, records, 1, "expected exactly one RESERVE on successful registration") + reserve := records[0] + assert.Equal(t, meteringpb.MeterAction_METER_ACTION_RESERVE, reserve.GetAction()) + + // Identity is populated from the host-injected deps, with resource_id set + // to the trigger_id (cron is workflow-scoped). + id := reserve.GetIdentity() + require.NotNil(t, id) + assert.Equal(t, "cre-mainline", id.GetProduct()) + assert.Equal(t, "staging", id.GetEnvironment()) + assert.Equal(t, "wf-zone-a", id.GetZone()) + assert.Equal(t, "7", id.GetDonId()) + assert.Equal(t, "csa-pubkey-1", id.GetNodeId()) + assert.Equal(t, "cron-trigger", id.GetService()) + assert.Equal(t, "trigger_registrations", id.GetResource()) + assert.Equal(t, "operations", id.GetResourceType()) + assert.Equal(t, triggerID1, id.GetResourceId()) + + require.NotNil(t, reserve.GetUtilization()) + assert.Equal(t, int64(1), reserve.GetUtilization().GetValue()) + assert.Equal(t, + resourcemanager.IdempotencyKey(expectedBaseIdentity.WithResourceID(triggerID1), meteringpb.MeterAction_METER_ACTION_RESERVE, triggerID1), + reserve.GetUtilization().GetIdempotencyKey()) + + // Each cron tick re-Writes the trigger to reschedule it; the Write + // happens before the channel send, so after receiving the event the + // callback path has fully run. It must not emit. + for range 3 { + fakeClock.Advance(time.Second) + <-ch + } + require.Len(t, emitter.Records(), 1, "cron tick callbacks must not emit meter records") + + require.Nil(t, ts.UnregisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond})) + records = emitter.Records() + require.Len(t, records, 2, "expected exactly one RELEASE on unregistration") + release := records[1] + assert.Equal(t, meteringpb.MeterAction_METER_ACTION_RELEASE, release.GetAction()) + assert.Equal(t, reserve.GetIdentity().GetResourceId(), release.GetIdentity().GetResourceId()) + require.NotNil(t, release.GetUtilization()) + assert.Equal(t, int64(1), release.GetUtilization().GetValue()) + assert.NotEqual(t, reserve.GetUtilization().GetIdempotencyKey(), release.GetUtilization().GetIdempotencyKey(), + "RESERVE and RELEASE must not share an idempotency key") + + require.NoError(t, ts.Close()) +} + +func TestCronTrigger_Metering_NoEmitOnFailedPaths(t *testing.T) { + t.Parallel() + + fakeClock := clockwork.NewFakeClock() + emitter := &fakeMeterEmitter{} + ts, _, _ := newMeteredTriggerService(t, fakeClock, emitter) + + metadata := capabilities.RequestMetadata{WorkflowID: workflowID1, WorkflowOwner: "owner-1"} + + // Invalid schedule: registration fails before allocation, nothing emitted. + _, capErr := ts.RegisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: "not-a-schedule"}) + require.NotNil(t, capErr) + require.Empty(t, emitter.Records()) + + // Unregistering a trigger that was never registered releases nothing. + require.Nil(t, ts.UnregisterTrigger(t.Context(), "missing", metadata, &crontypedapi.Config{Schedule: everySecond})) + require.Empty(t, emitter.Records()) + + // Duplicate registration fails and must not double-RESERVE. + _, capErr = ts.RegisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + _, capErr = ts.RegisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond}) + require.NotNil(t, capErr) + require.Len(t, emitter.Records(), 1) + + // Unregister to avoid a graceful-close RELEASE from interfering with the + // single-RESERVE assertion intent. + require.Nil(t, ts.UnregisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond})) + require.NoError(t, ts.Close()) +} + +func TestCronTrigger_Metering_FailOpen(t *testing.T) { + t.Parallel() + + fakeClock := clockwork.NewFakeClock() + emitter := &fakeMeterEmitter{err: errors.New("collector unavailable")} + ts, _, _ := newMeteredTriggerService(t, fakeClock, emitter) + + metadata := capabilities.RequestMetadata{WorkflowID: workflowID1, WorkflowOwner: "owner-1"} + + // Registration and unregistration succeed even though every emission fails. + ch, capErr := ts.RegisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + fakeClock.Advance(time.Second) + <-ch // trigger still fires + + require.Nil(t, ts.UnregisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond})) + require.Empty(t, emitter.Records()) + + require.NoError(t, ts.Close()) +} + +// TestCronTrigger_Metering_Snapshot asserts the Service implements Meterable +// such that a forced snapshot emits one MeterSnapshot per active trigger, each +// carrying the full per-resource identity (resource_id set to the trigger_id). +func TestCronTrigger_Metering_Snapshot(t *testing.T) { + t.Parallel() + + fakeClock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + emitter := &fakeMeterEmitter{} + ts, rm, clock := newMeteredTriggerService(t, fakeClock, emitter) + _ = rm + + metadata1 := capabilities.RequestMetadata{WorkflowID: workflowID1, WorkflowOwner: "0xOwner-1"} + _, capErr := ts.RegisterTrigger(t.Context(), triggerID1, metadata1, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + metadata2 := capabilities.RequestMetadata{WorkflowID: "workflow-id-2", WorkflowOwner: "owner-2"} + const triggerID2 = "test-id-2" + _, capErr = ts.RegisterTrigger(t.Context(), triggerID2, metadata2, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + require.NoError(t, clock.BlockUntilContext(t.Context(), 1)) + clock.Advance(time.Minute) + + // One MeterSnapshot per active trigger, value 1, full per-resource identity. + require.Eventually(t, func() bool { + return len(emitter.Snapshots()) == 2 + }, time.Second, time.Millisecond) + snapshots := emitter.Snapshots() + require.Len(t, snapshots, 2, "one MeterSnapshot per active trigger per tick") + + byTrigger := map[string]*meteringpb.MeterSnapshot{} + for _, s := range snapshots { + byTrigger[s.GetIdentity().GetResourceId()] = s + } + + s1 := byTrigger[triggerID1] + require.NotNil(t, s1) + assert.Equal(t, int64(1), s1.GetUtilization().GetValue()) + assert.Equal(t, "cron-trigger", s1.GetIdentity().GetService()) + assert.Equal(t, "trigger_registrations", s1.GetIdentity().GetResource()) + assert.Equal(t, "operations", s1.GetIdentity().GetResourceType()) + + s2 := byTrigger[triggerID2] + require.NotNil(t, s2) + assert.Equal(t, int64(1), s2.GetUtilization().GetValue()) + assert.Equal(t, triggerID2, s2.GetIdentity().GetResourceId()) + + require.NoError(t, ts.Close()) +} + +// TestCronTrigger_Metering_GracefulCloseReleases asserts that Close drains a +// RELEASE for every still-active registration, so a graceful shutdown does not +// leak reservations in billing. +func TestCronTrigger_Metering_GracefulCloseReleases(t *testing.T) { + t.Parallel() + + fakeClock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + emitter := &fakeMeterEmitter{} + ts, _, _ := newMeteredTriggerService(t, fakeClock, emitter) + + metadata1 := capabilities.RequestMetadata{WorkflowID: workflowID1, WorkflowOwner: "0xOwner-1"} + _, capErr := ts.RegisterTrigger(t.Context(), triggerID1, metadata1, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + metadata2 := capabilities.RequestMetadata{WorkflowID: "workflow-id-2", WorkflowOwner: "owner-2"} + const triggerID2 = "test-id-2" + _, capErr = ts.RegisterTrigger(t.Context(), triggerID2, metadata2, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + // Two RESERVEs so far. + require.Len(t, emitter.Records(), 2) + + require.NoError(t, ts.Close()) + + // Close drained a RELEASE for each active trigger. + records := emitter.Records() + require.Len(t, records, 4, "two RESERVEs + one RELEASE per active trigger on graceful close") + + releases := map[string]*meteringpb.MeterRecord{} + for _, r := range records[2:] { + require.Equal(t, meteringpb.MeterAction_METER_ACTION_RELEASE, r.GetAction()) + releases[r.GetIdentity().GetResourceId()] = r + } + require.Contains(t, releases, triggerID1) + require.Contains(t, releases, triggerID2) + assert.Equal(t, int64(1), releases[triggerID1].GetUtilization().GetValue()) +} + +// TestCronTrigger_Metering_DonIDFallback asserts the DON ID falls back to the +// consumer workflow's DON when the host has not injected a capability DON ID. +func TestCronTrigger_Metering_DonIDFallback(t *testing.T) { + t.Parallel() + + fakeClock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + emitter := &fakeMeterEmitter{} + + meters := resourcemanager.NewResourceManager(logger.Nop(), resourcemanager.ResourceManagerConfig{ + Enabled: true, + Emitter: emitter, + }) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, meters) + require.NoError(t, err) + + config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) + require.NoError(t, err) + + // No CapabilityDonID injected (zero) → fall back to WorkflowDonID at emit. + require.NoError(t, ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{Config: string(config)})) + + metadata := capabilities.RequestMetadata{WorkflowID: workflowID1, WorkflowOwner: "owner-1", WorkflowDonID: 42} + _, capErr := ts.RegisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond}) + require.Nil(t, capErr) + + records := emitter.Records() + require.Len(t, records, 1) + assert.Equal(t, "42", records[0].GetIdentity().GetDonId(), "DON ID falls back to WorkflowDonID") + // Product falls back to the cron constant when the host injects none. + assert.Equal(t, "cre", records[0].GetIdentity().GetProduct()) + + require.Nil(t, ts.UnregisterTrigger(t.Context(), triggerID1, metadata, &crontypedapi.Config{Schedule: everySecond})) + require.NoError(t, ts.Close()) +} diff --git a/cron/trigger/trigger.go b/cron/trigger/trigger.go index d78adf404..8b78ddfcf 100644 --- a/cron/trigger/trigger.go +++ b/cron/trigger/trigger.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/orgresolver" "github.com/smartcontractkit/chainlink-common/pkg/settings/cresettings" @@ -29,6 +30,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/workflows" "github.com/smartcontractkit/chainlink-common/pkg/workflows/events" + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" ) const ServiceName = "CronCapabilities" @@ -41,6 +43,21 @@ var cronTriggerInfo = capabilities.MustNewCapabilityInfo( "A trigger that uses a cron schedule to run periodically at fixed times, dates, or intervals.", ) +const ( + // meteringService is the stable service constant for cron trigger + // registrations on emitted MeterRecords and Snapshots. It must not encode + // deployment environment or zone: those are discrete identity dimensions + // sourced from deps at Initialise. + meteringService = "cron-trigger" + // meteringResource is the resource pool cron records apply to. + meteringResource = "trigger_registrations" + // meteringResourceType is the billing unit for cron registrations. + meteringResourceType = "operations" + // meteringProductFallback is used when the host has not injected a Product + // (a legacy node or a boot path not yet updated to populate deps.Product). + meteringProductFallback = "cre" +) + type Config struct { FastestScheduleIntervalSeconds int `json:"fastestScheduleIntervalSeconds"` } @@ -58,6 +75,9 @@ type cronTrigger struct { } type Service struct { + services.Service + srvcEng *services.Engine + capabilities.CapabilityInfo limitsFactory limits.Factory fastestScheduleInterval limits.TimeLimiter @@ -68,7 +88,15 @@ type Service struct { triggers *cronStore labeler custmsg.MessageEmitter metrics *Metrics - orgResolver orgresolver.OrgResolver + meters *resourcemanager.ResourceManager + // unregisterMeterable removes this Service from the ResourceManager's + // snapshot registry; set at start, called at close. Nil until started. + unregisterMeterable func() + // base is the resourcemanager identity for cron registrations, built from + // the host-injected deployment/node/DON dimensions at Initialise. ResourceID + // is left empty here and set per trigger via base.WithResourceID(triggerID). + base resourcemanager.ResourceIdentity + orgResolver orgresolver.OrgResolver } func (s *Service) RegisterLegacyTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *crontypedapi.Config) (<-chan capabilities.TriggerAndId[*crontypedapi.LegacyPayload], caperrors.Error) { //nolint:staticcheck @@ -103,13 +131,21 @@ func (s *Service) UnregisterLegacyTrigger(ctx context.Context, triggerID string, return s.UnregisterTrigger(ctx, triggerID, metadata, input) } -var _ services.Service = &Service{} +var ( + _ services.Service = &Service{} + _ resourcemanager.Meterable = &Service{} +) // NewTriggerService creates a new trigger service. Optionally, a clock can be passed in for testing, if nil // the system clock will be used. The orgResolver is optional and can be nil, but should be set in live environments. -func NewTriggerService(parentLggr logger.Logger, clock clockwork.Clock, limitsFactory limits.Factory) (*Service, error) { +// meters reports trigger registrations for billing; if nil, a disabled no-op manager is used. +func NewTriggerService(parentLggr logger.Logger, clock clockwork.Clock, limitsFactory limits.Factory, meters *resourcemanager.ResourceManager) (*Service, error) { lggr := logger.Named(parentLggr, "CRONTrigger") + if meters == nil { + meters = resourcemanager.NewResourceManager(lggr, resourcemanager.ResourceManagerConfig{}) + } + metrics, err := NewMetrics() if err != nil { return nil, fmt.Errorf("error creating metrics: %w", err) @@ -133,7 +169,7 @@ func NewTriggerService(parentLggr logger.Logger, clock clockwork.Clock, limitsFa return nil, fmt.Errorf("error creating scheduler: %w", err) } - return &Service{ + s := &Service{ lggr: lggr, CapabilityInfo: cronTriggerInfo, limitsFactory: limitsFactory, @@ -146,7 +182,44 @@ func NewTriggerService(parentLggr logger.Logger, clock clockwork.Clock, limitsFa "capabilityName", cronTriggerInfo.ID, ), metrics: metrics, - }, nil + meters: meters, + } + + // Adopt services.Engine so the trigger can host the ResourceManager as a + // sub-service (the RM owns the snapshot tick) and shut down cleanly. The + // scheduler is started/stopped in s.start / s.close. + s.Service, s.srvcEng = services.Config{ + Name: "CronTrigger", + NewSubServices: func(logger.Logger) []services.Service { return []services.Service{meters} }, + Start: s.start, + Close: s.close, + }.NewServiceEngine(lggr) + + return s, nil +} + +// identityFor returns the per-trigger metering identity: the base identity +// with ResourceID set to triggerID. resource_id is workflow-scoped (the +// trigger_id) for cron, which has no shared physical resource. The DON ID +// falls back to the consumer workflow's DON when the host has not injected a +// capability DON ID (deps.CapabilityDonID == 0). +func (s *Service) identityFor(triggerID string, workflowDonID uint32) resourcemanager.ResourceIdentity { + id := s.base.WithResourceID(triggerID) + if id.DONID == "" { + id.DONID = strconv.FormatUint(uint64(workflowDonID), 10) + } + return id +} + +// emitMeterRecord reports a change to this trigger's registration reservation +// for billing. The triggerID doubles as the idempotency event identity: a +// triggerID is registered at most once at a time, so retried emissions for the +// same registration dedup downstream. Emission is fail-open and never affects +// the registration itself. +func (s *Service) emitMeterRecord(ctx context.Context, action meteringpb.MeterAction, metadata capabilities.RequestMetadata, triggerID string) { + id := s.identityFor(triggerID, metadata.WorkflowDonID) + s.meters.EmitMeterRecord(ctx, id, action, + resourcemanager.NewUtilization(id, action, 1, triggerID)) } func (s *Service) Initialise(ctx context.Context, dependencies core.StandardCapabilitiesDependencies) error { @@ -180,6 +253,30 @@ func (s *Service) Initialise(ctx context.Context, dependencies core.StandardCapa s.lggr.Warn("OrgResolver is nil, cron capability will not be able to fetch organization ID") } + // Build the base metering identity from the host-injected deployment, node, + // and DON dimensions. These arrive via the standardized Initialise channel + // (mirroring how capabilities#619 injects CapabilityDonID). Any may be + // empty/zero until the host is updated to populate them; DONID falls back to + // the consumer workflow DON at emit time (see identityFor). + product := dependencies.Product + if product == "" { + product = meteringProductFallback + } + var donID string + if dependencies.CapabilityDonID != 0 { + donID = strconv.FormatUint(uint64(dependencies.CapabilityDonID), 10) + } + s.base = resourcemanager.ResourceIdentity{ + Product: product, + Environment: dependencies.Environment, + Zone: dependencies.Zone, + DONID: donID, + NodeID: dependencies.NodeID, + Service: meteringService, + Resource: meteringResource, + ResourceType: meteringResourceType, + } + err = s.Start(ctx) if err != nil { return fmt.Errorf("error when starting trigger service: %w", err) @@ -371,6 +468,8 @@ func (s *Service) RegisterTrigger(ctx context.Context, triggerID string, metadat close: closeCh, }) + s.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RESERVE, metadata, triggerID) + s.lggr.Debugw("Trigger registered", "workflowId", metadata.WorkflowID, "triggerId", triggerID, "jobId", job.ID()) s.metrics.IncActiveTriggersGauge(ctx) return callbackCh, nil @@ -422,17 +521,26 @@ func (s *Service) UnregisterTrigger(ctx context.Context, triggerID string, metad // Remove from triggers context s.triggers.Delete(triggerID) + s.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RELEASE, metadata, triggerID) + s.lggr.Debugw("UnregisterTrigger", "triggerId", triggerID, "jobId", jobID) s.metrics.DecActiveTriggersGauge(ctx) return nil } -// Start the service. -func (s *Service) Start(ctx context.Context) error { +// start is the services.Engine start hook. The ResourceManager sub-service has +// already been started by the engine, so start registers this Service as a +// Meterable (the RM polls it once per snapshot tick) and starts the scheduler, +// refreshing next-run times for any registrations that survived a restart. +func (s *Service) start(_ context.Context) error { if s.scheduler == nil { return errors.New("service has shutdown, it must be built again to restart") } + // Register for snapshots. The RM owns the tick; we only supply state via + // the Meterable interface. unregisterMeterable is called in close. + s.unregisterMeterable = s.meters.Register(s) + s.scheduler.Start() for triggerID, trigger := range s.triggers.ReadAll() { @@ -448,19 +556,35 @@ func (s *Service) Start(ctx context.Context) error { } } - s.lggr.Info(s.Name() + " started") - return nil } -// Close stops the Service. -// After this call the Service cannot be started again, -// The service will need to be re-built to start scheduling again. -func (s *Service) Close() error { +// close is the services.Engine close hook. After this the Service cannot be +// started again; it must be re-built to schedule again. close drains a RELEASE +// for every still-active registration (so a graceful shutdown does not leak +// reservations in billing), unregisters from the snapshot registry, then shuts +// the scheduler down. The ResourceManager sub-service is closed by the engine +// afterwards. +func (s *Service) close() error { if s.scheduler == nil { return errors.New("service has shutdown, it must be built again to restart") } + // Graceful-close RELEASEs. Use a background context: the engine's start + // context is already cancelled by the time close runs. Emission is + // fail-open, so a metering failure never blocks shutdown. + ctx := context.Background() + for triggerID := range s.triggers.ReadAll() { + id := s.base.WithResourceID(triggerID) + s.meters.EmitMeterRecord(ctx, id, meteringpb.MeterAction_METER_ACTION_RELEASE, + resourcemanager.NewUtilization(id, meteringpb.MeterAction_METER_ACTION_RELEASE, 1, triggerID)) + } + + if s.unregisterMeterable != nil { + s.unregisterMeterable() + s.unregisterMeterable = nil + } + err := s.scheduler.Shutdown() if err != nil { return fmt.Errorf("scheduler shutdown encountered a problem: %s", err) @@ -470,23 +594,35 @@ func (s *Service) Close() error { // but calling .Start() on it will not error. Set to nil to mark closed. s.scheduler = nil - s.lggr.Info(s.Name() + " closed") - - return nil -} - -func (s *Service) Ready() error { return nil } -func (s *Service) HealthReport() map[string]error { - return map[string]error{s.Name(): nil} +func (s *Service) Description() string { + return "Cron Trigger Capability" } -func (s *Service) Name() string { - return s.lggr.Name() +// ResourceIdentity implements resourcemanager.Meterable: it returns the base +// six-dimension identity (resource_id left empty; set per active trigger in +// GetUtilization). +func (s *Service) ResourceIdentity() resourcemanager.ResourceIdentity { + return s.base } -func (s *Service) Description() string { - return "Cron Trigger Capability" +// GetUtilization implements resourcemanager.Meterable: it returns the absolute +// state of every currently active cron registration, one SnapshotEntry per +// trigger, each at value 1 (a registration is a single reserved unit). It is a +// cheap in-memory read of the store snapshot and tolerates ctx cancellation. +func (s *Service) GetUtilization(ctx context.Context) []resourcemanager.SnapshotEntry { + if ctx.Err() != nil { + return nil + } + triggers := s.triggers.ReadAll() + entries := make([]resourcemanager.SnapshotEntry, 0, len(triggers)) + for triggerID := range triggers { + entries = append(entries, resourcemanager.SnapshotEntry{ + Identity: s.base.WithResourceID(triggerID), + Value: 1, + }) + } + return entries } diff --git a/cron/trigger/trigger_test.go b/cron/trigger/trigger_test.go index 49aca17d9..bd8c1abf3 100644 --- a/cron/trigger/trigger_test.go +++ b/cron/trigger/trigger_test.go @@ -255,7 +255,7 @@ func successWithStandardCronIntervals(t *testing.T, useTypedAPI bool) { config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(config), @@ -336,7 +336,7 @@ func TestCronTrigger_Load(t *testing.T) { config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(config), @@ -483,7 +483,7 @@ func testCronTriggerRegisterTriggerBeforeStart(t *testing.T, useTypedAPI bool) { fakeClock := clockwork.NewRealClock() config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(config), @@ -556,7 +556,7 @@ func testCronTriggerTimeWindows(t *testing.T, useTypedAPI bool) { config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(config), @@ -632,7 +632,7 @@ func testCronTriggerMultipleDifferentSchedules(t *testing.T, useTypedAPI bool) { } config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(config), @@ -755,7 +755,7 @@ func testCronTriggerTimeZone(t *testing.T, useTypedAPI bool) { config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(config), @@ -868,7 +868,7 @@ func testCronTriggerRegisterTrigger(t *testing.T, useTypedAPI bool) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { fakeClock := clockwork.NewRealClock() - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{}) require.NoError(t, err) @@ -907,7 +907,7 @@ func TestCronTrigger_RegisterTriggerDuplicateError(t *testing.T) { triggerConfig, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) fakeClock := clockwork.NewRealClock() - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(triggerConfig), @@ -942,7 +942,7 @@ func TestCronTrigger_UnregisterTriggerError(t *testing.T) { triggerConfig, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) fakeClock := clockwork.NewRealClock() - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(triggerConfig), @@ -1021,7 +1021,7 @@ func TestCronTrigger_UnregisterTriggerError(t *testing.T) { }) t.Run("NOK fails to unregister if closed", func(t *testing.T) { - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(triggerConfig), @@ -1059,7 +1059,7 @@ func TestCronTrigger_UnregisterTriggerError(t *testing.T) { func TestCronTrigger_CloseStartErrors(t *testing.T) { fakeClock := clockwork.NewRealClock() - ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger.Nop(), fakeClock, limits.Factory{}, nil) require.NoError(t, err) ctx := t.Context() @@ -1085,7 +1085,7 @@ func TestGocronNewTaskPanic(t *testing.T) { config, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) logger, observedLogs := logger.TestObserved(t, zap.ErrorLevel) - ts, err := NewTriggerService(logger, fakeClock, limits.Factory{}) + ts, err := NewTriggerService(logger, fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{ Config: string(config), @@ -1184,7 +1184,7 @@ func TestCronTrigger_MultiTriggerFlag_ExecutionIDPaths(t *testing.T) { triggerConfig, err := json.Marshal(Config{FastestScheduleIntervalSeconds: 1}) require.NoError(t, err) - ts, err := NewTriggerService(lggr, fakeClock, limits.Factory{}) + ts, err := NewTriggerService(lggr, fakeClock, limits.Factory{}, nil) require.NoError(t, err) err = ts.Initialise(t.Context(), core.StandardCapabilitiesDependencies{Config: string(triggerConfig)}) require.NoError(t, err) diff --git a/http_trigger/go.mod b/http_trigger/go.mod index 96f9aae90..4829ed2b9 100644 --- a/http_trigger/go.mod +++ b/http_trigger/go.mod @@ -76,10 +76,11 @@ require ( github.com/shopspring/decimal v1.4.0 // indirect github.com/smartcontractkit/chain-selectors v1.0.100 // indirect github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260529092756-a94bc8ce96d6 // indirect - github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 // indirect + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32 // indirect github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b // indirect + github.com/smartcontractkit/chainlink-protos/metering/go v0.0.0-00010101000000-000000000000 github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b // indirect - github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997 // indirect + github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9 // indirect github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e // indirect github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 // indirect github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d // indirect @@ -119,7 +120,13 @@ require ( google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect google.golang.org/grpc v1.80.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect + google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) + +// Local replaces for the unpublished resourcemanager/metering stack; drop once +// chainlink-common and chainlink-protos/metering/go are tagged. +replace github.com/smartcontractkit/chainlink-common => ../../chainlink-common + +replace github.com/smartcontractkit/chainlink-protos/metering/go => ../../chainlink-protos/metering/go diff --git a/http_trigger/go.sum b/http_trigger/go.sum index 248c87f80..b55922d78 100644 --- a/http_trigger/go.sum +++ b/http_trigger/go.sum @@ -207,18 +207,16 @@ github.com/smartcontractkit/capabilities/libs v0.0.0-20260210010829-97eb42ca2924 github.com/smartcontractkit/capabilities/libs v0.0.0-20260210010829-97eb42ca2924/go.mod h1:v0O0Au8RE00Z89QxBE6I2q9bR9r3+RO1gLD3oaO2WB0= github.com/smartcontractkit/chain-selectors v1.0.100 h1:wpiSpmI/eFjY+wx/nPr5VuNF4hki0prIBMKEaQWn3g4= github.com/smartcontractkit/chain-selectors v1.0.100/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260529092756-a94bc8ce96d6 h1:hms02zQQ0BPcp9CBwh/xda5KwJWdU0IIA/yjtwyRoA4= -github.com/smartcontractkit/chainlink-common v0.11.2-0.20260529092756-a94bc8ce96d6/go.mod h1:jueIfDkkRexwGgLbVB7vGCZlNtd383zuwi4uHHwcbqc= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260529092756-a94bc8ce96d6 h1:ucHu2bPDT/58AzSgnPDyp4IjnjVbrVWYD3bG5jCbXMY= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.11-0.20260529092756-a94bc8ce96d6/go.mod h1:HmUyH2oD9m+GRpKq7q3vuRnm1F2Uczf/Nd1v3ipMSK8= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7 h1:iljEJss3WOwcsMkWy72Yn2zvjw7Gyxc+RXL7r8YKM6g= -github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260526195338-adcf8013a1b7/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32 h1:GNl+lLK0QCakqA1J1i7FoOai2JrOGOzNzSniMijaCjA= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260611123141-db97012a6c32/go.mod h1:vTFHTCbLui4Vn8fTmAadfE3rdnvfrDwOmMujmW857D0= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b/go.mod h1:dkR2uYg9XYJuT1JASkPzWE51jjFkVb86P7a/yXe5/GM= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997 h1:W0HKHO8eE8BckTRnhSdqjHKbJcnk068nEWYnWRu6tJY= -github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260323124644-faea187e6997/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9 h1:LQy2j2+TdKLSWsUTUYuqmQPn8kjqCLjGI3ZJYGtDc08= +github.com/smartcontractkit/chainlink-protos/workflows/go v0.0.0-20260528173149-f5b8336b19d9/go.mod h1:GTpDgyK0OObf7jpch6p8N281KxN92wbB8serZhU9yRc= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9Mww35LrufCdM9wtS9yVi/rEWGI1UnjHbcKKU0nVY= github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12ijqMM9tvYVEm+nR826WsrNi6zCKpwBhuApq127wHs= diff --git a/http_trigger/trigger/connector_handler.go b/http_trigger/trigger/connector_handler.go index 82a60020b..3610a8746 100644 --- a/http_trigger/trigger/connector_handler.go +++ b/http_trigger/trigger/connector_handler.go @@ -16,12 +16,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/custmsg" jsonrpc "github.com/smartcontractkit/chainlink-common/pkg/jsonrpc2" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/orgresolver" "github.com/smartcontractkit/chainlink-common/pkg/types/core" gateway_common "github.com/smartcontractkit/chainlink-common/pkg/types/gateway" "github.com/smartcontractkit/chainlink-common/pkg/workflows" "github.com/smartcontractkit/chainlink-common/pkg/workflows/events" + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" ) const ( @@ -31,6 +33,12 @@ const ( var _ core.GatewayConnectorHandler = &connectorHandler{} +// connectorHandler implements resourcemanager.Meterable: it owns the +// workflowStore and the base metering identity, so it both emits lifecycle +// edges inline and reports the absolute state of active registrations on the +// ResourceManager's snapshot tick. +var _ resourcemanager.Meterable = &connectorHandler{} + type connectorHandler struct { services.StateMachine lggr logger.Logger @@ -43,10 +51,22 @@ type connectorHandler struct { wg sync.WaitGroup stopChan services.StopChan orgResolver orgresolver.OrgResolver // Optional org resolver for fetching organization IDs + resourceManager *resourcemanager.ResourceManager + // baseIdentity is the six-dimension + resource/resource_type metering + // identity for this trigger LOOP, built once at Initialise. The + // per-workflow resource_id is derived per emission via WithResourceID. + baseIdentity resourcemanager.ResourceIdentity + // unregisterMeterable removes this handler from the ResourceManager's + // snapshot registry; set on Start, called on Close. + unregisterMeterable func() } func NewConnectorHandler(lggr logger.Logger, gc core.GatewayConnector, config ServiceConfig, - workflowStore *workflowStore, gatewayMetadataPublisher GatewayMetadataPublisher, requestCache *requestCache, metrics *Metrics, orgResolver orgresolver.OrgResolver) (*connectorHandler, error) { + workflowStore *workflowStore, gatewayMetadataPublisher GatewayMetadataPublisher, requestCache *requestCache, metrics *Metrics, orgResolver orgresolver.OrgResolver, + resourceManager *resourcemanager.ResourceManager, baseIdentity resourcemanager.ResourceIdentity) (*connectorHandler, error) { + if resourceManager == nil { + resourceManager = resourcemanager.NewResourceManager(lggr, resourcemanager.ResourceManagerConfig{}) + } return &connectorHandler{ lggr: logger.Named(lggr, HandlerName), gatewayConnector: gc, @@ -57,6 +77,8 @@ func NewConnectorHandler(lggr logger.Logger, gc core.GatewayConnector, config Se metrics: metrics, stopChan: make(chan struct{}), orgResolver: orgResolver, + resourceManager: resourceManager, + baseIdentity: baseIdentity, }, nil } @@ -65,6 +87,13 @@ func (h *connectorHandler) Start(ctx context.Context) error { h.wg.Add(1) go h.startRequestCacheCleanup(ctx) return h.StartOnce(HandlerName, func() error { + // Start the ResourceManager as a sub-service (it owns the snapshot + // tick) and register this handler as the snapshotted Meterable. The RM + // is fail-open and disabled by default, so this never gates startup. + if err := h.resourceManager.Start(ctx); err != nil { + return err + } + h.unregisterMeterable = h.resourceManager.Register(h) return h.gatewayConnector.AddHandler(ctx, []string{ gateway_common.MethodWorkflowExecute, gateway_common.MethodPullWorkflowMetadata, @@ -99,10 +128,27 @@ func (h *connectorHandler) Close() error { return h.StopOnce(HandlerName, func() error { close(h.stopChan) h.wg.Wait() - return nil + // Drain RELEASEs for every still-active workflow so reservations do not + // leak past shutdown, then unregister from the snapshot tick and stop + // the ResourceManager. Order matters: release before unregister/close so + // the final edges are emitted while emission is still wired up. + h.releaseActiveWorkflows(context.Background()) + if h.unregisterMeterable != nil { + h.unregisterMeterable() + } + return h.resourceManager.Close() }) } +// releaseActiveWorkflows emits a RELEASE for each workflow still active in the +// store at shutdown. It is fail-open: emission never blocks or fails close. +func (h *connectorHandler) releaseActiveWorkflows(ctx context.Context) { + for _, w := range h.workflowStore.getWorkflows() { + h.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RELEASE, + w.workflowSelector.WorkflowID, w.metadata.WorkflowDONID) + } +} + func (h *connectorHandler) HealthReport() map[string]error { return map[string]error{h.Name(): h.Healthy()} } @@ -142,14 +188,90 @@ func (h *connectorHandler) RegisterWorkflow(ctx context.Context, input WorkflowR h.metrics.RecordBroadcastMetadataLatency(ctx, latencyMs, h.lggr) workflow := newWorkflowWithMetadata(input.WorkflowSelector, authorizedKeys, sendCh, input.Metadata) - if err := h.workflowStore.upsertWorkflow(workflow); err != nil { + prevWorkflowID, replaced, err := h.workflowStore.upsertWorkflow(workflow) + if err != nil { return fmt.Errorf("failed to register workflow (ID: %s, Owner: %s, Name: %s): %w", input.WorkflowSelector.WorkflowID, input.WorkflowSelector.WorkflowOwner, input.WorkflowSelector.WorkflowName, err) } + newWorkflowID := input.WorkflowSelector.WorkflowID + workflowDONID := input.Metadata.WorkflowDONID + switch { + case !replaced: + h.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RESERVE, newWorkflowID, workflowDONID) + case prevWorkflowID == newWorkflowID: + h.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_UPDATE, newWorkflowID, workflowDONID) + default: + // Version update: the same owner/name/tag reference now resolves to a + // new workflow ID. Release the previous workflow's reservation before + // reserving the new one so the old reservation does not leak. + h.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RELEASE, prevWorkflowID, workflowDONID) + h.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RESERVE, newWorkflowID, workflowDONID) + } h.lggr.Debugw("Registered workflow", "workflowID", input.WorkflowSelector.WorkflowID, "workflowOwner", input.WorkflowSelector.WorkflowOwner, "workflowName", input.WorkflowSelector.WorkflowName, "workflowTag", input.WorkflowSelector.WorkflowTag) return nil } +// emitMeterRecord emits a meter record for one workflow registration +// operation. resource_id is the workflow ID (HTTP registrations are +// workflow-scoped, so there is no shared physical resource); the workflow ID +// also doubles as the event identity, so a repeated emission for the same +// workflow and action dedups downstream. The workflow_id is recoverable from +// resource_id and the owner is resolved downstream, so no label metadata is +// attached. Emission is fail-open and never affects the registration outcome. +func (h *connectorHandler) emitMeterRecord(ctx context.Context, action meteringpb.MeterAction, workflowID string, workflowDONID uint32) { + identity := h.identityForWorkflow(workflowID, workflowDONID) + h.resourceManager.EmitMeterRecord(ctx, identity, action, + resourcemanager.NewUtilization(identity, action, 1, workflowID)) +} + +// identityForWorkflow derives the per-workflow metering identity: the base +// identity with resource_id set to the workflow ID, and DONID resolved per +// registration when the host did not inject a capability DON. +func (h *connectorHandler) identityForWorkflow(workflowID string, workflowDONID uint32) resourcemanager.ResourceIdentity { + identity := h.baseIdentity.WithResourceID(workflowID) + identity.DONID = h.donID(workflowDONID) + return identity +} + +// donID returns the DON identifier for an emission. It prefers the +// host-injected capability DON captured in the base identity (capabilities#619) +// and falls back to the per-registration workflow DON when the host did not +// populate one (CapabilityDonID == 0 at Initialise). +func (h *connectorHandler) donID(workflowDONID uint32) string { + if h.baseIdentity.DONID != "" { + return h.baseIdentity.DONID + } + if workflowDONID != 0 { + return strconv.FormatUint(uint64(workflowDONID), 10) + } + return "" +} + +// ResourceIdentity returns the HTTP trigger's base metering identity (six +// dimensions + resource / resource_type). The per-workflow resource_id is +// populated by GetUtilization. It implements resourcemanager.Meterable. +func (h *connectorHandler) ResourceIdentity() resourcemanager.ResourceIdentity { + return h.baseIdentity +} + +// GetUtilization returns the absolute state of currently active HTTP workflow +// registrations, one SnapshotEntry per workflow, for the ResourceManager's +// snapshot tick. It is a cheap read-snapshot of in-memory state (a read-locked +// copy from the workflow store) and holds no lock across I/O. It implements +// resourcemanager.Meterable. +func (h *connectorHandler) GetUtilization(ctx context.Context) []resourcemanager.SnapshotEntry { + workflows := h.workflowStore.getWorkflows() + entries := make([]resourcemanager.SnapshotEntry, 0, len(workflows)) + for _, w := range workflows { + workflowID := w.workflowSelector.WorkflowID + entries = append(entries, resourcemanager.SnapshotEntry{ + Identity: h.identityForWorkflow(workflowID, w.metadata.WorkflowDONID), + Value: 1, + }) + } + return entries +} + func (h *connectorHandler) validateAuthorizedKeys(inputKeys []*http.AuthorizedKey) ([]gateway_common.AuthorizedKey, error) { if len(inputKeys) == 0 { return nil, fmt.Errorf("HTTP trigger requires at least one authorized key to sign JSON-RPC requests. Add AuthorizedKeys to your http.Trigger configuration with ECDSA EVM public keys (0x-prefixed hex strings)") @@ -177,10 +299,17 @@ func (h *connectorHandler) validateAuthorizedKeys(inputKeys []*http.AuthorizedKe } func (h *connectorHandler) UnregisterWorkflow(ctx context.Context, workflowID string) error { + // Snapshot the workflow DON before removal; it is needed for the meter + // record's identity DON fallback. + var workflowDONID uint32 + if w, ok := h.workflowStore.getWorkflowByID(workflowID); ok { + workflowDONID = w.metadata.WorkflowDONID + } err := h.workflowStore.removeWorkflow(workflowID) if err != nil { return fmt.Errorf("failed to unregister workflow %s: %w", workflowID, err) } + h.emitMeterRecord(ctx, meteringpb.MeterAction_METER_ACTION_RELEASE, workflowID, workflowDONID) h.lggr.Debugw("Unregistered workflow", "workflowID", workflowID) return nil } diff --git a/http_trigger/trigger/connector_handler_test.go b/http_trigger/trigger/connector_handler_test.go index f80e49d83..f168736a7 100644 --- a/http_trigger/trigger/connector_handler_test.go +++ b/http_trigger/trigger/connector_handler_test.go @@ -4,20 +4,27 @@ import ( "context" "database/sql" "encoding/json" + "errors" "strings" "sync" "testing" "time" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/http" jsonrpc "github.com/smartcontractkit/chainlink-common/pkg/jsonrpc2" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types/core" gateway_common "github.com/smartcontractkit/chainlink-common/pkg/types/gateway" "github.com/smartcontractkit/chainlink-common/pkg/workflows" + meteringpb "github.com/smartcontractkit/chainlink-protos/metering/go" ) const ( @@ -194,6 +201,8 @@ func setupWithTriggerChannelBuffer(t *testing.T, lggr logger.Logger, triggerChBu requestCache, newMetrics(t), nil, + nil, + resourcemanager.ResourceIdentity{}, ) require.NoError(t, err) sdkCfg := &http.Config{ @@ -598,6 +607,8 @@ func TestRegisterWorkflow_TooManyAuthorizedKeys(t *testing.T) { requestCache, newMetrics(t), nil, + nil, + resourcemanager.ResourceIdentity{}, ) require.NoError(t, err) @@ -715,6 +726,8 @@ func TestConnectorHandler_Start_HealthReport_Ready_Name_Close(t *testing.T) { requestCache, newMetrics(t), nil, + nil, + resourcemanager.ResourceIdentity{}, ) require.NoError(t, err) @@ -874,6 +887,8 @@ func TestHandleGatewayMessage_PullAuthMetadata_EmptyWorkflows(t *testing.T) { requestCache, newMetrics(t), nil, + nil, + resourcemanager.ResourceIdentity{}, ) require.NoError(t, err) @@ -1050,6 +1065,8 @@ func TestConnectorHandler_StartRequestCacheCleanup(t *testing.T) { requestCache, newMetrics(t), nil, + nil, + resourcemanager.ResourceIdentity{}, ) require.NoError(t, err) @@ -1111,6 +1128,376 @@ func TestHandleGatewayMessage_NilRequest(t *testing.T) { require.Contains(t, err.Error(), "request cannot be nil") } +// fakeMeterEmitter decodes and records the MeterRecords and MeterSnapshots +// passed to Emit and can be configured to fail, for asserting fail-open +// behavior. It dispatches on the beholder entity attribute so MeterRecord and +// MeterSnapshot bodies are decoded with the correct type. Each MeterSnapshot +// covers exactly one resource. +type fakeMeterEmitter struct { + err error + records []*meteringpb.MeterRecord + snapshots []*meteringpb.MeterSnapshot +} + +func (f *fakeMeterEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error { + if f.entity(attrKVs) == "metering.v1.MeterSnapshot" { + var snapshot meteringpb.MeterSnapshot + if err := proto.Unmarshal(body, &snapshot); err != nil { + return err + } + f.snapshots = append(f.snapshots, &snapshot) + return f.err + } + var record meteringpb.MeterRecord + if err := proto.Unmarshal(body, &record); err != nil { + return err + } + f.records = append(f.records, &record) + return f.err +} + +// entity returns the value of the beholder entity attribute from the +// alternating key/value attrKVs slice, or "" if absent. +func (f *fakeMeterEmitter) entity(attrKVs []any) string { + for i := 0; i+1 < len(attrKVs); i += 2 { + if k, ok := attrKVs[i].(string); ok && k == beholder.AttrKeyEntity { + if v, ok := attrKVs[i+1].(string); ok { + return v + } + } + } + return "" +} + +func (f *fakeMeterEmitter) actions() []meteringpb.MeterAction { + actions := make([]meteringpb.MeterAction, len(f.records)) + for i, r := range f.records { + actions[i] = r.GetAction() + } + return actions +} + +// testBaseIdentity is the base metering identity used by metering tests. It +// carries the six coarse dimensions plus the service-level resource / +// resource_type; per-workflow resource_id is derived per emission. +var testBaseIdentity = resourcemanager.ResourceIdentity{ + Product: "cre-test", + Environment: "staging", + Zone: "wf-zone-a", + DONID: "7", + NodeID: "node-csa-pubkey", + Service: meterService, + Resource: meterResource, + ResourceType: meterResourceType, +} + +// setupWithMeterEmitter builds a handler with metering enabled and a fake +// emitter capturing emitted MeterRecords. The ResourceManager is started (so +// the snapshot tick is wired) and the handler is registered as the snapshotted +// Meterable; both are torn down on test cleanup. No workflows are registered. +func setupWithMeterEmitter(t *testing.T, lggr logger.Logger, emitErr error) (*connectorHandler, *fakeMeterEmitter) { + t.Helper() + emitter := &fakeMeterEmitter{err: emitErr} + cfg := ServiceConfig{ + MetadataBatchSize: 10, + MaxAuthorizedKeysPerWorkflow: 3, + } + store := newWorkflowStore(lggr) + metadataPublisher := NewGatewayMetadataPublisher(lggr, &mockGatewayConnector{}, store, cfg, newMetrics(t)) + requestCache := newRequestCache(logger.Sugared(lggr), newTestKVStore(), time.Hour) + resourceManager := resourcemanager.NewResourceManager(lggr, resourcemanager.ResourceManagerConfig{ + Enabled: true, + Emitter: emitter, + SnapshotInterval: resourcemanager.DefaultSnapshotInterval, + }) + handler, err := NewConnectorHandler( + lggr, + &mockGatewayConnector{}, + cfg, + store, + metadataPublisher, + requestCache, + newMetrics(t), + nil, + resourceManager, + testBaseIdentity, + ) + require.NoError(t, err) + require.NoError(t, handler.Start(t.Context())) + t.Cleanup(func() { require.NoError(t, handler.Close()) }) + return handler, emitter +} + +func meterTestRegistrationInput() WorkflowRegistrationInput { + return WorkflowRegistrationInput{ + WorkflowSelector: gateway_common.WorkflowSelector{ + WorkflowID: testWorkflowID, + WorkflowOwner: testWorkflowOwner, + WorkflowName: testWorkflowName, + WorkflowTag: testWorkflowTag, + }, + Config: &http.Config{ + AuthorizedKeys: []*http.AuthorizedKey{ + { + PublicKey: publicKey, + Type: http.KeyType_KEY_TYPE_ECDSA_EVM, + }, + }, + }, + Metadata: WorkflowRegistrationMetadata{}, + } +} + +func TestRegisterWorkflow_MetersReserveThenUpdate(t *testing.T) { + lggr := logger.Test(t) + handler, emitter := setupWithMeterEmitter(t, lggr, nil) + input := meterTestRegistrationInput() + + sendCh := make(chan capabilities.TriggerAndId[*http.Payload], 1) + err := handler.RegisterWorkflow(t.Context(), input, sendCh) + require.NoError(t, err) + + // First registration reserves exactly once, with the full structured + // identity populated on the record. + require.Equal(t, []meteringpb.MeterAction{meteringpb.MeterAction_METER_ACTION_RESERVE}, emitter.actions()) + record := emitter.records[0] + id := record.GetIdentity() + require.Equal(t, testBaseIdentity.Product, id.GetProduct()) + require.Equal(t, testBaseIdentity.Environment, id.GetEnvironment()) + require.Equal(t, testBaseIdentity.Zone, id.GetZone()) + require.Equal(t, testBaseIdentity.DONID, id.GetDonId()) + require.Equal(t, testBaseIdentity.NodeID, id.GetNodeId()) + require.Equal(t, meterService, id.GetService()) + require.Equal(t, meterResource, id.GetResource()) + require.Equal(t, meterResourceType, id.GetResourceType()) + // resource_id is the workflow ID (HTTP registrations are workflow-scoped). + require.Equal(t, testWorkflowID, id.GetResourceId()) + require.Equal(t, int64(1), record.GetUtilization().GetValue()) + require.Equal(t, + resourcemanager.IdempotencyKey(testBaseIdentity.WithResourceID(testWorkflowID), meteringpb.MeterAction_METER_ACTION_RESERVE, testWorkflowID), + record.GetUtilization().GetIdempotencyKey()) + + // Re-registering the same workflow emits UPDATE, not a second RESERVE. + sendCh2 := make(chan capabilities.TriggerAndId[*http.Payload], 1) + err = handler.RegisterWorkflow(t.Context(), input, sendCh2) + require.NoError(t, err) + require.Equal(t, []meteringpb.MeterAction{ + meteringpb.MeterAction_METER_ACTION_RESERVE, + meteringpb.MeterAction_METER_ACTION_UPDATE, + }, emitter.actions()) +} + +func TestRegisterWorkflow_VersionUpdate_MetersReleaseThenReserve(t *testing.T) { + lggr := logger.Test(t) + handler, emitter := setupWithMeterEmitter(t, lggr, nil) + + inputA := meterTestRegistrationInput() + inputA.WorkflowSelector.WorkflowID = testWorkflowID1 + sendChA := make(chan capabilities.TriggerAndId[*http.Payload], 1) + require.NoError(t, handler.RegisterWorkflow(t.Context(), inputA, sendChA)) + + // Re-registering the same owner/name/tag reference with a NEW workflow ID + // is a version update: the previous workflow's reservation is released + // before the new one is reserved, so the old reservation cannot leak. + inputB := meterTestRegistrationInput() + inputB.WorkflowSelector.WorkflowID = testWorkflowID2 + sendChB := make(chan capabilities.TriggerAndId[*http.Payload], 1) + require.NoError(t, handler.RegisterWorkflow(t.Context(), inputB, sendChB)) + + require.Equal(t, []meteringpb.MeterAction{ + meteringpb.MeterAction_METER_ACTION_RESERVE, + meteringpb.MeterAction_METER_ACTION_RELEASE, + meteringpb.MeterAction_METER_ACTION_RESERVE, + }, emitter.actions()) + + // RESERVE(A) anchors the old workflow ID via its resource_id. + reserveA := emitter.records[0] + require.Equal(t, testWorkflowID1, reserveA.GetIdentity().GetResourceId()) + + // RELEASE targets the PREVIOUS workflow ID under the same owner; its + // resource_id is that previous workflow ID. + release := emitter.records[1] + require.Equal(t, testWorkflowID1, release.GetIdentity().GetResourceId()) + require.Equal(t, + resourcemanager.IdempotencyKey(testBaseIdentity.WithResourceID(testWorkflowID1), meteringpb.MeterAction_METER_ACTION_RELEASE, testWorkflowID1), + release.GetUtilization().GetIdempotencyKey()) + + // The trailing RESERVE anchors the new workflow ID. + reserveB := emitter.records[2] + require.Equal(t, testWorkflowID2, reserveB.GetIdentity().GetResourceId()) + require.Equal(t, + resourcemanager.IdempotencyKey(testBaseIdentity.WithResourceID(testWorkflowID2), meteringpb.MeterAction_METER_ACTION_RESERVE, testWorkflowID2), + reserveB.GetUtilization().GetIdempotencyKey()) +} + +func TestUnregisterWorkflow_MetersRelease(t *testing.T) { + lggr := logger.Test(t) + handler, emitter := setupWithMeterEmitter(t, lggr, nil) + + sendCh := make(chan capabilities.TriggerAndId[*http.Payload], 1) + err := handler.RegisterWorkflow(t.Context(), meterTestRegistrationInput(), sendCh) + require.NoError(t, err) + + err = handler.UnregisterWorkflow(t.Context(), testWorkflowID) + require.NoError(t, err) + require.Equal(t, []meteringpb.MeterAction{ + meteringpb.MeterAction_METER_ACTION_RESERVE, + meteringpb.MeterAction_METER_ACTION_RELEASE, + }, emitter.actions()) + release := emitter.records[1] + require.Equal(t, testWorkflowID, release.GetIdentity().GetResourceId()) + + // Unregistering an absent workflow fails and must not emit RELEASE. + err = handler.UnregisterWorkflow(t.Context(), testWorkflowID) + require.Error(t, err) + require.Len(t, emitter.records, 2) +} + +func TestRegisterWorkflow_MeteringFailOpen(t *testing.T) { + lggr := logger.Test(t) + handler, emitter := setupWithMeterEmitter(t, lggr, errors.New("emit failed")) + + // Registration and unregistration succeed even though every emit fails. + sendCh := make(chan capabilities.TriggerAndId[*http.Payload], 1) + err := handler.RegisterWorkflow(t.Context(), meterTestRegistrationInput(), sendCh) + require.NoError(t, err) + err = handler.UnregisterWorkflow(t.Context(), testWorkflowID) + require.NoError(t, err) + require.Len(t, emitter.records, 2) +} + +// registerMeterWorkflow registers a workflow with the given ID/owner under the +// metering test config (each registration uses a distinct reference so they +// coexist). +func registerMeterWorkflow(t *testing.T, handler *connectorHandler, workflowID, owner string) { + t.Helper() + input := meterTestRegistrationInput() + input.WorkflowSelector.WorkflowID = workflowID + input.WorkflowSelector.WorkflowOwner = owner + sendCh := make(chan capabilities.TriggerAndId[*http.Payload], 1) + require.NoError(t, handler.RegisterWorkflow(t.Context(), input, sendCh)) +} + +// TestSnapshot_EmitsOneEntryPerActiveWorkflow starts the ResourceManager tick +// and asserts one MeterSnapshot per active workflow, each carrying the full +// per-workflow identity. +func TestSnapshot_EmitsOneEntryPerActiveWorkflow(t *testing.T) { + lggr := logger.Test(t) + emitter := &fakeMeterEmitter{} + clock := clockwork.NewFakeClockAt(time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)) + cfg := ServiceConfig{MetadataBatchSize: 10, MaxAuthorizedKeysPerWorkflow: 3} + store := newWorkflowStore(lggr) + metadataPublisher := NewGatewayMetadataPublisher(lggr, &mockGatewayConnector{}, store, cfg, newMetrics(t)) + requestCache := newRequestCache(logger.Sugared(lggr), newTestKVStore(), time.Hour) + rm := resourcemanager.NewResourceManager(lggr, resourcemanager.ResourceManagerConfig{ + Enabled: true, + Emitter: emitter, + SnapshotInterval: time.Minute, + Clock: clock, + }) + handler, err := NewConnectorHandler(lggr, &mockGatewayConnector{}, cfg, store, metadataPublisher, requestCache, newMetrics(t), nil, rm, testBaseIdentity) + require.NoError(t, err) + unregister := rm.Register(handler) + t.Cleanup(unregister) + + registerMeterWorkflow(t, handler, testWorkflowID1, testWorkflowOwner1) + registerMeterWorkflow(t, handler, testWorkflowID2, testWorkflowOwner2) + + // Drop the lifecycle RESERVE records; we assert only on the snapshot tick. + emitter.records = nil + servicetest.Run(t, rm) + require.NoError(t, clock.BlockUntilContext(t.Context(), 1)) + clock.Advance(time.Minute) + + require.Eventually(t, func() bool { + return len(emitter.snapshots) == 2 + }, time.Second, time.Millisecond) + + // One MeterSnapshot per active workflow, each fully identified by its own + // per-workflow identity (resource_id is the workflow ID). + require.Len(t, emitter.snapshots, 2) + byWorkflowID := map[string]*meteringpb.MeterSnapshot{} + for _, s := range emitter.snapshots { + byWorkflowID[s.GetIdentity().GetResourceId()] = s + } + require.Len(t, byWorkflowID, 2) + + r1 := byWorkflowID[testWorkflowID1] + require.NotNil(t, r1) + require.Equal(t, testBaseIdentity.Product, r1.GetIdentity().GetProduct()) + require.Equal(t, meterResource, r1.GetIdentity().GetResource()) + require.Equal(t, meterResourceType, r1.GetIdentity().GetResourceType()) + require.Equal(t, int64(1), r1.GetUtilization().GetValue()) + + r2 := byWorkflowID[testWorkflowID2] + require.NotNil(t, r2) + require.Equal(t, int64(1), r2.GetUtilization().GetValue()) +} + +// TestClose_EmitsReleasePerActiveWorkflow asserts graceful close drains a +// RELEASE for every still-active workflow so reservations do not leak. +func TestClose_EmitsReleasePerActiveWorkflow(t *testing.T) { + lggr := logger.Test(t) + emitter := &fakeMeterEmitter{} + cfg := ServiceConfig{MetadataBatchSize: 10, MaxAuthorizedKeysPerWorkflow: 3} + store := newWorkflowStore(lggr) + metadataPublisher := NewGatewayMetadataPublisher(lggr, &mockGatewayConnector{}, store, cfg, newMetrics(t)) + requestCache := newRequestCache(logger.Sugared(lggr), newTestKVStore(), time.Hour) + rm := resourcemanager.NewResourceManager(lggr, resourcemanager.ResourceManagerConfig{ + Enabled: true, + Emitter: emitter, + SnapshotInterval: resourcemanager.DefaultSnapshotInterval, + }) + handler, err := NewConnectorHandler(lggr, &mockGatewayConnector{}, cfg, store, metadataPublisher, requestCache, newMetrics(t), nil, rm, testBaseIdentity) + require.NoError(t, err) + require.NoError(t, handler.Start(t.Context())) + + registerMeterWorkflow(t, handler, testWorkflowID1, testWorkflowOwner1) + registerMeterWorkflow(t, handler, testWorkflowID2, testWorkflowOwner2) + + // Drop the lifecycle RESERVE records; assert only on the close drain. + emitter.records = nil + require.NoError(t, handler.Close()) + + require.Equal(t, []meteringpb.MeterAction{ + meteringpb.MeterAction_METER_ACTION_RELEASE, + meteringpb.MeterAction_METER_ACTION_RELEASE, + }, emitter.actions()) + + released := map[string]bool{} + for _, r := range emitter.records { + released[r.GetIdentity().GetResourceId()] = true + } + require.True(t, released[testWorkflowID1]) + require.True(t, released[testWorkflowID2]) +} + +// TestDONIDFallback_UsesWorkflowDON asserts that when the host did not inject a +// capability DON (base DONID empty), records fall back to the per-registration +// workflow DON. +func TestDONIDFallback_UsesWorkflowDON(t *testing.T) { + lggr := logger.Test(t) + emitter := &fakeMeterEmitter{} + cfg := ServiceConfig{MetadataBatchSize: 10, MaxAuthorizedKeysPerWorkflow: 3} + store := newWorkflowStore(lggr) + metadataPublisher := NewGatewayMetadataPublisher(lggr, &mockGatewayConnector{}, store, cfg, newMetrics(t)) + requestCache := newRequestCache(logger.Sugared(lggr), newTestKVStore(), time.Hour) + rm := resourcemanager.NewResourceManager(lggr, resourcemanager.ResourceManagerConfig{Enabled: true, Emitter: emitter}) + // Base identity WITHOUT a capability DON (host did not inject one). + base := testBaseIdentity + base.DONID = "" + handler, err := NewConnectorHandler(lggr, &mockGatewayConnector{}, cfg, store, metadataPublisher, requestCache, newMetrics(t), nil, rm, base) + require.NoError(t, err) + + input := meterTestRegistrationInput() + input.Metadata.WorkflowDONID = 99 + sendCh := make(chan capabilities.TriggerAndId[*http.Payload], 1) + require.NoError(t, handler.RegisterWorkflow(t.Context(), input, sendCh)) + + require.Len(t, emitter.records, 1) + require.Equal(t, "99", emitter.records[0].GetIdentity().GetDonId()) +} + // TestResolveWorkflowMetadata_PreservesStoredWorkflowOwner tests that the workflowOwner // from the stored workflow is used, even if the incoming request has zeros or missing values. // This is a regression test for the bug where workflowOwner was being set to zeros. diff --git a/http_trigger/trigger/gateway_metadata_publisher_test.go b/http_trigger/trigger/gateway_metadata_publisher_test.go index bedf22bdf..2b0e0a435 100644 --- a/http_trigger/trigger/gateway_metadata_publisher_test.go +++ b/http_trigger/trigger/gateway_metadata_publisher_test.go @@ -221,9 +221,9 @@ func TestSendWorkflows_Success(t *testing.T) { wf1 := newWorkflow(selector1, authorizedKeys1, sendCh1) wf2 := newWorkflow(selector2, authorizedKeys2, sendCh2) - err := workflowStore.upsertWorkflow(wf1) + _, _, err := workflowStore.upsertWorkflow(wf1) require.NoError(t, err) - err = workflowStore.upsertWorkflow(wf2) + _, _, err = workflowStore.upsertWorkflow(wf2) require.NoError(t, err) gatewayID := "gateway1" diff --git a/http_trigger/trigger/trigger.go b/http_trigger/trigger/trigger.go index d1d048a70..6e9ede4f6 100644 --- a/http_trigger/trigger/trigger.go +++ b/http_trigger/trigger/trigger.go @@ -4,14 +4,18 @@ import ( "context" "encoding/json" "fmt" + "os" + "strconv" "strings" "time" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" caperrors "github.com/smartcontractkit/chainlink-common/pkg/capabilities/errors" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/http" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/triggers/http/server" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink-common/pkg/resourcemanager" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/orgresolver" "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" @@ -21,6 +25,39 @@ import ( const ServiceName = "HTTPTriggerCapability" +// Metering identity constants for the HTTP trigger. Service is the stable +// service constant (it must not encode environment or zone); Resource and +// ResourceType identify the HTTP workflow-registration pool and its billing +// unit. +const ( + meterService = "http-trigger" + meterResource = "http_workflows" + meterResourceType = "operations" + // meterProductFallback is used when the host did not inject a Product + // dimension (legacy node or a boot path not yet updated). + meterProductFallback = "cre" +) + +// meterRecordsEnabledEnvVar gates MeterRecord emission; the name is the +// cross-producer convention for the metering rollout (SHARED-2718). +const meterRecordsEnabledEnvVar = "CL_METER_RECORDS_ENABLED" + +// meterRecordsEnabled reads the metering gate from the environment. Unset or +// unparseable values disable emission; metering config must never prevent the +// capability from starting. +func meterRecordsEnabled(lggr logger.Logger) bool { + v := os.Getenv(meterRecordsEnabledEnvVar) + if v == "" { + return false + } + enabled, err := strconv.ParseBool(v) + if err != nil { + lggr.Warnw("Invalid value for "+meterRecordsEnabledEnvVar+", meter record emission disabled", "value", v, "error", err) + return false + } + return enabled +} + var _ server.HTTPCapability = &service{} type WorkflowRegistrationInput struct { @@ -84,13 +121,50 @@ func (s *service) Initialise(ctx context.Context, dependencies core.StandardCapa } metadataPublisher := NewGatewayMetadataPublisher(s.lggr, dependencies.GatewayConnector, workflowStore, s.cfg, s.metrics) requestCache := newRequestCache(s.lggr, dependencies.Store, time.Duration(s.cfg.RequestCacheTTL)*time.Second) - s.connectorHandler, err = NewConnectorHandler(s.lggr, dependencies.GatewayConnector, s.cfg, workflowStore, metadataPublisher, requestCache, s.metrics, s.orgResolver) + resourceManager := resourcemanager.NewResourceManager(s.lggr, resourcemanager.ResourceManagerConfig{ + Enabled: meterRecordsEnabled(s.lggr), + Emitter: beholder.GetEmitter(), + SnapshotInterval: resourcemanager.DefaultSnapshotInterval, + }) + baseIdentity := baseMeterIdentity(dependencies) + s.connectorHandler, err = NewConnectorHandler(s.lggr, dependencies.GatewayConnector, s.cfg, workflowStore, metadataPublisher, requestCache, s.metrics, s.orgResolver, resourceManager, baseIdentity) if err != nil { return err } return s.Start(ctx) } +// baseMeterIdentity builds the HTTP trigger's base metering identity from the +// host-injected dependencies. The six coarse dimensions plus the service-level +// resource / resource_type are fixed here; the per-workflow resource_id is set +// per emission via ResourceIdentity.WithResourceID. +// +// DONID is the capability DON the trigger LOOP was spawned for +// (deps.CapabilityDonID, host-injected via capabilities#619). When the host has +// not populated it (0), DONID is left empty here and resolved per registration +// from the workflow DON at emit time (see connectorHandler.donID). Product +// falls back to a constant when the host did not inject one. +func baseMeterIdentity(deps core.StandardCapabilitiesDependencies) resourcemanager.ResourceIdentity { + product := deps.Product + if product == "" { + product = meterProductFallback + } + var donID string + if deps.CapabilityDonID != 0 { + donID = strconv.FormatUint(uint64(deps.CapabilityDonID), 10) + } + return resourcemanager.ResourceIdentity{ + Product: product, + Environment: deps.Environment, + Zone: deps.Zone, + DONID: donID, + NodeID: deps.NodeID, + Service: meterService, + Resource: meterResource, + ResourceType: meterResourceType, + } +} + func (s *service) Start(ctx context.Context) error { s.lggr.Debug("Service starting...") return s.StartOnce(ServiceName, func() error { diff --git a/http_trigger/trigger/workflow.go b/http_trigger/trigger/workflow.go index 67daefa28..8b7be4945 100644 --- a/http_trigger/trigger/workflow.go +++ b/http_trigger/trigger/workflow.go @@ -52,26 +52,30 @@ func newWorkflowStore(lggr logger.Logger) *workflowStore { // workflow reference (owner/name/tag combination) with new workflow instance. // upsertWorkflow should be invoked in the order of workflow registration, so that // the latest workflow instance is always used for the given reference. -func (s *workflowStore) upsertWorkflow(w *workflow) error { +// The returned replaced flag reports whether an existing registration was +// replaced (true) rather than a new one inserted (false); when replaced, +// prevWorkflowID is the workflow ID the reference pointed to before the +// upsert (it may equal the new workflow ID, or differ on a version update). +func (s *workflowStore) upsertWorkflow(w *workflow) (prevWorkflowID string, replaced bool, err error) { // Validate workflow fields if err := validateWorkflowSelector(w.workflowSelector); err != nil { - return fmt.Errorf("invalid workflow selector: %w", err) + return "", false, fmt.Errorf("invalid workflow selector: %w", err) } s.mu.Lock() defer s.mu.Unlock() - workflowID, exists := s.workflowReferenceToID[workflowReference{ + prevWorkflowID, replaced = s.workflowReferenceToID[workflowReference{ workflowOwner: w.workflowSelector.WorkflowOwner, workflowName: w.workflowSelector.WorkflowName, workflowTag: w.workflowSelector.WorkflowTag, }] - if exists { + if replaced { reference := fmt.Sprintf("%s/%s/%s", w.workflowSelector.WorkflowOwner, w.workflowSelector.WorkflowName, w.workflowSelector.WorkflowTag) - s.lggr.Debugw("Updating existing workflow reference and removing previous workflow", "reference", reference, "prevWorkflowID", workflowID) - if oldW, ok := s.workflows[workflowID]; ok { + s.lggr.Debugw("Updating existing workflow reference and removing previous workflow", "reference", reference, "prevWorkflowID", prevWorkflowID) + if oldW, ok := s.workflows[prevWorkflowID]; ok { oldW.close() } - delete(s.workflows, workflowID) + delete(s.workflows, prevWorkflowID) } s.workflows[w.workflowSelector.WorkflowID] = w s.workflowReferenceToID[workflowReference{ @@ -79,7 +83,7 @@ func (s *workflowStore) upsertWorkflow(w *workflow) error { workflowName: w.workflowSelector.WorkflowName, workflowTag: w.workflowSelector.WorkflowTag, }] = w.workflowSelector.WorkflowID - return nil + return prevWorkflowID, replaced, nil } // validateWorkflowSelector validates the workflow selector fields diff --git a/http_trigger/trigger/workflow_test.go b/http_trigger/trigger/workflow_test.go index 189ed0d39..62edf5ef5 100644 --- a/http_trigger/trigger/workflow_test.go +++ b/http_trigger/trigger/workflow_test.go @@ -134,8 +134,10 @@ func TestWorkflowStore_upsertWorkflow(t *testing.T) { lggr := logger.Test(t) store := newWorkflowStore(lggr) wf, _ := testWorkflow() - err := store.upsertWorkflow(wf) + prevWorkflowID, replaced, err := store.upsertWorkflow(wf) require.NoError(t, err) + require.False(t, replaced) + require.Empty(t, prevWorkflowID) w, exists := store.getWorkflowByID(wf.workflowSelector.WorkflowID) require.True(t, exists) @@ -257,7 +259,7 @@ func TestWorkflowStore_upsertWorkflow_ValidationErrors(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { wf := newWorkflow(tt.selector, authorizedKeys, sendCh) - err := store.upsertWorkflow(wf) + _, _, err := store.upsertWorkflow(wf) require.Error(t, err) require.Contains(t, err.Error(), tt.wantErr) }) @@ -273,12 +275,16 @@ func TestWorkflowStore_upsertWorkflow_Duplicate(t *testing.T) { w2, _ := testWorkflow() // Add first workflow - err := store.upsertWorkflow(w1) + prevWorkflowID, replaced, err := store.upsertWorkflow(w1) require.NoError(t, err) + require.False(t, replaced) + require.Empty(t, prevWorkflowID) // Add second workflow with same ID (this should replace the first) - err = store.upsertWorkflow(w2) + prevWorkflowID, replaced, err = store.upsertWorkflow(w2) require.NoError(t, err) + require.True(t, replaced) + require.Equal(t, w1.workflowSelector.WorkflowID, prevWorkflowID) // Verify the workflow was replaced - since both have same ID/reference, // the second one should be present @@ -294,7 +300,7 @@ func TestWorkflowStore_removeWorkflow_Success(t *testing.T) { lggr := logger.Test(t) store := newWorkflowStore(lggr) w, _ := testWorkflow() - err := store.upsertWorkflow(w) + _, _, err := store.upsertWorkflow(w) require.NoError(t, err) wf, exists := store.getWorkflowByID(w.workflowSelector.WorkflowID) @@ -393,11 +399,11 @@ func TestWorkflowStore_GetWorkflows_Multiple(t *testing.T) { wf2 := newWorkflow(wfSelector2, authorizedKeys, sendCh2) wf3 := newWorkflow(wfSelector3, authorizedKeys, sendCh3) - err := store.upsertWorkflow(wf1) + _, _, err := store.upsertWorkflow(wf1) require.NoError(t, err) - err = store.upsertWorkflow(wf2) + _, _, err = store.upsertWorkflow(wf2) require.NoError(t, err) - err = store.upsertWorkflow(wf3) + _, _, err = store.upsertWorkflow(wf3) require.NoError(t, err) // Get all workflows @@ -421,7 +427,7 @@ func TestWorkflowStore_getWorkflowIDByReference_Success(t *testing.T) { lggr := logger.Test(t) store := newWorkflowStore(lggr) wf, _ := testWorkflow() - err := store.upsertWorkflow(wf) + _, _, err := store.upsertWorkflow(wf) require.NoError(t, err) workflowID, exists := store.getWorkflowIDByReference( @@ -538,8 +544,10 @@ func TestWorkflowStore_upsertWorkflow_ReplaceWithSameReference(t *testing.T) { wf2 := newWorkflow(selector2, authorizedKeys, sendCh2) // Add first workflow - err := store.upsertWorkflow(wf1) + prevWorkflowID, replaced, err := store.upsertWorkflow(wf1) require.NoError(t, err) + require.False(t, replaced) + require.Empty(t, prevWorkflowID) // Verify first workflow is there workflow, exists := store.getWorkflowByID(testWorkflowID1) @@ -551,9 +559,12 @@ func TestWorkflowStore_upsertWorkflow_ReplaceWithSameReference(t *testing.T) { require.True(t, exists) require.Equal(t, testWorkflowID1, workflowID) - // Add second workflow with same reference - err = store.upsertWorkflow(wf2) + // Add second workflow with same reference; the previous workflow ID is + // surfaced so callers can release its reservation. + prevWorkflowID, replaced, err = store.upsertWorkflow(wf2) require.NoError(t, err) + require.True(t, replaced) + require.Equal(t, testWorkflowID1, prevWorkflowID) // First workflow should be removed _, exists = store.getWorkflowByID(testWorkflowID1) @@ -576,7 +587,7 @@ func TestWorkflowStore_removeWorkflow_RemovesReference(t *testing.T) { lggr := logger.Test(t) store := newWorkflowStore(lggr) wf, _ := testWorkflow() - err := store.upsertWorkflow(wf) + _, _, err := store.upsertWorkflow(wf) require.NoError(t, err) // Verify workflow and reference exist @@ -735,7 +746,7 @@ func TestWorkflowStore_getWorkflowIDByReference_PartialMatch(t *testing.T) { lggr := logger.Test(t) store := newWorkflowStore(lggr) wf, _ := testWorkflow() - err := store.upsertWorkflow(wf) + _, _, err := store.upsertWorkflow(wf) require.NoError(t, err) // Test with wrong owner