StepFunction 调用 Training Jobs

stepfunction_helper.py

import io
import logging
import os
import random
import time
import uuid
import json

import boto3
import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.steps import (
    Chain,
    ChoiceRule,
    ModelStep,
    ProcessingStep,
    TrainingStep,
    TransformStep,
)
from stepfunctions.template import TrainingPipeline
from stepfunctions.template.utils import replace_parameters_with_jsonpath
from stepfunctions.workflow import Workflow

import sagemaker
from sagemaker import get_execution_role
from sagemaker.amazon.amazon_estimator import image_uris
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.s3 import S3Uploader
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.estimator import Estimator

import os
import tarfile


def local_path_to_s3(path, bucket=None, prefix=None):
    def get_file_path(root_path):
        
        file_list = []
        print('Packaging:')
        for root, _, filenames in os.walk(root_path):
            for filename in filenames:
                print(filename)
                file_list.append(os.path.join(root, filename))

        return file_list

    def create_tar_file(source_files, target=None):
        if target:
            filename = target
        else:
            _, filename = tempfile.mkstemp()

        with tarfile.open(filename, mode="w:gz") as t:
            for sf in source_files:
                t.add(sf, arcname=os.path.basename(sf))
        return filename
    if bucket is None:
        sagemaker_session = sagemaker.session.Session()
        bucket = sagemaker_session.default_bucket()
    if prefix is None:
        prefix = 'test-sagemaker-'
    
        
    print('[INFO] Uploading local code to S3 bucket: s3://{}'.format(os.path.join(bucket, prefix, 'code')))
    
    # 获取文件列表
    code_files = get_file_path(path)

    # 把代码文件打包
    create_tar_file(code_files, "sourcedir.tar.gz")
    # upload to s3
    sources = sagemaker_session.upload_data("sourcedir.tar.gz", bucket, prefix + "/code")
    return sources

def make_ExecutionInput(name_list):
    schema = {}
    for name in name_list:
        schema[name] = str
    execution_input = ExecutionInput(
        schema=schema
    )
    return execution_input

def make_script_estimator(image_uri, role, hyperparameters, sources, base_job_name, script='train.py', instance_count=1, instance_type='ml.c5.xlarge'):
    def json_encode_hyperparameters(hyperparameters):
        return {str(k): json.dumps(v) for (k, v) in hyperparameters.items()}

    hyperparameters = json_encode_hyperparameters({
        "sagemaker_program": "train.py",
        "sagemaker_submit_directory": sources,
    })
    
    est = Estimator(image_uri=image_uri,
                    role=role,
                    instance_count=instance_count,
                    instance_type=instance_type, 
                    hyperparameters=hyperparameters,
                    base_job_name=base_job_name)
    return est

def make_TrainingStep(state_id, estimator, job_name, 
                      data_mapper=None,
                      hyperparameters=None,
                      mini_batch_size=None,
                      experiment_config=None,
                      wait_for_completion=True,
                      tags=None,
                      output_data_config_path=None
                     ):
    
    for key in data_mapper:
        data_mapper[key] = sagemaker.TrainingInput(data_mapper[key], content_type="text/csv")
    
    training_step = steps.TrainingStep(
        state_id,
        estimator=estimator,
        data=data_mapper,
        job_name=job_name,
        mini_batch_size=mini_batch_size,
        experiment_config=experiment_config,
        wait_for_completion=wait_for_completion,
        tags=tags,
        output_data_config_path=output_data_config_path
    )
    return training_step

def make_TrainingStep_estimator(state_id, image, role, sources, job_name, 
                      data_mapper=None,
                      hyperparameters=None,
                      mini_batch_size=None,
                      experiment_config=None,
                      wait_for_completion=True,
                      tags=None,
                      output_data_config_path=None
                     ):
    
    estimator = make_script_estimator(image, role, hyperparameters, sources, 'dummy-prefix')
    
    for key in data_mapper:
        data_mapper[key] = sagemaker.TrainingInput(data_mapper[key], content_type="text/csv")
    
    training_step = steps.TrainingStep(
        state_id,
        estimator=estimator,
        data=data_mapper,
        job_name=job_name,
        mini_batch_size=mini_batch_size,
        experiment_config=experiment_config,
        wait_for_completion=wait_for_completion,
        tags=tags,
        output_data_config_path=output_data_config_path
    )
    return training_step

def make_Workflow_Chain(name, step_list, workflow_execution_role, create=True):
    workflow_graph = Chain(step_list)
    branching_workflow = Workflow(
        name=name,
        definition=workflow_graph,
        role=workflow_execution_role,
    )
    if create:
        branching_workflow.create()
    return branching_workflow

def get_unique_name(name_mapper):
    for key in name_mapper:
        name_mapper[key] = "{}-{}".format(name_mapper[key], uuid.uuid1().hex)
    return name_mapper

def get_full_model_output(output_model_path, training_job_name):
    return os.path.join(output_model_path, training_job_name, 'output/model.tar.gz')

def workflow_execution(branching_workflow, name_mapper, wait=True):
    execution = branching_workflow.execute(
        inputs=name_mapper
    )
    execution_output = execution.get_output(wait=wait)
    execution.render_progress()
    return execution, execution_output

调用

导入

from stepfunction_helper import *

设置各种参数

  • code_path:本地代码路径
  • prefix:代码在s3上的prefix,会被存到s3://<bucket-name>/<prefix>/code/
  • sources:代码在s3上的实际位置,会被存到s3://<bucket-name>/<prefix>/code/
  • image:训练镜像
  • role:SageMaker Training Jobs的Execution Role
  • train:训练集在s3上位置
  • hyperparameters:超参数
  • workflow_execution_role:Step Functions的执行Role
  • output_model_path:模型的输出路径s3://<bucket-name>/<prefix>/
# local data path
code_path = 'code'
# prefix
prefix = 'test-stp'

# upload local code to s3
sources = local_path_to_s3(code_path, prefix=prefix)

# training image
image = '337058716437.dkr.ecr.ca-central-1.amazonaws.com/xxxtrn'
# execution role
role = 'arn:aws:iam::337058716437:role/test-mlops-template-sagemaker-role'
# training dataset
train = 's3://ca-central-sagemaker-test/iris-data/'

# hyperparameters for training jobs
hyperparameters = {
    "sagemaker_program": "train.py", # entry point script
    "sagemaker_submit_directory": sources, # s3 code mounted to sagemaker training jobs
}

# step functions workflow execution role
workflow_execution_role = "arn:aws:iam::337058716437:role/AmazonSageMaker-StepFunctionsWorkflowExecutionRole"

# model output path
output_model_path = sources.replace('code/sourcedir.tar.gz', '')

由于Training Jobs需要独立的名字,所以建立ExecutionInput动态指定placeholder

# place holder for every step
execution_input = make_ExecutionInput(['TrainingJobName'])

创建自定义镜像的estimator,和对应的Training Jobs Step

# Make estimator and training step
training_step = make_TrainingStep_estimator("SageMaker Training Step", image=image, role=role, sources=sources, job_name=execution_input["TrainingJobName"], 
                      data_mapper={"train": train},
                      hyperparameters=hyperparameters,
                      output_data_config_path=output_model_path
                 )

创建Step Functions Workflow

# make workflow with chain step
branching_workflow = make_Workflow_Chain('LM-WF005', [training_step], workflow_execution_role)

动态输入training jobs是名字,并获取对应的模型输出名字

# dynamically set the name of steps
name_mapper = get_unique_name({"TrainingJobName" : 'training'})
# training jobs output model file in s3 
output_model_targz = get_full_model_output(output_model_path, name_mapper['TrainingJobName'])

执行

# start workflow
execution, execution_output = workflow_execution(branching_workflow, name_mapper)
最后修改:2021 年 09 月 03 日 12 : 06 AM
如果觉得我的文章对你有用,请随意赞赏