StepFunction 调用 Training Jobs
stepfunction_helper.py
import io
import logging
import os
import random
import time
import uuid
import json
import boto3
import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import (
Chain,
ChoiceRule,
ModelStep,
ProcessingStep,
TrainingStep,
TransformStep,
)
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
from stepfunctions.workflow import Workflow
import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import image_uris
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.s3 import S3Uploader
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.estimator import Estimator
import os
import tarfile
def local_path_to_s3(path, bucket=None, prefix=None):
def get_file_path(root_path):
file_list = []
print('Packaging:')
for root, _, filenames in os.walk(root_path):
for filename in filenames:
print(filename)
file_list.append(os.path.join(root, filename))
return file_list
def create_tar_file(source_files, target=None):
if target:
filename = target
else:
_, filename = tempfile.mkstemp()
with tarfile.open(filename, mode="w:gz") as t:
for sf in source_files:
t.add(sf, arcname=os.path.basename(sf))
return filename
if bucket is None:
sagemaker_session = sagemaker.session.Session()
bucket = sagemaker_session.default_bucket()
if prefix is None:
prefix = 'test-sagemaker-'
print('[INFO] Uploading local code to S3 bucket: s3://{}'.format(os.path.join(bucket, prefix, 'code')))
# 获取文件列表
code_files = get_file_path(path)
# 把代码文件打包
create_tar_file(code_files, "sourcedir.tar.gz")
# upload to s3
sources = sagemaker_session.upload_data("sourcedir.tar.gz", bucket, prefix + "/code")
return sources
def make_ExecutionInput(name_list):
schema = {}
for name in name_list:
schema[name] = str
execution_input = ExecutionInput(
schema=schema
)
return execution_input
def make_script_estimator(image_uri, role, hyperparameters, sources, base_job_name, script='train.py', instance_count=1, instance_type='ml.c5.xlarge'):
def json_encode_hyperparameters(hyperparameters):
return {str(k): json.dumps(v) for (k, v) in hyperparameters.items()}
hyperparameters = json_encode_hyperparameters({
"sagemaker_program": "train.py",
"sagemaker_submit_directory": sources,
})
est = Estimator(image_uri=image_uri,
role=role,
instance_count=instance_count,
instance_type=instance_type,
hyperparameters=hyperparameters,
base_job_name=base_job_name)
return est
def make_TrainingStep(state_id, estimator, job_name,
data_mapper=None,
hyperparameters=None,
mini_batch_size=None,
experiment_config=None,
wait_for_completion=True,
tags=None,
output_data_config_path=None
):
for key in data_mapper:
data_mapper[key] = sagemaker.TrainingInput(data_mapper[key], content_type="text/csv")
training_step = steps.TrainingStep(
state_id,
estimator=estimator,
data=data_mapper,
job_name=job_name,
mini_batch_size=mini_batch_size,
experiment_config=experiment_config,
wait_for_completion=wait_for_completion,
tags=tags,
output_data_config_path=output_data_config_path
)
return training_step
def make_TrainingStep_estimator(state_id, image, role, sources, job_name,
data_mapper=None,
hyperparameters=None,
mini_batch_size=None,
experiment_config=None,
wait_for_completion=True,
tags=None,
output_data_config_path=None
):
estimator = make_script_estimator(image, role, hyperparameters, sources, 'dummy-prefix')
for key in data_mapper:
data_mapper[key] = sagemaker.TrainingInput(data_mapper[key], content_type="text/csv")
training_step = steps.TrainingStep(
state_id,
estimator=estimator,
data=data_mapper,
job_name=job_name,
mini_batch_size=mini_batch_size,
experiment_config=experiment_config,
wait_for_completion=wait_for_completion,
tags=tags,
output_data_config_path=output_data_config_path
)
return training_step
def make_Workflow_Chain(name, step_list, workflow_execution_role, create=True):
workflow_graph = Chain(step_list)
branching_workflow = Workflow(
name=name,
definition=workflow_graph,
role=workflow_execution_role,
)
if create:
branching_workflow.create()
return branching_workflow
def get_unique_name(name_mapper):
for key in name_mapper:
name_mapper[key] = "{}-{}".format(name_mapper[key], uuid.uuid1().hex)
return name_mapper
def get_full_model_output(output_model_path, training_job_name):
return os.path.join(output_model_path, training_job_name, 'output/model.tar.gz')
def workflow_execution(branching_workflow, name_mapper, wait=True):
execution = branching_workflow.execute(
inputs=name_mapper
)
execution_output = execution.get_output(wait=wait)
execution.render_progress()
return execution, execution_output
调用
导入
from stepfunction_helper import *
设置各种参数
code_path
:本地代码路径prefix
:代码在s3上的prefix,会被存到s3://<bucket-name>/<prefix>/code/
sources
:代码在s3上的实际位置,会被存到s3://<bucket-name>/<prefix>/code/
image
:训练镜像role
:SageMaker Training Jobs的Execution Roletrain
:训练集在s3上位置hyperparameters
:超参数workflow_execution_role
:Step Functions的执行Roleoutput_model_path
:模型的输出路径s3://<bucket-name>/<prefix>/
# local data path
code_path = 'code'
# prefix
prefix = 'test-stp'
# upload local code to s3
sources = local_path_to_s3(code_path, prefix=prefix)
# training image
image = '337058716437.dkr.ecr.ca-central-1.amazonaws.com/xxxtrn'
# execution role
role = 'arn:aws:iam::337058716437:role/test-mlops-template-sagemaker-role'
# training dataset
train = 's3://ca-central-sagemaker-test/iris-data/'
# hyperparameters for training jobs
hyperparameters = {
"sagemaker_program": "train.py", # entry point script
"sagemaker_submit_directory": sources, # s3 code mounted to sagemaker training jobs
}
# step functions workflow execution role
workflow_execution_role = "arn:aws:iam::337058716437:role/AmazonSageMaker-StepFunctionsWorkflowExecutionRole"
# model output path
output_model_path = sources.replace('code/sourcedir.tar.gz', '')
由于Training Jobs需要独立的名字,所以建立ExecutionInput动态指定placeholder
# place holder for every step
execution_input = make_ExecutionInput(['TrainingJobName'])
创建自定义镜像的estimator,和对应的Training Jobs Step
# Make estimator and training step
training_step = make_TrainingStep_estimator("SageMaker Training Step", image=image, role=role, sources=sources, job_name=execution_input["TrainingJobName"],
data_mapper={"train": train},
hyperparameters=hyperparameters,
output_data_config_path=output_model_path
)
创建Step Functions Workflow
# make workflow with chain step
branching_workflow = make_Workflow_Chain('LM-WF005', [training_step], workflow_execution_role)
动态输入training jobs是名字,并获取对应的模型输出名字
# dynamically set the name of steps
name_mapper = get_unique_name({"TrainingJobName" : 'training'})
# training jobs output model file in s3
output_model_targz = get_full_model_output(output_model_path, name_mapper['TrainingJobName'])
执行
# start workflow
execution, execution_output = workflow_execution(branching_workflow, name_mapper)