今天遇到了一個(gè)廣告網(wǎng)絡(luò)比較現(xiàn)實(shí)的需求,如下:
最為一個(gè)廣告服務(wù)端,可以從publisher的app接收到很多的加載廣告的請(qǐng)求。。。這個(gè)時(shí)候可以將這些請(qǐng)求的數(shù)據(jù)發(fā)給一些中間的機(jī)構(gòu)(exchange),然后由他們返回廣告的數(shù)據(jù)。。。因?yàn)檎?qǐng)求量較大,而且要保證延遲不能太高,所以這里與這些中間機(jī)構(gòu)進(jìn)行通信的時(shí)候就只能采用長(zhǎng)連接的方式了,不能每次請(qǐng)求都生成一次連接來進(jìn)行http請(qǐng)求。。。
其實(shí)以前一般用netty開發(fā)都是作為服務(wù)端來的大致思路如下:
?。?)創(chuàng)建一個(gè)eventLoopGroup,用于維護(hù)nio的io事件
?。?)創(chuàng)建一個(gè)niosocketchanel,然后將其注冊(cè)到eventLoopGroup上面去,并未channel設(shè)置初始化的handler
?。?)調(diào)用channel的connect方法發(fā)起與遠(yuǎn)端的連接請(qǐng)求
?。?)當(dāng)鏈接建立以后,剛剛提到的初始化handler將會(huì)用于響應(yīng),為channel添加http的decode與encode的handler。。
?。?)最后就可以開始進(jìn)行http通信了。。
當(dāng)然這里就不要主動(dòng)的去斷開channel了,斷開還是讓對(duì)方的服務(wù)器去做吧,或者超時(shí),或者什么的。。反正客戶端不會(huì)主動(dòng)斷開。。。
其實(shí)只要上面的步驟想出來,接下來寫代碼用netty來實(shí)現(xiàn)一個(gè)基于長(zhǎng)連接的http客戶端還算是很簡(jiǎn)單 的。。直接上代碼吧:
?。?a href="http://www.wenjunhu.com/v/tag/852/" target="_blank">java] view plain copypackage fjs;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringEncoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
public class Fjs {
public static AtomicInteger number = new AtomicInteger(0);
public static AtomicLong time = new AtomicLong(0);
public static void doIt(Channel channel) {
if (number.get() 《 50) {
number.incrementAndGet();
time.set(System.currentTimeMillis());
QueryStringEncoder encoder = new QueryStringEncoder(“http://www.baidu.com/oapi/reqAd.jsp?pub=923875870&adspace=65826983&adcount=1&response=HTML&devip=22.56.22.66&user=900&format=IMG&position=top&height=&width=&device=Mozilla%2F5.0%20%28Linux%3B%20Android%204.2.1%3B%20en-us%3B%20Nexus%204%20Build%2FJOP40D%29%20AppleWebKit%2F535.19%20%28KHTML%2C%20like%20Gecko%29%20Chrome%2F18.0.1025.166%20Mobile%20Safari%2F535.19&beacon=TRUE&phpsnip=104”);
URI uriGet = null;
try {
uriGet = new URI(encoder.toString());
} catch (URISyntaxException e) {
System.out.println(“我擦,,,,”);
}
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uriGet.toASCIIString());
channel.pipeline().write(request);
channel.flush();
} else {
System.out.println(“over”);
}
}
public static void main(String args[]) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
NioSocketChannel channel = new NioSocketChannel(); //創(chuàng)建一個(gè)channel,待會(huì)用它來發(fā)起鏈接
channel.pipeline().addFirst(new InitHandler()); //為這個(gè)channel添加一個(gè)初始化的handler,用于響應(yīng)待會(huì)channel建立成功
group.register(channel); //注冊(cè)這個(gè)channel
channel.connect(new InetSocketAddress(“www.baidu.com”, 80)); //調(diào)用connect方法
Thread.currentThread().sleep(Long.MAX_VALUE);
}
public static class InitHandler implements ChannelInboundHandler {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
}
public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
public void channelUnregistered(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
// 當(dāng)連接建立成功之后會(huì)調(diào)用這個(gè)方法初始化channel
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
ctx.channel().pipeline().remove(this); //嗯,當(dāng)前這個(gè)handler對(duì)這個(gè)channel就算是沒有用了,可以移除了。。。
ctx.channel().pipeline().addFirst(new HttpClientCodec()); //添加一個(gè)http協(xié)議的encoder與decoder
ctx.channel().pipeline().addLast(new ReponseHandler()); //添加用于處理http返回信息的handler
Fjs.doIt(ctx.channel());
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO Auto-generated method stub
System.out.println(“disconnect ” + System.currentTimeMillis() / 1000);
}
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
// TODO Auto-generated method stub
System.out.println(“read ” + System.currentTimeMillis() / 1000);
}
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
// TODO Auto-generated method stub
}
public void channelWritabilityChanged(ChannelHandlerContext ctx)
throws Exception {
// TODO Auto-generated method stub
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
// TODO Auto-generated method stub
System.out.println(“error ” + System.currentTimeMillis() / 1000);
}
}
}
這部分的內(nèi)容基本上就囊括了上面提到的所有的步驟。。。而且寫了一個(gè)發(fā)起http請(qǐng)求的靜態(tài)方法。。。到這里基本上一個(gè)基于長(zhǎng)連接的http客戶端就算差不多了。。。最后再來一個(gè)響應(yīng)httpresponse的handler吧:
?。踛ava] view plain copypackage fjs;
import java.nio.charset.Charset;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
public class ReponseHandler extends ChannelInboundHandlerAdapter{
ByteBuf buf = Unpooled.buffer();
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
DefaultHttpResponse response = (DefaultHttpResponse)msg;
}
if (msg instanceof HttpContent) {
DefaultLastHttpContent chunk = (DefaultLastHttpContent)msg;
buf.writeBytes(chunk.content());
if (chunk instanceof LastHttpContent) {
long now = System.currentTimeMillis();
long before = Fjs.time.get();
System.out.println(((double) now - (double)before) / 1000);
String xml = buf.toString(Charset.forName(“UTF-8”));
buf.clear();
Fjs.doIt(ctx.channel());
}
}
}
}
評(píng)論
查看更多