webflux가 netty 기반이라서, 아직 (Spring5) 기반에서는 request-timeout 설정이 없음.

각 request에서 mono.timout 이 설정이 가능하고,

아니면, 아래와 같이 WebFilter 설정을 통해서 전체 request-timeout 설정이 가능함.

 

그런데, 만약 비동기식으로 처리되고, 서버에서 on-line 배치 방식으로 처리된다면,

아래 WebFilter는 의미가 없을 테고, 해당 Mono에서 timeout 설정을 해야함.

@Slf4j
@Component
public class RequestTimeoutWebFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
    	// Request Timeout 설정. WebFlux는 아직 application.yml에서 request-timeout 설정이 없음.
    	// 최근 Tracking 이력 max값 199초. 최대 300초 기준으로 5분으로 산정함.
    	log.info("#################### WebFilter - start");
    	Mono<Void> mono = chain
    			.filter(exchange)
//    			.timeout(Duration.ofMinutes(5)); // 5 분
   		        .timeout(Duration.ofSeconds(10)); // test 
    	log.info("#################### WebFilter - end");
    	return mono;
    }
}

 

end.

 

1. Mono -> Flux 전환 / flatMapMany 예

@Test
public void flatMapMany_test001() {
// List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  List<Integer> list = new ArrayList<>();
  for(int i = 0; i < 100000; i++)
    list.add(i);

  // Publisher가 Mono 라서 List가 길어도, 1번으로 emit 되서, subscribe하면 mono -> Flux 전환된 데이터 모두 출력된다.
  Mono.just(list)
      .flatMapMany(n -> Flux.fromIterable(n))
      .map(n -> n * 10)
      .subscribe(n -> log.info(n + ""));
}

@Test
public void flatMapMany_test002() {
  List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

  // Publisher가 Mono 라서 List가 길어도 1번으로 emit 되서, subscribe하면 mono -> Flux 전환된 데이터 모두 출력된다.
  Mono.just(list)
      .flatMapMany(n -> Flux.fromIterable(n).map(x -> x * 10))
//   .map(n -> n * 10)
      .subscribe(n -> log.info(n + ""));
}

 

2. Mono -> Flux 전환 / flatMapMany 예

@Test
	public void test_sql03() {
		String COUNT_KEY = "COUNT_KEY";
		Map<String, Integer> cntMap = new HashMap<>();
		cntMap.put(COUNT_KEY, 0);
		
		String sql3 = "SELECT ORDER_ID, CUSTOMER_ID, STATUS, NVL(SALESMAN_ID, -1) AS SALESMAN_ID, ORDER_DATE FROM ORDERS ORDER BY ORDER_ID";
		
		AtomicReference<Long> startTime = new AtomicReference<>();
		
		Mono.from(
					ConnectionFactories.get(
						ConnectionFactoryOptions.builder()
							.option(ConnectionFactoryOptions.DRIVER, "oracle")
//							.option(OracleR2dbcOptions.DESCRIPTOR, "(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.20)(PORT = 1521)) (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.29)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = XEPDB1) ) )")
							.option(ConnectionFactoryOptions.HOST, "127.0.0.1")
							.option(ConnectionFactoryOptions.PORT, 1521)
							.option(ConnectionFactoryOptions.DATABASE, "XEPDB1")
							.option(ConnectionFactoryOptions.USER, "ot")
							.option(ConnectionFactoryOptions.PASSWORD, "1111")
							.build()
						).create()
				)
				.doOnNext(connection -> log.debug(connection.toString()))
				.flatMapMany(connection -> 
					Mono.from(connection.createStatement(sql3).execute())
						.flatMapMany(result -> 
								result.map(row -> {
									Orders orders = new Orders(row.get("ORDER_ID", Integer.class), row.get("CUSTOMER_ID", Integer.class), row.get("STATUS", String.class), row.get("SALESMAN_ID", Integer.class), row.get("ORDER_DATE", LocalDateTime.class));
									return orders;
								}
							)
						)
					    .doFinally((st) -> {
					    	connection.close();
					    	log.debug("### connection.close()");
					    })
				).doOnSubscribe(x -> startTime.set(System.nanoTime()))
				.doOnNext(n -> {
					cntMap.put(COUNT_KEY, cntMap.get(COUNT_KEY) + 1 ); // count row
				})
				.doAfterTerminate(() -> {
					log.info("Time taken : " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get()) + " milliseconds.");
					log.debug("## count = " + cntMap.get(COUNT_KEY));					
				})
				.subscribe(n -> log.debug(n.toString()));
		
		try {
			Thread.sleep(5000L);
		} catch (InterruptedException e) {
		}
	}

 

	@Test
	public void test_sql05() {
		log.info("### test_sql04() - start");
		
		String COUNT_KEY = "COUNT_KEY";
		Map<String, Integer> cntMap = new HashMap<>();
		cntMap.put(COUNT_KEY, 0);
		
		String sql3 = "SELECT ORDER_ID, CUSTOMER_ID, STATUS, NVL(SALESMAN_ID, -1) AS SALESMAN_ID, ORDER_DATE FROM ORDERS ORDER BY ORDER_ID";
		
		AtomicReference<Long> startTime = new AtomicReference<>();
		
		ConnectionFactory confactory = connectionFactory();
		
			
		Flux<Orders> xx = Mono.from(confactory.create())
		    .flatMapMany(connection ->     // return Flux. Flux로 받으려면 Mono의 flatMapMany로 Mono로 받으면 Mono의 FlatMap 으로.
				Mono.from(connection.createStatement(sql3).execute())		
				    .flatMapMany(result -> // return Flux
							result.map(row -> {
								Orders orders = new Orders(row.get("ORDER_ID", Integer.class), row.get("CUSTOMER_ID", Integer.class), row.get("STATUS", String.class), row.get("SALESMAN_ID", Integer.class), row.get("ORDER_DATE", LocalDateTime.class));
								return orders;				    	
							})
				    )
				    .doFinally((st) -> {
				    	connection.close();
				    	log.debug("### connection.close()");
				    })
		    );
				
		xx.subscribe(n -> log.debug(n.toString()));
				
		// 한건 테스트. block 으로 뽑음.
//		Orders yyy = xx.blockFirst();
//		log.debug("### yyy = " + yyy.toString());

		try {
			Thread.sleep(15000L);
		} catch (InterruptedException e) {
		}
	}
    
      public ConnectionFactory connectionFactory() {
	    ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
	        .option(ConnectionFactoryOptions.DRIVER, "pool")
	        .option(ConnectionFactoryOptions.PROTOCOL, "oracle")
			.option(OracleR2dbcOptions.DESCRIPTOR, "(DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.19)(PORT = 1521)) (ADDRESS = (PROTOCOL = TCP)(HOST = 127.0.0.1)(PORT = 1521)) ) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = XEPDB1) ) )")
			.option(ConnectionFactoryOptions.USER, "ot")
			.option(ConnectionFactoryOptions.PASSWORD, "1111")
	        .build());
	    
//	    Mono<Connection> con1 = (Mono<Connection>) connectionFactory.create();
//	    Flux<Connection> con2 = (Flux<Connection>) connectionFactory.create();
	    
	    int initialSize = 1;
	    int maxSize = 5;
	    ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
	        .maxIdleTime(Duration.ofMinutes(30))
	        .initialSize(initialSize)
	        .maxSize(maxSize)
	        .maxCreateConnectionTime(Duration.ofSeconds(1))
//	        .maxAcquireTime(Duration.ofSeconds(1))
//	        .validationQuery("SELECT 1 FROM DUAL")
//	        .maxValidationTime(Duration.ofSeconds(2))
	        .build();
	    		    
	    return new ConnectionPool(configuration);
	  }

'Programming Language > Reactive Programing' 카테고리의 다른 글

Reactor Hooks 관련 글 모음  (0) 2023.10.06

 

JSON (JavaScript Object Notation)은 가벼운 데이터 교환 형식입니다. 
JavaScript 프로그래밍 언어 표준 ECMA-262 3판 - 1999년 12월의 하위 집합을 기반으로 합니다. 
JSON은 완전히 언어 독립적인 텍스트 형식이지만 C를 비롯한 C 계열 언어의 프로그래머에게 친숙한 규칙을 사용합니다. 
이러한 속성은 JSON을 이상적인 데이터 교환 언어로 만듭니다.

JSON은 두 가지 구조를 기반으로 합니다.

1. 이름/값 쌍의 모음. 다양한 언어에서 이것은 객체 , 레코드, 구조체, 사전, 해시 테이블, 키 목록 또는 연관 배열 로 실현됩니다 .
 - {} : 객체로 이름/값 의 정렬되지 않은 집합

 

2. 정렬된 값 목록. 대부분의 언어에서 이는 배열 , 벡터, 목록 또는 시퀀스로 실현됩니다.
 - [] : 배열로 정렬 된 값 모음. 배열에 객체를 다양하게 담을 수 있기 때문에, 처음 의도와 달리 순수한 정렬된 값 목록으로는 사용되지 않음.

 

9.양식

1) whitespace 는 공백("", " "), 캐리지리턴, 폼피드, 탭가 있음.

2) 제어문자

\b 백스페이스

\f 폼 피드

\n 개행

\r 캐리지 리턴

\t 탭

\" 따옴표

\/ 슬래시

\\ 역슬래시

\uHHHH  16진수 네자리로되어 있는 유니코드 문자

 

관련사이트

https://www.json.org/

https://ko.wikipedia.org/wiki/JSON

 {
    "키1": "가나다",
    "키2": 999,
    "배열1": ["A", "B"],
    "객체1": {"키1": 0, "키2": "문자1", "키3": "문자2"},
 }

end.

 

 

출력 시 기본 터미널 크기(80byte)에 맞추기 위해서 데이터 프레임이 잘려서 나눠 출력되는데,

터미널 사이즈를 조정해서 출력한다.

from shutil import get_terminal_size

# 터미널 크기에 맞게 출력함
pd.set_option('display.width', get_terminal_size()[0])
print(df)

# 최대 1000 사이즈로 출력함
pd.set_option('display.width', 1000)
print(df)

'Programming Language > Python' 카테고리의 다른 글

python 문제 해결 목록  (0) 2024.04.29
python DB 연결  (0) 2024.04.29
Python 모듈 실행시 ModuleNotFoundError 처리  (0) 2022.03.25
[Python] pandas - DataFrame Text 출력  (0) 2022.03.19
[Python] 클래스  (0) 2022.03.14

+ Recent posts