diff --git a/sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py b/sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py index 1fb0f80f00..a1b07bfeb8 100644 --- a/sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py +++ b/sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py @@ -72,7 +72,67 @@ } -def _telemetry_emitter(feature: str, func_name: str): +def _classify_error(e: Exception) -> str: + """Classify an exception into an actionable error category.""" + error_type = type(e).__name__ + error_msg = str(e).lower() + + if "validation" in error_type.lower() or "invalid" in error_msg or "must be" in error_msg: + return "validation_error" + if "accessdenied" in error_msg or "not authorized" in error_msg or "forbidden" in error_msg: + return "auth_error" + if "capacity" in error_msg or "insufficientcapacity" in error_msg or "resourcelimitexceeded" in error_msg: + return "capacity_error" + if "timeout" in error_type.lower() or "timed out" in error_msg or "timeout" in error_msg: + return "timeout_error" + if "not found" in error_msg or "does not exist" in error_msg or "could not find" in error_msg: + return "resource_not_found" + if "eula" in error_msg or "accept_eula" in error_msg: + return "eula_error" + if "throttl" in error_msg or "rate exceeded" in error_msg or "too many requests" in error_msg: + return "throttling_error" + if "connection" in error_msg or "network" in error_msg or "unreachable" in error_msg: + return "network_error" + return "unknown" + + +def _attr_to_key(attr: str) -> str: + """Convert attribute name to camelCase telemetry key. + + Examples: '_model_name' -> 'modelName', 'training_type' -> 'trainingType' + """ + attr = attr.lstrip("_") + parts = attr.split("_") + return parts[0] + "".join(p.capitalize() for p in parts[1:]) + + +def _extract_telemetry_params(instance, emit=None, emit_presence=None) -> str: + """Extract telemetry params from an instance based on emit/emit_presence lists. + + Args: + instance: The class instance (args[0]) to extract attributes from. + emit: List of attribute names whose actual values should be emitted. + emit_presence: List of attribute names where only presence (true/false) is emitted. + + Returns: + str: URL query params string (e.g., "&x-modelName=llama&x-hasNetworking=true"). + """ + parts = [] + if emit: + for attr in emit: + value = getattr(instance, attr, None) + if value is not None: + key = _attr_to_key(attr) + parts.append(f"&x-{key}={value}") + if emit_presence: + for attr in emit_presence: + value = getattr(instance, attr, None) + key = _attr_to_key(attr) + parts.append(f"&x-has{key[0].upper()}{key[1:]}={'true' if value else 'false'}") + return "".join(parts) + + +def _telemetry_emitter(feature: str, func_name: str, emit=None, emit_presence=None): """Telemetry Emitter Decorator to emit telemetry logs for SageMaker Python SDK functions. This class needs @@ -80,6 +140,13 @@ def _telemetry_emitter(feature: str, func_name: str): in this repo. When collecting telemetry for classes using sagemaker-core Session object, we should be aware of its differences, such as sagemaker_session.sagemaker_config does not exist in new Session class. + + Args: + feature: The Feature enum value for this telemetry event. + func_name: Human-readable function name for tracking. + emit: Optional list of instance attribute names whose values will be sent in telemetry. + emit_presence: Optional list of instance attribute names where only presence is tracked + (emits true/false). """ def decorator(func): @@ -175,6 +242,10 @@ def wrapper(*args, **kwargs): if created_by: extra += f"&x-createdBy={quote(created_by, safe='')}" + # Extract granular telemetry params from the instance + if (emit or emit_presence) and len(args) > 0: + extra += _extract_telemetry_params(args[0], emit, emit_presence) + start_timer = perf_counter() try: # Call the original function @@ -200,6 +271,7 @@ def wrapper(*args, **kwargs): stop_timer = perf_counter() elapsed = stop_timer - start_timer extra += f"&x-latency={round(elapsed, 2)}" + extra += f"&x-errorCategory={_classify_error(e)}" if not telemetry_opt_out_flag: _send_telemetry_request( STATUS_TO_CODE[str(Status.FAILURE)], diff --git a/sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py b/sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py index 4d627059a6..98657c40e0 100644 --- a/sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py +++ b/sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py @@ -137,7 +137,11 @@ def _is_nova_model_for_telemetry(self) -> bool: except Exception: return False - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="BedrockModelBuilder.deploy") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="BedrockModelBuilder.deploy", + emit_presence=["model_package"], + ) def deploy( self, job_name: Optional[str] = None, diff --git a/sagemaker-serve/src/sagemaker/serve/model_builder.py b/sagemaker-serve/src/sagemaker/serve/model_builder.py index 27eaaa8fa3..a04bacce5c 100644 --- a/sagemaker-serve/src/sagemaker/serve/model_builder.py +++ b/sagemaker-serve/src/sagemaker/serve/model_builder.py @@ -3325,7 +3325,12 @@ def _reset_build_state(self): if hasattr(self, attr): delattr(self, attr) - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="model_builder.build") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="model_builder.build", + emit=["mode"], + emit_presence=["network", "source_code", "inference_spec"], + ) @runnable_by_pipeline def build( self, @@ -4188,7 +4193,12 @@ def _model_builder_optimize_wrapper( self.built_model = self._create_model() return self.built_model - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="model_builder.deploy") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="model_builder.deploy", + emit=["mode", "instance_type"], + emit_presence=["network", "compute"], + ) def deploy( self, endpoint_name: str = None, diff --git a/sagemaker-train/src/sagemaker/train/dpo_trainer.py b/sagemaker-train/src/sagemaker/train/dpo_trainer.py index bd5d9a11bd..ffdde9c0ee 100644 --- a/sagemaker-train/src/sagemaker/train/dpo_trainer.py +++ b/sagemaker-train/src/sagemaker/train/dpo_trainer.py @@ -176,7 +176,12 @@ def _process_hyperparameters(self): delattr(self.hyperparameters, 'validation_data_path') self.hyperparameters._specs.pop('validation_data_path', None) - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="DPOTrainer.train") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="DPOTrainer.train", + emit=["_model_name", "training_type"], + emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "stopping_condition"], + ) def train(self, training_dataset: Optional[Union[str, DataSet]] = None, validation_dataset: Optional[Union[str, DataSet]] = None, diff --git a/sagemaker-train/src/sagemaker/train/evaluate/benchmark_evaluator.py b/sagemaker-train/src/sagemaker/train/evaluate/benchmark_evaluator.py index 4e4e522b3a..e73694a839 100644 --- a/sagemaker-train/src/sagemaker/train/evaluate/benchmark_evaluator.py +++ b/sagemaker-train/src/sagemaker/train/evaluate/benchmark_evaluator.py @@ -567,7 +567,12 @@ def _get_benchmark_template_additions(self, eval_subtask: Optional[Union[str, Li return benchmark_context - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="BenchMarkEvaluator.evaluate") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="BenchMarkEvaluator.evaluate", + emit=["benchmark"], + emit_presence=["networking", "kms_key_id", "mlflow_resource_arn"], + ) def evaluate(self, subtask: Optional[Union[str, List[str]]] = None) -> EvaluationPipelineExecution: """Create and start a benchmark evaluation job. diff --git a/sagemaker-train/src/sagemaker/train/evaluate/custom_scorer_evaluator.py b/sagemaker-train/src/sagemaker/train/evaluate/custom_scorer_evaluator.py index 9c768c3891..38fc7518cd 100644 --- a/sagemaker-train/src/sagemaker/train/evaluate/custom_scorer_evaluator.py +++ b/sagemaker-train/src/sagemaker/train/evaluate/custom_scorer_evaluator.py @@ -385,7 +385,12 @@ def _get_inference_params_from_hub(self, region: str) -> dict: ) return fallback_params - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="CustomScorerEvaluator.evaluate") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="CustomScorerEvaluator.evaluate", + emit=["evaluator"], + emit_presence=["networking", "kms_key_id", "mlflow_resource_arn"], + ) def evaluate(self) -> EvaluationPipelineExecution: """Create and start a custom scorer evaluation job. diff --git a/sagemaker-train/src/sagemaker/train/evaluate/llm_as_judge_evaluator.py b/sagemaker-train/src/sagemaker/train/evaluate/llm_as_judge_evaluator.py index 8438b65688..0e808d017d 100644 --- a/sagemaker-train/src/sagemaker/train/evaluate/llm_as_judge_evaluator.py +++ b/sagemaker-train/src/sagemaker/train/evaluate/llm_as_judge_evaluator.py @@ -294,7 +294,12 @@ def _get_llmaj_template_additions(self, eval_name: str) -> dict: 'evaluate_base_model': self.evaluate_base_model, } - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="LLMAsJudgeEvaluator.evaluate") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="LLMAsJudgeEvaluator.evaluate", + emit=["evaluator_model"], + emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "custom_metrics"], + ) def evaluate(self): """Create and start an LLM-as-judge evaluation job. diff --git a/sagemaker-train/src/sagemaker/train/evaluate/multi_turn_rl_evaluator.py b/sagemaker-train/src/sagemaker/train/evaluate/multi_turn_rl_evaluator.py index ebdb8de434..734480ae30 100644 --- a/sagemaker-train/src/sagemaker/train/evaluate/multi_turn_rl_evaluator.py +++ b/sagemaker-train/src/sagemaker/train/evaluate/multi_turn_rl_evaluator.py @@ -536,6 +536,8 @@ def _build_job_config_doc(include_mpc: bool, mlflow_run_name: str) -> str: @_telemetry_emitter( feature=Feature.MODEL_CUSTOMIZATION, func_name="MultiTurnRLEvaluator.evaluate", + emit=["agent_qualifier"], + emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "agent_config", "stopping_condition"], ) def evaluate(self) -> 'MTRLEvaluationExecution': """Render the MTRL pipeline and start a non-blocking execution. diff --git a/sagemaker-train/src/sagemaker/train/model_trainer.py b/sagemaker-train/src/sagemaker/train/model_trainer.py index 9bd2e34442..210681ae8c 100644 --- a/sagemaker-train/src/sagemaker/train/model_trainer.py +++ b/sagemaker-train/src/sagemaker/train/model_trainer.py @@ -767,7 +767,12 @@ def _create_training_job_args( return training_request - @_telemetry_emitter(feature=Feature.MODEL_TRAINER, func_name="model_trainer.train") + @_telemetry_emitter( + feature=Feature.MODEL_TRAINER, + func_name="model_trainer.train", + emit=["training_mode", "training_input_mode"], + emit_presence=["networking", "stopping_condition", "distributed", "source_code", "checkpoint_config"], + ) @runnable_by_pipeline @validate_call def train( diff --git a/sagemaker-train/src/sagemaker/train/multi_turn_rl_trainer.py b/sagemaker-train/src/sagemaker/train/multi_turn_rl_trainer.py index d1f456c371..06c198320e 100644 --- a/sagemaker-train/src/sagemaker/train/multi_turn_rl_trainer.py +++ b/sagemaker-train/src/sagemaker/train/multi_turn_rl_trainer.py @@ -246,7 +246,10 @@ def __init__( self._latest_job: AgentRFTJob | None = None @_telemetry_emitter( - feature=Feature.MODEL_CUSTOMIZATION, func_name="MultiTurnRLTrainer.train" + feature=Feature.MODEL_CUSTOMIZATION, + func_name="MultiTurnRLTrainer.train", + emit=["_model_name", "bedrock_agentcore_qualifier"], + emit_presence=["networking", "kms_key_arn", "mlflow_app_arn", "agent_env"], ) def train( self, diff --git a/sagemaker-train/src/sagemaker/train/rlaif_trainer.py b/sagemaker-train/src/sagemaker/train/rlaif_trainer.py index f2d8460989..f0ff92c2e4 100644 --- a/sagemaker-train/src/sagemaker/train/rlaif_trainer.py +++ b/sagemaker-train/src/sagemaker/train/rlaif_trainer.py @@ -196,7 +196,12 @@ def _validate_reward_model_id(self, reward_model_id): return reward_model_id - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="RLAIFTrainer.train") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="RLAIFTrainer.train", + emit=["_model_name", "training_type"], + emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "stopping_condition", "custom_reward_function"], + ) def train(self, training_dataset: Optional[Union[str, DataSet]] = None, validation_dataset: Optional[Union[str, DataSet]] = None, wait: bool = True, wait_timeout: Optional[int] = None, poll: int = 5): """Execute the RLAIF training job. diff --git a/sagemaker-train/src/sagemaker/train/rlvr_trainer.py b/sagemaker-train/src/sagemaker/train/rlvr_trainer.py index 333a93fc55..b3e8d42f07 100644 --- a/sagemaker-train/src/sagemaker/train/rlvr_trainer.py +++ b/sagemaker-train/src/sagemaker/train/rlvr_trainer.py @@ -184,7 +184,12 @@ def _process_hyperparameters(self): delattr(self.hyperparameters, 'output_path') self.hyperparameters._specs.pop('output_path', None) - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="RLVRTrainer.train") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="RLVRTrainer.train", + emit=["_model_name", "training_type"], + emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "stopping_condition", "custom_reward_function"], + ) def train(self, training_dataset: Optional[Union[str, DataSet]] = None, validation_dataset: Optional[Union[str, DataSet]] = None, wait: bool = True, wait_timeout: Optional[int] = None, poll: int = 5): """Execute the RLVR training job. diff --git a/sagemaker-train/src/sagemaker/train/sft_trainer.py b/sagemaker-train/src/sagemaker/train/sft_trainer.py index 233f169d0f..cf07d4f75b 100644 --- a/sagemaker-train/src/sagemaker/train/sft_trainer.py +++ b/sagemaker-train/src/sagemaker/train/sft_trainer.py @@ -179,7 +179,12 @@ def _process_hyperparameters(self): delattr(self.hyperparameters, 'validation_data_path') self.hyperparameters._specs.pop('validation_data_path', None) - @_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="SFTTrainer.train") + @_telemetry_emitter( + feature=Feature.MODEL_CUSTOMIZATION, + func_name="SFTTrainer.train", + emit=["_model_name", "training_type"], + emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "stopping_condition"], + ) def train(self, training_dataset: Optional[Union[str, DataSet]] = None, validation_dataset: Optional[Union[str, DataSet]] = None, wait: bool = True, wait_timeout: Optional[int] = None, poll: int = 5): """Execute the SFT training job.