`
xpenxpen
  • 浏览: 703774 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

jgroups初步

阅读更多
1.官方的tutorial已经很好了,得首先阅读。
http://www.jgroups.org/ug.html
本文其实是官方的tutorial的笔记,其中大部分文字转载自http://whitesock.iteye.com/blog/199229

2.基本概况
在JGroups中JChannel类提供了主要的API ,用于连接到集群(cluster)、发送和接收消息(Message)和注册listeners等。
Message包含消息头(保存地址等信息)和一个字节数组(保存希望传输的数据)。org.jgroups.Address接口及其实现类封装了地址信息,它通常包含IP地址和端口号。
连接到集群中的所有实例(instance)被称为一个视图(org.jgroups.View)。通过View.getMembers()可以得到所有实例的地址。
实例只有在连接到集群后才能够发送和接收消息。
以相同name调用JChannel.connect(String name)方法的所有实例会连接到同一个集群。
当实例希望离开集群时,可以调用JChannel.disconnect()方法。当希望释放占有的资源时,可以调用JChannel.close()方法。JChannel.close()方法内部会调用JChannel.disconnect()方法。

通过调用JChannel.setReceiver()方法可以接收消息和得到View改变的通知。每当有实例加入或者离开集群的时候,viewAccepted(View view)方法会被调用。
View.toString()方法会打印出View中所有实例的地址,以及View ID。
需要注意的是,每次viewAccepted(View view)方法被调用时,view参数都不同,其View ID也会增长。
如果没有名字,名字是机器名+随机数,后面跟|,以及自增长的View ID。
View内的第一个实例被称为coordinator。
Receiver接口上的getState(),setState()方法用于在实例间传递状态。
新的实例通过setState()方法获得通过状态,而这个状态是通过调用集群中其它某个实例上的getState()获得的。

3.Chat例子
3.1 实现功能
我们来写一个聊天程序,只支持文本的。我们要实现如下功能
  • 所有的SimpleChat实例可以相互找到并组成一个集群。
  • 没必要创建一个中心的ChatServer,这样就不会有单点故障。
  • 聊天消息将被发到集群中的所有实例。
  • 当一个实例加入或退出(或崩溃)时,其他实例都将得到通知。
  • 我们维护一个集群内的聊天记录state。新加入的实例可以查询聊天记录。


3.2 代码
代码就是官方的例子,我加入了详细的注释。
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.LinkedList;
import java.util.List;

import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.util.Util;

public class SimpleChat extends ReceiverAdapter {
	JChannel channel;
	String user_name = "ABC";
	private List<String> state = new LinkedList<String>();

	private void start() throws Exception {
		channel = new JChannel(); //使用默认配置udp.xml
		channel.setReceiver(this); //指定Receiver用来收消息和得到View改变的通知
		channel.connect("ChatCluster"); //连接到集群
		
		//刚加入集群时,我们通过getState()获取聊天历史记录
		//getState()的第一个参数代表目的地地址,这里传null代表第一个实例(coordinator)
		//第二个参数代表等待超时时间,我们等待10秒。如果时间到了,State传递不过来,会抛例外。也可以传0代表永远等下去
		channel.getState(null, 10000);
		eventLoop();
		channel.close();
	}

	private void eventLoop() {
		BufferedReader in = new BufferedReader(new InputStreamReader(System.in));

		while (true) {
			try {
				System.out.print("> ");
				System.out.flush();
				String line = in.readLine().toLowerCase();
				if (line.startsWith("quit") || line.startsWith("exit")) {
					break;
				}
				line = "[" + user_name + "] " + line;

				//Message构造函数的第一个参数代表目的地地址,这里传null代表要发消息给集群内的所有地址
				//第二个参数表示源地址,传null即可,框架会自动赋值
				//第三个参数line会被序列化成byte[]然后发送,推荐自己序列化而不是用java自带的序列化
				Message msg = new Message(null, null, line);
				channel.send(msg); //发消息到集群

			} catch (Exception e) {
			}
		}
	}
	
	@Override
	//每当有实例加入或者离开集群(或崩溃)的时候,viewAccepted方法会被调用
	public void viewAccepted(View new_view) {
	     System.out.println("** view: " + new_view); 
	}

	@Override
	//有消息时,byte[]会被反序列化成Message对象,也可以用Message.getBuffer得到byte[]然后自己反序列化。
	public void receive(Message msg) {
		String line = msg.getSrc() + ": " + msg.getObject();
	    System.out.println(line);
	    //加入到历史记录
	    synchronized (state) {
	    	state.add(line);
	    }
	}
	
	@Override
	public void getState(OutputStream output) throws Exception {
		//当JChannel.getState()被调用时,某个原来就在集群中的实例的getState会被调用用来得到集群的共享state
		//Util.objectToStream方法将state序列化为output二进制流 
	    synchronized (state) {
	        Util.objectToStream(state, new DataOutputStream(output)); 
	    }
	}
	
	@Override
	public void setState(InputStream input) throws Exception {
		//当以上集群的共享state被得到后,新加入集群的实例的setState方法就会被调用了
		List<String> list = (List<String>) Util.objectFromStream(new DataInputStream(input));
		synchronized (state) {
			state.clear();
			state.addAll(list);
		}

		System.out.println(list.size() + " messages in chat history):");
		for (String str : list) {
			System.out.println(str);
		}
	}
	
	public static void main(String[] args) throws Exception {
		new SimpleChat().start();
	}

}


3.3 功能测试

1.运行该代码3次,开启了3个实例,观察控制台,可以看到每有一个实例加入集群,其他客户端都会得到通知(viewAccepted被调用)。
2.随便哪个客户端发一条消息,其他客户端都能收到这条消息。
3.其中一个客户端输入exit,其他客户端都会得到通知。
4.模拟崩溃,可以杀死某个客户端进程,可以观察到其他客户端可以得到通知。
5.新加入的客户端可以看到聊天历史记录。

3.4 监控测试

为了探索jgroups的内在机理,我们用Process Explorer做另一个测试。
先开启第1个SimpleChat
看到第1台机器的53242开始监听


开启第2个SimpleChat
第1台机器的53244和第2台机器的53243建立连接
第2台机器的53245和第1台机器的53242建立连接


开启第3个SimpleChat
第1台机器的53244和第2台机器的53243建立连接
第2台机器的53247和第3台机器的53246建立连接
第3台机器的53248和第1台机器的53242建立连接


开启第4个SimpleChat
第1台机器的53244和第2台机器的53243建立连接
第2台机器的53247和第3台机器的53246建立连接
第3台机器的53250和第4台机器的53249建立连接
第4台机器的53251和第1台机器的53242建立连接


杀死第3个SimpleChat
第1台机器的53244和第2台机器的53243建立连接
第2台机器的53252和第4台机器的53249建立连接
第4台机器的53251和第1台机器的53242建立连接


我们管中窥豹,略微嗅到了jgroups是怎样实现可靠多播的,就是采用一个环将各个节点连接起来(TCP连接)。


当有一个节点崩溃(Client3),这个环会重新连接成一个新的环。图中的蓝线便是为了修补这个环所建立的新的连接。


图中红色的端口是UDP的意思,这个端口负责多播通讯,图中看可出是45588端口,jgroups.jar包默认的udp.xml印证了这一点。
    <UDP
         mcast_port="${jgroups.udp.mcast_port:45588}" />


4.jgroups的应用
上面例子程序我们已经可以看到,jgroups可以用来做state replication
以下项目场景都使用了jgroups
JBoss Application Server Clustering
OSCache Clustering
Jetty HTTP session replication
Tomcat HTTP session replication

5.参考资料
官方文档部分中文翻译
https://community.jboss.org/wiki/BelaBansJGroupsManualTranslationSerialI-  共4篇
  • 大小: 32.5 KB
  • 大小: 55.8 KB
  • 大小: 86.4 KB
  • 大小: 96.2 KB
  • 大小: 88.9 KB
  • 大小: 9.1 KB
  • 大小: 8 KB
分享到:
评论

相关推荐

    JavaEE源代码 jgroups-2.2.8

    JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 jgroups-2.2.8JavaEE源代码 ...

    jgroups-2.2.7.jar

    jgroups-2.2.7.jar jgroups-2.2.7.jar

    JGroups的Raft实现jgroups-raft.zip

    jgroups-raft 项目是 JGroups 框架对 Raft 的实现。Maven:&lt;groupId&gt;org.jgroups &lt;artifactId&gt;jgroups-raft &lt;version&gt;0.2&lt;/version&gt;Raft 是一个容易理解的共识算法。在容错和性能方面它相当于 Paxos(Google 的一致...

    jgroups.part1

    jgroups.part1

    JGroups_集群.pdf

    JGroups_集群.pdf

    jgroups-3.0.2

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说必须要使用IP Multicast,JGroups也可以使用TCP来实现)。其工作模式基于IP多播,但可以在可靠性和群组...

    jgroups-3.2

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集(需要说明的是,这并不是说必须要使用IP Multicast,JGroups也可以使用TCP来实现)。其工作模式基于IP多播,但可以在可靠性和群组...

    jgroups.part3

    jgroups.part3

    Ehcache通过Jgroups做集群

    Ehcache通过使用Jgroups做集群配置,更改每一个不同的jgroups.xml文件的端口号和IP,如果一台机器就使用127.0.0.1即可。配置好之后,把每台机器起来,就可以测试了。

    jgroups源代码

    jgroups源代码,想要学习jgroups开源框架的童鞋可以看看

    Android代码-jgroups-android

    JGroups - A Framework for Group Communication in Java ======================================================== March 3, 1998 Bela Ban 4114 Upson Hall Cornell University Ithaca, NY 14853 bba@...

    Jgroups 教程

    JGROUPs 的重要用法全部都在里面了

    Jgroups中的UNICAST3协议中文翻译

    Jgroups是一款组播工具,基于IP多播的可靠的组播中间件

    jgroups-2.6.8.GA.jar

    jgroups-2.6.8.GA.jar jgroups-2.6.8.GA.jar

    基于JGroups的共享电子白板系统的研究与实现

    基于JGroups的共享电子白板系统的研究与实现

    Java多播通讯框架 JGroups

    Java多播通讯框架 JGroups

    JGroups(Java多播通讯框架) v4.0.0.CR1.zip

    JGroups(Java多播通讯框架)简介 JGroups是一个可靠的群组通讯Java工具包。它基于IP组播(IP multicast),但在可靠性,组成员管理上对它作了扩展。 JGroups的可靠性体现在: 1,对所有接收者的消息的无丢失传输...

    jgroups的jar

    JGroups是一个开源的纯java编写的可靠的群组通讯工具。其是一个可靠的组播通讯工具集

    Jgroups-all.jar

    JGroup是当前被广泛使用的可靠组间通信的工具之一。例如OSCache以及JBossTreeCache都是用的是JGroup。 JGroup功能十分强大,通过配置各种参数就可以充分利用它所提供的各项功能。JGroup最大的特点就是支持协议栈的...

    jgroups-2.2.8.jar

    jgroups-2.2.8.jar.........

Global site tag (gtag.js) - Google Analytics