AWS Sagemaker 파이프라인은 머신러닝에 필요할 것으로 예상되는 표준화된 단계들이 일관된 형태로 상호작용할 수 있는지를 고민했다. TFX(from2)와 매우 유사하다. AWS Sagemaker 파이프라인은 이 모든 작업이 클라우드에서 작동하는 경우만을 처리할 수 있을 뿐이다.
import boto3
import sagemaker
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.session.Session()
Python
복사
초기화
# 파이프라인의 입출력을 워크플로우 파라미터 객체로 정의한다.
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
# 아래 정의된 변수들은 파이프라인 구성 요소들을 정의할 때 사용된다.
preprocessing_instance_count = ParameterInteger(
name="PreprocessingInstanceCount",
default_value=1)
preprocessing_instance_type = ParameterString(
name="PreprocessingInstanceType",
default_value="ml.m5.xlarge")
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.xlarge")
training_instance_count = ParameterInteger(
name="TrainingInstanceCount",
default_value=1)
input_data = ParameterString(
name="InputData",
default_value=input_data_uri) # S3 경로
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
# 파이프라인에서 실질적으로 실행될 대상 정의
# 이 경우에는 sklearn 모델 입력 전처리를 위한 'Processor'
sklearn_preprocessor = SKLearnProcessor(
framework_version="0.23-1",
instance_type=preprocessing_instance_type,
instance_count=preprocessing_instance_count,
base_job_name="sklearn-fraud-preprocess",
role=role,
)
# 파이프라인의 특정 노드, 즉 Step 정의
# 이 경우에는 전처리를 의미하는 'Processing'
step_process = ProcessingStep(
name="FraudScratchPreprocess",
processor=sklearn_preprocessor,
inputs=[
ProcessingInput(source=input_data, destination='/opt/ml/processing/input'),
],
outputs=[
ProcessingOutput(output_name="train", source='/opt/ml/processing/output/train'),
ProcessingOutput(output_name="test", source='/opt/ml/processing/output/test')
],
job_arguments=["--split_rate", "0.2"],
code="src/preprocessing.py",
)
Python
복사
파이프라인 구성요소를 만드는 과정
from sagemaker.workflow.pipeline import Pipeline
pipeline = Pipeline(
name="sagemaker-pipeline",
parameters=[
processing_instance_type,
processing_instance_count,
training_instance_type,
training_instance_count,
input_data,
],
steps=[
step_process,
step_train,
step_create_model,
step_deploy
],
)
Python
복사
파이프라인 만들기
# 파이프라인 정의를 Sagemaker 파이프라인에 제출하여 파이프라인을 생성
# 파이프라인이 이미 존재하면 파이프라인 정의를 업데이트
# Sagemaker Studio 에서도 파이프라인을 확인할 수 있음
pipeline.upsert(role_arn=role)
# 파이프라인 샐행
execution = pipeline.start()
# 파이프라인 진행상황 확인
execution.describe()
# 실행 완료 대기
execution.wait()
# 실행 결과를 담은 리스트 반환
# steps_li[-1] 와 같은 방법으로 값을 가져올 수 있어 경로 추출에 유용하게 사용 가능
steps_li = execution.list_steps()
Python
복사
파이프라인 실행
파이프라인 API 이용 비용이 따로 들지는 않지만, Sagemaker Studio를 사용하거나 파이프라인에서 사용하는 인스턴스 혹은 서버리스 비용(참고1,참고2:서버리스를 사용할 수 있음)이 청구된다.
parse me : 언젠가 이 글에 쓰이면 좋을 것 같은 재료들.
1.
None
from : 과거의 어떤 생각이 이 생각을 만들었는가?
1.
•
이미 비슷하게 데이터 파이프라인이라는 도구가 있다. 다만 이 도구는 머신러닝 작업에 특화되어 있다. @9/14/2023 기준 아직 AWS Sagemaker Studio에서 GUI를 이용해 작업할 수 있도록 만들어져 있지는 않다.
2.
•
결국 TFX가 하고자 하는 것과 정말 비슷하다.
supplementary : 어떤 새로운 생각이 이 문서에 작성된 생각을 뒷받침하는가?
1.
None
opposite : 어떤 새로운 생각이 이 문서에 작성된 생각과 대조되는가?
1.
None
to : 이 문서에 작성된 생각이 어떤 생각으로 발전되고 이어지는가?
1.
None
참고 : 레퍼런스