AWS DB 운영, ChatOps로 초극한 자동화 플랫폼 엔지니어링

2024. 6. 1. 23:49데이터베이스

반응형

 


 

 

안녕하세요? DBA로 일하고 있는 일반인입니다.

 

 

[AWS 멀티 어카운트 환경]에서의 RDS 운영 - Aurora MySQL 을 Slack ChatOps로 극한까지 자동화한 개인 프로젝트를 소개합니다.

플랫폼 엔지니어링의 관점에서 어떻게 하면 데이터베이스 운영을 최소한의 리스크로 고품질로 유지시킬지에 대해 많은 고민을 담았습니다.

이 글에서 설명하는 모든 내용의 코드는 https://github.com/leezzangmin/db_automation 에서 보실 수 있습니다. 

 

개략적인 아키텍처

 


 

프로젝트의 목적과 탄생 배경

DBA로서 매일 반복적인 작업을 하다 보면 지치기 마련입니다.

모든 개발팀의 요청 작업 및 데이터베이스 배포는 수문장처럼 지키고있는 DBA의 검토를 받습니다.

아무리 간단하고 변경내용이 작은 작업이라도 업무 협의는 이루어져야 합니다. 그 때 마다 개발팀과 인프라팀의 (때때로 기획, 실무 및 C레벨) 커뮤니케이션 리소스는 소모되기 마련입니다. 적게는 하루이틀에서, 길면 일이주동안 검토와 업무 스케쥴링을 해야합니다. 굉장히 노동집약적인 워크로드입니다.

 

그래서, 테이블을 생성하거나 컬럼을 추가하는 등의 단순 작업을 자동화할 수 있다면 얼마나 좋을까요?

AWS RDS 운영을 최대한 자동화하여 저 뿐만 아닌 서로의 시간과 노력을 절약하고, 잠재적인 휴먼 에러를 줄이는 것이 주된 목적입니다.

 

플랫폼으로 아래의 일들을 수행하려 합니다.

 

1. 반복적인 DB 요청 및 운영 작업을 자동화하여 DB 엔지니어가 더 중요한 일에 집중할 수 있도록 돕습니다.

-> 비즈니스 밸류가 더 높은 일에 시간을 쓸 수 있습니다.

 

 

2. 플랫폼 엔지니어링으로 일반 개발자들도 DB에 대한 전문지식 없이 어느정도의 인프라 운영이 가능하도록 돕습니다.

-> 개발자가 데이터베이스를 어디까지 공부해야 할까요? 내부 동작을 학습하는 것이 회사 전체 입장에서 과연 효율이 좋을까요? 복잡한 부분은 컴퓨터에게 시키고 서비스로직 코딩하는 게 더 부가가치가 높을 것 같습니다.

 

 

3. 리스크가 높은 작업들을 수동으로 작업했을 때의 장애를 예방합니다.

-> 수작업으로 진행했을 경우 기민한 대처가 힘든 경우가 있습니다.

 

 

4. [수 많은 템플릿의 각종 shell, python 등등]  SCRIPT-BASED의 전통적인 노동집약적 DBA 노동형식에서 벗어나게 돕습니다.

-> 이 상황에는 3번 스크립트.. 앗 메타락이다 14번 스크립트.. 등의 복잡함에서 탈출

 

 

5. 이 모든 작업을 복잡한 선행 작업(jira 요청 티켓 생성 및 작성, api 호출, 배포 스크립트 작성 등) 없이 Slack에서 버튼 클릭 몇 번으로 수행하게 돕습니다. 모든 증적은 슬랙 메세지와 인터널 DB에 로깅됩니다.
-> ChatOps의 장점은 후술합니다

 

 

6. 개발팀의 요청 처리 뿐 아니라 손쉬운 운영을 위해 다음과 같은 검사도 수행합니다. 문제를 조기탐지하여 장애를 예방합니다.

  • DB 파라미터 값 표준 정의 및 준수 여부 검사 (ex. performance_schema 활성화 여부, collation, charset 등)
  • PROD - DEV 간 객체 차이점 검사 (Table, View, Function, Procedure, Trigger, Account 등)
  • 계정 과도한 권한 검사
  • RDS Resource Tag 여부
  • 실시간 메트릭 검사, 롱 쿼리와 메타데이터 락, HLL 모니터링 등

 


 

 

자동화 다 좋은데, 어느 경우에 적용할 것인가?

자동화라는 말은 듣기만 해도 설렙니다. 엔지니어로서, 프로젝트에 투입하는 비용과 시간을 줄이는 것은 숙명과 같습니다. 

하지만 좋은 자동화를 하는 것은 어렵습니다. 

 

모든 비용을 계산해보았을 때

- 절약하는 시간이 미미해서 구축하는 데 시간이 오히려 더 길다거나

- 구축했지만 자동화된 작업이 실패해서 수습하는 것에 오히려 더 많은 리소스를 투입한다거나

- 구축하는 사람은 편해졌지만 오히려 사용하는 사람의 시간을 뺏는다거나

의 엔지니어링적인 저울질을 잘 해야합니다. 인프라팀의 시간 뿐 아니라 회사 전체의 리소스를 고려하여 넣어 계산하는 것은 언제나 어렵습니다.

 

DB 요청은 하루에도 적게는 세네건, 많으면 열건 이상 발생하고비즈니스가 진행된다면 절대 줄어들지 않고 우상향하기 때문에 어느 정도 위임이 필요하다고 판단됩니다. 

 

 

 

 

 

자동화 구축은 결정되었고 이제는 사용성이 관건입니다. DBA의 1차 고객은 어쨌거나 개발조직입니다.

개발조직이 사용하거나 요청하기 불편하다면(요청 난이도가 논문작성급) 플랫폼으로서의 기능은 하지 못하는 것입니다.

 


 

그래서, 가장 먼저 접근성을 고려합니다.

 

DB 운영 플랫폼을 어떻게 구축하는게 가장 진입장벽이 낮을까요?

 

새로운 플랫폼을 구상하기 전에, 기존의 전통적인 작업 환경을 생각해봅시다.

jira ticket-based 입니다.이걸 토대로 워크플로우를 나열하자면

 

개발팀이 바라보는 전통적인 요청 ticket-based의 워크플로우 (간소화됨)

 

1. PC로 jira에 접속해서

2. 프로젝트를 선택하고

3. 티켓을 생성하기 위해 요청 템플릿을 찾거나 기존의 요청 내역을 살펴봅니다.

4. 티켓에 필요한 필드를 모두 채우고 (작업자, 작업기한, 관련자 태그, 에픽 링크)

5. 드디어 작업 컨텐츠를 본문에 작성합니다. (컬럼 추가 사유 작성, 컬럼 추가 커맨드, 관련 PR 링크, 슬랙 스레드 삽입 등)

6. 티켓을 작성하고 작업에 대한 상세한 논의를 슬랙 스레드 혹은 직접 소통합니다.

7. 커뮤니케이션, 커뮤니케이션, 커뮤니케이션

 

간단한 작업이라도 이 순환의 노동집약적 플로우는 계속 가동되어야 합니다.

 

 

 

 

 

Jira 티켓 기반 요청의 태생적인 단점으로, 업무 흐름을 관심있게 직접 들어가서 찾아서 눈여겨보지 않으면, 이 모든 프로세스가 어떻게 흘러가고 있는지 알 수 없다는 것입니다. 한마디로 정보의 투명성이 탁해져서 구성원간 이해도의 정보 격차가 발생하게 됩니다. "나는 저기다가 잘 써놨는데 안 찾아본 네가 잘못이지" 같은 상황이 발생할 수도 있다는 것입니다.

 

 

 


 

 

(ChatOps) 모두가 볼 수 있는 Slack 채널에서 플랫폼을 구축한다면? 

더보기

ChatOps란 실시간 커뮤니케이션을 기반으로 비즈니스 요구사항이 발생했을 때 문제를 빠르게 해결하기 위한 도구입니다. 봇(bot)을 활용해 대화에 빠르게 대응하며 상태를 확인하고 보고해 줍니다.

ChatOps를 이용하면 사람, 도구, 프로세스, 자동화가 연결됩니다.

개발자나 운영자는 더 이상 터미널에 접속하거나 도구에 직접 접근할 필요 없이 봇에게 명령하면 됩니다.

봇이 작업을 수행한 뒤 결과를 공유해 주며, 모든 유관 부서가 같은 채팅 앱에서 실시간으로 공유 받기 때문에 소통이 빨라지고 정보의 투명성이 올라갑니다.

(https://techblog.lycorp.co.jp/ko/how-to-use-chatops-to-automate-devops-tasks-feat-slack-hubot)

+ 덧붙여 최근 AWS Seoul Summit 2024에 가서 카카오페이증권과 당근마켓의 플랫폼 엔지니어링 세션을 참가했습니다.

늘어나는 요구사항과 비즈니스를 만족시키기 위해서는 정보의 개방성과 병목지점을 개발팀 스스로 해결할 수 있는 플랫폼이 있어야 하겠다는 것을 절실히 깨달았습니다. 작년 서밋 세션에서 쿠팡 DB 팀의 사례도 마찬가지입니다.

그래서 Slack 에서 봇 기반의 플랫폼을 구축하기로 결정합니다. (사실 이미 결정했는데 이 계기로 확신이 생김)

 

 


 

 

취지는 좋은데 Slack에서 플랫폼 구축을 어떻게 해 ?

이 글을 읽고 계신 분들도 Slack을 통해 업무 자동화를 어느 정도 구현하고 계실 것입니다.

예를 들면,

1. AWS 리소스 메트릭 모니터링을 하는 Datadog과 같은 Saas 를 integration 설정해서 알람을 쏘거나

2. JIRA integration으로 티켓 생성을 자동화 한다거나

3. 구글 캘린더를 연동해서 회의 알람을 받는다거나..

4. github 와 연동해서 배포 현황을 본다거나

 

등등을 사용하실 것 같습니다. 

이런 자동화의 한계는, integration을 제공하는 vendor에서 제공하는 기능에 종속적일 수 밖에 없다는 것입니다. 

쉽게 말하자면 '우리가 만들어놓은 것만 써' 입니다. 그래서 뭔가 더 확장하려면 금방 한계에 부딪힙니다.

 

 

< 그래서, 우리가 직접 만들고 유지보수할 수 있는 커스텀한 도구가 필요합니다. >

 

저는 AWS Multi-Account, MSA 환경의 RDS 운영을 위한 ChatOps 플랫폼을 구현하기 위한 도구로 아래 도구를 사용했습니다.

  • Java, Springboot, JPA
    • Java SpringBoot 생태계의 강력하고 풍부한 기능을 활용 (Junit 테스팅 프레임워크, AOP, Scheduler, DI와 싱글톤개념 등)
  • AWS 공식 SDK
    • AWS에서 제공하는 양질의 문서에 기반한 리소스 컨트롤을 자유자재로 도와주는 SDK
  • Slack 공식 SDK
    • api를 통해 슬랙의 메세지와 블록들을 컨트롤 할 수 있게하는 SDK 및 api
  • Slack Interactivity + Block kit + slash command
  • JSQL (SQL 파싱 java 오픈소스 라이브러리)

감이 오지 않으실테니 허접한 실제 플로우 데모 영상을 준비했습니다.

실제 개발팀이 DB 변경 요청을 하는 내용과,

미리 Admin으로 지정된 사람이 작업을 승인한 후,

실제 자동으로 수행되는 것 까지의 프로세스를 담았습니다.

2배속으로 보시는 것을 추천합니다

요청에 필요한 실제 SQL 도 생성해주고,

미리 정의된 표준에 맞지 않는 요청은 validation 단계에서 걸러집니다.

모든 작업은 사전에 정의된 안전 기준을 충족해야 합니다. 롱쿼리 탐지, 메타데이터 락 모니터링, 실시간 메트릭 검사 등의 기능을 통해 작업 중 발생할 수 있는 문제를 사전에 차단합니다. 예를 들어, 특정 쿼리가 예상보다 오래 실행될 경우, 자동으로 작업을 중단하고 관리자에게 실패 알림을 보냅니다.

 

아래는 데모의 사례에 해당하는 요청 종류에서, 테이블 컨벤션을 검사하는 짤막한 코드 예제입니다. (https://github.com/leezzangmin/db_automation/blob/main/src/main/java/zzangmin/db_automation/convention/TableConvention.java)

public static void validateTableConvention(Table table) {
    checkDuplicateColumnAndConstraintConvention(table.getColumns(), table.getConstraints());
    checkNamingConvention(table.getColumns(), table.getConstraints(), table.getTableName());
    checkTableOptionConvention(table.getTableEngine(), table.getTableCharset(), table.getTableCollate(), table.getTableComment());
    for (Column column : table.getColumns()) {
        ColumnConvention.validateColumnConvention(column);
    }
    for (Constraint constraint : table.getConstraints()) {
        IndexConvention.validateIndexConvention(constraint);
    }
}

 

 

이것 외에도 각종 표준과 제약사항이 코드로 정의되어있습니다.(파라미터, 스키마, 컬럼, 인코딩, 콜레이션, 계정 권한, AWS TAG 등)

 

각 요청마다 이 표준과 제약사항을 요청에 맞는 validation 로직에 태워 검사합니다.

수행 사전에는 롱쿼리와 하드웨어 metric 등을 미리 체크하고,

심지어 수행 도중에는 메타데이터 락과 인스턴스의 Metric을 모니터링하다가 이상이 발생하면 즉시 작업을 중단합니다.

 

이처럼 Slack을 통해 모든 작업이 실시간으로 모니터링되고 피드백 받을 수 있습니다. 예를 들어, DBA가 특정 작업을 승인하면, 해당 작업의 진행 상황과 결과가 실시간으로 Slack 채널에 공유됩니다. 이렇게 하면 모든 관련 부서가 동일한 정보를 실시간으로 확인할 수 있어 협업이 원활해집니다.

 

 

 

 

 

 

백오피스인데 Slack 서버와 인터넷 통신하면 보안은 어떡하고 ?

기본적으로는 public하게 엔드포인트가 열려있어서 누구나 네트워크 요청을 찔러볼 수 있습니다.

그래서 슬랙에서 이같은 슬랙 앱을 private하게 운영할 수 있도록 2020년, Socket mode라는 것을 제공합니다. 소켓 단방향 연결로 서버를 띄울 수 있어서 이젠 슬랙 서버만 네트워크 요청을 찌를 수 있습니다. (https://api.slack.com/apis/socket-mode)

 

소켓 모드를 적용하면 이제 private 서버입니다. 하지만 슬랙 사용자의 클라이언트만 탈취되면 누구나 요청을 보낼 수 있습니다.

Slack 을 통해 작업을 수행할 때는 사용자 권한을 철저히 관리합니다. 슬랙 내부의 인증 서비스를 활용해 백엔드 서버와의 모든 HTTP 요청을 인증/검증하고, 내부적으로 signing secret을 통한 검증 단계도 존재합니다.

 

이 단계들을 뚫고 들어왔다고 해도 실제 작업 승인은 미리 설정해둔 ADMIN 권한을 가진 사용자만 실행할 수 있도록 제한되어 있습니다. 정상적인 프로세스라면 DBA에게만 부여되어있겠죠.

또한, 모든 작업 내역은 슬랙 스레드 채팅과 내부 DB 테이블에 로그로 남아 추후 감사가 가능합니다. 

 

 

 

 

기존 Jira를 통한 업무 플로우와 대비되는 간략히 작성한 워크플로우입니다.

 

 

1. Slack 에 로그인된 사용자가 DB 요청 채널에서 슬래시 커맨드를 유발 (/db_request)

2. 커맨드로 슬랙 모달 창이 띄워짐

3. 어떤 종류의 요청을 할 것인지 DropDown 항목에서 템플릿 선택 후 요청 내용 작성

4. 전송 버튼 클릭으로 모달 닫음

5. 전송됨과 동시에 DB 슬랙 채널에 요청 메세지가 도착

6. DBA는 해당 요청 메세지 스레드에서 작업을 논의하고, 작업이 결정되면 수락 버튼 클릭 (혹은 거절 버튼 클릭)

7. 요청 종류에 따라 안전하게 자동화된 프로세스로 요청 실행

8. 요청 실행 결과가 다시 Slack 채널에 전송됨

 

 

 

개발팀이 직접 소통하고 신경써야 할 부분이 줄었습니다. 자동화된 프로세스가 많은 일을 도맡아서 합니다.

 

 

 

 


 

 

이제 기술 세부사항에 대해서 살펴봅시다. 드디어!

 

하소연 시작

Slack api 를 통해 Block Action을 컨트롤하는 작업은 정말 힘든 과정이었습니다. 문서가 꽤 자세하긴 하지만 너무 방대했고 버전 별 차이도 많았습니다.

또한 인터넷에는 코드 예제가 거의 없다시피 했습니다. 찾았다!! 싶더라도 Bolt SDK를 통한 webhookURL 을 호출하는 형식의 간단한 토이프로젝트였습니다. 제가 원하는 것은 고수준의 동적 block 컨트롤이었고, 구현해야하는 기능도 많았기 때문에 참 고민이 많았습니다.

 

머리가 가장 많이 빠졌던 부분은 Slack에서 표준으로 보내주는 http 메세지의 컨텐츠가가 모두 requestbody 에 JSON 형식으로 자유분방하게 담겨있어서 파라미터 컨트롤하는 것이 참 어려웠던 것입니다. 수백개의 Block에 먼저 unique한 id를 상수로 정의하고, 매 요청마다 java의 stream 을 사용해서 수십개의 블럭을 파싱하고 id를 식별해서 갱신하는 작업이 복잡도가 참 높았습니다. ㅎㅎ

슬랙에서 콜백 URL은 '단 하나'만 지원하기 때문에 사용자의 액션에서 오는 모든 콜백과 요청을 하나의 컨트롤러-하나의 메서드에서 모두  OOP스럽게 처리해야하는 것도 참 고민이었습니다. 네.. 뭐 그랬습니다 ㅠㅠ

하소연 끝

 

 

아까 봤던 유저 요청 플로우 다이어그램을 더 자세하게 그려보겠습니다.

사용자가 특정 테이블에 Add Column을 요청하는 예제입니다.

단일 요청으로 발생하는 프로세스

 

복잡해 보입니다. 하나씩 코드를 인용해 설명해보겠습니다

 

 


 

 

AWS Multi Account 을 지원하는 방법

  • 백엔드 서버가 실행되는 환경에 aws configure 명령으로 모든 계정의 profile을 미리 등록합니다.
  • DB 커넥션에 필요한 정보를 AWS SDK 를 통해 당겨옵니다.
  • Database 식별하는 방식은 태그 방식입니다.
  • env, service 태그로 구분합니다
  • Cluster 와 Stand-alone 을 구분해서 모두 저장합니다.
  • DB 접속에 사용되는 계정은 미리 생성되어있고, 계정 정보는 컨벤션에 맞게 secret-manager에 등록되어있습니다.

간략한 코드로 보시죠.

@Component
public class AwsClient {

    private static final Region defaultRegion = Region.AP_NORTHEAST_2;
    private final ProfileFile profiles = ProfileFile.builder()
        .content(Paths.get(System.getProperty("user.home"), ".aws", "credentials"))
        .type(ProfileFile.Type.CREDENTIALS)
        .build();

    private final Map<String, RdsClient> accountIdRdsClients = new HashMap<>();
    private final Map<String, SecretsManagerClient> accountIdSecretsManagerClients = new HashMap<>();
    private final Map<String, CloudWatchClient> accountIdCloudWatchClients = new HashMap<>();
    private final Map<String, PiClient> accountIdPiClients = new HashMap<>();

    @PostConstruct
    public void initAwsClients() {
        Map<String, Profile> profileMap = profiles.profiles();

        for (String profileName : profileMap.keySet()) {
            AwsCredentialsProvider credentialsProvider = ProfileCredentialsProvider.builder()
                    .profileFile(profiles)
                    .profileName(profileName)
                    .build();

            String accountId = getAwsAccountId(credentialsProvider);

            accountIdRdsClients.put(accountId, RdsClient.builder()
                    .credentialsProvider(credentialsProvider)
                    .region(defaultRegion)
                    .build());

            accountIdSecretsManagerClients.put(accountId, SecretsManagerClient.builder()
                    .credentialsProvider(credentialsProvider)
                    .region(defaultRegion)
                    .build());

            accountIdCloudWatchClients.put(accountId, CloudWatchClient.builder()
                    .credentialsProvider(credentialsProvider)
                    .region(defaultRegion)
                    .build());

            accountIdPiClients.put(accountId, PiClient.builder()
                    .credentialsProvider(credentialsProvider)
                    .region(defaultRegion)
                    .build());
        }
    }
}
@RequiredArgsConstructor
@Service
public class AwsService {

    private final AwsClient awsClient;
    public Map<String, List<DBCluster>> findAllClusterInfo() {
        Map<String, List<DBCluster>> dbClusters = new HashMap<>();

        Map<String, RdsClient> rdsClients = awsClient.findAllRdsClients();
        for (String accountId : rdsClients.keySet()) {
            RdsClient rdsClient = rdsClients.get(accountId);
            DescribeDbClustersResponse describeDbClustersResponse = rdsClient.describeDBClusters();
            DescribeDbClustersResponse clustersResponse = findValidClusters(describeDbClustersResponse);
            List<DBCluster> accountClusters = clustersResponse.dbClusters();
            dbClusters.put(accountId, accountClusters);
        }

        log.info("clusters: {}", dbClusters);
        return dbClusters;
    }
}
@RequiredArgsConstructor
@Component
@Profile("!test")
@DependsOn("profileUtil")
public class DynamicDataSourceLoader {

    private final DynamicDataSourceProperties dynamicDataSourceProperties;
    private final AwsService awsService;

    @PostConstruct
    public void loadDynamicDataSources() {
        Map<String, List<DBCluster>> clusters = awsService.findAllClusterInfo();
        Map<String, List<DBInstance>> instances = awsService.findAllInstanceInfo();

		// 클러스터 저장 로직
        for (String accountId : clusters.keySet()) {
            List<DBCluster> accountClusters = clusters.get(accountId);
            for (DBCluster accountCluster : accountClusters) {
                String dbName = accountCluster.dbClusterIdentifier();

                List<Tag> tags = accountCluster.tagList();
                if (!isValidTags(dbName, tags)) {
                    continue;
                }
                Tag serviceNameTag = tags.stream()
                        .filter(tag -> tag.key().equals(TagStandard.SERVICE_TAG_KEY_NAME))
                        .findFirst()
                        .orElseThrow(IllegalStateException::new);
                Tag environmentTag = tags.stream()
                        .filter(tag -> tag.key().equals(TagStandard.ENVIRONMENT_TAG_KEY_NAME))
                        .findFirst()
                        .orElseThrow(IllegalStateException::new);

                String rdsUsername = awsService.findRdsUsername(accountId, serviceNameTag.value(), environmentTag.value());
                String password = awsService.findRdsPassword(accountId, serviceNameTag.value(), environmentTag.value());

                DatabaseConnectionInfo databaseConnectionInfo = DatabaseConnectionInfo.builder()
                        .environment(environmentTag.value())
                        .accountId(accountId)
                        .serviceName(serviceNameTag.value())
                        .databaseType(DatabaseConnectionInfo.DatabaseType.CLUSTER)
                        .databaseName(dbName)
                        .driverClassName("com.mysql.cj.jdbc.Driver")
                        .url("jdbc:mysql://" + accountCluster.endpoint())
                        .username(rdsUsername)
                        .password(password)
                        .build();

                dynamicDataSourceProperties.addDatabase(dbName, databaseConnectionInfo);
            }
        }

        // 인스턴스 저장 로직 생략...
        
        dynamicDataSourceProperties.validateDatabases();
        dynamicDataSourceProperties.logDatabases();
    }

    private boolean isValidTags(String dbName, List<Tag> tags) {
        if (tags == null || tags.isEmpty()) {
            log.info("{} DB에 태그가 존재하지 않습니다.", dbName);
            return false;
        }
        return true;
    }
}

 

AwsClient에서는 aws configure 된 profile 기반으로 각 client를 init 합니다.

초기화된 client list 를 순회하며 AWS SDK로 각 계정의 DB 리소스를 가져옵니다.

클러스터와 인스턴스를 나눠서 DatabaseConnectionInfo 객체로 생성하고, `DynamicDataSourceProperties`라는 데이터소스 저장소 클래스에 add 합니다.

이제 백엔드 서버는 캐싱된 멀티어카운트 데이터소스를 필요에 따라 가져와서 자유자재로 활용할 수 있습니다.

혹시 온프레미스 DB가 존재한다면, 따로 onpremise-config.yaml 파일을 만들어 static 초기화하면 됩니다.

 

데이터소스를 메모리에 캐싱한 이유는 매 요청마다 SDK 를 호출하는 것은 레이턴시도 증가하고, 특히나 호출 비용이 증가하기 때문입니다.

 

 

Slack Slash Command 발동

slash 자체를 만드는 방법은 쉽습니다. 튜토리얼만 블로그 글만 따라봐도 어렵지 않게 구축할 수 있습니다.

다만, 삽질을 좀 한 내용으로는.. URL을 지정할 때 CA 인증서로 통신하는 https 도메인만 등록할 수 있어서 당황했던 내용입니다.

가비아에서 유료 도메인 구매하고.. aws certificate manager 에 등록하고.. lb에 등록하고 등 한번 해봤으면 쉬웠을 것 같은데 좀 헤맸습니다. 인증서 때문에 프로젝트를 로컬에서 돌릴 수가 없어 수정할 때 마다 커밋 푸시 후aws 인프라를 통해 배포하는 기행을 토했죠.

다 해놓고 나니 ngrok이나 localtunner 같은 서비스가 있는걸 우연히 발견해서 지난날의 저를 비웃었다는 이야기..

 

 

슬래시 커맨드 콜백 코드를 보시죠

@Slf4j
@RequiredArgsConstructor
@RestController
public class SlackController {

    private final MethodsClient slackClient;
    private final SlackRequestHandler slackRequestHandler;
    private final SlackService slackService;
    private static final JsonPayloadTypeDetector payloadTypeDetector = new JsonPayloadTypeDetector();

    @PostMapping(value = "/slack/command/dbselect", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
    public void databaseRequestCommand(@RequestParam("token") String token,
                                  //생략
                                  @RequestBody String requestBody,
                                  @RequestHeader("X-Slack-Signature") String slackSignature,
                                  @RequestHeader("X-Slack-Request-Timestamp") String timestamp) throws SlackApiException, IOException {

		validateRequestAuth(slackSignature, timestamp, requestBody);

        List<LayoutBlock> initialBlocks = new ArrayList<>();
        initialBlocks.addAll(SelectCommandBlocks.selectCommandGroupAndCommandTypeBlocks());
        ViewsOpenResponse viewsOpenResponse = slackClient.viewsOpen(r -> r.triggerId(triggerId)
                .view(BasicBlockFactory.findGlobalRequestModalView(initialBlocks)));
        log.info("viewsOpenResponse: {}", viewsOpenResponse);
    }
}

 

요약하면, LayoutBlock들을 리스트로 만들어서 Slack에 View Open이라는 api call을 하는 내용입니다.

최초 요청에 대한 triggerId를 담는 것을 확인할 수 있습니다.

해당 호출이 성공적으로 반환되면 개발팀은 렌더링된 view modal이 열리는 것을 볼 수 있습니다.

 

초기 모달.
요청 타입을 선택하면 모달의 블럭들이 동적으로 재구성된다.

 

 

 

 

개발팀이 모달에서 각종 옵션 선택 및 모달 업데이트 (요청 타입 선택, 계정, 환경, 클러스터, 스키마, 테이블 선택 등)

슬랙 메세지는 Block으로 이루어집니다. Block에 특정한 Action(클릭, 선택, 입력 등)을 할 때마다 Slack은 등록된 URL로 Block Action 콜백을 호출합니다. 

수백 종류의 모든 액션들을 구분하고 처리해야합니다.

단일 책임 원칙을 잃지 않기 위해 전략 패턴을 적용했습니다만 글의 주제와 벗어나니 생략하겠습니다.

 

@RequiredArgsConstructor
@RestController
public class SlackController {

    private final MethodsClient slackClient;
    private final SlackRequestHandler slackRequestHandler;
    private final SlackService slackService;
    private static final JsonPayloadTypeDetector payloadTypeDetector = new JsonPayloadTypeDetector();

    @PostMapping(value = "/slack/callback", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
    public ResponseEntity<?> slackCallBack(@RequestParam String payload,
                                           @RequestBody String requestBody,
                                           @RequestHeader("X-Slack-Signature") String slackSignature,
                                           @RequestHeader("X-Slack-Request-Timestamp") String timestamp) throws IOException, SlackApiException {

        String decodedPayload = HtmlUtils.htmlUnescape(payload);
        validateRequestAuth(slackSignature, timestamp, requestBody);
        String payloadType = payloadTypeDetector.detectType(decodedPayload);

        List<LayoutBlock> layoutBlocks;
        View view;
        if (payloadType.equals("block_actions")) {
            BlockActionPayload blockActionPayload = GsonFactory.createSnakeCase()
                    .fromJson(payload, BlockActionPayload.class);
            log.info("BlockActionPayload: {}", blockActionPayload);
            view = blockActionPayload.getView();

            layoutBlocks = handleBlockAction(blockActionPayload);
        } else if (payloadType.equals("view_submission")) {
           // 생략
            return handleViewSubmission(viewSubmissionPayload);
        } else {
            throw new IllegalArgumentException("미지원 payload");
        }
        updateView(layoutBlocks, view);

        return ResponseEntity.ok("ok");
    }
    
    private List<LayoutBlock> handleBlockAction(BlockActionPayload blockActionPayload) throws JsonProcessingException {
        List<Action> actions = blockActionPayload.getActions();

        // message action
        if (blockActionPayload.getView() == null) {
            User user = blockActionPayload.getUser();
            Message message = blockActionPayload.getMessage();

            for (Action action : actions) {
                log.info("action: {}", action);
                validateRequestAcceptDoerAdmin(user.getId());
                if (action.getActionId().equals(SlackConstants.CommunicationBlockIds.commandRequestAcceptButtonBlockId)) {
                    slackRequestHandler.handleAccept(message, user.getId());
                } else if (action.getActionId().equals(SlackConstants.CommunicationBlockIds.commandRequestDenyButtonBlockId)) {
                    slackRequestHandler.handleDeny(message, user.getId());
                }
            }
            List<LayoutBlock> blocks = blockActionPayload.getMessage().getBlocks();
            return blocks;
        }

        // view modal action
        View view = blockActionPayload.getView();
        ViewState state = view.getState();
        List<LayoutBlock> blocks = view.getBlocks();
        for (Action action : actions) {
            log.info("action: {}", action);
            blocks = slackRequestHandler.handleAction(action, blocks, state.getValues());
        }
        return blocks;
    }
}

 

콜백 URL 이 단 하나만 지원되기 때문에 RequestHandler를 구현해서 사용했습니다.

결론만 설명하자면 action 에 따라 Block들을 갱신하고, updateView() 메서드의 view update api를 호출해서 모달이 업데이트 되는 흐름입니다.

사용자는 인식하지 못하지만, 클러스터 목록에서 SHOP 이라는 DB를 선택하면 'selectClusterName' 이라는 id를 가진 action이 callback 되고, backend 서버는 선택된 클러스터명을 토대로 MySQL에서 Database 목록을 가져와서 다음 옵션 dropdown 목록을 갱신합니다.

 

 

제출 요청 검증 - 실패

개발팀에서 모달의 모든 필드를 선택하고 내용을 입력했습니다. 이제 요청을 전송해야겠죠?

전송 버튼을 누르면 요청에 대한 validation이 발생합니다.

예를 들어, 테이블 명 컨벤션이 코드 상으로는 snake_case로 정의되어있는데 개발팀이 입력한 테이블명이 `AuthorityGroup` 이라면?

컨벤션 검사 코드에서 실패하게 되고, Exception에 Message가 담깁니다.

private ResponseEntity<String> handleViewSubmission(ViewSubmissionPayload viewSubmissionPayload) {
    List<LayoutBlock> blocks = viewSubmissionPayload.getView().getBlocks();
    ViewState state = viewSubmissionPayload.getView().getState();
    ViewSubmissionPayload.User slackUser = viewSubmissionPayload.getUser();
    try {
        CommandType findCommandType = findCommandType(state);
        String selectedDBMSName = SlackService.findCurrentValueFromState(state.getValues(), SlackConstants.CommandBlockIds.ClusterSchemaTable.findClusterSelectsElementActionId);
        DatabaseConnectionInfo selectedDatabaseConnectionInfo = DynamicDataSourceProperties.findByDbIdentifier(selectedDBMSName);
        RequestDTO requestDTO = slackRequestHandler.handleSubmission(findCommandType,
                state.getValues());
        List<LayoutBlock> requestMessageBlocks = slackRequestHandler.sendSubmissionRequestMessage(selectedDatabaseConnectionInfo, findCommandType, slackUser.getId(), requestDTO);
        slackService.sendBlockMessageWithMetadata(selectedDatabaseConnectionInfo, findCommandType, requestMessageBlocks, requestDTO);
    } catch (Exception e) {
        log.info("Exception: {}", e.getMessage());
        log.info("Exception trace: {}", e.getStackTrace());
        e.printStackTrace();
        return ResponseEntity.ok(displayErrorViewJsonString(e, blocks));
    }

    return ResponseEntity.ok(closeViewJsonString());
}

private String displayErrorViewJsonString(Exception e, List<LayoutBlock> blocks) {
    int errorBlockIndex = SlackService.findBlockIndex(blocks, "input", SlackConstants.ErrorBlockIds.errorMessageBlockId);
    String errorBlockId = blocks.get(errorBlockIndex).getBlockId();
    String errorMessage = e.getMessage().replace("\"", "\'");
    String errorViewResponseJson = "{\"response_action\":\"errors\",\"errors\": {\"" + errorBlockId + "\":\"" + errorMessage + "\"}}";
    log.info("errorViewResponseJson: {}", errorViewResponseJson);
    return errorViewResponseJson;
}

 


하위에서 발생하는 컨벤션 검사 로직을 수행할 때 exception이 발생하면, 상위 컨트롤러에서는 try catch로 이 Exception을 받아 view modal에 에러메세지를 업데이트해줍니다.

(BOLT를 사용하시면 이것보단 쉽게 구현할 수 있습니다. 저는 자유도를 위해 slack api만 사용했습니다.)

 

 

 

제출 요청 검증 - 성공

validation이 성공하면.

슬랙 채널에 해당 요청에 대한 요약 정보를 메세지로 전송합니다. 

실제 메세지. 좀 더 꾸밀 필요는 있어보입니다.

 

사용자는 인식하지 못하지만, 메세지 내부에는 요청 정보가 담긴 Metadata가 존재합니다.

Modal과 Message는 완전히 개별로 존재하는 엔티티이기 때문에. 이미 닫힌 모달에서 정보를 가져오지 못합니다. 데이터가 휘발되는 것입니다.

그래서 모달의 요청 정보를 메세지에 인젝션해야합니다.

public void sendBlockMessageWithMetadata(DatabaseConnectionInfo databaseConnectionInfo,
                                         DatabaseRequestCommandGroup.CommandType commandType,
                                         List<LayoutBlock> blocks,
                                         RequestDTO requestDTO) throws JsonProcessingException {
    if (blocks.size() < 1) {
        return;
    }
    log.info("block slack message: {}", blocks);
    Map<String, Object> metadataMap = new HashMap<>();
    metadataMap.put(SlackConstants.MetadataKeys.messageMetadataDatabaseConnectionInfo, JsonUtil.toJson(databaseConnectionInfo));
    metadataMap.put(SlackConstants.MetadataKeys.messageMetadataClass, JsonUtil.toJson(requestDTO.getClass()));
    metadataMap.put(SlackConstants.MetadataKeys.messageMetadataRequestDTO, JsonUtil.toJson(requestDTO));
    metadataMap.put(SlackConstants.MetadataKeys.messageMetadataCommandType, JsonUtil.toJson(commandType));

    // https://api.slack.com/metadata/using
    Message.Metadata metadata = Message.Metadata.builder()
            .eventType(SlackConstants.MetadataKeys.messageMetadataMapTypeName)
            .eventPayload(metadataMap)
            .build();

    ChatPostMessageRequest request = ChatPostMessageRequest.builder()
            .channel(DEFAULT_CHANNEL_ID)
            .blocks(blocks)
            .metadata(metadata)
            .build();
    ChatPostMessageResponse chatPostMessageResponse = null;
    try {
        chatPostMessageResponse = slackClient.chatPostMessage(request);
    } catch (SSLHandshakeException sslHandshakeException) {
        log.info(sslHandshakeException.getMessage());
    } catch (Exception e) {
        log.info(e.getMessage());
    }
    if (chatPostMessageResponse != null && chatPostMessageResponse.getWarning() == null && chatPostMessageResponse.isOk()) {
        log.info("chatPostMessageResponse: {}", chatPostMessageResponse);
    } else {
        log.error("chatPostMessageResponse: {}", chatPostMessageResponse);
    }
}

 

DatabaseConnectionInfo, RequestDTO, CommandType 등을 JsonUtil로 Json으로 만들어서 metadata 라는 클래스에 담아 메세지를 전송합니다.

(참고로 메타데이터의 크기는 슬랙 사에서 리밋을 걸어두었습니다. 요청 사이즈가 너무 커지면 압축 등의 전처리가 필요할 수 있습니다)

 

 

 

 

요청 승인

전송된 슬랙 메세지 스레드에서 논의가 모두 끝나면, DBA처럼 미리 권한을 부여받은 사람이 '승인' 버튼을 누르게 됩니다.

block action은 누구나 발생시킬 수 있으나, 백엔드 서버에서 검증이 이뤄지기 때문에 요청 메세지가 승인되거나 파기되지 않습니다.

// 승인자 검증 코드
private void validateRequestAcceptDoerAdmin(String slackUserId) {
    if (!SlackConfig.slackAdminUserIds.contains(slackUserId)) {
        throw new IllegalArgumentException("해당 user 가 처리할 수 없는 action 입니다.");
    }
}

 

 

정상적인 프로세스로 승인/거부 버튼 액션이 전송되면, 메세지는 파기됩니다. (승인/거부 버튼 사라짐)

private void resetAcceptDenyButtonBlock(List<LayoutBlock> requestBlocks, String requestAckMessage) {
    SectionBlock requestAckBlock = BasicBlockFactory.getMarkdownTextSection("request " + requestAckMessage, SlackConstants.CommunicationBlockIds.commandRequestAcceptDenyButtonBlockId);
    int acceptDenyBlockIndex = SlackService.findBlockIndex(requestBlocks, "actions", SlackConstants.CommunicationBlockIds.commandRequestAcceptDenyButtonBlockId);
    requestBlocks.set(acceptDenyBlockIndex, requestAckBlock);
}

 

그 후 Accepted 혹은 Denied 메세지가 전송됩니다.

 

백엔드 서버는 메세지 전송 후 실제 요청을 DB에 수행한 후 작업 시작, 종료 메세지를 차례대로 보냅니다.

(좀 예쁘게 꾸미긴 해야겠네요. 지금은 서비스로직을 감싼 AOP에서 전송되게 구현되어있는데, 복잡도가 높아서 리팩토링 예정입니다.)

 

 

 

 


 

 

여기까지 입니다.

 

위 코드들은 회사 dev,stage 환경에서 개인 용도로 스크립트성으로 제한적이게 활용중입니다. 회사 내부 사정상 보안 규정이 있어 prod에 적용하려면 보안팀과의 많은 난관이 있을 것 같습니다. 그래도 도전은 해봐야죠!

 

아직 기능이 활발하게 개발되고 있습니다. 모든 요청이 자동화될 필요는 없지만, 의미없이 리소스가 쓰이는 것을 방지하기 위해 정의된 아래 기능은 우선 모두 개발하고 있습니다.

public enum DatabaseRequestCommandGroup {
    DDL("ddl",
            List.of(CommandType.CREATE_INDEX,
                    CommandType.CREATE_TABLE,
                    CommandType.ADD_COLUMN,
                    CommandType.ALTER_COLUMN,
                    CommandType.DELETE_COLUMN,
                    CommandType.EXTEND_VARCHAR_COLUMN,
                    CommandType.RENAME_COLUMN,
                    CommandType.RENAME_TABLE)),
    DML("dml",
            List.of(CommandType.INSERT,
                    CommandType.UPDATE,
                    CommandType.DELETE)),
    SELECT("select",
            List.of(CommandType.SELECT)),
    MIGRATION("migration",
            List.of(CommandType.TABLE_MIGRATION,
                    CommandType.DATABASE_MIGRATION)),
    PARAMETER("parameter",
            List.of(CommandType.PARAMETER_STANDARD,
                    CommandType.SCHEMA_STANDARD)),
    METRIC("metric",
            List.of(CommandType.CPU_METRIC,
                    CommandType.MEMORY_METRIC,
                    CommandType.HLL_METRIC)),
    SCHEMA_OBJECT("schema_object",
            List.of(CommandType.PROCEDURE,
                    CommandType.FUNCTION,
                    CommandType.VIEW,
                    CommandType.TRIGGER)),
    ACCOUNT("account",
            List.of(CommandType.CREATE_USER,
                    CommandType.GRANT_PRIVILEGE,
                    CommandType.REVOKE_PRIVILEGE,
                    CommandType.SHOW_GRANTS)),



리소스가 풍부해서 UI/UX가 모두 좋으면 좋겠지만, 안정성이 우선이니 열심히 개발해보려 합니다~

 

관련해서 질문이 있으시면 댓글로 알려주세요

 

 

 

 

 

반응형