-
Notifications
You must be signed in to change notification settings - Fork 1
/
201811.txt
564 lines (514 loc) · 49 KB
/
201811.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
ZAB协议是为分布式协调服务Zookeeper专门设计的一种支持崩溃恢复和原子广播协议。
ZAB让整个Zookeeper集群在两个模式之间转换:消息广播和崩溃恢复,消息广播可以说是一个简化版本的2PC,通过崩溃恢复解决了2PC的单点问题,通过队列解决了2PC的同步阻塞问题。而支持崩溃恢复后数据准确性的就是数据同步了,数据同步基于事务的ZXID的唯一性来保证。通过+1操作可以辨别事务的先后顺序。
两阶段2PC两阶段提交协议并不完美,而且存在数据不一致、同步阻塞、单点等问题。
Zookeeper使用改进的两阶段提交协议来保证节点的事务一致性。
可靠提交由ZAB的事务一致性协议保证;全局有序由TCP协议保证;因果有序由follower的历史队列(history queue)保证。
当leader在commit之后但在发送commit消息之前宕机,即只有老leader自己commit了,而其他follower都没有收到commit消息,新的leader也必须保证这个proposal被提交(新的leader会重新发送该proposal的commit消息).
当leader产生某个proposal之后但在发出消息之前宕机,即只有老leader自己有这个proposal,当老的leader重启后(此时作为follower),新的leader必须保证老的leader丢弃这个proposal(新的leader会通知上线后的老leader截断其epoch对应的最后一个commit的位置)。
BeanPostProcessors operate on bean(or object) instances;that is to say, the Spring IoC container instantiates a bean instance and then BeanPostProcessor do their work.
BeanPostProcessors are scoped per-container. This is only relevant if you are using container hierarchies. If you define a BeanPostProcessor in one container, it will only post-process the beans in that container. In other words, beans that are defined in one container are not post-processed by a BeanPostProcessor defined in another container, even if both containers are part of the same hierarchy.
To change the actual bean definition, you instead need to use a BeanFactoryPostProcessor.
PropertyPlaceholderConfigurer
Kafka为什么不支持减少分区:
1. 按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理?如果随着分区一起消失则消息的可靠性得不到保障;如果需要保留则又需要考虑如何保留。直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于Spark、Flink这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入到现有的分区中,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题、以及分区和副本的状态机切换问题都是不得不面对的。反观这个功能的收益点却是很低,如果真的需要实现此类的功能,完全可以重新创建一个分区数较小的主题,然后将现有主题中的消息按照既定的逻辑复制过去即可。
2. 虽然分区数不可用减少,但是分区对应的副本是可以减少的,这个很好理解,关闭一个副本时就相当于副本数减少了。不过正规的做法是使用kafka-reassign-partition.sh脚本来实现。
几个注意点:
1. 调用wait方法和notifyAll方法必须被同步,并且调用的对象必须是当前锁对象,否则IllegalMonitorStateException;
如果不在同步方法或代码块中执行notify和wait方法,假设这种情况下,等待方判断不满足条件后准备调用wait时,发生上下文切换,此时通知方改变条件调用notifyAll方法,但是等待方并没有调用wait方法,那么等待方就错过了这次通知唤醒。这就是lost wake up问题,所以这2个方法必须要被包在同一个对象锁住的同步方法里。
2. 通知方尽量使用notifyAll而不是notify,否则会出现假死;
notify方法是随机将等待队列中的一个等待线程唤醒;notifyAll方法是将等待队列中的所有等待线程唤醒;当有多个等待方和通知方时,通知方如果调用notify方法,有可能会通知错。
3. 判断条件是否满足,要用while循环,而不是直接一次if判断。
因为如果此时有2个等待方A和B,它们同时调用wait方法进入等待,当通知方C调用notifyAll方法后,此时A和B并不是立即恢复执行,而是要去争夺锁资源,假设A先获得锁,它从wait方法返回,执行了后续的逻辑,并将满足的条件改变成不满足。此时B获得锁资源,从wait方法返回后,虽然此时条件已经不满足,但B依然继续执行。因为我们使用了if,而不是while循环判断。
4. wait会立即释放当前锁,notifyAll不会立即释放当前对象锁。
wait被调用后会释放当前锁并进入对象锁的等待队列中,而sleep会一直占用锁资源;调用notifyAll会将当前同步代码块的剩余部分执行完才会释放锁。
使用等待/通知机制,实现生产者消费者模式
其实Thread中的join方法也用到了等待/通知机制:如果线程A调用了B.join(),那么线程A会一直等待B线程执行完毕才会继续执行。和wait()一样,它们都有wait(long millis), join(long millis)重载方法,表示等待指定的时间。join源码如下:
publi class Thread implements Runnable {
...
/* Waits at most millis millisconds for this thread to die. A timeout of 0 means to wait forever.
This implementation uses a loop of this.wait calls conditioned on this.isAlive. As a thread terminates the this.notifyAll method is invoked. It is recommended that applications not use wait, notify, or notifyAll on Thread instances.
*/
public final synchronized void join(long millis) {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
...
}
根据上述方法:如果线程A调用了B.join(),那么线程A会获得B对象的锁,并且判断释放满足条件(isAlive()方法为本地方法),然后调用B对象的wait方法,进入到B对象的等待队列,当B线程终止时,会调用自身的notifyAll方法通知所有等待在该线程对象上的线程。此时,线程A被唤醒并继续执行。可见,join方法实现原理和等待/通知机制一样,并且因为join方法调用的也是wait方法,所以也会释放当前对象锁。
Safepoints in HotSpot JVM:
Term Stop-the-World pause is usually associated with garbage collection. Indeed GC is a major contributor to STW pauses, but not the only one.
Safepoints: In HotSpot JVM Stop-the-World pause mechanism is called safepoint. During safepoint all threads running java code are suspended. Threads running native code may continue to run as long as they do not interact with JVM(attempt to access Java objects via JNI, call Java method or return from native to java, will suspend thread unit end of safepoint).
Stopping all threads are required to ensure what safepoint initiator have exclusive access to JVM data structures and can do crazy things like moving objects in heap or replacing code of method which is currently running(On-Stack-Replacement).
When safepoints are used?
Below are few reasons for HotSpot JVM to initiate a safepoint:
- Garbage collection pauses
- Code deoptimization
- Flushing code cache
- Class redefinition(e.g. hot swap or instrumentation)
- Biased lock revocation
- Various debug operation(e.g. deadlock check or stacktrace dump)
Trouble shooting safepoints:
Normally safepoints just work. Thus, you can care less about them(most of them, except GC ones, are extremely quick). But if something can break it will break eventually, so here is useful diagnostic:
-XX:+PrintGCApplicationStoppedTime - this will actually report pause time for all safepoints(GC related or not). Unfortunately output from this option lacks timestamps, but it is still useful to narrow down problem to safepoints.
-XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticesCount=1 this two options will force JVM to report reason and timings after each safepoint(it will be reported to stdout, not GC log)
/* ThreadPool used to executed HystrixCommand#run() on separate threads when configured to do so with HystrixCommandProperties#executionIsolationStrategy().
Typically each HystrixCommandGroupKey has its own thread-pool so that any one group of commands can not starve others from being able to run.
A HystrixCommand can be configured with a thread-pool explicitly by injecting a HystrixThreadPoolKey or via the HystrixCommandProperties#executionIsolationThreadPoolKeyOverride() otherwise it will derive a HystrixThreadPoolKey from the injected HystrixCommandGroupKey.
The pool should be sized large enough to handle normal healthy traffic but small enough that it will constrain concurrent execution if backend calls become latent.
*/
public interface HystrixThreadPool {
...
}
MyBatis的优点:
1. 基于SQL语句编程,相当灵活,不会对应用程序或者数据库的现有设计造成任何影响,SQL写在XML里,接触SQL与程序代码的耦合,便于统一管理;提供XML标签,支持编写动态SQL语句,并可重用。
2. 与JDBC相比,减少了50%以上的代码,消除了JDBC大量冗余的代码,不需要手动开关连接。
3. 很好的与各种数据库兼容(因为MyBatis使用JDBC来连接数据库,所以只要JDBC支持的数据库MyBatis就支持)
4. 能够与Spring很好的集成。
5. 提供映射标签,支持对象与数据库的ORM字段关系映射。提供对象关系映射标签,支持对象关系组件维护。
MyBatis的缺点:
1. SQL语句编写工作量较大,尤其字段多、关联表多时,对开发人员编写SQL的功底有要求;
2. SQL语句依赖具体数据库,导致数据库移植性差,不能随意更换数据库。
MyBatis的#{}和${}的区别:
1. #{}是预编译处理,${}是字符串替换。
2. MyBatis在处理#{}时,会将SQL中的#{}替换为?,调用PreparedStatement的set方法来赋值;
3. MyBatis在处理${}时,就是把${}替换成变量的值。
4. 使用#{}可以有效的防止SQL注入,提供系统安全性。
MyBatis如何获取自动生成的主键值:insert方法总是返回一个int,表示插入的行数。如果采用自增长策略,自动生成的键值在insert方法执行完后可以被设置到传入的参数对象中。
<insert id="xxx", usegeneratedkeys="true" keyproperty="id">
insert into names(name) values(#{name})
</insert>
Hibernate属于全自动ORM映射工具,使用Hibernate查询关联对象或者关联集合对象时,可以根据对象关系模型直接获取,所以它是全自动的。而MyBatis在查询关联对象或关联集合对象时,需要手动编写SQL来完成,所以称之为半自动ORM映射工具。
MyBatis是否支持延迟加载,实现原理:MyBatis仅支持association关联对象和collection关联集合对象的延迟加载,association指的是一对一,collection指的是一对多查询。在Mybatis配置文件中,可以配置是否启用延迟加载lazyLoadingEnabled=true|false。
它的原理是,使用CGLIB创建目标对象的代理对象,当调用目标方法时,进入拦截器方法,比如调用a.getB().getName(),拦截器invoke()方法发现a.getB()为NULL,那么就会单独发送实现保存好的查询关联B对象的SQL,把B查询出来,然后调用a.setB(b),于是a的对象b属性就有值了,接着完成a.getB().getName()方法的调用。这就是延迟加载的基本原理。
MyBatis仅可以针对ParameterHandler,ResultSetHandler,StatementHandler,Executor这4种接口的插件,MyBatis使用JDK的动态代理,为需要拦截的接口生成代理对象以实现接口方法拦截功能,每当执行这4个接口对象的方法时,就会进入拦截方法,具体就是InvocationHandler的invoke()方法,当然只会拦截你指定需要拦截的方法。
编写插件:实现MyBatis的Interceptor接口并复写intercept()方法,然后再给插件编写注解,指定要拦截哪个接口的哪些方法即可。最后,在配置文件配置编写的插件。
BeanFactory
ListableBeanFactory
HierarchicalBeanFactory
AutowireCapableBeanFactory
ConfigurableBeanFactory
ConfigurableListableBeanFactory
AbstracAutowireCapableBeanFactory
DefaultListableBeanFactory
最终的默认实现类是DefaultListableBeanFactory,它实现了所有的接口。
Spring IOC容器管理了定义的各种Bean对象以及互相之间的关系,Bean对象在Spring实现中是以BeanDefinition来描述的。BeanDefinition定义了Bean的数据结构,用来存储Bean。
AttributeAccessor
BeanMetadataElement
AttributeAccessorSupport
BeanDefinition
BeanMetadataAttributeAccessor
AbstractBeanDefinition
RootBeanDefinition
BeanDefinitionReader
EnvironmentCapable
AbstractBeanDefinitionReader
XmlBeanDefinitionReader
----------------------------------------------------
JDK8对CAS机制的优化:
普通的CAS(AtomicInteger等)在高并发下,比如大量线程同时并发修改一个AtomicInteger,可能有很多线程会不停的自旋,进入一个无限重复的循环中。这些线程不停地获取值,然后发起CAS操作,但是发现这个值已经被修改,于是再次进入下一个循环,获取值,发起CAS操作又失败了,再次进入下一个循环。
在大量线程高并发更新AtomicInteger的时候,这种问题会比较明显,导致大量线程空循环,自旋转,性能和效率大大降低。
JDK8推出一个新类:LongAdder,尝试使用分段CAS以及自动分段迁移的方式来大幅度提升多线程高并发执行CAS操作的性能。
在LongAdder的底层实现中,首先有一个base值,刚开始多线程来不停的累加数值,都是对base进行累加,。
接着如果发现并发更新的线程数量过多,就会开始实施分段CAS的机制,也就是内部搞一个Cell数值,每个数值是一个数值分段。这时,让大量的线程分别对不同Cell内部的value值进行CAS累加操作,这样就把CAS计算压力分散到不同Cell分段数值中了。
这样就可以大幅度的降低多线程并发更新一个数值出现的无限循环的问题,大幅度提升了多线程并发更新数值的性能和效率。
而且内部实现了自动分段迁移的机制,也就是如果某个Cell的value执行CAS失败了,那么就会自动去找另外一个Cell分段内的value值进行CAS操作。
这样解决了线程空旋转、自旋不停等待执行CAS操作的问题,让一个线程执行CAS尽快的完成。
最后,如果要从LongAdder获取当前累加的总值,就会把base值和所有Cell分段数值加在一起返回。
----------------------------------------------------
消费者和分区的关系:
1. 一个消费者可以消费一个到全部分区数据;
2. 分组消费,同一个分组内所有消费者消费一份完整的数据,此时一个分区数据只能被一个消费者消费,而一个消费者可以消费多个分区数据;
3. 同一个消费组内,消费者数目大于分区数目后,消费者会有空余=分区数-消费者数;
当一个group中,有consumer加入或离开时,会触发partitions均衡partition.assignment.strategy,决定了partition分配给消费者的分配策略,默认有两种分配策略:
1. RangeAssignor: 默认采用的是这种再平衡方式,这种方式分配只是针对消费者订阅的topic的单个topic所有分区再分配。
2. RoundRobinAssignor: 这种分配策略是针对消费者消费的所有topic的所有分区进行分配。当有新的消费者加入或者消费者退出时,就会触发rebalance。这种方式有两点要求:A): 在实例化每个消费者时给每个topic指定相同的流数;B): 每个消费者实例订阅的topic必须相同。
3. Sticky:这种分区策略是最新版本中新增的一种策略,主要实现了两个目的:
A:将现有的分区尽可能均衡的分配给各个consumer,存在此目的的原因在于Round Robin和Range分配策略实际上都会导致某几个consumer承载过多分区,从而导致消费压力不均衡;
B:如果发生再平衡,那么重新分配之后在前一点的基础上会尽力保证当前未宕机的consumer所消费的分区不会被分配给其他的consumer上。
Kafka引入协调器有其历史过程,原来consumer信息依赖于Zookeeper存储,当代理或者消费者发生变化时,引发消费者平衡,此时消费者之间是互不透明的,每个消费者和Zookeeper单独通信,容易造成羊群效应和脑裂问题。
为了解决这些问题,Kafka引入了协调器。服务端引入组协调器(GroupCoordinator),消费者端引入消费者协调器(ConsumerCoordinator)。每个broker启动的时候,都会创建GroupCoordinator实例,管理部分消费组(集群负载均衡)和组下每个消费者消费的偏移量(offset)。每个Consumer实例化时,同时实例化一个ConsumerCoordinator对象,负责同一个消费组下各个消费组和服务器组协调器之间的通信。
消费者协调器主要负责如下工作:
1. 更新消费者缓存的Metadata
2. 向组协调器申请加入组
3. 消费者加入组后的相应处理
4. 请求离开消费组
5. 向组协调器提交offset
6. 通过心跳,保持组协调器的连接感知
7. 被组协调器选为leader的消费者协调器,负责消费者分区分配。分配结果发给组协调器。
8. 非leader的消费者,通过消费者协调器和组协调器同步分配结果。
组协调器在broker启动的时候实例化,每个组协调器负责一部分消费组的管理。组协调器负责处理消费者协调器发过来的各种请求,它主要提供如下功能:
1. 在与之连接的消费者中选举出消费者leader;
2. 下发leader消费者返回的消费者分区分配结果给所有的消费者
3. 管理消费者的消费位移提交,保存在kafka的内部主题中
4. 和消费者心跳保持,知道哪些消费者已经宕机,组中存活的消费者是哪些。
有哪些情形会造成重复消费:
1. 在没有事务支持时,producer重试发送消息;
2. consumer先处理消息,后提交offset:处理消息后,出现异常,没有提交offset。
3. consumer自动提交offset,在max.poll.interval.ms时间内,有新的消费者加入/退出,导致consumer rebalance,新的消费者再次poll到已经消费过的数据。
4. consumer每次拉取的数据过大(max.partition.fetch.bytes),导致消费时间大于session.timeout.ms,无法和broker保持心跳,导致broker认为退出,发生rebalance。
Redis的并发竞争问题:多个系统/线程同时对一个Key进行操作,造成最终结果和期望结果不一致的问题。
如何解决:
1. 消息队列;将相关操作放在队列,利用队列的有序性,进行串行处理。即并行处理变成串行化。
2. 在客户端进行加锁:分布式锁+时间戳;
3. 利用redis自带的incr命令或者decr命令;
4. 使用乐观锁;利用redis的watch命令;如下:
watch price
get price $price
$price = $price + 10
multi
set price $price
exec
5. 利用redis的setnx实现内置的锁:要设置超时时间,防止抢占到锁的客户端因为失败、崩溃或其他原因没有办法释放锁而造成死锁。
Kafka从0.8.x版本开始引入副本机制,这样可以极大的提高集群的可靠性和稳定性。通常情况下,Kafka中的每个分区partition都会分配多个副本replica,具体的副本数量由broker级别参数default.replication.factor(默认为1)指定,也可以在创建topic的时候通过--replication-factor <num>显式指定副本的数量(副本因子)。一般情况下,将前者default.replication.factor设置为大于1的值,这样在参数auto.create.topic.enable为true的时候,自动创建的topic会根据default.replication.factor的值来创建副本;或者更加通用的做法是使用后者而指定大于1的副本数。
每个分区的多个副本称之为AR(assigned replicas),包含至多一个leader副本和多个follower副本。与AR对应的是ISR(in-sync replicas),ISR指与leader副本保持同步状态的副本集合,当然leader副本也是这个集合中的一员。而ISR之外,也就是处于同步失败或者失效状态的副本,副本对应的分区也称为同步失效分区,即under-replicated分区。
怎么样判定一个分区是否有副本是处于同步失效状态的呢?从Kafka 0.9.x版本开始通过唯一的一个参数replica.lag.time.max.ms(默认大小为10,000)来控制,当ISR中的一个follower副本滞后leader副本的时间超过参数replica.lag.time.max.ms指定的值时即判定为副本失效,需要将此follower副本剔出除ISR之外。
过多的使用索引将会造成滥用。因此索引也会有缺点: 虽然索引大大提高了查询速度,同时却降低了更新表的速度,如表进行insert, update和delete。因为更新表时,MySQL不仅要保存数据,还要保存索引文件。建立索引占用磁盘空间的索引文件。
查看MySQL慢查询日志: mysqldumpslow -s at -a /usr/local/var/mysql.Macbook-Pro-3-slow.log
MySQL索引分类:
1. Normal普通索引
2. Unique唯一索引: 表示唯一的,不允许重复的索引。Unique和Primary key为列或列组合提供了唯一性的保证,primary key是拥有自动定义的Unique约束,但是每个表中可以有多个Unique约束,但是只能有一个Primary key约束。
3. Full Text全文索引:全文索引可以在varchar, char, text类型的列上创建。MyISM支持全文索引,InnoDB在MySQL5.6之后支持了全文索引。Full Text用于搜索很长文字时候,效果最好。用在比较短的文本,比如一两行的,普通的index也可以。
4. Spatial 空间索引:空间索引是对空间数据类型的字段建立的索引,MySQL中空间数据类型有4种:geometry(几何), poin(点), lineString(线), polygon(多边形)。MySQL使用Spatial关键字进行扩展,使得能够用于创建正规索引类型的语法创建空间索引。创建空间索引的列,必须将其声明为not null,空间索引只能在存储引擎为MyISAM的表中创建。
在InnoDB中有一个特殊的功能:自适应哈希索引,当它发现某些索引值被使用的非常频繁时,它会在内存中基于B+树索引之上,再创建一个hash索引,加快数据的查找速度。
MySQL创建索引的语法:
create [unique|fulltext|spatial] index index_name [using index_type] on table_name(index_col_name, ...);
index_type:表示索引的具体实现方式,在MySQL中有两种不同形式的索引:BTREE索引和HASH索引。在存储引擎为MyISAM和InnoDB的表中只能使用BTREE,其默认值就是BTREE。在存储引擎为Memory或者Heap的表中可以使用HASH或者BTREE两种类型的索引,其默认值是HASH。
消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1: offset+1
The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offset) you should add one of the offset of the last message processed.
对于Kafka Consumer的手动提交,支持异步提交和同步提交。建议两者进行组合使用,不同场景使用不同的方式:
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或者再均衡前的最后一次提交,就要确保能够提交成功。因此,在消费者关闭前一般会组合使用commitAsync()和commitSync。使用commitAsync()方式来做每条消费消息的提交(因为这种方式速度更快),最后再使用commitSync()方式来做位移提交最后的保证。
鉴于日志留存log-retention和日志删除实际上是一个问题的两个方面,待删除的是日志段,即LogSegment,以.log结尾的一个个文件,而非整个目录。另外还有一点:当前日志段(active logsegment)是永远不会被删除的,不管配置了哪种留存机制。Kafka log retention留存机制共有3种:
1. 基于空间维度;
2. 基于时间维度;
3. 基于起始位移维度;
Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理操作。Kafka中每一个分区partition都对应一个日志文件,而日志文件又可以分为多个日志分段文件,这样也便于日志的清理操作。Kafka提供了两种日志清理策略:
1. 日志删除(Log Deletion): 按照一定的保留策略来直接删除不符合条件的日志分段;
2. 日志压缩(Log Compaction): 针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本。
异常分类: Throwable包括Error和Exception。
Error:
1. VirtualMachineError
1.1 StackOverFlowError
1.2 OutOfMemoryError
2. AWTError
Exception:
1. IOException
1.1 EOFException
1.2 FileNotFoundException
2. RuntimeException
2.1 ArithmeticException
2.2 MissingResouceException
2.3 ClassNotFoundException
2.4 NullPointerException
2.5 IllegalArgumentException
2.6 ArrayIndexOutOfBoundsException
2.7 UnknownTypeException
-Xmx -xms
在OutOfMemoryError之前有可能系统会提前报下列关键字: java.lang.OutOfMemoryError: GC over head limit exceeded
这种情况是当系统处于高频的GC状态,而且回收的效果依然不佳的情况。
java.lang.OutOfMemoryError: PermGen space
-XX:PermSize -XX:MaxPermSize
java.lang.OutOfMemoryError: Direct buffer memory
-XX:MaxDirectMemorySize
java.lang.StackOverflowError
-Xss
java.lang.OutOfMemoryError:unable to create new native thread
java.lang.OutOfMemoryError: request {} byte for {} out of swap
-XX:+PrintFlagsFinal
Thread Local Allocation Buffer,简称TLAB,即内存本地持有的buffer。
-XX:+UseTLAB 启用
-XX:TLABSize=<size in kb>设置大小,也就是本地线程中的私有区域大小(只有这个区域放不下才会到Eden中申请)
-XX:+ResizeTLAB 是否启动动态修改
-XX:+PrintTLAB 输出TLAB的内容
TLAB这些参数在多CPU下非常有用。
-XX:+UseParNewGC
-XX:+UseParallelOldGC
-XX:+UseSerialGC
-XX:+UseConcMarkSweepGC
-XX:+UseParallelGC
-XX:ParallelGCThread=12
-Xmn
-XX:NewSize
-XX:MaxNewSize
-XX:NewRatio
-XX:SurvivorRatio
-XX:InitialSurivivorRatio
-XX:MaxTenuringThreshold=15
-XX:MinHeapFreeRatio=40
-XX:MaxHeapFreeRatio=70
-XX:+UseAdaptiveSizePolicy
-XX:+PrintAdaptiveSizePolicy
-XX:MaxDirectMemorySize
-XX:+ScavengeBeforeFullGC,默认开启状态,在FGC前先进行Minor GC
-XX:+UseLargePages
-XX:LargePageSizeInBytes
-Djava.io.tmpdir
-XX:-HandlePromotionFailure
-XX:PretenureSizeThreshold
-XX:GCTimeRatio
-XX:MaxGCPauseMillis
-XX:GCTimeLimit
-XX:+PrintGCApplicationStoppedTime
-XX:+DisableExplicitGC
-XX:+HeapDumpOnOutOfMemoryError
-XX:+UseFastAccessorMethods
-XX:+PrintHeapUsageOverTime
-XX:+UseCompressedOops
-XX:+BackgroudCompilation
-XX:-TraceClassLoading
-XX:-TraceClasssUnloading
-XX:+UseConcMarkSweepGC
-XX:+CMSIncrementalMode
-XX:+CMSParallelRemarkEnabled
-XX:ParallelCMSThreads
-XX:CMSInitiatingOccupancyFraction=70
-XX:CMSInitiatingPermOccupancyFraction
-XX:+PrintCMSInitiationStatistics
-XX:+UseCMSInitiatingOccupancyOnly
-XX:+UseCMSCompactAtFullCollection
-XX:CMSFullGCsBeforeCompaction
-XX:CMSMaxAbortablePrecleanTime
对账补送
/* One or more variables that together maintain an initially zero long sum. When updates (method #add) are contended across threads, the set of variables may grow dynamically to reduce contention. Method #sum (or equivalently, #longValue) returns the current total combined across the variables maintaining the sum.
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
LongAdders can be used with a ConcurrentHashMap to maintain a scalable frequency map (a form of histogram or multiset). For example, to add a count to a ConcurrentHashMap<String, LongAdder> freqs, initializing if not already present, you can use freqs.computeIfAbsent(k -> new LongAdder()).increment();
This class extends Number, but does not define methods such as equals, hashCode and compareTo because instances are expected to be mutated, and so are not useful as collection keys.
*/
public class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
public LongAdder() {}
public void add(long x) {
Cell[] as;
long b, v;
int m;
Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v+x))) {
longAccumulate(x, null, uncontended);
}
}
}
public void inrement() {
add(1L);
}
public void decrement() {
add(-1L);
}
public long sum() {
Cell[] as = cells;
Cell a;
long sum = base;
if (as != null) {
for (int i = 0;i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
}
}
}
return sum;
}
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
public long sumThenReset() {
Cell[] as = cells; Cell a;
long sum = base;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
public String toString() {
return Long.toString(sum());
}
public long longValue() {
return sum();
}
public int intValue() {
return (int)sum();
}
public float floatValue() {
return (float)sum();
}
public double doubleValue() {
return (double)sum();
}
/* Serialization proxy, used to avoid reference to the non-public Striped64 superclass in serialized forms.
*/
private static class SerializationProxy implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
private final long value;
SerializationProxy(LongAdder a) {
value = a.sum();
}
private Object readResolve() {
LongAdder a = new LongAdder();
a.base = value;
return a;
}
}
private Object writeReplace() {
return new SerializationProxy(this);
}
private void readObject(java.io.ObjectInputStream s) throws java.io.InvalidObjectException {
throw new java.io.InvalidObjectException("Proxy required");
}
}
最经典的缓存+数据库读写的模式:Cache Aside Pattern。
- 读的时候,先读缓存,缓存没有的话,就读数据库,然后取出数据后放入缓存,同时返回响应;
- 更新的时候,先更新数据库,然后删除缓存;
最初级的缓存不一致问题:先修改数据库,再删除缓存。如果缓存删除失败了,那么数据库是新数据,缓存是旧数据,出现数据不一致。解决思路:先删除缓存,再修改数据库。如果数据库修改失败了,那么数据库是旧数据,缓存是空的,不会出现数据不一致问题。
比较复杂的数据不一致问题:数据发生了变更,先删除缓存,然后要去修改数据库,此时还没修改;请求到达,读取缓存,查询到了旧的数据,放入缓存。随后数据变更的程序完成了数据库的修改;这样出现了数据不一致。
Kubernetes中service是一组提供相同功能的Pods的抽象,并为它们提供一个统一的入口。借助Service,应用可以方便的实现服务发现于负载均衡,并实现应用的零宕机升级。Service通过spce.selector来选取后端服务,一般配合ReplicationController或者Deployment来保证后端容器的正常运行。
Kubernetes的Service能够提供负载均衡的能力,但是在使用上有以下限制:
1. 只提供4层负载均衡能力,而没有7层,但有时需要更多的匹配规则来转发请求,这点上4层负载均衡是不支持的;
2. 使用NodePort类型的Service时,需要在集群外部部署外部的负载均衡器;
3. 使用LoadBalancer类型的Service时,Kubernetes必须运行在特定的云服务上。
Kubernetes的Service的类型:
1. ClusterIP: 默认类型,自动分配一个仅Cluster内部可以访问的虚拟IP;
2. NodePort: 在ClusterIP基础上为Service在每台机器上绑定一个端口,这样就可以通过<NodeIP>:<NodePort>来访问该服务;
3. LoadBalancer: 在NodePort的基础上,借助Cloud provider创建一个外部负载均衡器,并将请求转发到<NodeIP>:<NodePort>
4. ExternalName: 把集群外部的服务引入到集群内部来,在集群内部直接使用。没有任何类型代理被创建,这只有Kubernetes1.7或者更高版本的kube-dns才支持。
ClusterIP:主要在每个node节点使用iptables, 将发往ClusterIP对应端口的数据,转发到kube-proxy中。然后kueb-proxy自己内部实现有负载均衡方法,并可以查询到这个Service下对应Pod的地址和端口,进而把数据转发给对应的pod地址和端口。
NodePort: 原理在于在Node上开了一个端口,将向该端口的流量导入到kube-proxy,然后由kube-proxy进一步到给对应的pod;
LoadBalancer: 和NodePort其实是同一种方式。区别在于LoaderBalancer比NodePort多了一步,就是可以调用Cloud provider去创建LB来向节点导流。
Redis缓存穿透:是指查询一个数据库中一定不存在的数据。解决方法:
1. 缓存不存在的值,但将缓存的过期时间设置的短一些;
2. Key值合法性校验:首先前端按照约定的算法校验传入的值是否合法,不合法直接拒绝;后端同样进行值的合法性校验,校验失败直接返回,否则从缓存获取;另外为了更好的灵活性,可支持配置先从缓存获取后校验,或者先校验后缓存获取。支持两种情况:非法情况下先校验后缓存;正常情况先缓存后校验;
3. 布隆过滤器:将数据库中所有Key值放入布隆过滤器,当查询Key值时,先经过布隆过滤器进行检查,如果不存在,直接丢弃,否则继续进行;将缓存穿透控制在可容忍的范围内。
4.
Redis缓存崩溃:某段时间缓存集中过期失效。解决方法:
1. 设置随机缓存过期时间。对于热门Key值,设置时间长一些;对于冷门Key值设置时间短一些;
2. 在缓存失效后,通过加锁或者队列来控制读DB的线程数;比如漏桶/令牌桶
3. 预先缓存reload,在缓存失效前,主动进行缓存的更新和缓存时间续约;
4. 做二级缓存或双缓存策略。
Redis缓存击穿: 指一个Key非常热点,在不停的扛着大并发,当这个热点Key失效的瞬间,持续的大并发就穿破缓存,直接请求DB。解决方法:
1. 设置永久不过期的缓存时间;
2. 在失效前,进行预先构建缓存;
3. 使用互斥锁:一个线程构建缓存,其他线程等待构建完毕后,再获取数据;
4.
新服务刚上线时,记得需要进行缓存预热/预加载。
__consumer_offsets: Every consumer group maintains its offset per topic partitions. Since v0.9 the information of committed offsets for every consumer group is stored in this internal topic(prior to v0.9 this information was stored on Zookeeper). When the offset manager receives an OffsetCommitRequest, it appends the request to a special compacted Kafka topic named __consumer_offsets. Finally, the offset manager will send a successful offset commit response to the consumer, only when all the replicas of the offsets topic receive the offsets.
_schema: This is an internal topic used by the Schema Registry, which is a distributed storage layer for Avro schemas. All the information which is relevant to schema, subject (with its corresponding version), metadata and compatibility configuration is appended to this topic. The schema registry in turn, produces(e.g. when a new schema is registered under a subject) and consumer data form this topic).
__transaction_state: 内部topic,所有事务状态信息会持久化到这个topic,TransactionCoordinator在做故障恢复时,也是从这个topic中恢复数据。
-------串行/并行/并发---------------
串行: 单个线程执行垃圾回收,并且此时用户线程仍然处于等待状态;
并行:多个线程执行垃圾回收,但此时用户线程仍然处于等待状态;
并发:指用户线程与垃圾收集线程同时执行(但不一定是并行的,可能会交替执行),用户程序在继续执行,而垃圾收集程序运行于另外一个CPU上。
----------------------
新生代回收器:SerialGC, ParNewGC, ParallelScavengeGC
名称 串行/并行/并发 回收算法 适用场景 是否可与CMS配合
SerialGC 串行 复制 单CPU 是
ParNewGC 并行 复制 多CPU 是
ParallelScavengeGC 并行 复制 多CPU且关注吞吐量 否
------------------------------
Serial(串行GC)收集器:
Serial收集器是一个新生代收集器,单线程执行,使用复制算法。它在进行垃圾收集时,必须暂停其他所有的工作线程(用户线程)。是JVM Client模式下默认的新生代收集器。对于限定单个CPU的环境来说,Serial收集器由于没有线程交互的开销,专心做垃圾收集可以获得最高的单线程收集效率,适用于单CPU机器的场景。在用户桌面应用场景中,即Client模式下是一个很好的选择。
ParNew(并行GC)收集器:
ParNew收集器其实是Serial收集器的多线程版本,除了使用多线程进行垃圾收集外,其余行为与Serial收集器一样。它是许多运行在Server模式下的JVM首选的新生代收集器。其中一个与性能无关但很重要的原因:除了Serial收集器外,目前只有它能与CMS收集器配合工作。ParNew在单CPU环境下绝对不会比Serial收集器更好的效果,甚至由于存在线程切换的开销,该收集器在通过超线程技术实现的两个CPU环境中都不能百分百保证可以超越Serial收集器。当然,随着CPU的增加,它对GC时系统资源的有效利用还是很有好处的。
ParallelScavenge(并行回收GC)收集器:
ParallelScavenge收集器也是一个新生代收集器,也是使用复制算法的收集器,又是并行多线程收集器。ParallelScavenge收集器的特点是它的关注点与其他收集器不同,CMS收集器关注点是尽可能缩短垃圾收集时用户线程的停顿时间,而ParallelScavenge收集器的目标则是达到一个可控制的吞吐量。吞吐量=程序运行时间/(程序运行时间+垃圾收集时间)。因此ParallelScavenge也经常被称为“吞吐量优先”收集器。ParallelScavenge收集器有一个参数-XX:UseAdaptiveSizePolicy,当参数启用时,JVM会根据当前系统的运行状况收集性能监控信息,动态调整一些如新生代大小、Eden与Survivor区的比例等细节参数。这种自适应调节策略也是ParallelScavenge收集器与ParNew收集器的一个重要区别。
--------------------------
三种老年代收集器:SerialOldGC, ParNewOldGC, CMS
名称 串行/并行/并发 回收算法 适用场景
SerialOldGC 串行 标记整理 单CPU
ParNewOldGC 并行 标记整理 多CPU
CMS 并发,几乎不会暂停用户线程 标记清除 多CPU且与用户线程共存
------------------------
SerialOld(串行GC)收集器:
SerialOld是Serial收集器的老年代版本,同样使用一个线程执行收集,使用"标记-整理"算法。主要使用在Client模式下的JVM。如果在Server模式下,它有两个用途:一种用途是JDK1.5以及之前的版本与ParallelScavenge收集器搭配使用;另一种用途是作为CMS的后备预案,在Concurrent Mode Failure时使用。
ParallelOld(并行GC)收集器:
ParalleOld是ParallelScavenge收集器的老年代版本,使用多线程和"标记-整理"算法。在注重吞吐量以及CPU资源敏感的场合,都可以优先考虑ParallelScavenge收集器加ParallelOld收集器。
CMS(并发GC)收集器:
CMS(Concurrent Mark Sweep)收集器是一种以获取最短停顿时间为目标的收集器,适用于集中在网站/BS系统的服务端应用。CMS收集器基于"标记-清除"算法实现,整个收集过程大致分为4个步骤:
1. 初始标记-initial mark
2. 并发标记-concurrent mark
3. 重新标记-remark
4. 并发清除-concurrent sweep
其中初始标记、重新标记这两个步骤需要停顿用户线程。初始标记仅仅只是标记出GC Roots能直接关联的对象,速度很快,并发标记阶段是进行GC Roots根搜索算法阶段,会判定对象是否存活。而重新标记阶段则是为了修正并发标记期间,因用户线程继续运行而导致产生变动的那部分对象的标记记录,这个阶段的停顿时间会比初始标记阶段稍长,但比并发标记阶段要短。
由于整个过程中耗时最长的并发标记和并发清除过程中,收集器线程都可以与用户线程一起工作,所以整体来说,CMS收集器的内存回收过程是与用户线程一起并发执行的。
CMS收集器的优点:并发收集、停顿低。但是CMS有三个显著缺点:CPU敏感、浮动垃圾、空间碎片。
CMS收集器对CPU资源非常敏感。在并发阶段,虽然不会导致用户线程停顿,但是会占用CPU资源而导致用户程序变慢,总吞吐量下降。CMS默认启动的回收线程数是:(CPU数量+3)/4。
CMS收集器无法处理浮动垃圾,可能出现"Concurrent Mode Failure",失败后而导致另一次FGC的产生。由于CMS并发清理阶段用户线程还在运行,伴随程序的运行自然会有新的垃圾不断产生,这部分垃圾出现在标记过程之后,CMS无法在本次收集中处理,只好留到下次GC清理。这部分垃圾称为"浮动垃圾"。也是由于在垃圾收集阶段用户线程还需要运行,即需要预留足够的内存空间给用户线程使用,因此CMS不能像其他收集器一样等待老年代几乎完全被填满再进行收集,需要预留一部分内存空间提供给并发收集时的程序运行使用。可以通过参数-XX:CMSInitiatingOccupancyFraction的值来提供触发百分比,以降低内存回收次数提供性能。如果CMS运行期间预留的内存无法满足程序其他线程使用,就会出现"Concurrent Mode Failure"失败,这时会启动后备预案:临时使用SerialOld收集器来重新进行老年代收集,这样停顿时间就很长了。所以说参数-XX:CMSInitiatingOccupancyFraction设置的过高将会很容易导致“Concurrent Mode Failure”失败,性能反而降低。
最后一个缺点:CMS是基于"标记-清除"算法实现的收集器,使用"标记-清除"算法收集后,会产生大量碎片。空间碎片太多时,将会给对象分配带来很多麻烦,比如大对象,内存空间找不到连续的空间来分配不得不提前触发一次FGC。为了解决这个问题,CMS提供了一个-XX:UseCMSCompactAtFullCollection开关参数,用于FGC后增加一个碎片整理过程,还可以通过-XX:CMSFullGCBeforeCompaction参数设置执行多少次不压缩的FGC之后,跟着来一次碎片整理过程。
G1(Garbage First)收集器是JDK7提供的一个新收集器,G1是面向服务端应用的垃圾收集器。与其他收集器相比,G1具备如下特点:
1. 并行与并发:G1能充分利用多CPU、多核环境下的硬件优势,使用多CPU来缩短STW停顿时间,部分其他收集器原本需要停顿Java线程执行的GC动作,G1仍然通过并发的方式让Java程序继续运行。
2. 分代收集:与其他收集器一样,分代依然得以保留。虽然G1可以不需要其他收集器配合就能单独管理整个GC堆,但它能够采用不同的方式去处理新创建的对象和已经存活了一段时间、熬过多次GC的旧对象已获得更好的收集效果。
3. 空间整合:与CMS的"标记-清除"算法不同,G1整体上看是基于标记-整理算法实现的,从局部(两个Region之间)上看是基于"复制"算法实现的,但无论如何,这两种算法都意味着G1运行期间不会产生内存空间碎片,收集后能提供规整的可用内存。这种特性有利于程序的长时间运行,分配大对象时不会无法找到连续内存空间而提前触发下一次GC。
4. 可预测的停顿:降低停顿时间是G1和CMS共同的关注点,但G1除了追求低停顿外,还能建立可预测的停顿时间模型,能让使用者明确指定在一个长度M毫秒内消耗在垃圾收集上的时间不能超过N毫秒,这几乎是实时Java(RTSJ)的垃圾收集器的特征了。
region介绍:大小一致,在1M到32M之间的一个2的幂指数,JVM会尽量划分2048个左右、同等大小的region。当然这个数值既可以手动调整,也会根据堆大小自动进行调整。
在G1实现中,一部分region是作为Eden,一部分作为Surivor,除了意料之中的Old region,G1会将超过Region 50%大小的对象归类为Humongous对象,并放置在相应的region中。逻辑上,Humongous region算是老年代的一部分,因为复制这样的大对象是很昂贵的操作,并不适合新生代GC的复制算法。
G1的缺点:region大小和大对象很难保证一致,这会导致空间浪费。特别大的对象可能会占用超过一个region。并且,region太小不合适,会在分配大对象时更难找到连续空间。
G1选择的是复合算法:在新生代,G1采用的仍然是并行的复制算法,所以同样会发生STW的暂停。新生代的清理会带上Old区已标记好的region。在老年代,大部分情况下都是并发标记,而整理(Compact)则是和新生代GC时捎带进行,而且不是整体性的整理,而是增量进行的。
Humonous对象的分配和回收:Humongous region 作为老年代的一部分,通常认为它会在并发标记结束后才进行回收,但是在新版G1中,Humongous 对象回收采取了更加激进的策略。G1记录了老年代region间对象引用,Humongous 对象数量有限,所以能够快速的知道是否有老年代对象引用它。如果没有,能够阻止它被回收的唯一可能,就是新生代是否有对象引用了它,但这个信息是可以在Young GC时就知道的,所以完全可以在Young GC中就进行Humongous 对象的回收,不用像其他老年代对象那样,等待并发标记结束。
JDK11增加了两种全新的GC方式:
1. Epsilon GC: 简单说就是不做垃圾收集的GC,有些情况下,例如在进行性能测试的时候,可能需要明确判断GC本身产生了多大的开销,这就是典型的应用场景。
2. ZGC: Oracle开源的一个超级GC。具备很大的扩展能力,支持T级别的堆大小,并且保证据大部分情况下,延迟都不会超过10ms。
ConcurrentHashMap:
// Nodes for use in TreeBins
static final class TreeNode<K, V> extends Node<K, V> {
TreeNode<K, V> parent; // red-black tree links
TreeNode<K, V> left;
TreeNode<K, V> right;
TreeNode<K, V> prev; // needed to unlink next upon deletion
boolean read;
...
}
/* TreeNodes used at the heads of bins. TreeBin do not hold user keys or values, but instead point to list of TreeNodes and their root. They also maintain a parasitic read-write lock forcing writers (who hold bin lock) to wait for readers (who do not) to complete before tree restructuring operations.
*/
static final class TreeBin<K, V> extends Node<K, V> {
TreeNode<K, V> root;
volatile TreeNode<K, V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; //set while holding write lock
static final int WAITER = 2; //set when waiting for write lock
static final int READER = 4; //increment value for setting read lock
...
private static final sun.misc.Unsafe U;
private static final long LOCKSTATE;
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = TreeBin.class;
LOCKSTATE = U.objectFieldOffset(
k.getDelaredField("lockState"));
} catch (Exception e) {
throw new Error(e);
}
}
}
JDK8 update20引入一个优化就是G1的字符串去重(String deduplication). 由于字符串(包括它们内部的char[]数组)占用了大多数的堆空间,这项新的优化旨在使得G1能识别出堆中那些重复出现的字符串并将它们指向同一个内部的char[]数组,以避免同一个字符串的多份拷贝,这样堆的使用效率会很高。可以使用-XX:+UseStringDeduplication来启用。