Module sktmls.data_status
Sub-modules
sktmls.data_status.status
Classes
class BatchStatus (job_status: str, jobs: List[dict])
-
MLS Batch 작업 상태를 나타내는 클래스입니다.
Args
- job_status: (str) 작업 상태 (
running
|success
|fail
) - jobs: (list) 작업 리스트
Returns
Methods
def get(self)
- job_status: (str) 작업 상태 (
class DynamoDBStatus (table: str, status: List[dict])
-
MLS DynamoDB 업로드 상태를 나타내는 클래스입니다.
Args
- table: (str) 테이블 명
- status: (list) 최근 5일간의 상태값 리스트
Returns
Methods
def get(self)
class S3Status (table: str, daily_status: List[dict])
-
MLS S3 업로드 상태를 나타내는 클래스입니다.
Args
- table: (str) 테이블 명
- daily_status: (list) 최근 5일간의 상태 값 리스트
Returns
Methods
def get(self)
class StatusClient (env: MLSENV = None, runtime_env: MLSRuntimeENV = None, username: str = None, password: str = None)
-
MLS Status 관련 기능들을 제공하는 클라이언트입니다.
Args
- env: (
MLSENV
) 접근할 MLS 환경 (MLSENV.DEV
|MLSENV.STG
|MLSENV.PRD
) (기본값:MLSENV.STG
) - runtime_env: (
MLSRuntimeENV
) 클라이언트가 실행되는 환경 (MLSRuntimeENV.YE
|MLSRuntimeENV.EDD
|MLSRuntimeENV.LOCAL
) (기본값:MLSRuntimeENV.LOCAL
) - username: (str) MLS 계정명 (기본값: $MLS_USERNAME)
- password: (str) MLS 계정 비밀번호 (기본값: $MLS_PASSWORD)
아래의 환경 변수가 정의된 경우 해당 파라미터를 생략 가능합니다.
- $MLS_ENV: env
- $MLS_RUNTIME_ENV: runtime_env
- $MLS_USERNAME: username
- $MLS_PASSWORD: password
Returns
Example
status_client = StatusClient(env=MLSENV.STG, runtime_env=MLSRuntimeENV.YE, username="mls_account", password="mls_password")
Ancestors
Methods
def get_batch_status(self, job_status: str) ‑> BatchStatus
-
Batch 상태를 가져옵니다.
Args
- job_status: (str) 작업 상태 (
running
|success
|fail
)
Returns
- job_status: (str) 작업 상태 (
running
|success
|fail
) - jobs: (list(dict)) 작업 리스트
Example
batch_status = status_client.get_batch_status(job_status="success")
- job_status: (str) 작업 상태 (
def get_dynamodb_status(self, table: str) ‑> DynamoDBStatus
-
DynamoDB 상태를 가져옵니다.
Args
- table: (str) 테이블 명
Returns
- table: (str) 테이블 명
- status: (list(dict)) 최근 5일간의 상태값 리스트
Example
dynamodb_status = status_client.get_dynamodb_status(table="sample_table")
def get_s3_status(self, table: str) ‑> S3Status
-
S3 상태 정보를 가져옵니다.
Args
- table: (str) 테이블 명
Returns
- table: (str) 테이블 명
- daily_status: (list(dict)) 최근 5일간의 상태 값 리스트
Example
s3_status = status_client.get_s3_status(table="sample_table")
def list_batch_status(self) ‑> List[BatchStatus]
-
Batch 상태 리스트를 가져옵니다.
Returns
list(
BatchStatus
)- job_status: (str) 작업 상태 (
running
|success
|fail
) - jobs: (list(dict)) 작업 리스트
Example
batch_status_list = status_client.list_batch_status()
- job_status: (str) 작업 상태 (
def list_dynamodb_status(self) ‑> List[DynamoDBStatus]
-
DynamoDB 상태 리스트를 가져옵니다.
Returns
list(
DynamoDBStatus
)- table: (str) 테이블 명
- status: (list(dict)) 최근 5일간의 상태값 리스트
Example
dynamodb_status_list = status_client.list_dynamodb_status()
def list_s3_status(self) ‑> List[S3Status]
-
S3 상태 리스트를 가져옵니다.
Returns
list(
S3Status
)- table: (str) 테이블 명
- daily_status: (list(dict)) 최근 5일간의 상태 값 리스트
Example
s3_status_list = status_client.list_s3_status()
- env: (