-
Notifications
You must be signed in to change notification settings - Fork 1
/
202006.txt
843 lines (751 loc) · 54.2 KB
/
202006.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
Stream类库的实现使用流水线(Pipeline)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。
Stream操作分类:
1. 中间操作:Intermediate operations
1.1 无状态:Stateless: unordered(), filter(), map(), mapToInt(), mapToLong(), mapToDouble(),
1.2 有状态:Stateful: distinct(), sorted(), limit(), skip()
2. 结束操作:Terminal operations
2.1 非短路操作: forEach(), forEachOrdered(), toArray(), reduce(), collect(), max(), min(), count()
2.2 短路操作:short circuiting: anyMatch(), allMatch(), noneMatch(), findFirst(), findAny()
Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态和有状态,无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如找到第一个满足条件的元素。之所以要进行如此精细的划分,是因为底层对每一种的处理方式不同。
如果非Stream,多次迭代的弊端如下:
1. 迭代次数多。迭代次数跟函数调用的次数相等。
2. 频繁产生中间结果。每次函数调用都产生一次中间结果,存储开销无法接受。
/* Helper class for executing stream pipelines, capturing all of the information about a stream
pipeline(output shape, intermediate operations, stream flags, parallelism, etc) in one place.
A #PipelineHelper describes the initial segment of a stream pipeline, including its source,
intermediate operations, and may additionally incorporate information about the terminal
(or stateful) operation which follows the last intermediate operation described by this
#PipelineHelper. The #PipelineHelper is passed to
TerminalOp#evaluateParallel(PipelineHelper, java.util.Spliterator),
TerminalOp#evaluateSequential(PipelineHelper, java.util.Spliterator), and
AbstractPipeline#opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.
IntFunction), methods, which can use the #PipelineHelper to access information about the
pipeline such as head shape, stream flags, and size, and use the helper methods such as
#wrapAndCopyInto(Sink, Spliterator), #copyInto(Sink, Spliterator), and #wrapSink(Sink) to
execute pipeline operations.
*/
abstract class PipelineHelper<P_OUT> {
...
}
Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。和Stream相关类和接口的如下:
PipelineHelper, AbstractPipeline, ReferencePipeline, Head, StatelessOp, StatefulOp, BaseStream, Stream
还有IntPipeline, LongPipeline, DoublePipeling,这三个类是专门为基本类型(不是包装类型)而定制,更ReferencePipeline是并列关系。其中Head用于表示第一个Stage,即调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作;StatelessOp和StatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。
通过Collection.stream()方法得到Head也就是Stage0,紧接着调用一系列的中间操作,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Steam记录操作的方式。
Stream pipelines may execute either sequentially or in parallel. This execution mode is a property of the stream. Streams are created with an initial choice of sequential or parallel execution. (For example, Collection#stream() create a sequential stream, and Collection#parallelStream() creates a parallel one.) This choice of execution mode may be modified by the #sequential() or #parallel() methods, and may be queried with the #isParallel() method.
// Factory methods for transforming streams into sorted stream.
final class SortedOps {
...
// #Sink for implementing sort on reference streams.
private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
private ArrayList<T> list;
RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
super(sink, comparator);
}
@Override
public void begin(long size) {
if (size >= Nodes.MAX_ARRAY_SIZE)
throw new IllegalArgumentException(Nodes.BAD_SIZE);
list = (size >= 0) ? new ArrayList<T>((int)size) : new ArrayList<T>();
}
@Override
public void end() {
list.sort(comparator);
downstream.begin(list.size());
if (!cancellationWasRequested) {
list.forEach(downstream::accept);
} else {
for (T t : list) {
if (downstream.cancellationRequested())
break;
downstream.accept(t);
}
}
downstream.end();
list = null;
}
@Override
public void accept(T t) {
list.add(t);
}
}
...
}
/* Abstract base class for "pipeline" classes, which are the core implementations of the
Stream interface and its primitive specializations. Manages construction and evaluation of stream
pipelines.
An #AbstractPipeline represents an initial portion of a stream pipeline, encapsulating a stream source and zero or more intermediate operations. The individual #AbstractPipeline objects are often referred to as stages, where each stage describes either the stream source or an intermediate operation.
A concrete intermediate stage is generally built from an #AbstractPipeline, a shape-specific pipeline class which extends it(e.g., #IntPipeline) which is also abstract, and an operation-specific concrete class which extends that. #AbstractPipeline contains most of the mechanics of evaluating the pipeline, and implements methods that will be used by the operation; the shape-specific classes add helper methods for dealing with collection of results into the appropriate shape-specific containers.
After chaining a new intermediate operation, or executing a terminal operation, the stream is considered to be consumed, and no more intermediate or terminal operations are permitted on this stream instance.
Note:
For sequential streams, and parallel streams without stateful intermediate operations, parallel streams, pipeline evaluation is done in a single pass that "jams" all the operations together. For parallel streams with stateful operations, execution is divided into segments, where each stateful operations marks the end of a segment, and each segment is evaluated separately and the result used as the input to the next segment. In all cases, the source data is not consumed until a terminal operation begins.
*/
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
private static final String MSG_CONSUMED = "source already consumed or closed";
//Backlink to the head of the pipeline chain(self if this is the source stage).
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
//The "upstream" pipeline, or null if this is the source stage.
@SuppressWarnings("rawtypes")
private final AbstractPipeline previousStage;
//The operation flags for the intermediate operation represented by this pipeline object.
protected final int sourceOrOpFlags;
//The next stage int the pipeline, or null if this is the last stage.
//Effectively final at the point of linking to the next pipeline.
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
/* The number of intermediate operations between this pipeline object
and the stream source if sequential, or the previous stateful if parallel.
Valid at the point of pipeline preparation for evaluation.
*/
private int depth;
/* The combined source and operation flags for the source and all operations up to
and including the operation represented by this pipeline object.
Valid at the point of pipeline preparation for evaluation.
*/
private int combinedFlags;
/* The source spliterator. Only valid for the head pipeline.
Before the pipeline is consumed if non-null then #sourceSupplier must be null.
After the pipeline is consumed if non-null then is set to null.
*/
private Spliterator<?> sourceSpliterator;
/* The source supplier. Only valid for the head pipeline. Before the pipeline is consumed
if non-null then #sourceSpliterator must be null. After the pipeline is consumed if
non-null then is set to null.
*/
private Supplier<? extends Spliterator<?>> sourceSupplier;
//True if this pipeline has been linked or consumed
private boolean linkedOrConsumed.
//True if there are any stateful ops in the pipeline; only valid for the source stage.
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
//True if pipeline is parallel, otherwise the pipeline is sequential; only valid for the source stage.
private boolean parallel;
...
}
要想让流水线起到应有的作用需要将所有操作叠加到一起,但前面的Stage不知道后面的Stage到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前Stage本身才知道如何执行自己包含的动作。这就需要某种协议来协调相邻Stage之间的调用关系。
这种协议由Sink接口完成,Sink接口包含如下方法:
void begin(long size);//开始遍历元素之前调用该方法,通知Sink做好准备;
void end();//所有元素遍历完成之后调用,通知Sink没有更多元素了。
boolean cancellationRequested();//是否可以结束操作,可以让短路操作尽早结束。
void accept(T t);//遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就可以。
有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的begin()和end()方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.begin()方法可能创建一个盛放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。对于短路操作,Sink.cancellationRequested()也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法。
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的Head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。
/* Abstract base class for an intermediate pipeline stage or pipeline source
stage implementing whose elements are of type #U.
*/
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {
...
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
}
}
};
}
...
}
Sink完美封装了Stream每一步操作,并给出了(处理->转发)的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。
结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段Stage,直观的说就是流水线的链表不会再往后延伸了。结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的(处理->转发)模型,结束操作的Sink就是调用链的出口。
另外,在读提交隔离级别下还有一个优化,即:语句执行过程中加上的行锁,在语句执行完成后,就要把"不满足条件的行"上的行锁直接释放了,不需要等到事务提交。
也就是说,读提交隔离级别下,锁的范围更小,锁的时间更短,这也是不少业务都默认使用读提交隔离级别的原因。
Nginx upstream的5种负载均衡策略:
1. 轮询(默认);
2. weight;
3. ip_hash;
4. fair(第三方):按后端服务器的响应时间来分配请求,响应时间短的优先分配;
5. url_hash(第三方);
相比其他系统的细粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的时粗粒度的特定数据Transformation操作(如filter, map, join等)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。因为这种粗粒度的数据模型,限制了Spark的运用场合,所以Spark并不适用于所有高性能要求的场景,但同时相比细粒度的数据模型,也带来了性能的提升。
容错原理:
在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算比较昂贵。可以这样理解开销:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加Checkpoint是最物有所值的。
-----------------------------------------------------------------
-XX:CMSMaxAbortablePrecleanTime
CMS中的AbortablePreclean(可中断的预清理)阶段
该阶段发生的前提是,新生代Eden区的内存使用量大于参数CMSScheduleRemarkEdenSizeThreshold默认是2M,如果新生代的对象太少,就没有必要执行该阶段,直接执行重新标记阶段。
为什么需要这个阶段,存在的价值是什么?
因为CMS GC的终极目标是降低垃圾回收时的暂停时间,所以在该阶段要尽最大努力去处理那些在并发阶段被应用线程更新的老年代对象,这样在暂停的重新标记阶段就可以少处理一些,暂停时间也会相应的降低。
在该阶段,主要循环的做两件事情:
1. 处理from和to区的对象,标记可达的老年代对象;
2. 和上一阶段一样,扫描处理Dirty Card中的对象;
这个循环不会一直循环下去,打断这个循环的条件有三个:
1. 设置最多循环次数:CMSMaxAbortablePrecleanLoops,默认是0,意思是没有循环次数的限制;
2. 如果执行这个逻辑的时间达到了阈值CMSMaxAbortablePrecleanTime,默认5秒,会退出循环;
3. 如果新生代Eden区的内存使用率达到了阈值CMSScheduleRemarkEdenPenetration,默认50%,会退出循环。
如果在循环退出之前,发生了依次YGC,对于后面的Remark阶段来说,大大减轻了扫描年轻代的负担,但是发生YGC并非人为控制。
-----------------------------------------------------------------
HandlerMethodArgumentResolver
利用HttpMessageConverter将输入流转换成对应的参数
/* A base class for resolving method argument values by reading from the body of a request with
HttpMessageConverter
*/
public abstract class AbstractMessageConverterMethodArgumentResolver implements HandlerMethodArgumentResolver {
private static final Set<HttpMethod> SUPPORTED_METHODS =
EnumSet.of(HttpMethod.POST, HttpMethod.PUT, HttpMethod.PATCH);
private static final Object NO_VALUE = new Object();
protected final List<HttpMessageConverter<?>> messageConverters;
protected final List<MediaType> allSupportedMediaTypes;
private final RequestResponseBodyAdviceChain advice;
...
}
AbstractMessageConverterMethodArgumentResolver
RequestPartMethodArgumentResolver
AbstractMessageConverterMethodProcessor
RequestResponseBodyMethodProcessor
HttpEntityMethodProcessor
RequestPartMethodArgumentResolver:用于解析参数被@RequestPart修饰,或者参数类型是MultipartFile或者Servlet3.0提供的javax.servlet.http.Part类型(并且没有被@RequestParam修饰),数据通过HttpServletRequest获取。当属性被标注为@RequestPart的话,那就会经过HttpMessageConverter结合Content-Type来解析,这个效果和@RequestBody的处理方式比较像。
/* Extends #AbstractMessageConverterMethodArgumentResolver with the ability to handle method
return values by writing to the response with #HttpMessageConverter.
*/
public abstract class AbstractMessageConverterMethodProcessor
extends AbstractMessageConverterMethodArgumentResolver
implements HandlerMethodReturnValueHandler {
...
}
HttpEntityMethodProcessor: 用于处理HttpEntity和RequestEntity类型的入参。
/* Resolves #HttpEntity and #RequestEntity method argument values and also
handles #HttpEntity and #ResponseEntity return values.
An #HttpEntity return type has a specific purpose. Therefore this handler
should be configured ahead of handlers that support any return value type
annotated with @ModelAttribute or @ResponseBody to ensure they don't take over.
*/
public class HttpEntityMethodProcessor extends AbstractMessageConverterMethodProcessor {
...
}
/* Resolves #Errors method arguments.
An #Errors method argument is expected to appear immediately after the model attribute
in the method signature. It is resolved by expecting the last two attributes added to
the model to be the model attribute and its #BindingResult.
*/
public class ErrorsMethodArgumentResolver implements HandlerMethodArgumentResolver {
@Override
public boolean supportsParameter(MethodParameter parameter) {
Class<?> paramType = parameter.getParameterType();
return Errors.class.isAssignableFrom(paramType);
}
@Override
@Nullable
public Object resolveArgument(MethodParameter parameter,
@Nullable ModelAndViewContainer mavContainer, NativeWebRequest webRequest,
@Nullable WebDataBinderFactory binderFactory) throws Exception {
Assert.state(mavContainer != null,
"Errors/BindingResult argument only supported on regular handler methods");
ModelMap model = mavContainer.getModel();
String lastKey = CollectionUtils.lastElement(model.keySet());
if (lastKey != null && lastKey.startsWith(BindingResult.MODEL_KEY_PREFIX)) {
return model.get(lastKey);
}
throw new IllegalStateException(
"An Errors/BindingResult argument is expected to be declared immediately after " +
"the model attribute, the @RequestBody or the @RequestPart arguments " +
"to which they apply: " + parameter.getMethod());
}
}
/* Resolves method parameters by delegating to a list of registered #HandlerMethodArgumentResolver.
Previously resolved method parameters are cached for faster lookups.
*/
public class HandlerMethodArgumentResolverComposite implements HandlerMethodArgumentResolver {
...
}
/**
* A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case is for third-party components to hook into the consumer applications for custom monitoring, logging, etc.
* This class will get consumer config properties via #configure() method, including clientId assigned by KafkaConsumer if not specified in the consumer config. The interceptor implementation needs to be aware that it will be sharing consumer config namespace with other interceptors and serializers, and ensure that there are no conflicts.
Exceptions thrown by ConsumerInterceptor methods will be caught, logged, but not propagated further. As a result, if the user configures the interceptor with the wrong key and value type parameters, the consumer will not throw an exception, just log the errors.
* ConsumerInterceptor callbacks are called from the same thread that invokes {@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}.
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available.
*/
public interface ConsumerInterceptor<K, V> extends Configurable {
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
public void close();
}
public interface ClusterResourceListener {
//A callback method that a user can implement to get updates for #ClusterResource
void onUpdate(ClusterResource clusterResource);
}
连接池设计要点:
1. 保证线程安全,可以考虑用JUC里的容器;
2. 提供获取超时的功能,确保当池内长时间没有空闲连接时,不会导致业务阻塞,即刻熔断;
3. 最小连接数
4. 最大连接数
5. 初始化连接数
6. 最大空闲时长,超过空闲并且大于最小连接数的话,进行连接回收;
7. validationQuery
8. validationQueryTimeout
9. testOnBorrow
10. testOnReturn
11. testWhileIdle
12. 设置一些扩展点,比如Filter,Interceptor
13. 回收连接的周期
MySQL5.6引入了GTID(全局事务ID),在数据同步的场景下,GTID发挥了很大的作用。GTID由两部分组成: server_uuid:transaction_id,其中server_uuid是mysql随机生成的,全局唯一。transaction_id事务ID,默认情况下每次插入一个事务,transaction_id自增ID。
GTID提供了一个会话级变量gtid_next,指示如何产生下一个GTID。可能的取值如下:
1. AUTOMATIC:自动生成下一个GTID,实现上是分配一个当前实例上尚未执行过的序号最小的GTID;
2. ANONYMOUS:执行事务不会产生GTID,显示指定的GTID。
GTID除了可以帮助避免数据回环问题,还可以解决数据重复插入的问题,对于一条没有主键或者唯一索引的记录,即使重复插入也没有,只要GTID已经执行过,之后的重复插入都会忽略。
ThreadLocalMap为什么对ThreadLocal的引用要设置成弱引用WeakReference?
1. 如果是强引用的话,在使用的ThreadLocal对象被回收后,ThreadLocalMap仍然会对ThreadLocal有一个强引用,导致ThreadLocal一直不能被回收,如果此时有多个ThreadLocal对象,这样容易导致内存泄漏。
2. 如果是弱引用,那么在使用的ThreadLocal回收后,ThreadLocalMap对ThreadLocal的引用由于是弱引用,因此ThreadLocal会被回收。ThreadLocalMap中的key变为null,而value仍然存在。但是,ThreadLocal在每一次set过程中,都会把key==null的value回收掉,所以弱引用的好处就是使得ThreadLocal可以回收一部分value。但是仍然导致多个ThreadLocal对象以及可能发生的内存泄漏。
因此,最好的办法是使用static,然后在使用完ThreadLocal后,调用remove方法。
Spring中对ThreadLocal的应用:
1. AbstractBeanFactory中在创建原型对象时,保存正在创建中的原型对象;
// Names of beans that are currently in creation
private final ThreadLocal<Object> prototypesCurrentlyInCreation =
new NamedThreadLocal<Object>("Prototype beans currently in creation");
2. TransactionSynchronizationManager中有大量ThreadLocal的使用;
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");
private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<String>("Current transaction name");
private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<Boolean>("Current transaction read-only status");
private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<Integer>("Current transaction isolation level");
private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<Boolean>("Actual transaction active");
3. TransactionAspectSupport类中
/* Holder to support the #currentTransactinStatus() method, and to support
communication between different cooperating advices(e.g. before and after advice)
if the aspect involves more than a single method
(as will be the case for around advice).
*/
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
3. 在AopContext中:
/* ThreadLocal holder for AOP proxy associated with this thread.
Will contain #null unless the "exposeProxy" property on the controlling
proxy configuration has been set to "true".
*/
private static final ThreadLocal<Object> currentProxy =
new NamedThreadLocal<>("Current AOP proxy");
4. AbstractAutowireCapableBeanFactory类中:
/* The name of the currently created bean, for implicit dependency registration
on getBean etc invocations triggered from a user-specified Supplier callback.
*/
private final NamedThreadLocal<String> currentlyCreatedBean = new NamedThreadLocal<>("Currently created bean");
5. XmlBeanDefinitionReader类中:
private final ThreadLocal<Set<EncodedResource>> resourcesCurrentlyBeingLoaded =
new NamedThreadLocal<>("XML bean definition resources currently being loaded");
6. 抽象类RequestContextHolder中:
private static final ThreadLocal<RequestAttributes> requestAttributesHolder =
new NamedThreadLocal<>("Request attributes");
private static final ThreadLocal<RequestAttributes> inheritableRequestAttributesHolder =
new NamedInheritableThreadLocal<>("Request context");
7. LocalContextHolder类中:
private static final ThreadLocal<LocaleContext> localeContextHolder =
new NamedThreadLocal<>("LocaleContext");
private static final ThreadLocal<LocaleContext> inheritableLocaleContextHolder =
new NamedInheritableThreadLocal<>("LocaleContext");
8. ProxyCreationContext类中:
//ThreadLocal holding the current proxied bean name during Advisor matching.
private static final ThreadLocal<String> currentProxiedBeanName =
new NamedThreadLocal<>("Name of currently proxied bean");
9. ...
/* Callback interface for initializing a Spring #ConfigurableApplicationContext prior to
to being ConfigurableApplicationContext#refresh() refreshed.
*/
public interface ApplicationContextInitializer<C extends ConfigurableApplicationContext> {
// Initialize the given application context.
void initializer(C applicationContext);
}
/* SPI interface to be implemented by most if not all application contexts.
Provides facilities to configure an application context in addition to
the application context client methods in the #ApplicationContext interface.
Configuration and lifecycle methods are encapsulated here to avoid making
them obvious to ApplicationContext client code. The present methods should
only be used by startup and shutdown code.
*/
public interface ConfigurableApplicationContext
extends ApplicationContext, Lifecycle, Closeable {
...
}
Stream:规约操作(Reduction Operation)又被称为折叠操作(fold),是通过某个连接动作将所有元素汇总成一个汇总结果的过程。元素求和、求最大值或最小值、求元素总个数、将所有元素转换成一个列表或集合,都属于规约操作。Stream类库有两个通用的规约操作reduce()和collect(),也有一些简化的专用规约操作,如sum(), max(), min(), count()等。
reduce操作可以实现从一组元素中生成一个值, sum(), max(), min(), count()等都是reduce操作,将他们单独设为函数只是因为常用。reduce()的方法有三种重写形式:
Optional<T> reduce(BinaryOperation<T> accumulator);
T reduce(T identity, BinaryOperation<T> accumulator);
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperation<U> combiner);
MYSQL数据表在什么情况下容易损坏:
1. 服务器断电导致数据文件损坏;
2. 强制关机,没有先关闭MySQL服务;
3. mysqld进程在写表时被kill;
4. 使用myisamchk的同时,mysqld也在操作表;
5. 磁盘故障;
6. 服务器死机;
7. MySQL本身的Bug;
使用下面的手段预防MySQL损坏:
1. 定期使用myisamchk检查MyISAM表(注意要关闭mysqld),推荐使用check table来检查表(不用关闭mysqld);
2. 在做过大量更新或删除后,使用optimize table来优化表,这样既减少了文件碎片,又减少了表损坏的概率;
3. 关闭服务器前,先关闭mysqld(正常关闭服务,不要kill -9)
4. 使用ups电源,避免突然断电;
5. 使用稳定版,减少MySQL本身的bug导致的表损坏;
6. 对InnoDB引擎,可以使用innodb_tablespace_monitor来检查表空间文件内文件空间管理的完整性;
7. 对磁盘做RAID,减少磁盘出错并提高性能;
8. 数据库服务器只有mysql,不启动其他服务,减少互相影响;
9. 日常做好备份。
disruptor解决了传统队列的痛点:
1. false-sharding: CPU伪共享问题;
2. 无锁编程的极致优化:CAS
3. 两个独立线程之间高效交换数据;
LinkedTransferQueue是LinkedBlockingQueue, SynchronousQueue(公平模式), ConcurrentLinkedQueue三者的集合体,综合了三者的方法,并且提供了更加高效的实现方式。
LinkedTransferQueue使用了一个叫做dual data structure的数据结构,或者叫做dual queue,翻译为双重数据结构或者双重队列。
放取元素使用同一个队列,队列中的节点具有两种模式,一种是数据节点,一种是非数据节点。
放元素时先跟队列头节点对比,如果头节点是非数据节点,就匹配;如果头节点是数据节点,就生成一个数据节点放在队列尾端。
取元素时也是先跟队列头节点对比,如果头节点是数据节点,就匹配;如果头节点是非数据节点,就生成一个非数据节点放在队列尾端。
不管是放元素还是取元素,都先跟头节点对比,如果二者模式不一样就匹配它们,如果两者模式一样,就入队。
static final class Node {
// 是否是数据节点(也就标识了是生产者还是消费者)
final boolean isData; // false if this is a request node
// 元素的值
volatile Object item; // initially non-null if isData; CASed to match
// 下一个节点
volatile Node next;
// 持有元素的线程
volatile Thread waiter; // null until waiting
}
典型的单链表结构,内部除了存储元素的值和下一个节点的指针外,还包含了是否为数据节点和持有元素的线程。
LinkedTransferQueue的大致逻辑如下:
1. 来了一个元素,先查看队列头的节点,是否与这个元素的模式一样;
2. 如果模式不一样,就尝试让他们匹配,如果头节点被别的线程先匹配走了,就尝试与头节点的下一个节点匹配,如此一直往后,直到匹配到或到链表尾为止;
3. 如果模式一样,或者到链表尾了,就尝试入队;
4. 入队的时候有可能链表尾修改了,那就尾指针后移,再重新尝试入队,依此往复;
5. 入队成功了,就自旋或阻塞,阻塞了就等待被其它线程匹配到并唤醒;
6. 唤醒之后进入下一次循环就匹配到元素了,返回匹配到的元素;
7. 是否需要入队及阻塞有四种情况:
a): NOW, 立即返回,没有匹配到立即返回,不做入队操作
对应的方法有:poll(), tryTransfer(e)
b): ASYNC, 异步,元素入队当当前线程不会阻塞
对应的方法有: add(e), offer(e), put(e), offer(e, timeout, unit)
c): SYNC,同步,元素入队后当前线程阻塞,等待被匹配
对应的方法有:take(), transfer()
d): TIMED,有超时,元素入队后等待一段时间被匹配,时间到了还没匹配到就返回元素本身
对应的方法有:poll(timeout, unit), tryTransfer(e, timeout, unit)
public final class JavaPageRank {
private static final Pattern SPACE = Pattern.compile("\\s+");
private static class Sum implements Function2<Double, Double, Double> {
@Override
public Double call(Double a, Double b) {
return a + b;
}
}
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("JavaPageRank")
.getOrCreate();
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(
String[] parts = SPACES.split(s);
return new Tuple2<>(parts[0], parts[1]);
).distinct().groupByKey().cache();
JavaPairRDD<String, Double> ranks = links.mapValus(rs -> 1.0);
for (int current = 0; current < Integer.parseInt(args[1]); current ++) {
JavaPairRDD<String, Double> contribs = links.join(ranks)
.values()
.flatMapToPair(s -> {
int urlCount = Iterables.size(s_1());
List<Tuple2<String, Double>> results = new ArrayList<>();
for (String n : s._1) {
results.add(new Tuple2<>(n, s._2() / urlCount));
}
return results.iterator();
});
ranks = contribs.reduceByKey(new Sum()).mapValues(sum -> 0.15 + sum * 0.85);
}
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + " has rank:" + tuple._2() + ".");
}
spark.stop();
}
}
数据质量可以从完整性、准确性、一致性和及时性共四个角度进行评估。
public final class JavaStatusTrackerDemo {
public static final String APP_NAME = "JavaStatusAPIDeom";
public static final class IdentityWithDelay<T> implements Function<T, T> {
@Override
public T call(T x) throws Exception {
Thread.sleep(2 * 1000);
return x;
}
}
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName(APP_NAME)
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5), 5)
.map(new IdentityWithDelay<>());
JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync();
while (!jobFuture.isDone()) {
Thread.sleep(1 * 1000);
List<Integer> jobIds = jobFutures.jobIds();
if (jobIds.isEmpty()) {
continue;
}
int concurrentJobId = jobIds.get(jobIds.size() - 1);
SparkJobInfo jobInfo = jsc.statusTracker().getJobInfo(concurrentJobId);
SparkStageInfo stageInfo = jsc.statusTracker().getStageInfo(jobInfo.stageIds()[10]);
System.out.println(stageInfo.numTasks() + " tasks total:" +
stageInfo.numActiveTasks() + " active, " + stageInfo.numCompletedTasks)
+ " complete");
}
System.out.println("Job results are:" + jobFuture.get());
spark.stop();
}
}
-XX:+CMSIncrementalMode 该标志将开启CMS收集器的增量模式。增量模式经常暂停CMS过程,以便应用程序线程作出完全的让步。因此,收集器将花更长的时间完成整个收集周期。因此,只有通过测试后发现正常CMS周期对应用程序线程干扰太大时,才应该使用增量模式。由于现在服务器有足够的处理器来适应并发的垃圾收集,所以这种情况发生的很少。
SparkSession设计出来合并SparkContext和SQLContext,建议尽量使用SparkSession。如果发现某些API不在SparkSession中,可以通过SparkSession来拿到SparkContext和SQLContext。
/* A BlockingQueue in which each insert operation must wait for a
corresponding remove operation by another thread, and vice versa.
A synchonous queue does not have any internal capacity, not even
a capacity of one. You cannot #peek at a synchronous because an element
is only present when you try to remove it; you cannot insert an element
(using any method) unless another thread is trying to remove it; you
cannot iterate as there is nothing to iterate. The #head of the queue is
the element that the first queued inserting thread is trying to add to
the queue; if there is no such queued thread then no element is available
for removal and #poll() will return #null. For purpose of other #Collection
mehtods(for example #contains), a SynchronousQueue acts as an empty collection.
This queue does not permit #null elements.
Synchronous queues are similar to rendezvous channels used in CSP and Ada. They
are well suited for handoff designs, in which an object running in one thread must
sync up with an object runing in another thread in order to hand it some information,
event, or task.
This class supports an optional fairness policy for ordering waiting producer and
consumer threads. By default, this ordering is not guaranteed. However, a queue
constructed with fairness set to #true grants threads access in FIFO order.
*/
public class SynchonousQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E> java.io.Serializable {
/* This class implements extensions of the dual stack and dual queue
algorithms described in "Nonblocking Concurrent Objects with Condition
Synchronization" by ...
The (LIFO) stack is used for non-fair mode, and the (FIFO) queue for fair
mode. The performance of the two is generally similar. Fifo usually supports
higher throughput under contention but Lifo maintains higher thread locality
in common applications.
A dual queue (and similarly stack) is one that at any given time either holds
"data" -- items provided by put operations, or "requests" -- slots representing
takes operations, or is empty. A call to "fulfill" (i.e. a call requesting an item
from a queue holding data or vice versa) dequeues a complementary node. The most
interesting feature of these queues is that any operation can figure out which
mode the queue is in, and act accordingly without needing locks.
Both the queue and stack extend abstract class Transferer defining the single
method transfer that does a put or a take. These are unified into a single
method because in dual data structures, the put and take operations are
symmetrical, so nearly all code can be combined. The resulting transfer methods
are on the long side, but are easier to follow than they would be if broken up
into nearly-duplicated parts.
The queue and stack data structures share many conceptual similarities but very
few concrete details. For simplicity, they are kept distinct so that they can
later evolve separately.
The algorithms here differ from the versions in the above paper in extending
them for use in synchronous queues, as well as dealing with cancellation. The
main differences include:
1. The original algorithms used bit-marked pointers, but the ones here use
mode bits in nodes, leading to a number of further adaptations.
2. SynchronousQueue must block threads waiting to become fulfilled.
3. Support for cancellation via timeout and interrupts, including cleaning out
cancelled nodes/threads from lists to avoid garbage retention and memory
depletion.
Blocking is mainly accomplished using LockSuppport park/unpark, except that nodes
that appear to be the next ones to become fulfilled first spin a bit(on
multiprocessors only). On very busy synchronous queues, spinning can dramatically
improve throughput. And on less busy ones, the amount of spinning is small enough
not to be noticeable.
Cleaning is done in different ways in queues vs stacks. For queues, we can almost
always remove a node immediately in O(1) time (modulo retries for consistency checks)
when it is cancelled. But if it may be pinned as the current tail, it must wait until
some subsequent cancellation. For stacks, we need a potentially O(n) traversal to be
sure that we can remove the node, but this can run concurrently with other threads
accessing the stack.
While garbage collection takes care of most node reclamation issues that otherwise
complicate nonblocking algorithms, care is taken to "forget" references to data, other
nodes, and threads that might be held on to long-term by blocked threads. In cases
where setting to null would otherwise conflict with main algorithms, this is done by
changing a node's link to now point to the node itself. This doesn't arise much for
Stack nodes (because blocked threads do not hang on to old head pointers), but references
in Queue nodes must be aggressively forgotten to avoid reachability of everything any
node has ever referred to since arrival.
*/
}
(有栈)协程可以理解为一个用户态下的线程,在用户态下进行线程(协程)的上下文切换。但是和传统的线程不同的是:线程是抢占式执行,当发生系统调用或者中断的时候,交由操作系统调度执行;而协程是通过yield主动让出CPU所有权,切换到其他协程执行。
SynchronousQueue是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。
SynchronousQueue的TransferStack<E>中tranfer算法步骤如下:
使用put操作时参数e不为空,而使用take操作时参数e为null,而timed和nanos指定是否使用超时。
1. 如果头节点为空或者已经包含了相同模式的结点,那么尝试将结点入栈等待匹配。如果被取消,返回NULL.
2. 如果头节点是一个模式不同的结点,尝试将一个fulfilling结点加入到栈中,匹配相应的等待结点,然后一起从栈中弹出,并且返回匹配的元素。匹配和弹出操作可能无法进行,由于其他线程可能在执行操作步骤3.
3. 如果栈顶已经有一个fulfilling结点,帮助它完成它的匹配和弹出操作,然后继续。
协程是一种任务调度机制,可以让你用逻辑流的顺序去写控制流,而且还不会导致操作系统级的线程阻塞。发起异步请求、注册回调/通知器、保存状态、挂起控制流、收到回调/通知、恢复状态、恢复控制流的所有过程都能通过一个yield来默默完成。
从实现上看,与线程相比,这种主动让出型的调度方式更为高效。一方面,它让调用者自己来决定什么时候让出,比操作系统的抢占式调度所需要的时间代价要小很多。后者为了恢复现场会在切换线程时保存相当多的状态,并且会非常频繁地进行切换。另一方面,协程本身可以做在用户态,每个协程的体积比线程要小得多,因此一个进程可以容纳数量相当可观的逻辑流。举个例子,Openresty中的ngx_lua组件就使用了协程来管理所有的IO接口,极大地提升了服务器的负载能力。
参考Kimball的数据仓库理论,把元数据分为三类:
1. 技术元数据:如表结构、文件路径/格式;
2. 业务元数据:如责任人、归属的业务、血缘关系;
3. 过程元数据:如表每天的行数、大小、更新时间。
尽量把这三类数据都自动化采集或计算获取,然后在Web页面上展示或对外提供接口。
InnoDB引擎的四大特性:
1. 插入缓冲;
2. 二次写;
3. 自适应哈希;
4. 预读;
innodb_change_buffer_max_size:25
innodb_change_buffering:all
innodb_doublewrite:ON
innodb_adaptive_hash_index:ON
InnoDB的doublewrite组成: 内存中的doublewrite buffer大小为2M。物理磁盘上共享表空间中连续的128个页,即2个区(extend),大小同样为2M。对缓冲池的脏页进行刷新时,不是直接写磁盘,而是会通过memcpy()函数将脏页先复制到内存中的doublewrite buffer,之后通过doublewrite再分两次,每次1M顺序地写入共享表空间的物理磁盘上,在这个过程中,因为doublewrite页是连续的,因此这个过程是顺序写的,开销并不是很大。在完成doublewrite页的写入后,再将doublewrite buffer中的页写入各个表空间文件中,此时的写入则是离散的。如果操作系统在将页写入磁盘的过程中发生了崩溃,在恢复过程中,InnoDB可以从共享表空间中的doublewrite中找到该页的一个副本,将其复制到表空间文件,再应用重做日志。
InnoDB使用两种预读算法来提高IO性能:线性预读(linear read-ahead)和随机预读(random read-ahead)。
-XX:+UseFastAccessorMethods: 优化原始类型的getter方法性能
/* An object for traversing and partitioning elements of a source. The source
of elements covered by a Spliterator could be, for example, an array, a #Collection,
an IO channel, or a generator function.
A Spliterator may traverse elements individually #tryAdvance() or sequentially in bulk
#forEachRemaining().
...
*/
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
default void forEachRemaining(Consumer<? super T> action) {
do {} while(tryAdvance(action));
}
Spliterator<T> trySplit();
long estimateSize();
default long getExactSizeIfKnown() {
return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
}
int characteristics();
default boolean hasCharacteristics(int characteristics) {
return (characteristics() & characteristics) == characteristics;
}
default Comparator<? super T> getComparator() {
throw new IllegalStateException();
}
public static final int ORDERED = 0x00000010;
public static final int DISTINCT = 0x00000001;
public static final int SORTED = 0x00000004;
public static final int SIZED = 0x00000040;
public static final int NONNULL = 0x00000100;
public static final int IMMUTABLE = 0x00000400;
public static final int CONCURRENT = 0x00001000;
public static final int SUBSIZED = 0x00004000;
public interface OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
extends Spliterator<T> {
@Override
T_SPLITR trySplit();
...
}
...
}
流和集合的主要区别:
1. 不存储数据:流不是存储元素的数据结构,它通过一个计算操作的管道,从一个数据源,如数据结构/数组/生成器函数或者IO通道中传递元素。
2. 函数特性:一个流上的操作产生一个结果,但是不会修改它的源。例如,过滤集合获得的流会产生一个没有被过滤元素的新流,而不是从源集合中删除元素。
3. 延迟搜索:许多流操作,如过滤、映射或重复删除,都可以延迟实现,从而提供出优化的机会。流操作分为中间(流生成)操作和终端(值或副作用)操作。中间操作总是lazy的。
4. Stream可能是无界的:虽然集合的大小是有限的,但流不需要。比如limit(n)或findFirst()短路操作可以允许在有限时间内完成无限流的计算。
5. 消耗的:流的元素只在流的生命周期中访问一次。就像迭代器一样,必须生成一个新的流来重新访问源的相同元素。
/* Abstract base class for "pipeline" classes, which are the core implementations of the Stream
interface and its primitive specializations. Manages construction and evaluation of stream pipelines.
An #AbstractPipeline represents an initial portion of a stream pipeline, encapsulating a stream
source and zero or more intermediate operations. The individual #AbstractPipeline objects are often
referred to as stages, where each stage describes either the stream source or an intermediate operation.
A concrete intermediate stage is generally built from an #AbstractPipeline, a shape-specific
pipeline class which extends it (e.g., #IntPipeline) which is also abstract, and an
operation-specific concrete class which extends that. #AbstractPipeline contains most of the
mechanics of evaluating the pipeline, and implements methods that will be used by the operation;
the shape-specific classes add helper methods for dealing with collection of results into the
appropriate shape-specific containers.
After chaining a new intermediate operation, or executing a terminal operation, the stream is
considered to be consumed, and no more intermediate or terminal operations are permitted on this
stream instance.
For sequential streams, and parallel streams without stateful intermediate operations, parallel
streams, pipeline evaluation is done in a single pass that "jams" all the operations together. For
parallel streams with stateful operations, execution is divided into segments, where each stateful
operations marks the end of a segment, and each segment is evaluated separately and the result used
as the input to the next segment. In all cases, the source data is not consumed until a termianl
operation begins.
*/
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
private static final String MSG_CONSUMED = "source already consumed or closed";
@SuppressWarnings("rawtypes")
private final AbstractPipeline sourceStage;
@SuppressWarnings("rawtypes")
private final AbstractPipeline previsousStage;
protected final int sourceOrOpFlags;
@SuppressWarnings("rawtypes")
private AbstractPipeline nextStage;
private int depth;
private int combinedFlags;
private Splierator<?> sourceSpliterator;
private Supplier<? extends Spliterator<?>> sourceSupplier;
private boolean linkedOrConsumed;
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
private boolean parallel;
...
}
public interface Instrumentation {
void addTransformer(ClassFileTransformer transformer, boolean canRetransform);
void addTransformer(ClassFileTransformer transformer);
boolean removeTransformer(ClassFileTransformer transformer);
boolean isRetransformClassesSupported();
void retransformClasses(Class<?>... classes) throws UnmodifiableClassException;
boolean isRedefineClassesSupported();
void redefineClasses(ClassDefinition... definitions) throws ClassNotFoundException,
UnmodifiableClassException;
boolean isModifiableClass(Class<?> theClass);
@SuppressWarnings("rawtypes")
Class[] getAllLoadClasses();
@SuppressWarnings("rawtypes")
Class[] getInitiatedClasses(ClassLoader loader);
long getObjectSize(Object objectToSize);
void appendToBootstrapClassLoaderSearch(JarFile jarfile);
void appendToSystemClassLoaderSearch(JarFile jarfile);
boolean isNativeMethodPrefixSupported();
void setNativeMethodPrefix(ClassFileTransformer transformer, String prefix);
}
MySQL如何实现双向互为主从复制,并说明应用场景:双向同步主要应用于解决单一主库的写压力。具体配置如下:
主库配置:
[mysqld]
auto_increment_increment=1 #起始ID
auto_increment_offset=2 #ID自增间隔
log-slave-updates
从库配置:
[mysqld]
auto_increment_increment=2 #起始ID
auto_increment_offset=2 #ID自增间隔
log-slave-updates
主从服务器都需要重启mysql服务
将流管道表达为一系列功能转换,有助于实施一些有用的执行战略,比如惰性、并行性、短路和操作融合。
Redis的缓存淘汰策略
Redis的键过期删除策略
当某个key被设置了过期时间之后,客户端每次对该key的访问(读写)都会事先检测该key是否过期,如果过期就直接删除;但有些键只访问了一次,因此需要主动删除,默认情况下Redis每秒检测10次,检测的对象是所有设置了过期时间的键集合,每次从这个集合中随机检测20个键查看它们是否过期,如果过期就直接删除,如果删除后还有超过25%的集合中的键已经过期,那么继续检测过期集合中的20个随机键进行删除。这样可以保证过期键最大只占所有设置了过期时间键的25%。
Redis缓存淘汰策略:
1. noeviction
2. allkeys-lru
3. allkeys-random
4. volatile-lru
5. volatile-random
6. volatile-ttl: 从配置了过期时间的键中,驱逐马上就要过期的键;
6. allkeys-lfu:从所有键中驱逐使用频率最少的键
7. volatile-lfu
LFU是Redis4.0后出现的。
Redis4.0中LFU的实现原理:LFU把原来的key对象的内部时钟的24位分成两部分,前16位还是代表时钟,后8位代表一个计数器。16位的情况下如果还按照秒为单位就会导致不够用,所以一般这里以时钟为单位。而后8位标识当前key对象的访问频率,8位只能表示255,但是Redis并没有采用线性上升的方式,而是通过一个复杂的公式,通过配置两个参数来调整数据的递增速度。
InnoDB采取的方式是: 将数据划分位若干个页,以页作为磁盘和内存之间交互的基本单位,InnoDB中页的大小一般为16K。也就是一般情况下,一次最少从磁盘上读取16K的内容到内存中,一次最少把内存中的16KB内容刷新到磁盘中。
InnoDB存储引擎设计了四种不同类型的行格式/记录格式:Compact, Redundant, Dynamic和Compressed行格式。
在Compact和Reduntant行格式中,对于占用存储空间非常大的列,在记录的真实数据处只会存储该列的一部分数据,把剩余的数据分散存储在几个其他的页中,然后记录的真实数据处用20个字节存储指向这些页的地址(当然这20个字节中还包括分散在其他页面中的数据的占用的字节数),从而可以找到剩余数据所在的页。
对于Compact和Reduntant行格式来说,如果某一列中的数据非常多的话,在本记录的真实数据处只会存储该列的前768字节的数据和一个指向其他页的地址,然后把剩下的数据存放到其他页中,这个过程也叫做行溢出,存储超过768字节的那些页面也被称为溢出页。
--------------------------------------------------