1. Mono -> Flux 전환 / flatMapMany 예
@Test
public void flatMapMany_test001() {
// List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
List<Integer> list = new ArrayList<>();
for(int i = 0; i < 100000; i++)
list.add(i);
// Publisher가 Mono 라서 List가 길어도, 1번으로 emit 되서, subscribe하면 mono -> Flux 전환된 데이터 모두 출력된다.
Mono.just(list)
.flatMapMany(n -> Flux.fromIterable(n))
.map(n -> n * 10)
.subscribe(n -> log.info(n + ""));
}
@Test
public void flatMapMany_test002() {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// Publisher가 Mono 라서 List가 길어도 1번으로 emit 되서, subscribe하면 mono -> Flux 전환된 데이터 모두 출력된다.
Mono.just(list)
.flatMapMany(n -> Flux.fromIterable(n).map(x -> x * 10))
// .map(n -> n * 10)
.subscribe(n -> log.info(n + ""));
}
2. Mono -> Flux 전환 / flatMapMany 예
@Test
public void test_sql03() {
String COUNT_KEY = "COUNT_KEY";
Map<String, Integer> cntMap = new HashMap<>();
cntMap.put(COUNT_KEY, 0);
String sql3 = "SELECT ORDER_ID, CUSTOMER_ID, STATUS, NVL(SALESMAN_ID, -1) AS SALESMAN_ID, ORDER_DATE FROM ORDERS ORDER BY ORDER_ID";
AtomicReference<Long> startTime = new AtomicReference<>();
Mono.from(
ConnectionFactories.get(
ConnectionFactoryOptions.builder()
.option(ConnectionFactoryOptions.DRIVER, "oracle")
// .option(OracleR2dbcOptions.DESCRIPTOR, "(DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.20)(PORT = 1521)) (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.29)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = XEPDB1) ) )")
.option(ConnectionFactoryOptions.HOST, "127.0.0.1")
.option(ConnectionFactoryOptions.PORT, 1521)
.option(ConnectionFactoryOptions.DATABASE, "XEPDB1")
.option(ConnectionFactoryOptions.USER, "ot")
.option(ConnectionFactoryOptions.PASSWORD, "1111")
.build()
).create()
)
.doOnNext(connection -> log.debug(connection.toString()))
.flatMapMany(connection ->
Mono.from(connection.createStatement(sql3).execute())
.flatMapMany(result ->
result.map(row -> {
Orders orders = new Orders(row.get("ORDER_ID", Integer.class), row.get("CUSTOMER_ID", Integer.class), row.get("STATUS", String.class), row.get("SALESMAN_ID", Integer.class), row.get("ORDER_DATE", LocalDateTime.class));
return orders;
}
)
)
.doFinally((st) -> {
connection.close();
log.debug("### connection.close()");
})
).doOnSubscribe(x -> startTime.set(System.nanoTime()))
.doOnNext(n -> {
cntMap.put(COUNT_KEY, cntMap.get(COUNT_KEY) + 1 ); // count row
})
.doAfterTerminate(() -> {
log.info("Time taken : " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime.get()) + " milliseconds.");
log.debug("## count = " + cntMap.get(COUNT_KEY));
})
.subscribe(n -> log.debug(n.toString()));
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
}
}
@Test
public void test_sql05() {
log.info("### test_sql04() - start");
String COUNT_KEY = "COUNT_KEY";
Map<String, Integer> cntMap = new HashMap<>();
cntMap.put(COUNT_KEY, 0);
String sql3 = "SELECT ORDER_ID, CUSTOMER_ID, STATUS, NVL(SALESMAN_ID, -1) AS SALESMAN_ID, ORDER_DATE FROM ORDERS ORDER BY ORDER_ID";
AtomicReference<Long> startTime = new AtomicReference<>();
ConnectionFactory confactory = connectionFactory();
Flux<Orders> xx = Mono.from(confactory.create())
.flatMapMany(connection -> // return Flux. Flux로 받으려면 Mono의 flatMapMany로 Mono로 받으면 Mono의 FlatMap 으로.
Mono.from(connection.createStatement(sql3).execute())
.flatMapMany(result -> // return Flux
result.map(row -> {
Orders orders = new Orders(row.get("ORDER_ID", Integer.class), row.get("CUSTOMER_ID", Integer.class), row.get("STATUS", String.class), row.get("SALESMAN_ID", Integer.class), row.get("ORDER_DATE", LocalDateTime.class));
return orders;
})
)
.doFinally((st) -> {
connection.close();
log.debug("### connection.close()");
})
);
xx.subscribe(n -> log.debug(n.toString()));
// 한건 테스트. block 으로 뽑음.
// Orders yyy = xx.blockFirst();
// log.debug("### yyy = " + yyy.toString());
try {
Thread.sleep(15000L);
} catch (InterruptedException e) {
}
}
public ConnectionFactory connectionFactory() {
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(ConnectionFactoryOptions.DRIVER, "pool")
.option(ConnectionFactoryOptions.PROTOCOL, "oracle")
.option(OracleR2dbcOptions.DESCRIPTOR, "(DESCRIPTION = (ADDRESS_LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.19)(PORT = 1521)) (ADDRESS = (PROTOCOL = TCP)(HOST = 127.0.0.1)(PORT = 1521)) ) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = XEPDB1) ) )")
.option(ConnectionFactoryOptions.USER, "ot")
.option(ConnectionFactoryOptions.PASSWORD, "1111")
.build());
// Mono<Connection> con1 = (Mono<Connection>) connectionFactory.create();
// Flux<Connection> con2 = (Flux<Connection>) connectionFactory.create();
int initialSize = 1;
int maxSize = 5;
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMinutes(30))
.initialSize(initialSize)
.maxSize(maxSize)
.maxCreateConnectionTime(Duration.ofSeconds(1))
// .maxAcquireTime(Duration.ofSeconds(1))
// .validationQuery("SELECT 1 FROM DUAL")
// .maxValidationTime(Duration.ofSeconds(2))
.build();
return new ConnectionPool(configuration);
}