Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion sagemaker-core/src/sagemaker/core/telemetry/telemetry_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,81 @@
}


def _telemetry_emitter(feature: str, func_name: str):
def _classify_error(e: Exception) -> str:
"""Classify an exception into an actionable error category."""
error_type = type(e).__name__
error_msg = str(e).lower()

if "validation" in error_type.lower() or "invalid" in error_msg or "must be" in error_msg:
return "validation_error"
if "accessdenied" in error_msg or "not authorized" in error_msg or "forbidden" in error_msg:
return "auth_error"
if "capacity" in error_msg or "insufficientcapacity" in error_msg or "resourcelimitexceeded" in error_msg:
return "capacity_error"
if "timeout" in error_type.lower() or "timed out" in error_msg or "timeout" in error_msg:
return "timeout_error"
if "not found" in error_msg or "does not exist" in error_msg or "could not find" in error_msg:
return "resource_not_found"
if "eula" in error_msg or "accept_eula" in error_msg:
return "eula_error"
if "throttl" in error_msg or "rate exceeded" in error_msg or "too many requests" in error_msg:
return "throttling_error"
if "connection" in error_msg or "network" in error_msg or "unreachable" in error_msg:
return "network_error"
return "unknown"


def _attr_to_key(attr: str) -> str:
"""Convert attribute name to camelCase telemetry key.

Examples: '_model_name' -> 'modelName', 'training_type' -> 'trainingType'
"""
attr = attr.lstrip("_")
parts = attr.split("_")
return parts[0] + "".join(p.capitalize() for p in parts[1:])


def _extract_telemetry_params(instance, emit=None, emit_presence=None) -> str:
"""Extract telemetry params from an instance based on emit/emit_presence lists.

Args:
instance: The class instance (args[0]) to extract attributes from.
emit: List of attribute names whose actual values should be emitted.
emit_presence: List of attribute names where only presence (true/false) is emitted.

Returns:
str: URL query params string (e.g., "&x-modelName=llama&x-hasNetworking=true").
"""
parts = []
if emit:
for attr in emit:
value = getattr(instance, attr, None)
if value is not None:
key = _attr_to_key(attr)
parts.append(f"&x-{key}={value}")
if emit_presence:
for attr in emit_presence:
value = getattr(instance, attr, None)
key = _attr_to_key(attr)
parts.append(f"&x-has{key[0].upper()}{key[1:]}={'true' if value else 'false'}")
return "".join(parts)


def _telemetry_emitter(feature: str, func_name: str, emit=None, emit_presence=None):
"""Telemetry Emitter

Decorator to emit telemetry logs for SageMaker Python SDK functions. This class needs
sagemaker_session object as a member. Default session object is a pysdk v2 Session object
in this repo. When collecting telemetry for classes using sagemaker-core Session object,
we should be aware of its differences, such as sagemaker_session.sagemaker_config does not
exist in new Session class.

Args:
feature: The Feature enum value for this telemetry event.
func_name: Human-readable function name for tracking.
emit: Optional list of instance attribute names whose values will be sent in telemetry.
emit_presence: Optional list of instance attribute names where only presence is tracked
(emits true/false).
"""

def decorator(func):
Expand Down Expand Up @@ -175,6 +242,10 @@ def wrapper(*args, **kwargs):
if created_by:
extra += f"&x-createdBy={quote(created_by, safe='')}"

# Extract granular telemetry params from the instance
if (emit or emit_presence) and len(args) > 0:
extra += _extract_telemetry_params(args[0], emit, emit_presence)

start_timer = perf_counter()
try:
# Call the original function
Expand All @@ -200,6 +271,7 @@ def wrapper(*args, **kwargs):
stop_timer = perf_counter()
elapsed = stop_timer - start_timer
extra += f"&x-latency={round(elapsed, 2)}"
extra += f"&x-errorCategory={_classify_error(e)}"
if not telemetry_opt_out_flag:
_send_telemetry_request(
STATUS_TO_CODE[str(Status.FAILURE)],
Expand Down
6 changes: 5 additions & 1 deletion sagemaker-serve/src/sagemaker/serve/bedrock_model_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ def _is_nova_model_for_telemetry(self) -> bool:
except Exception:
return False

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="BedrockModelBuilder.deploy")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="BedrockModelBuilder.deploy",
emit_presence=["model_package"],
)
def deploy(
self,
job_name: Optional[str] = None,
Expand Down
14 changes: 12 additions & 2 deletions sagemaker-serve/src/sagemaker/serve/model_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3325,7 +3325,12 @@ def _reset_build_state(self):
if hasattr(self, attr):
delattr(self, attr)

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="model_builder.build")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="model_builder.build",
emit=["mode"],
emit_presence=["network", "source_code", "inference_spec"],
)
@runnable_by_pipeline
def build(
self,
Expand Down Expand Up @@ -4188,7 +4193,12 @@ def _model_builder_optimize_wrapper(
self.built_model = self._create_model()
return self.built_model

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="model_builder.deploy")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="model_builder.deploy",
emit=["mode", "instance_type"],
emit_presence=["network", "compute"],
)
def deploy(
self,
endpoint_name: str = None,
Expand Down
7 changes: 6 additions & 1 deletion sagemaker-train/src/sagemaker/train/dpo_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,12 @@ def _process_hyperparameters(self):
delattr(self.hyperparameters, 'validation_data_path')
self.hyperparameters._specs.pop('validation_data_path', None)

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="DPOTrainer.train")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="DPOTrainer.train",
emit=["_model_name", "training_type"],
emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "stopping_condition"],
)
def train(self,
training_dataset: Optional[Union[str, DataSet]] = None,
validation_dataset: Optional[Union[str, DataSet]] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,12 @@ def _get_benchmark_template_additions(self, eval_subtask: Optional[Union[str, Li

return benchmark_context

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="BenchMarkEvaluator.evaluate")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="BenchMarkEvaluator.evaluate",
emit=["benchmark"],
emit_presence=["networking", "kms_key_id", "mlflow_resource_arn"],
)
def evaluate(self, subtask: Optional[Union[str, List[str]]] = None) -> EvaluationPipelineExecution:
"""Create and start a benchmark evaluation job.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,12 @@ def _get_inference_params_from_hub(self, region: str) -> dict:
)
return fallback_params

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="CustomScorerEvaluator.evaluate")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="CustomScorerEvaluator.evaluate",
emit=["evaluator"],
emit_presence=["networking", "kms_key_id", "mlflow_resource_arn"],
)
def evaluate(self) -> EvaluationPipelineExecution:
"""Create and start a custom scorer evaluation job.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,12 @@ def _get_llmaj_template_additions(self, eval_name: str) -> dict:
'evaluate_base_model': self.evaluate_base_model,
}

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="LLMAsJudgeEvaluator.evaluate")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="LLMAsJudgeEvaluator.evaluate",
emit=["evaluator_model"],
emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "custom_metrics"],
)
def evaluate(self):
"""Create and start an LLM-as-judge evaluation job.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ def _build_job_config_doc(include_mpc: bool, mlflow_run_name: str) -> str:
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="MultiTurnRLEvaluator.evaluate",
emit=["agent_qualifier"],
emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "agent_config", "stopping_condition"],
)
def evaluate(self) -> 'MTRLEvaluationExecution':
"""Render the MTRL pipeline and start a non-blocking execution.
Expand Down
7 changes: 6 additions & 1 deletion sagemaker-train/src/sagemaker/train/model_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,12 @@ def _create_training_job_args(
return training_request


@_telemetry_emitter(feature=Feature.MODEL_TRAINER, func_name="model_trainer.train")
@_telemetry_emitter(
feature=Feature.MODEL_TRAINER,
func_name="model_trainer.train",
emit=["training_mode", "training_input_mode"],
emit_presence=["networking", "stopping_condition", "distributed", "source_code", "checkpoint_config"],
)
@runnable_by_pipeline
@validate_call
def train(
Expand Down
5 changes: 4 additions & 1 deletion sagemaker-train/src/sagemaker/train/multi_turn_rl_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ def __init__(
self._latest_job: AgentRFTJob | None = None

@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION, func_name="MultiTurnRLTrainer.train"
feature=Feature.MODEL_CUSTOMIZATION,
func_name="MultiTurnRLTrainer.train",
emit=["_model_name", "bedrock_agentcore_qualifier"],
emit_presence=["networking", "kms_key_arn", "mlflow_app_arn", "agent_env"],
)
def train(
self,
Expand Down
7 changes: 6 additions & 1 deletion sagemaker-train/src/sagemaker/train/rlaif_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ def _validate_reward_model_id(self, reward_model_id):
return reward_model_id


@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="RLAIFTrainer.train")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="RLAIFTrainer.train",
emit=["_model_name", "training_type"],
emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "stopping_condition", "custom_reward_function"],
)
def train(self, training_dataset: Optional[Union[str, DataSet]] = None, validation_dataset: Optional[Union[str, DataSet]] = None, wait: bool = True, wait_timeout: Optional[int] = None, poll: int = 5):
"""Execute the RLAIF training job.

Expand Down
7 changes: 6 additions & 1 deletion sagemaker-train/src/sagemaker/train/rlvr_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,12 @@ def _process_hyperparameters(self):
delattr(self.hyperparameters, 'output_path')
self.hyperparameters._specs.pop('output_path', None)

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="RLVRTrainer.train")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="RLVRTrainer.train",
emit=["_model_name", "training_type"],
emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "stopping_condition", "custom_reward_function"],
)
def train(self, training_dataset: Optional[Union[str, DataSet]] = None,
validation_dataset: Optional[Union[str, DataSet]] = None, wait: bool = True, wait_timeout: Optional[int] = None, poll: int = 5):
"""Execute the RLVR training job.
Expand Down
7 changes: 6 additions & 1 deletion sagemaker-train/src/sagemaker/train/sft_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,12 @@ def _process_hyperparameters(self):
delattr(self.hyperparameters, 'validation_data_path')
self.hyperparameters._specs.pop('validation_data_path', None)

@_telemetry_emitter(feature=Feature.MODEL_CUSTOMIZATION, func_name="SFTTrainer.train")
@_telemetry_emitter(
feature=Feature.MODEL_CUSTOMIZATION,
func_name="SFTTrainer.train",
emit=["_model_name", "training_type"],
emit_presence=["networking", "kms_key_id", "mlflow_resource_arn", "stopping_condition"],
)
def train(self, training_dataset: Optional[Union[str, DataSet]] = None, validation_dataset: Optional[Union[str, DataSet]] = None, wait: bool = True, wait_timeout: Optional[int] = None, poll: int = 5):
"""Execute the SFT training job.

Expand Down
Loading