一,
一元运算,二元运算,flatMap,reduce,min
flatMap扁平流,将多个流组合,或者整合元素中的元素
reduce是把一组值变成一个值。
min是标准化的一个reduce操作。
@Test
public void test01(){
UnaryOperator<Integer> unary = t -> t + 1;
System.out.println(unary.apply(2));
// 3
BinaryOperator<Integer> binary = (x, y) -> x + 2 * y;
System.out.println(binary.apply(2,3));
// 8
List<Integer> list1 = Lists.newArrayList(1, 2, 3);
List<Integer> list2 = Lists.newArrayList(4, 5, 6);
List<Integer> list = Stream.of(list1, list2)
.flatMap(Collection::stream)
.collect(Collectors.toList());
System.out.println(list);
// [1, 2, 3, 4, 5, 6]
List<String> demoList = Lists.newArrayList("2", "3", "1", "6", "4", "5", "10");
String min = demoList.stream()
.min(Comparator.comparing(Integer::valueOf))
.get();
System.out.println(min);
// 1
String reduce = demoList.stream().reduce("0", (acc, ele) -> acc + ele);
System.out.println(reduce);
// 023164510
}
二,
Java8的日志优化
Java8的基本类型流(装箱和拆箱)
/**
* Java8基本类型流
* int字节为4字节,而Integer类型是16字节
* 将流中元素变成基本类型进行运算,节省空间
*/
@Test
public void test02(){
// mapToInt 拆箱 T -> int
List<Integer> demoList = Lists.newArrayList(3,1,2);
final IntStream intStream = demoList.stream().mapToInt(Integer::intValue);
// mapToObj 装箱 int -> T 或者使用 intStream.boxed()
final Stream<Integer> integerStream = intStream.mapToObj(Integer::new);
}
/**
* 使用lambda表达式简化日志代码
*/
@Test
public void test03(){
logger logger = new logger();
logger.debug(() -> "look at this");
}
三,
流是否有序取决于集合的类型
但是使用并行流时,forEach 方法不能保证元素是有序的,
如果需要保证按顺序处理,应该使用forEachOrdered 方法。
Java8的map新方法computeIfAbsent, 额外提供一个默认值, 缓存map
Java8的map新方法foreach, 可以接受一个BiConsumer函数
@Test
public void test04(){
List<Integer> demoList = Lists.newArrayList(3, 1, 2);
demoList.parallelStream().forEachOrdered(System.out::println);
// 3
// 1
// 2
Map<String, List<Integer>> map = Maps.newHashMap();
map.put("a", Lists.newArrayList(1,2,3));
map.put("b", Lists.newArrayList(4,5,6));
List<Integer> cValue = map.computeIfAbsent("c", value -> Lists.newArrayList(7, 8, 9));
System.out.println(cValue); //[7, 8, 9]
List<Integer> bValue = map.computeIfAbsent("b", value -> Lists.newArrayList(7, 8, 9));
System.out.println(bValue); //[4, 5, 6]
map.forEach((key, value) -> {
System.out.println("key: " + key + ", value: " + value);
});
//key: a, value: [1, 2, 3]
//key: b, value: [4, 5, 6]
//key: c, value: [7, 8, 9]
}
四,
收集器Collectors
使用收集器进行字符串拼接joining
使用收集器将流转换成值(最大值maxBy,最小值minBy,平均值averagingInt, 统计个数counting)
使用收集器将流切分(分块partitioningBy,分组groupingBy)
组合收集器,计算每组的数量,groupingBy,mapping
归一化处理 例: 累加 reducing(init, (acc, ele) -> acc + ele)
@Test
public void test05(){
List<String> list1 = Lists.newArrayList("1", "2", "3", "4");
List<String> list2 = Lists.newArrayList("1", "2");
List<String> list3 = Lists.newArrayList("1", "2", "3");
List<List<String>> list = Lists.newArrayList(list1, list2, list3);
String str = list1.stream().collect(Collectors.joining(",", "{", "}"));
System.out.println(str); // {1,2,3,4}
// 定义一个比较器 (y-x, 值越小权重越大)
Comparator<List<String>> comparator = (x, y) -> {
return y.size() - x.size();
};
List<String> resultList = list.stream().collect(Collectors.maxBy(comparator)).get();
// 可以简写成: List<String> resultList = list.stream().max(comparator).get();
System.out.println(resultList); // [1, 2]
Double avg = list1.stream().collect(Collectors.averagingInt(ele -> Integer.parseInt(ele)));
System.out.println(avg); // 2.5
Map<Boolean, List<String>> partitions = list1.stream().collect(Collectors.partitioningBy(ele -> Integer.parseInt(ele) > 1));
System.out.println(partitions); // {false=[1], true=[2, 3, 4]}
Map<String, List<String>> groups = list1.stream().collect(Collectors.groupingBy(ele -> Integer.parseInt(ele) == 2 ? "two" : "other"));
System.out.println(groups); // {other=[1, 3, 4], two=[2]}
Map<String, Long> groupsCount = list1.stream().collect(Collectors.groupingBy(ele -> Integer.parseInt(ele) == 2 ? "two" : "other", Collectors.counting()));
System.out.println(groupsCount); // {other=3, two=1}
Map<String, Set<Integer>> collect = list1.stream().collect(Collectors.groupingBy(ele -> Integer.parseInt(ele) == 2 ? "two" : "other", Collectors.mapping(ele -> Integer.parseInt(ele) * 2, Collectors.toSet())));
System.out.println(collect); // {other=[2, 6, 8], two=[4]} 上游收集器,下游收集器,组合使用
Integer sum = list1.stream().map(Integer::parseInt).collect(Collectors.reducing(0, (acc, ele) -> acc + ele));
System.out.println(sum); // 10
}
五,
并行流 parallelStream
内部使用的是Fork/Join框架,但不是所有的场景都适合并行流。
有些数据结构难于分解,比如,可能要花 O(N) 的时间复杂度来分解问题。其中包括LinkedList,对半分解太难了。
还有 Streams.iterate 和 BufferedReader.lines,它们长度未知.
如果能避开有状态,选用无状态操作,就能获得更好的并行性能。无状态操作包括 map、filter 和 flatMap,有状态操作包括 sorted、distinct 和 limit。
影响性能的五要素是:数据大小、源数据结构、值是否装箱、可用的 CPU 核数量,以及处理每个元素所花的时间
Java 8 还引入了一些针对数组的并行操作,脱离流框架也可以使用 Lambda 表达式。
(数组初始化parallelSetAll,并行排序parallelSort)
parallelPrefix 操作擅长对时间序列数据做累加,它会更新一个数组,将每一个元素替换
为当前元素和其前驱元素的和,这里的“和”是一个宽泛的概念,它不必是加法,可以是任意一个 BinaryOperator
@Test
public void test06(){
// 使用并行化数组操作初始化数
int[] arr = new int[8];
Arrays.parallelSetAll(arr, i -> new Random().nextInt(10));
System.out.println(Arrays.toString(arr));
// [2, 3, 8, 3, 6, 9, 4, 1]
// 并行排序
Arrays.parallelSort(arr);
System.out.println(Arrays.toString(arr));
// [1, 2, 3, 3, 4, 6, 8, 9]
// parallelPrefix
int[] arr2 = new int[]{1, 2, 3, 4, 5};
Arrays.parallelPrefix(arr2, Integer::sum);
System.out.println(Arrays.toString(arr2));
// [1, 3, 6, 10, 15]
}
六,
CompletableFuture功能介绍与原理分析
多线程处理 CompletableFuture
thenCompose 可以用于组合多个CompletableFuture,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着先后顺序。
thenCompose生成了一个新的CompletableFuture。
thenApply还是原来的CompletableFuture,只是泛型从Dept转换成User。
thenCombine会在两个任务都执行完成后,执行不分先后,把两个任务的结果合并。
whenComplete继续使用当前线程,执行剩余任务,而thenApply从线程池获取
exceptionally运行时异常
cancel、complete和completeExceptionally会尝试对未完成的CompletableFuture进行赋值(赋传入值或异常结果)并触发后续任务,
若赋值时CompletableFuture已完成则赋值操作无效。
@Test
public void test07(){
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int x = new Random().nextInt(10);
System.out.println("x = " + x);
return x;
}).thenCompose(x -> CompletableFuture.supplyAsync(() -> {
int y = new Random().nextInt(10);
System.out.println("y = " + y);
int sum = x + y;
// int sum = x + y/0;
System.out.println("x + y = " + sum);
return sum;
})).whenComplete((t, e)->{
if (e != null) {
System.out.println("e1: " + e.getMessage());
}
System.out.println("t = " + t);
}).thenCombine(CompletableFuture.supplyAsync(() -> {
int z = new Random().nextInt(10);
System.out.println("z = " + z);
return z;
}), (result1, result2) -> result1 * result2)
.exceptionally(e -> {
System.out.println("e2: " + e.getCause().getMessage());
return -1;
});
boolean flag = future.complete(100);
System.out.println("flag: " + flag + ", result: " + future.join());
//x = 0
//z = 8
//y = 1
//x + y = 1
//t = 1
//flag: true, result: 100
}
七,
日志和打印消息
流有一个方法让你能查看每个值,同时能继续操作流。这就是 peek 方法。
使用 peek 方法还能以同样的方式,将输出定向到现有的日志系统中,比如 log4j、java.util.logging 或者 slf4j
记录日志这是 peek 方法的用途之一。为了像调试循环那样一步一步跟踪,可在 peek 方法中加入断点,这样就能逐个调试流中的元素了。
此时,peek 方法可知包含一个空的方法体,只要能设置断点就行。有一些调试器不允许在空的方法体中设置断点,
此时,我将值简单地映射为其本身,这样就有地方设置断点了,虽然这样做不够完美,但只要能工作就行
@Test
public void test08(){
Set<String> nationalities
= album.getMusicians()
.filter(artist -> artist.getName().startsWith("The"))
.map(artist -> artist.getNationality())
.peek(nation -> System.out.println("Found nationality: " + nation))
.collect(Collectors.<String>toSet());
}
八,
Lambda表达式的SOLID原则
领域专业语言 DSL 分为两类:内部 DSL 和外部 DSL。
外部 DSL 脱离程序源码编写,然后单独解析和实现。比如级联样式表(CSS)和正则表达式,就是常用的外部 DSL。
内部 DSL嵌入编写它们的编程语言中,某种角度上来说,就是普通的类库,提供 API 方便使用,比如 JMock 和 Mockito
Lambda一定程度上改变了传统的设计模式,命令者模式,策略模式,观察者模式,模板方法模式
方法引用在一定程度上替代了继承关系
使用Lambda表达式的SOLID原则
Single responsibility、Open/closed、Liskov substitution、Interface segregation、Dependency inversion
// 单一功能原则 Single responsibility
// 程序中的类或方法只能有一个改变的理由
// 使用集合流重构质数计数程序
public long countPrimes(int upTo) {
return IntStream.range(1, upTo)
.parallel()
.filter(this::isPrime)
.count();
}
private boolean isPrime(int number) {
return IntStream.range(2, number)
.allMatch(x -> (number % x) != 0);
}
// 开闭原则 Open/closed
// 软件应该对扩展开放,对修改闭合
@Test
public void test09(){
// ThreadLocal 日期格式化器
ThreadLocal<SimpleDateFormat> localFormatter = ThreadLocal.withInitial(SimpleDateFormat::new);
SimpleDateFormat formatter = localFormatter.get();
// ThreadLocal 标识符
AtomicInteger atomicInteger = new AtomicInteger();
ThreadLocal<Integer> localId = ThreadLocal.withInitial(atomicInteger::getAndIncrement);
Integer threadId = localId.get();
System.out.println(threadId);
}
// 依赖反转原则 Dependency inversion
// 抽象不应依赖细节,细节应该依赖抽象。
// 依赖反转原则另外值得注意的一点是待依赖的抽象不必是接口,通常使用高阶函数管理资源
// 传统方式,解析文件标题头
public List<String> findHeadings(Reader input) {
try (BufferedReader reader = new BufferedReader(input)) {
return reader.lines()
.filter(line -> line.endsWith(":"))
.map(line -> line.substring(0, line.length() - 1))
.collect(toList());
} catch (IOException e) {
throw new HeadingLookupException(e);
}
}
// Lambda方式,解析文件标题头
public List<String> findHeadings(Reader input) {
return withLinesOf(input,
lines -> lines.filter(line -> line.endsWith(":"))
.map(line -> line.substring(0, line.length()-1))
.collect(toList()),
HeadingLookupException::new);
}
private <T> T withLinesOf(Reader input,
Function<Stream<String>, T> handler,
Function<IOException, RuntimeException> error) {
try (BufferedReader reader = new BufferedReader(input)) {
return handler.apply(reader.lines());
} catch (IOException e) {
throw error.apply(e);
}
}