-
Notifications
You must be signed in to change notification settings - Fork 1
/
201707.txt
411 lines (383 loc) · 27.6 KB
/
201707.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = *L;
//Synchronizer providing all implementation mechanics
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = *L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) { //overflow
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return isHeldExclusively() ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s) throws IOException, ClassNotFoundException{
s.defaultReadObject();
setState(0);
}
}
//Sync object for non-fair locks
static final class NonfairSync extends Sync {
private static final long serialVersionUID = *L;
final void lock() {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
} else {
acquire(1);
}
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//Sync object for fair locks
static final class FairSync extends Sync {
private static final long serialVersionUID = *L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
setState(nextc);
return true;
}
return false;
}
}
}
GC调优:(前提使用Parallel Scavenge GC)如果发现每次晋升到老年代的对象太多(通过GC日志可以计算出),导致YoungGC和FullGC都很频繁,可以考虑如下调优:
-Xmn1350m -XX:-UseAdaptiveSizePolicy -XX:SurvivorRatio=6
YoungGC每次晋升到old gen的内容较多,很可能是因为JVM动态的调整eden和survivor区,导致空间过小,部分本该在new gen呆着的对象跳到了old gen(此现象在survivor区较为明显,因为其本来就很小)
调优后发现Full GC次数减少,但每次时间还是很长,考虑启用CMS,并且启用碎片整理功能,降低FullGC的耗时。
但是发现oldgen GC开销还是较大,但比Pallel Scavenge 略好,通过GC日志发现,主要耗时都是在remark的rescan阶段。
所以需要降低remark的时间开销,加入参数:-XX:+CMSScavengBeforeRemark。
调优思路:通常情况下进行remark会先堆new gen进行一次扫描,而且这个开销占比很大,所以加上这个参数,在remark之前强制进行一次youngGC
最终的JVM参数如下:
-Xms4096m -Xmx4096m -XX:PermSize=256M -XX:MaxPermSize=256M -XX:ReservedCodeCacheSize=1024M -XX:+UseCodeCacheFlushing -Xmn1350m -XX:-UseAdaptiveSizePolicy -XX:SurvivorRatio=6, -XX:UseConcMarkSweepGC -XX:UseCMSCompactAtFullCollection -XX:CMSFullGCsBeforeCompaction=0 -XX:UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSScavengeBeforeRemark
AQS的唤醒原理:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
...
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) {
unparkSuccessor(h);
}
return true;
}
return false;
}
//Wakes up node's successor, if one exists.
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node;t = t.prev) {
if (t.waitStatus <= 0) {
s = t;
}
}
}
if (s != null) {
LockSupport.unpark(s.thread);
}
}
}
架构设计产出文档:
1. 项目简介
2. 架构设计目标:关键功能,指标
3. 架构设计原则
4. 用例图
5. 逻辑视图:层次结构,和外部系统的关系
6. 进程视图
7. 数据视图
8. 非功能性指标
9. 部署图
如何在MySQL中开启慢查询日志:
方法一:修改my.ini,long_query_time=2, slow-query-log=On, slow_query_log_file="mysql_slow_query.log", //记录下没有使用索引的query
log-query-not-using-indexes
方法二:通过数据库开启慢查询:set global slow_query_log=ON;
set global long_query_time=3600;
set log_queries_not_using_indexes=ON;
如何优化慢查询:
1. 利用explain关键字
2. 查看是否使用索引
2.1 使用like关键字进行查询时,如果匹配字符串的第一个字符"%",索引不会起作用。只有%不在第一个位置索引才会起作用
2.2 MySQL索引最多可以包括16个字段的索引,只有查询条件中使用了这些字段中的第一个字段时,索引才会被使用。
3. 如果某表包含的字段很多,里面有些字段不使用或很少使用,可以分解成多个表,来提高效率
4. 对于经常联合查询的表,可以建立中间表提高查询效率。
5. 分解关联查询,很多高性能的应用都会对关联查询进行分解,对每一个表进行单表查询,然后将查询结果在程序中进行关联。
6. 优化limit分页:limit m,n;当m,n中的偏移量m很大时,导致每次查询都要从整个结果集中找。可以通过考虑在筛选字段加索引或者先查询出主键然后根据主键然后limit;
7. 建立where字段和order by字段的复合索引,这样就不会有Using filesort了
8. 避免对索引字段进行计算或类型转化,不然不会使用索引
public class ConcurrentLinkedDeque<E> extends AbstractCollection<E>
implements Deque<E>, java.io.Serializable {
}
Kafka的连接器中使用了死信队列,当配置的时候会使用。
Kafka重复消费的原因:根本原因已经消费了数据,但是offset没有提交
1. 强行kill线程,导致消费后的数据,offset没有提交
2. 设置offset为自动提交,关闭kafka时,如果在close之前调用consumer.unsubscribe(0则有可能部分offset没提交,下次重启会重复消费
3. (重复消费最常见原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如消费数据耗时太长,超过了kafka的session timeout时间(0.10默认30S),那么就会re-balance重平衡,此时有一定几率offset没提交,导致重平衡后重复消费
4. 当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
5. 当消费者消费的速度很慢的时候,可能在一个session周期内还没完成,导致心跳机制检测问题。
Kafka高性能/高吞吐量的原因:
1. 数据磁盘持久化:消息不在内存中,直接写入磁盘,充分利用磁盘的顺序读写性能(.index文件和log文件,称为segment),并且是顺序写磁盘。
2. zero-copy:减少IO操作步骤
3. 数据批量发送
4. 数据压缩
5. Topic划分为多个partition,提高parallelism
6. Page Cache:为了优化读写性能,kafka利用了操作系统本身的Page Cache,而不是JVM空间内存。通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。
ThreadLocal的Thread.ThreadLocalMap<ThreadLocal, Object>这种存储的优点:
1. 线程消亡时,线程共享变量ThreadLocalMap一同销毁;
2. ThreadLocalMap<ThreadLocal, Object>键值对为ThreadLocal的数量,一般来说ThreaLocal数量很少,相比在ThreadLocal中用Map<Thread, Object>键值对存储线程共享变量(Thread数量一般来说比ThreadLocal数量多),性能提高很多。
关于ThreadLocalMap<ThreadLocal, Object>弱引用问题:
当线程没有结束,但是ThreadLocal已经被回收,则可能导致线程中存在ThreadLocalMap<null, Object>的键值对,造成内存泄露。(ThreadLocal被回收,ThreadLocal关联的线程共享变量还存在)。虽然ThreadLocal的get,set方法可以清除ThreadLocalMap中key为null的value,但是get,set方法在内存泄露后并不会必然调用,所以为了防止此类情况的出现,有两种手段:
1. 使用完线程共享变量后,显示调用ThreadLocalMap.remove方法清除线程共享变量。
2. JDK建议ThreadLocal定义为private static,这样ThreadLocal的弱引用问题则不存在了。
使用堆外内存的2种方式:
1. Unsafe.allocateMemory(size);
2. NIO包下的ByteBuffer.allocateDirect(int capacity);
设置堆外内存的大小:-XX:MaxDirectMemorySize=40M
ByteBuffer.allocateDirect分配的堆外内存不需要手动释放,而且ByteBuffer中没有提高手动释放的API。也就是说使用ByteBuffer不用担心堆外内存的释放问题,除非堆内存中的ByteBuffer对象由于错误编码而出现内存泄露。
但使用Unsafe必须要手动释放内存
在Cleaner内部中通过一个列表,维护了针对每一个directBuffer的一个回收堆外内存的线程对象(Runnable),回收操作是发生在Cleaner的clean()方法中。
根据不同的实现技术AOP织入有三种:
1. 编译器织入,这要求有特殊的Java编译器;
2. 类装载期织入,这需要有特殊的类装载器;
3. 动态代理织入,在运行期为目标类添加增强Advice生成子类的方法。
AspectJ是静态代理在编译期就生成了代理。
Spring AOP是动态代理使用JDK或CGLIB动态代理。
最左匹配原则:mysql查询优化器会判断纠正SQL语句该以什么样的顺序执行效率最高,最后才生成真正的执行计划。B+树的数据项是复合的数据结构index(a, b, c),B+树是按照从左到右的顺序来建立搜索树的, 如果不包含a列,则B+树不知道怎么搜索于是用不到这个索引。
最左匹配原则:最左优先,以最左边的为起点任何连续的索引都能匹配上。同时遇到范围查询(>、<、between、like)就会停止匹配。
例如:b = 2 如果建立(a,b)顺序的索引,是匹配不到(a,b)索引的;但是如果查询条件是a = 1 and b = 2或者a=1(又或者是b = 2 and b = 1)就可以,因为优化器会自动调整a,b的顺序。再比如a = 1 and b = 2 and c > 3 and d = 4 如果建立(a,b,c,d)顺序的索引,d是用不到索引的,因为c字段是一个范围查询,它之后的字段会停止匹配。
IdentityHashMap:
1. 使用==判断key的相等性;
2. IdentityHashMap不是Map的通用实现,违反了Map的常规协议,允许key和value都为Null
3. hash冲撞使用线性再探测的方式解决。private static int nextKeyIndex(int i, int len) {
return (i + 2 < len ? i + 2 : 0);
}
有关年轻代的JVM参数:
1. -XX:NewSize -XX:MaxNewSize
2. -XX:SurvivorRatio
3. -XX:+PrintTenuringDistribution
4. -XX:InitialTenuringThreshold -XX:MaxTenuringThreshold
JVM为什么有1个Eden区和2个Survivor区:
如果没有Survivor区,此时每触发一次Minor GC,就会把Eden区的对象复制到老年代,这样当老年代满了之后就会触发MajorGC,比较耗时。如果只有一个Survivor区,那当Eden区满了之后,就会复制对象到Survivor区,容易产生内存碎片化,严重影响性能。所以使用2个Survivor,始终保持一个空的Survivor,可以避免内存碎片化。
分代收集算法:根据对象存活的周期的不同将内存划分为几块,然后再选主合适的收集算法。
一般把堆分成新生代和老年代,这样就可以根据各个年代的特点采用最合适的收集算法。在新生代中,每次垃圾收集都会有大量的对象销毁,只有少量存活,所以选用复制算法。老年代因为对象存活率高,没有额外空间进行分配担保,所以一般采用标记整理或标记清除算法进行回收。
Kafka事务消息:从0.11.0开始支持事务消息,
Kafka使用事务的两种方式:
1. 配置Kafka事务管理器(KafkaTransactionManager)并使用@Transactional注解
2. 使用KafkaTemplate的executeInTransaction方法
建索引的几大原则:
1.最左前缀匹配原则,非常重要的原则,mysql会一直向右匹配直到遇到范围查询(>、<、between、like)就停止匹配,比如a = 1 and b = 2 and c > 3 and d = 4 如果建立(a,b,c,d)顺序的索引,d是用不到索引的,如果建立(a,b,d,c)的索引则都可以用到,a,b,d的顺序可以任意调整。
2.=和in可以乱序,比如a=1 and b=2 and c=3 建立(a,b,c)索引可以任意顺序,mysql的查询优化器会帮你优化成索引可以识别的形式。 3.尽量选择区分度高的列作为索引,区分度的公式是count(distinct col)/count(*),表示字段不重复的比例,比例越大我们扫描的记录数越少,唯一键的区分度是1,而一些状态、性别字段可能在大数据面前区分度就是0,那可能有人会问,这个比例有什么经验值吗?使用场景不同,这个值也很难确定,一般需要join的字段我们都要求是0.1以上,即平均1条扫描10条记录。
4.索引列不能参与计算,保持列“干净”,比如from_unixtime(create_time) = ’2014-05-29’就不能使用到索引,原因很简单,b+树中存的都是数据表中的字段值,但进行检索时,需要把所有元素都应用函数才能比较,显然成本太大。所以语句应该写成create_time = unix_timestamp(’2014-05-29’)。
5.尽量的扩展索引,不要新建索引。比如表中已经有a的索引,现在要加(a,b)的索引,那么只需要修改原来的索引即可。
Kafka Consumer的负载均衡算法:
1. A = partition数量/同组内消费者的总数
2. M = 对A取小数点第一位向上取整
3. 计算出该消费者拉取数据的partition集合:Ci = [P(M * i) ~ P((i + 1) * M - 1)]
MySQL锁:
1. 按锁使用方式:乐观锁和悲观锁;
2. 按锁级别:共享锁,排他锁,意向锁,间隙锁
3. 按锁粒度:行级锁,表级锁,页级锁
4. 按操作方式:DDL锁,DML锁
5. 按加锁方式:自动锁,显示锁;
在InnoDB下,间隙锁的产生需要满足三个条件:
1. 隔离级别为RR
2. 当前读
3. 查询条件能够走到索引
间隙锁的作用:在RR模式的InnoDB中,间隙锁起到2个作用:1. 保障数据的恢复和复制;2. 防止幻读
分库分表的情况下,如何做排序:
1. 部分字段冗余存储/部分表冗余存储,或者广播表;
2. 通过数仓来做非实时数据,使用Kafka Stream做实时数据聚合
3. 在应用系统中进行处理。
如何排查死锁:
1. 使用JConsole,在“线程”里面有一个“检测死锁”的选项
2. 使用jstack, 将堆文件dump,如果有waiting to lock
3. 使用Arthas的jvm命令: DEADLOCK-COUNT:JVM当前死锁的线程数
//If false(default), core threads stay alive even when idle.
//If true, core threads use keepAliveTime to time out waiting for work.
private volatile boolean allowCoreThreadTimeout;
WebClient是Spring 5最新引入的,可以理解为Reactive版的RestTemplate。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
对于Spring Cloud老手来说,就算更换了Nacos作为服务注册中心,其实对于应用层面的代码是没有影响的。为什么Spring Cloud可以带给我们这样的完美编码体验呢?实际上,这完全归功于Spring Cloud Common的封装,由于在服务注册与发现、客户端负载均衡等都做了抽象,而上层应用方面依赖的都是这些抽象接口,而非针对某个具体中间件的实现。所以,在Spring Cloud中,可以很方便的去切换服务治理方面的中间件。
为了实现开始时间不确定的定时任务触发,引入延迟消息。RabbitMQ中提供了延迟消息的插件,可以利用Spring Cloud Stream以及RabbitMQ实现延迟消息。
Tomcat是如何完成多个Web应用之间相互隔离,又如何保证多个Web应用都能加载到基础类库的
Tomcat的ClassLoader:Tomcat本身也是java项目,因此也需要被JDK的类加载机制加载,也就必然存在引导类加载器,扩展类加载器和应用(系统)类加载器。Tomcat自身定义的类加载器主要有下面组成,CommonClassLoader作为CatalinaClassLoader和SharedClassLoader的parent,而SharedClassLoader又可能存在多个clildren类加载器WebAppClassLoader,一个WebAppClassLoader实际上就对应一个Web应用,一个Web应用就有可能存在Jsp页面,这些Jsp页面最终会转化成clall类被加载,因此也需要一个jsp类加载器:JasperLoader
需要注意的是,在代码层面CatalinaClass, SharedClassLoader, CommonClassLoader对应的实体类实际上都是URLClassLoader或者SecureClasssLoader,一般我们只是根据加载内容的不同和加载父子顺序的关系,在逻辑上划分为这三个类加载器;而WebAppClassLoader和JasperLoader都是存在对应的类加载器类的。
综上所述,Tomcat的类加载机制不能算完全"正统"的双亲委派,WebappClassLoader内部重写了loadClass和findClass方法,实现了绕过"双亲委派"直接加载web应用内部的资源,当然可以通过在Context.xml文件中加上<Loader delegate="true">开启正统的"双亲委派"加载机制。
CAS使用场景:
1. 在线程冲突严重时,会大幅降低程序性能;CAS只适合线程冲突较少的情况使用;
2. Synchronized在JDK6之后已经改进优化,底层使用Lock-Free的队列,基本思路是自旋后阻塞,竞争切换后继续竞争锁,稍微牺牲了公平性,但获得了高吞吐量。在线程冲突较少的情况下,可以获得和CAS类似的性能。而线程冲突严重的情况下,性能远高于CAS。
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = *L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset(AtomictInteger.class.getDeclaredField("value"));
} catch (Exception ex) {throw new Error(ex);}
}
private volatile int value;
...
}
volatile变量两个特性:1. 可见性;2. 禁止指令重排
在硬件指令集的发展驱动下,使得 "操作和冲突检测" 这种看起来需要多次操作的行为只需要一条处理器指令便可以完成,这些指令中就包括非常著名的CAS指令(Compare-And-Swap比较并交换)。
由于Unsafe类不是提供给用户程序调用的类(Unsafe.getUnsafe()的代码中限制了只有启动类加载器(BootstrapClassLoader)加载的Class才能访问它),因此,如果不采用反射手段,只能通过其他JavaAPI来间接使用它,如JUC包里面的AtomicInteger,其中compareAndSet()和getAndIncrement()等方法使用了Unsafe类的CAS操作。
Kafka的replica:
存储系统高效,充分利用磁盘顺序读写,kafka的replica
复制协议 Broker,如何自动调优kafka副本的工作方式
如何避免follower进入和退出同步副本列表ISR
topics处于under replicated 状态,这些副本处于同步失败或者失效状态,更意味着
数据没有被复制到足够数量Broker从而增加数据丢失的概率。
kafka集群中处于under replicated中partition数要密切监控。
offset,用来确定消息在分区日志中的唯一位置。
kafka通过多副本机制实现故障自动转移。
partition的N个replica中一个replica是leader,其他是follower。leader处理partition的所有读写请求。
于此同时,follower会被动定期去复制leader上的数据。
leader负责维护和跟踪ISR中所有follower滞后状态。消息复制延迟受最慢的follower限制,重要的是快速检测慢replica,如果follower落后太多或者失效,leader将会把它从ISR中移除。
AR = ISR + OSR
AR,ISR,OSR,LEO,HW这些信息都被保存在Zookeeper中。
假设replica.lag.max.messages=4,表明只要follower落后leader不超过3,就不会标记为死亡,也不会从同步副本列表中移除。
log end offset
一个副本和leader不同步的几个原因:
1. 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I/O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。
2. 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。
3. 新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到完全赶上了leader日志。
在Kafka-0.8.2.x中,副本滞后判断依据是副本落后于leader最大消息数量(replica.lag.max.messages)或replicas响应partition leader的最长等待时间(replica.lag.time.max.ms)。前者是用来检测缓慢的副本,而后者是用来检测失效或死亡的副本。
避免写消息延迟增加。
Consul是一个支持多数据中心分布式高可用的服务发现和配置共享的服务软件,采用Go语言开发。
Consul支持健康检查,并允许http和dns协议调用api存储键值对。Consul采用Raft一致性协议算法,来保证服务的高可用;使用goosip协议管理成员和广播消息,并且支持ACL访问控制。
漏桶算法:水-请求先进入到漏桶里,漏桶以一定的速度出水,当水流速度过大会直接溢出,漏桶算法能强行限制数据的传输速率。
令牌桶算法:对应很多应用除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这个适合漏桶算法就不适合了,令牌桶算法更为合适。令牌桶算法的原理是系统以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
Guava提供了限流工具类RateLimiter,基于令牌桶算法实现流量限制。
RateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费)。
Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值),主要区别在等待时间的计算上。
//Google实现的uninterruptibly的sleep
public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
boolean interrupted = false;
try {
long remainingNanos = unit.toNanos(sleepFor);
long end = System.nanoTime() + remainingNanos;
while (true) {
try {
NANOSECONDS.sleep(remainingNanos);
return;
} catch (InterruptedException e) {
interrupted = true;
remainingNanos = end - System.nanoTime();
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
0.10kafka的rebalance条件:
条件1. 有新的consumer加入
条件2. 旧的consumer挂了
条件3. coordinator挂了,集群选举出新的coordinator(0.10特有的)
条件4. topic的partition新增
条件5. consumer调用unsubscrible(),取消topic订阅
当consumer启动时,触发下面操作:
1. 首先进行"Consumer Id注册"
2. 然后在"Consumer id"节点下注册一个watch用来监听当前group中其他consumer的"退出"和"加入";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
3. 在"Broker id"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
Consumer分配Partition算法:
1) 假如topic1,具有如下partitions: P0,P1,P2,P3
2) 加入group中,有如下consumer: C0,C1
3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3
4) 根据(consumer.id + '-'+ thread序号)排序: C0,C1
5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
根据Kafka社区wiki,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(Coordinator)。大体思想是为所有Consumer Group的子集选举出一个Broker作为Coordinator,由它来管理Consumer的增减,然后生成Rebalance命令,并检查是否这些Rebalance。
consumer rebalance失败是0.8版本的bug,在0.9以后,这个模块由组件Coordinator负责,能够保证rebalance成功。
从0.10.0-src来看,ZookeeperConsumerConnector已经重构了,新增了ConsumerCoordinator。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
AdaptiveSizePolicy(自适应大小策略)是JVM GC Ergonomics(人类工程学)的一部分。如果开启AdaptiveSizePolicy,则每次GC后会重新计算Eden, From和To的大小,计算依据是GC过程中统计的GC时间/吞吐量/内存占用量。
-XX:+UseAdaptiveSizePolicy
JDK 1.8 默认使用 UseParallelGC 垃圾回收器,该垃圾回收器默认启动了 AdaptiveSizePolicy。
AdaptiveSizePolicy有三个目标:
Pause goal:应用达到预期的 GC 暂停时间。
Throughput goal:应用达到预期的吞吐量,即应用正常运行时间 / (正常运行时间 + GC 耗时)。
Minimum footprint:尽可能小的内存占用量。
AdaptiveSizePolicy 为了达到三个预期目标,涉及以下操作:
如果 GC 停顿时间超过了预期值,会减小内存大小。理论上,减小内存,可以减少垃圾标记等操作的耗时,以此达到预期停顿时间。
如果应用吞吐量小于预期,会增加内存大小。理论上,增大内存,可以降低 GC 的频率,以此达到预期吞吐量。
如果应用达到了前两个目标,则尝试减小内存,以减少内存消耗。
jinfo -flags <pid>查看当前jvm的参数
jmap -histo <pid> 查看当前jvm中每个类的实例数和占用子节数
jstat -gcutil <pid>查看自启动到现在GC情况
jstat -gcutil <pid> 2s 每隔2秒查看GC情况(每个区域大小,YGC,FGC次数等)
CMS默认关闭AdaptiveSizePolicy
jmap -heap <pid>查看所有区域的大小和使用大小,和配置的参数
kafka重复消费优化措施:
1. 提供partition的数量,从而提高consumer的并行能力,从而提高数据的消费能力
2. 对于单partition的消费线程,增加一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力
3. 由于使用了spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没有更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提高了多种提交策略:这些策略保证了一批消息没有完成消费的情况下,也能提交offset,从而避免完全提交不上导致永远重复消费的问题。
4. 可以根据消费者的消费速度堆session.timeout.ms的时间进行设置,适当延长
5. 减少每次从partition里面捞取的数据分片的大小,提高消费者的消费速度。