RxJava + OkHttp 封装请求和回调的流程
处理异步请求后的回到主线程,再也不用写Handler了,只是关键代码,第一个subscribe和map里面依次放需要异步请求的代码,最后subscribe是主线程。
public void request(final String url, final String method, final AbOkRequestParams params, final AbOkHttpResponseListener responseListener){
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter emitter){
try{
LogUtil.i(TAG, "[subscribe] 线程:" + Thread.currentThread().getName() + "\n");
if (!emitter.isDisposed()){
{
Request.Builder builder = new Request.Builder();
final String requestUrl = setUrlParams(url, method, params);
if (params != null && params.getFileParams().size() > 0) {
builder.post(getMultipartBody(params, responseListener));
} else {
//设置参数
switch (method) {
case HTTP_GET:
break;
case HTTP_POST:
builder.post(getRequestBody(params));
break;
case HTTP_PUT:
builder.put(getRequestBody(params));
break;
case HTTP_DELETE:
builder.delete(getRequestBody(params));
break;
}
}
builder.url(requestUrl);
//设置User-Agent
builder.removeHeader("User-Agent").addHeader("User-Agent", getUserAgent());
//请求头
Iterator iterator = headerMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry entry = (Map.Entry) iterator.next();
String key = (String) entry.getKey();
String val = (String) entry.getValue();
builder.addHeader(key, val);
}
final Request request = builder.build();
Call call = httpClient.newCall(request);
//请求开始
Response response = call.execute();
if(response.isSuccessful()){
emitter.onNext(response);
}else{
emitter.onError(new IllegalAccessError());
}
}
emitter.onComplete();
}
}catch (Exception e){
emitter.onError(e);
}
}
}).map(new Function() {
@Override
public T apply(@NonNull Response response) {
LogUtil.i(TAG, "[Map Apply] 线程:" + Thread.currentThread().getName() + "\n");
try{
String result = response.body().string();
return (T)result;
}catch (Exception e){
e.printStackTrace();
}
return null;
}
}).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
LogUtil.i(TAG, "[onSubscribe] ========================");
}
@Override
public void onNext(T result) {
LogUtil.i(TAG, "[onNext] 线程:" + Thread.currentThread().getName() + "\n");
try {
responseListener.onSuccess(result);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable e) {
LogUtil.i(TAG, "[onError]" + e.getMessage());
if(StrUtil.isEmpty(e.getMessage())){
responseListener.onError(500,"服务连接失败!",null);
}else{
responseListener.onError(500,e.getMessage(),null);
}
}
@Override
public void onComplete() {
LogUtil.i(TAG, "[onComplete] ===========================");
responseListener.onComplete();
}
});
}
本站内容来源于作者发布和网络转载,如有版权相关问题请及时与我们取得联系,我们将立即删除。