摘要:微信已經(jīng)開源了,但是市面上相關(guān)的文章較少,即使有也是多在于使用等這些,那么這次我希望能夠從這個直接用于底層通訊的部分進(jìn)行個分析。首先明確下,微信用了的開源協(xié)議庫,來代替和。核心的部分我們先放下,下一篇再深入分析。
微信已經(jīng)開源了mars,但是市面上相關(guān)的文章較少,即使有也是多在于使用xlog等這些,那么這次我希望能夠從stn這個直接用于im底層通訊的部分進(jìn)行個分析。
為了能分析的全面些,我們從samples開始。
首先明確下,微信用了google的開源協(xié)議protobuf庫,來代替json和xml。至于為何使用這個,原因還在于效率和傳輸量上,效率上他能夠比json提升將近10倍,而且基于二進(jìn)制而非文本,傳輸?shù)拇笮「佑袃?yōu)勢,具體的不再累述,有興趣的可以自己查查。
我們從samples開始看看通過http是怎么獲得列表數(shù)據(jù)的,直接看/mars-master/samples/android/marsSampleChat/app/src/main/java/com/tencent/mars/sample/ConversationActivity.java,這個是個初始的列表界面,需要看的就是這個:
/**
* pull conversation list from server
*/
private void updateConversationTopics() {
if (taskGetConvList != null) {
MarsServiceProxy.cancel(taskGetConvList);
}
mTextView.setVisibility(View.INVISIBLE);
progressBar.setVisibility(View.VISIBLE);
swipeRefreshLayout.setRefreshing(true);
taskGetConvList = new NanoMarsTaskWrapper(
new Main.ConversationListRequest(),
new Main.ConversationListResponse()
) {
private List dataList = new LinkedList<>();
@Override
public void onPreEncode(Main.ConversationListRequest req) {
req.type = conversationFilterType;
req.accessToken = ""; // TODO:
Log.d("xxx", "onPreEncode: " + req.toString());
}
@Override
public void onPostDecode(Main.ConversationListResponse response) {
Log.d("xxx", "onPostDecode: " + response.toString());
}
@Override
public void onTaskEnd(int errType, int errCode) {
Log.d("xxx", "onTaskEnd: " + errType + " " + errCode);
runOnUiThread(new Runnable() {
@Override
public void run() {
if (response != null) {
for (Main.Conversation conv : response.list) {
dataList.add(new Conversation(conv.name, conv.topic, conv.notice));
Log.d("xxx", conv.toString());
}
}
if (!dataList.isEmpty()) {
progressBar.setVisibility(View.INVISIBLE);
conversationListAdapter.list.clear();
conversationListAdapter.list.addAll(dataList);
conversationListAdapter.notifyDataSetChanged();
swipeRefreshLayout.setRefreshing(false);
}
else {
Log.i(TAG, "getconvlist: empty response list");
progressBar.setVisibility(View.INVISIBLE);
mTextView.setVisibility(View.VISIBLE);
}
}
});
}
};
MarsServiceProxy.send(taskGetConvList.setHttpRequest(CONVERSATION_HOST, "/mars/getconvlist"));
}
new了一個NanoMarsTaskWrapper對象,并Override了幾個方法:onPreEncode、onPostDecode、onTaskEnd。分別是編碼傳輸前回調(diào),收到結(jié)果解碼后回調(diào),任務(wù)結(jié)束后回調(diào);
設(shè)置NanoMarsTaskWrapper對象的http url地址;
通過MarsServiceProxy的send方法,執(zhí)行發(fā)送;
通過這些,我們可以大體了解到,通過一個內(nèi)置的任務(wù)體系,來進(jìn)行傳輸?shù)呐砂l(fā)調(diào)用的;通過服務(wù)來驅(qū)使整個體系運轉(zhuǎn),并保證獨立性;
其實在目錄中已經(jīng)可以看到了,samples分為2個部分,一個是app,另一個是wrapper,wrapper是jar。
好吧,我們從wrapper入手看下基本結(jié)構(gòu)。
首先是manifest:
可以看到,獨立進(jìn)程的服務(wù)在這里約定了。廣播接受者在這里約定了,與服務(wù)在同一進(jìn)程中。
上面app中使用的MarsServiceProxy是個什么東西呢?
public class MarsServiceProxy implements ServiceConnection {
......
private MarsServiceProxy() {
worker = new Worker();
worker.start();
}
public static void init(Context context, Looper looper, String packageName) {
if (inst != null) {
// TODO: Already initialized
return;
}
gContext = context.getApplicationContext();
gPackageName = (packageName == null ? context.getPackageName() : packageName);
gClassName = SERVICE_DEFUALT_CLASSNAME;
inst = new MarsServiceProxy();
}
......
}
其實是從ServiceConnection繼承下來的服務(wù)連接對象,但是他不僅僅是個連接對象。我們看到,他是個單例,在app的SampleApplicaton的onCreate中進(jìn)行的初始化:
// NOTE: MarsServiceProxy is for client/caller
// Initialize MarsServiceProxy for local client, can be moved to other place
MarsServiceProxy.init(this, getMainLooper(), null);
app中調(diào)用的是send這個靜態(tài)方法:
public static void send(MarsTaskWrapper marsTaskWrapper) {
inst.queue.offer(marsTaskWrapper);
}
其實這個方法在操作的是隊列LinkedBlockingQueue
暫時放一下,我們關(guān)注下他的服務(wù)功能。在構(gòu)造的時候,new了一個Worker,并start了。這個worker就是一個線程:
private static class Worker extends Thread {
@Override
public void run() {
while (true) {
inst.continueProcessTaskWrappers();
try {
Thread.sleep(50);
} catch (InterruptedException e) {
//
}
}
}
}
也就是說,在這個類創(chuàng)建的時候,同時創(chuàng)建了一個工作線程,不斷的以間隔50ms循環(huán)調(diào)用continueProcessTaskWrappers。再看continueProcessTaskWrappers:
private void continueProcessTaskWrappers() {
try {
if (service == null) {
Log.d(TAG, "try to bind remote mars service, packageName: %s, className: %s", gPackageName, gClassName);
Intent i = new Intent().setClassName(gPackageName, gClassName);
gContext.startService(i);
if (!gContext.bindService(i, inst, Service.BIND_AUTO_CREATE)) {
Log.e(TAG, "remote mars service bind failed");
}
// Waiting for service connected
return;
}
MarsTaskWrapper taskWrapper = queue.take();
if (taskWrapper == null) {
// Stop, no more task
return;
}
try {
Log.d(TAG, "sending task = %s", taskWrapper);
final String cgiPath = taskWrapper.getProperties().getString(MarsTaskProperty.OPTIONS_CGI_PATH);
final Integer globalCmdID = GLOBAL_CMD_ID_MAP.get(cgiPath);
if (globalCmdID != null) {
taskWrapper.getProperties().putInt(MarsTaskProperty.OPTIONS_CMD_ID, globalCmdID);
Log.i(TAG, "overwrite cmdID with global cmdID Map: %s -> %d", cgiPath, globalCmdID);
}
service.send(taskWrapper, taskWrapper.getProperties());
} catch (Exception e) { // RemoteExceptionHandler
e.printStackTrace();
}
} catch (Exception e) {
}
}
1.檢查服務(wù)是否啟動,沒有則啟動并返回等待下一個50ms再繼續(xù);
2.從隊列中獲取一個任務(wù),并給他分配一個cmdID,然后調(diào)用MarsService的send方法執(zhí)行真正的發(fā)送事件。
其實從上面看,這個服務(wù)代理就是做了這些事情,更深入的事情其實是交給了具體的服務(wù)進(jìn)程來做的。這里就是個代理api。
好的,我們往下看具體的服務(wù)。
首先MarsService是個aidl的定義,不過我們從上面的這個線程循環(huán)里就可以看到,啟動的服務(wù)是根據(jù)Intent i = new Intent().setClassName(gPackageName, gClassName);啟動的,這個gClassName = SERVICE_DEFUALT_CLASSNAME;就是public static final String SERVICE_DEFUALT_CLASSNAME = "com.tencent.mars.sample.wrapper.service.MarsServiceNative";看到了吧,就是MarsServiceNative。
現(xiàn)在起進(jìn)入到服務(wù)里面。
public class MarsServiceNative extends Service implements MarsService {
private static final String TAG = "Mars.Sample.MarsServiceNative";
private MarsServiceStub stub;
......
}
這里保存了一個MarsServiceStub,后面的send都是調(diào)用他來實現(xiàn)的,現(xiàn)在暫時先放下send,看下onCreate:
@Override
public void onCreate() {
super.onCreate();
final MarsServiceProfile profile = gFactory.createMarsServiceProfile();
stub = new MarsServiceStub(this, profile);
// set callback
AppLogic.setCallBack(stub);
StnLogic.setCallBack(stub);
SdtLogic.setCallBack(stub);
// Initialize the Mars PlatformComm
Mars.init(getApplicationContext(), new Handler(Looper.getMainLooper()));
// Initialize the Mars
StnLogic.setLonglinkSvrAddr(profile.longLinkHost(), profile.longLinkPorts());
StnLogic.setShortlinkSvrAddr(profile.shortLinkPort());
StnLogic.setClientVersion(profile.productID());
Mars.onCreate(true);
StnLogic.makesureLongLinkConnected();
//
Log.d(TAG, "mars service native created");
}
1.創(chuàng)建配置信息類MarsServiceProfile;
2.new出MarsServiceStub來;
3.設(shè)置各種回調(diào);
4.初始化Mars;
5.Mars.onCreate(true);
6.StnLogic.makesureLongLinkConnected();確認(rèn)長連接。
這里開始用到了Mars了,這個才是核心,并且不在這個工程中。核心的部分我們先放下,下一篇再深入分析。
回到MarsServiceStub,看他的send方法:
@Override
public void send(final MarsTaskWrapper taskWrapper, Bundle taskProperties) throws RemoteException {
final StnLogic.Task _task = new StnLogic.Task(StnLogic.Task.EShort, 0, "", null);
// Set host & cgi path
final String host = taskProperties.getString(MarsTaskProperty.OPTIONS_HOST);
final String cgiPath = taskProperties.getString(MarsTaskProperty.OPTIONS_CGI_PATH);
_task.shortLinkHostList = new ArrayList<>();
_task.shortLinkHostList.add(host);
_task.cgi = cgiPath;
final boolean shortSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, true);
final boolean longSupport = taskProperties.getBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, false);
if (shortSupport && longSupport) {
_task.channelSelect = StnLogic.Task.EBoth;
} else if (shortSupport) {
_task.channelSelect = StnLogic.Task.EShort;
} else if (longSupport) {
_task.channelSelect = StnLogic.Task.ELong;
} else {
Log.e(TAG, "invalid channel strategy");
throw new RemoteException("Invalid Channel Strategy");
}
// Set cmdID if necessary
int cmdID = taskProperties.getInt(MarsTaskProperty.OPTIONS_CMD_ID, -1);
if (cmdID != -1) {
_task.cmdID = cmdID;
}
TASK_ID_TO_WRAPPER.put(_task.taskID, taskWrapper);
WRAPPER_TO_TASK_ID.put(taskWrapper, _task.taskID);
// Send
Log.i(TAG, "now start task with id %d", _task.taskID);
StnLogic.startTask(_task);
if (StnLogic.hasTask(_task.taskID)) {
Log.i(TAG, "stn task started with id %d", _task.taskID);
} else {
Log.e(TAG, "stn task start failed with id %d", _task.taskID);
}
}
1.new一個StnLogic.Task;
2.設(shè)置task的參數(shù),根據(jù)入口的Bundle;
3.2個map保存taskID與task的關(guān)系;
4.StnLogic.startTask(_task);啟動任務(wù)執(zhí)行;
這里的內(nèi)容又深入到了Mars核心里,可以看到,關(guān)鍵的處理都是在Mars核心部分完成的,這里的內(nèi)容甭管是服務(wù)還是什么都是在做參數(shù)的傳遞及關(guān)系的維護(hù)等工作。
好吧,我們倒帶回來,回到MarsServiceStub,他實現(xiàn)了StnLogic.ICallBack這個interface。定義在mars里:
public interface ICallBack {
/**
* SDK要求上層做認(rèn)證操作(可能新發(fā)起一個AUTH CGI)
* @return
*/
boolean makesureAuthed();
/**
* SDK要求上層做域名解析.上層可以實現(xiàn)傳統(tǒng)DNS解析,或者自己實現(xiàn)的域名/IP映射
* @param host
* @return
*/
String[] onNewDns(final String host);
/**
* 收到SVR PUSH下來的消息
* @param cmdid
* @param data
*/
void onPush(final int cmdid, final byte[] data);
/**
* SDK要求上層對TASK組包
* @param taskID 任務(wù)標(biāo)識
* @param userContext
* @param reqBuffer 組包的BUFFER
* @param errCode 組包的錯誤碼
* @return
*/
boolean req2Buf(final int taskID, Object userContext, ByteArrayOutputStream reqBuffer, int[] errCode, int channelSelect);
/**
* SDK要求上層對TASK解包
* @param taskID 任務(wù)標(biāo)識
* @param userContext
* @param respBuffer 要解包的BUFFER
* @param errCode 解包的錯誤碼
* @return int
*/
int buf2Resp(final int taskID, Object userContext, final byte[] respBuffer, int[] errCode, int channelSelect);
/**
* 任務(wù)結(jié)束回調(diào)
* @param taskID 任務(wù)標(biāo)識
* @param userContext
* @param errType 錯誤類型
* @param errCode 錯誤碼
* @return
*/
int onTaskEnd(final int taskID, Object userContext, final int errType, final int errCode);
/**
* 流量統(tǒng)計
* @param send
* @param recv
*/
void trafficData(final int send, final int recv);
/**
* 連接狀態(tài)通知
* @param status 綜合狀態(tài),即長連+短連的狀態(tài)
* @param longlinkstatus 僅長連的狀態(tài)
*/
void reportConnectInfo(int status, int longlinkstatus);
/**
* SDK要求上層生成長鏈接數(shù)據(jù)校驗包,在長鏈接連接上之后使用,用于驗證SVR身份
* @param identifyReqBuf 校驗包數(shù)據(jù)內(nèi)容
* @param hashCodeBuffer 校驗包的HASH
* @param reqRespCmdID 數(shù)據(jù)校驗的CMD ID
* @return ECHECK_NOW(需要校驗), ECHECK_NEVER(不校驗), ECHECK_NEXT(下一次再詢問)
*/
int getLongLinkIdentifyCheckBuffer(ByteArrayOutputStream identifyReqBuf, ByteArrayOutputStream hashCodeBuffer, int[] reqRespCmdID);
/**
* SDK要求上層解連接校驗回包.
* @param buffer SVR回復(fù)的連接校驗包
* @param hashCodeBuffer CLIENT請求的連接校驗包的HASH值
* @return
*/
boolean onLongLinkIdentifyResp(final byte[] buffer, final byte[] hashCodeBuffer);
/**
* 請求做sync
*/
void requestDoSync();
String[] requestNetCheckShortLinkHosts();
/**
* 是否登錄
* @return true 登錄 false 未登錄
*/
boolean isLogoned();
void reportTaskProfile(String taskString);
}
可以看到都是回調(diào),通過mars的回調(diào),MarsServiceStub接收到了taskend,并執(zhí)行了:
@Override
public int onTaskEnd(int taskID, Object userContext, int errType, int errCode) {
final MarsTaskWrapper wrapper = TASK_ID_TO_WRAPPER.remove(taskID);
if (wrapper == null) {
Log.w(TAG, "stn task onTaskEnd callback may fail, null wrapper, taskID=%d", taskID);
return 0; // TODO: ???
}
try {
wrapper.onTaskEnd(errType, errCode);
} catch (RemoteException e) {
e.printStackTrace();
} finally {
WRAPPER_TO_TASK_ID.remove(wrapper); // onTaskEnd will be called only once for each task
}
return 0;
}
從map中移除task,然后執(zhí)行了task自己的onTaskEnd。這樣我們正最初的updateConversationTopics里就可以看到后續(xù)的更新ui的代碼。
下面我們要回到updateConversationTopics附近,看看NanoMarsTaskWrapper:
public abstract class NanoMarsTaskWrapperextends AbstractTaskWrapper { private static final String TAG = "Mars.Sample.NanoMarsTaskWrapper"; protected T request; protected R response; public NanoMarsTaskWrapper(T req, R resp) { super(); this.request = req; this.response = resp; } @Override public byte[] req2buf() { try { onPreEncode(request); final byte[] flatArray = new byte[request.getSerializedSize()]; final CodedOutputByteBufferNano output = CodedOutputByteBufferNano.newInstance(flatArray); request.writeTo(output); Log.d(TAG, "encoded request to buffer, [%s]", MemoryDump.dumpHex(flatArray)); return flatArray; } catch (Exception e) { e.printStackTrace(); } return new byte[0]; } @Override public int buf2resp(byte[] buf) { try { Log.d(TAG, "decode response buffer, [%s]", MemoryDump.dumpHex(buf)); response = MessageNano.mergeFrom(response, buf); onPostDecode(response); return StnLogic.RESP_FAIL_HANDLE_NORMAL; } catch (Exception e) { Log.e(TAG, "%s", e); } return StnLogic.RESP_FAIL_HANDLE_TASK_END; } public abstract void onPreEncode(T request); public abstract void onPostDecode(R response);
}
1.從AbstractTaskWrapper繼承下來;
2.保存了request和response,都是MessageNano類型的(google的protobuf內(nèi)的message數(shù)據(jù)類);
3.實現(xiàn)了2個接口,分別用來作為request轉(zhuǎn)換為buf何buf轉(zhuǎn)換成為response。其實就是對象轉(zhuǎn)成byte[],byte轉(zhuǎn)成對象;
3.在req2buf轉(zhuǎn)換的過程中,調(diào)用了request的writeTo方法;
4.在buf2resp中,調(diào)用了MessageNano.mergeFrom,實際上最終也是調(diào)用了response的mergeFrom,見下:
/**
* Parse {@code data} as a message of this type and merge it with the
* message being built.
*/
public static final T mergeFrom(T msg, final byte[] data)
throws InvalidProtocolBufferNanoException {
return mergeFrom(msg, data, 0, data.length);
}
根據(jù)上面的4點可以看到這是個實現(xiàn)序列化及反序列化的過程。google的開源protobuf我們不去關(guān)注,但是需要了解的是他是通過以proto為后綴名的配置文件來達(dá)到編譯時即可生成類的相關(guān)代碼的程度。
那么這個AbstractTaskWrapper的基類的作用又是什么呢?
public abstract class AbstractTaskWrapper extends MarsTaskWrapper.Stub {
private Bundle properties = new Bundle();
public AbstractTaskWrapper() {
// Reflects task properties
final TaskProperty taskProperty = this.getClass().getAnnotation(TaskProperty.class);
if (taskProperty != null) {
setHttpRequest(taskProperty.host(), taskProperty.path());
setShortChannelSupport(taskProperty.shortChannelSupport());
setLongChannelSupport(taskProperty.longChannelSupport());
setCmdID(taskProperty.cmdID());
}
}
@Override
public Bundle getProperties() {
return properties;
}
@Override
public abstract void onTaskEnd(int errType, int errCode);
public AbstractTaskWrapper setHttpRequest(String host, String path) {
properties.putString(MarsTaskProperty.OPTIONS_HOST, ("".equals(host) ? null : host));
properties.putString(MarsTaskProperty.OPTIONS_CGI_PATH, path);
return this;
}
public AbstractTaskWrapper setShortChannelSupport(boolean support) {
properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_SHORT_SUPPORT, support);
return this;
}
public AbstractTaskWrapper setLongChannelSupport(boolean support) {
properties.putBoolean(MarsTaskProperty.OPTIONS_CHANNEL_LONG_SUPPORT, support);
return this;
}
public AbstractTaskWrapper setCmdID(int cmdID) {
properties.putInt(MarsTaskProperty.OPTIONS_CMD_ID, cmdID);
return this;
}
@Override
public String toString() {
return "AbsMarsTask: " + BundleFormat.toString(properties);
}
}
很簡單,就是提供了一些接口來設(shè)置傳輸協(xié)議類型,長短連接、http等。
綜合來說,這個demo使用了獨立的服務(wù)框架來進(jìn)行傳輸?shù)谋WC;使用了任務(wù)體系來承載每次傳輸及響應(yīng);大量的回調(diào)來監(jiān)控運轉(zhuǎn)過程中的各項關(guān)鍵點;封裝了獨立的jar wrapper,便于上層的更改及使用;獨立的配置類引入支持http和tcp長短連接的使用;protobuf的引入極大提升序列化及反序列化的效率,并降低傳輸?shù)臄?shù)據(jù)大??;
這篇暫時就到這里吧,后面我們會深入分析下mars的核心部分。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://hztianpu.com/yun/66978.html
摘要:本來是想直接深入到的核心層去看的,但是發(fā)現(xiàn)其實上面的部分還有好些沒有分析到,因此回來繼續(xù)分析。另外一個,是專用于統(tǒng)計的,我們暫時不去關(guān)注。具體的內(nèi)容我會在后面的核心層分析的時候指出。準(zhǔn)備下一篇進(jìn)行的核心層分析吧。 本來是想直接深入到mars的核心層去看的,但是發(fā)現(xiàn)其實上面的samples部分還有好些沒有分析到,因此回來繼續(xù)分析。ConversationActivity這個類中實際上還做...
摘要:執(zhí)行并根據(jù)每個連接的狀態(tài)決定后續(xù)處理,上篇已經(jīng)講過,不再累述。上面的三段處理完畢后,應(yīng)該是數(shù)組中不再有連接才對,這里的保險處理是對數(shù)組再進(jìn)行檢查。至此跳出,算是整個連接過程完畢了。這里需要逐句分析,首先是。 最近回顧之前的文章,發(fā)現(xiàn)最后一篇有些著急了,很多地方?jīng)]有敘述清楚。這里先做個銜接吧。我們還是以長連接為例,從longlink.cc看起。首先是那個線程函數(shù)__Run:/mars-m...
摘要:作為微信的終端數(shù)據(jù)庫,從開源至今,共迭代了個版本。微信也轉(zhuǎn)向開發(fā)了嗎相信這會是大家非常關(guān)心的問題。不僅微信,國內(nèi)外大部分都還沒有完全轉(zhuǎn)向,但顯然這是個趨勢。另一方面,沒有微信的上線機制的保護(hù)和龐大的用戶量的驗證,我們需要確保的穩(wěn)定性。 WCDB 作為微信的終端數(shù)據(jù)庫,從 2017.6 開源至今,共迭代了 5 個版本。我們一直關(guān)注開發(fā)者們的需求,并不斷優(yōu)化性能,新增如全文搜索等常用的功能...
閱讀 2470·2023-04-25 14:29
閱讀 1643·2021-11-22 09:34
閱讀 2847·2021-11-22 09:34
閱讀 3531·2021-11-11 10:59
閱讀 1997·2021-09-26 09:46
閱讀 2387·2021-09-22 16:03
閱讀 2080·2019-08-30 12:56
閱讀 600·2019-08-30 11:12