RxJava + OkHttp 封装请求和回调的流程

处理异步请求后的回到主线程,再也不用写Handler了,只是关键代码,第一个subscribe和map里面依次放需要异步请求的代码,最后subscribe是主线程


public void  request(final String url, final String method, final AbOkRequestParams params, final AbOkHttpResponseListener responseListener){

        Observable.create(new ObservableOnSubscribe() {

            @Override
            public void subscribe(@NonNull ObservableEmitter emitter){
                try{
                    LogUtil.i(TAG, "[subscribe] 线程:" + Thread.currentThread().getName() + "\n");
                    if (!emitter.isDisposed()){
                        {
                            Request.Builder builder = new Request.Builder();
                            final String requestUrl = setUrlParams(url, method, params);
                            if (params != null && params.getFileParams().size() > 0) {
                                builder.post(getMultipartBody(params, responseListener));
                            } else {
                                //设置参数
                                switch (method) {
                                    case HTTP_GET:
                                        break;
                                    case HTTP_POST:
                                        builder.post(getRequestBody(params));
                                        break;
                                    case HTTP_PUT:
                                        builder.put(getRequestBody(params));
                                        break;
                                    case HTTP_DELETE:
                                        builder.delete(getRequestBody(params));
                                        break;
                                }
                            }

                            builder.url(requestUrl);
                            //设置User-Agent
                            builder.removeHeader("User-Agent").addHeader("User-Agent", getUserAgent());
                            //请求头
                            Iterator iterator = headerMap.entrySet().iterator();
                            while (iterator.hasNext()) {
                                Map.Entry entry = (Map.Entry) iterator.next();
                                String key = (String) entry.getKey();
                                String val = (String) entry.getValue();
                                builder.addHeader(key, val);
                            }

                            final Request request = builder.build();
                            Call call = httpClient.newCall(request);

                            //请求开始
                            Response response = call.execute();
                            if(response.isSuccessful()){
                                emitter.onNext(response);
                            }else{
                                emitter.onError(new IllegalAccessError());
                            }
                        }

                        emitter.onComplete();
                    }
                }catch (Exception e){
                    emitter.onError(e);
                }
            }
        }).map(new Function() {

            @Override
            public T apply(@NonNull Response response) {

                LogUtil.i(TAG, "[Map Apply] 线程:" + Thread.currentThread().getName() + "\n");
                try{
           String result = response.body().string();
                    return (T)result;
                }catch (Exception e){
                    e.printStackTrace();
                }
                return null;
            }
        }).subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer() {

            @Override
            public void onSubscribe(Disposable d) {
               LogUtil.i(TAG, "[onSubscribe] ========================");
            }

            @Override
            public void onNext(T result) {
                LogUtil.i(TAG, "[onNext] 线程:" + Thread.currentThread().getName() + "\n");
                try {
                    responseListener.onSuccess(result);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void onError(Throwable e) {
                LogUtil.i(TAG, "[onError]" + e.getMessage());
                if(StrUtil.isEmpty(e.getMessage())){
                    responseListener.onError(500,"服务连接失败!",null);
                }else{
                    responseListener.onError(500,e.getMessage(),null);
                }

            }

            @Override
            public void onComplete() {
                LogUtil.i(TAG, "[onComplete] ===========================");
                responseListener.onComplete();
            }
        });

}


本站内容来源于作者发布和网络转载,如有版权相关问题请及时与我们取得联系,我们将立即删除。

 关于作者