1. Mono -> Flux 전환 / flatMapMany 예

@Test
public void flatMapMany_test001() {
// List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  List<Integer> list = new ArrayList<>();
  for(int i = 0; i < 100000; i++)
    list.add(i);

  // Publisher가 Mono 라서 List가 길어도, 1번으로 emit 되서, subscribe하면 mono -> Flux 전환된 데이터 모두 출력된다.
  Mono.just(list)
      .flatMapMany(n -> Flux.fromIterable(n))
      .map(n -> n * 10)
      .subscribe(n -> log.info(n + ""));
}

@Test
public void flatMapMany_test002() {
  List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

  // Publisher가 Mono 라서 List가 길어도 1번으로 emit 되서, subscribe하면 mono -> Flux 전환된 데이터 모두 출력된다.
  Mono.just(list)
      .flatMapMany(n -> Flux.fromIterable(n).map(x -> x * 10))
//   .map(n -> n * 10)
      .subscribe(n -> log.info(n + ""));
}

 

2. Mono -> Flux 전환 / flatMapMany 예

@Test
	public void test_sql03() {
		String COUNT_KEY = "COUNT_KEY";
		Map<String, Integer> cntMap = new HashMap<>();
		cntMap.put(COUNT_KEY, 0);
		
		String sql3 = "SELECT ORDER_ID, CUSTOMER_ID, STATUS, NVL(SALESMAN_ID, -1) AS SALESMAN_ID, ORDER_DATE FROM ORDERS ORDER BY ORDER_ID";
		
		AtomicReference<Long> startTime = new AtomicReference<>();
		
		Mono.from(
					ConnectionFactories.get(
						ConnectionFactoryOptions.builder()
							.option(ConnectionFactoryOptions.DRIVER, "oracle")
//							.option(OracleR2dbcOptions.DESCRIPTOR, "(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.20)(PORT = 1521)) (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.29)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = XEPDB1) ) )")
							.option(ConnectionFactoryOptions.HOST, "127.0.0.1")
							.option(ConnectionFactoryOptions.PORT, 1521)
							.option(ConnectionFactoryOptions.DATABASE, "XEPDB1")
							.option(ConnectionFactoryOptions.USER, "ot")
							.option(ConnectionFactoryOptions.PASSWORD, "1111")
							.build()
						).create()
				)
				.doOnNext(connection -> log.debug(connection.toString()))
				.flatMapMany(connection -> 
					Mono.from(connection.createStatement(sql3).execute())
						.flatMapMany(result -> 
								result.map(row -> {
									Orders orders = new Orders(row.get("ORDER_ID", Integer.class), row.get("CUSTOMER_ID", Integer.class), row.get("STATUS", String.class), row.get("SALESMAN_ID", Integer.class), row.get("ORDER_DATE", LocalDateTime.class));
									return orders;
								}
							)
						)
					    .doFinally((st) -> {
					    	connection.close();
					    	log.debug("### connection.close()");
					    })
				).doOnSubscribe(x -> startTime.set(System.nanoTime()))
				.doOnNext(n -> {
					cntMap.put(COUNT_KEY, cntMap.get(COUNT_KEY) + 1 ); // count row
				})
				.doAfterTerminate(() -> {
					log.info("Time taken : " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get()) + " milliseconds.");
					log.debug("## count = " + cntMap.get(COUNT_KEY));					
				})
				.subscribe(n -> log.debug(n.toString()));
		
		try {
			Thread.sleep(5000L);
		} catch (InterruptedException e) {
		}
	}

 

	@Test
	public void test_sql05() {
		log.info("### test_sql04() - start");
		
		String COUNT_KEY = "COUNT_KEY";
		Map<String, Integer> cntMap = new HashMap<>();
		cntMap.put(COUNT_KEY, 0);
		
		String sql3 = "SELECT ORDER_ID, CUSTOMER_ID, STATUS, NVL(SALESMAN_ID, -1) AS SALESMAN_ID, ORDER_DATE FROM ORDERS ORDER BY ORDER_ID";
		
		AtomicReference<Long> startTime = new AtomicReference<>();
		
		ConnectionFactory confactory = connectionFactory();
		
			
		Flux<Orders> xx = Mono.from(confactory.create())
		    .flatMapMany(connection ->     // return Flux. Flux로 받으려면 Mono의 flatMapMany로 Mono로 받으면 Mono의 FlatMap 으로.
				Mono.from(connection.createStatement(sql3).execute())		
				    .flatMapMany(result -> // return Flux
							result.map(row -> {
								Orders orders = new Orders(row.get("ORDER_ID", Integer.class), row.get("CUSTOMER_ID", Integer.class), row.get("STATUS", String.class), row.get("SALESMAN_ID", Integer.class), row.get("ORDER_DATE", LocalDateTime.class));
								return orders;				    	
							})
				    )
				    .doFinally((st) -> {
				    	connection.close();
				    	log.debug("### connection.close()");
				    })
		    );
				
		xx.subscribe(n -> log.debug(n.toString()));
				
		// 한건 테스트. block 으로 뽑음.
//		Orders yyy = xx.blockFirst();
//		log.debug("### yyy = " + yyy.toString());

		try {
			Thread.sleep(15000L);
		} catch (InterruptedException e) {
		}
	}
    
      public ConnectionFactory connectionFactory() {
	    ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
	        .option(ConnectionFactoryOptions.DRIVER, "pool")
	        .option(ConnectionFactoryOptions.PROTOCOL, "oracle")
			.option(OracleR2dbcOptions.DESCRIPTOR, "(DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.19)(PORT = 1521)) (ADDRESS = (PROTOCOL = TCP)(HOST = 127.0.0.1)(PORT = 1521)) ) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = XEPDB1) ) )")
			.option(ConnectionFactoryOptions.USER, "ot")
			.option(ConnectionFactoryOptions.PASSWORD, "1111")
	        .build());
	    
//	    Mono<Connection> con1 = (Mono<Connection>) connectionFactory.create();
//	    Flux<Connection> con2 = (Flux<Connection>) connectionFactory.create();
	    
	    int initialSize = 1;
	    int maxSize = 5;
	    ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
	        .maxIdleTime(Duration.ofMinutes(30))
	        .initialSize(initialSize)
	        .maxSize(maxSize)
	        .maxCreateConnectionTime(Duration.ofSeconds(1))
//	        .maxAcquireTime(Duration.ofSeconds(1))
//	        .validationQuery("SELECT 1 FROM DUAL")
//	        .maxValidationTime(Duration.ofSeconds(2))
	        .build();
	    		    
	    return new ConnectionPool(configuration);
	  }
728x90

'Programming Language > Reactive Programing' 카테고리의 다른 글

Reactor Hooks 관련 글 모음  (0) 2023.10.06

+ Recent posts