湖畔镇

DLNA开源库——Cling

DLNA开源库——Cling

目前主流的智能网络设备解决方案有DLNAAirPlayMiracast

DLNA的全称是Digital Living Network Alliance(数字生活网络联盟),旨在解决个人PC,消费电器,移动设备在内的无线网络和有线网络的互联互通

DLNA将其整个应用规定成5个功能组件,从下到上依次为:

  1. 网络互连:802.3以太网,802.11WiFi,802.15蓝牙
  2. 网络协议:IPV4
  3. 设备的发现控制和管理:UPnP
  4. 媒体传输:Http
  5. 媒体格式

DLNA不是一种协议,但包括了实现相关标准所需要的一系列协议栈,UPnP是其中的关键协议

DLNA原理

参考基于DLNA的移动端网络视频投屏技术初探

要实现从移动端将网络视频投放至智能电视或机顶盒,首先要保证这些设备在同一个局域网的相同网段下,即共享同一个网关,这样所有设备都能够拥有独立的IP,从而具备相互通信的基础了

设备发现

当一个新的Control Point)加入一个局域网时,为了获取当前网段里都有哪些智能设备,需要遵循SSDP向默认多播IP和端口发送获取信息的请求,这是一个UDP消息,所以建议在设备搜索过程多做几次发现请求,以免丢包带来的遗漏

控制点可以获得的信息是:有一台设备,它的IP和端口号,设备描述文档(DDD)在什么位置、UUID是什么

请求设备描述文档

DDD以及后面要说到的服务描述文档 (SDD, Service Description Document)都是以XML格式返回给请求端的,这一步的通信则是基于TCP /HTTP进行可靠传输的。我们请求Location字段的内容即

1
http://10.2.9.152:49152/TxMediaRenderer_desc.xml

响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<root xmlns="urn:schemas-upnp-org:device-1-0" xmlns:dlna="urn:schemas-dlna-org:device-1-0" configId="499354">
<specVersion>
<major>1</major>
<minor>1</minor>
</specVersion>
<device>
<deviceType>urn:schemas-upnp-org:device:MediaRenderer:1</deviceType>
<friendlyName>我的设备</friendlyName>
<manufacturer>Plutinosoft LLC</manufacturer>
<manufacturerURL>http://www.plutinosoft.com</manufacturerURL>
<modelDescription>Plutinosoft AV Media Renderer Device</modelDescription>
<modelName>AV Renderer Device</modelName>
<modelURL>http://www.plutinosoft.com/platinum</modelURL>
<UDN>uuid:9c443d47158b-dmr</UDN>
<dlna:X_DLNADOC xmlns:dlna="urn:schemas-dlna-org:device-1-0">DMR-1.50</dlna:X_DLNADOC>
<serviceList>
<service>
<serviceType>urn:schemas-upnp-org:service:AVTransport:1</serviceType>
<serviceId>urn:upnp-org:serviceId:AVTransport</serviceId>
<SCPDURL>/AVTransport/9c443d47158b-dmr/scpd.xml</SCPDURL>
<controlURL>/AVTransport/9c443d47158b-dmr/control.xml</controlURL>
<eventSubURL>/AVTransport/9c443d47158b-dmr/event.xml</eventSubURL>
</service>
...
</serviceList>
</device>
</root>

返回设备的详细信息:设备名称、设备类型、UUID、服务列表,每个服务都有serviceTypeSCPDURLserviceIdcontrolURLeventSubURL

请求服务描述文档

如何使用这个服务需要参考该服务的SDDSCPDURL这个字段的内容就是请求SDD的路径地址,我们将其与之前在发现设备阶段获取到的响应消息中的Location字段内容中设备的IP和端口号拿过来,拼接成完整URL字符串

1
http://10.2.9.152:49152/AVTransport/9c443d47158b-dmr/scpd.xml

返回一个动作列表,一个服务会包含一个或多个功能请求动作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<scpd xmlns="urn:schemas-upnp-org:service-1-0">
<specVersion>
<major>1</major>
<minor>0</minor>
</specVersion>
<actionList>
<action>
<name>SetAVTransportURI</name>
<argumentList>
<argument>
<name>InstanceID</name>
<direction>in</direction>
<relatedStateVariable>A_ARG_TYPE_InstanceID</relatedStateVariable>
</argument>
<argument>
<name>CurrentURI</name>
<direction>in</direction>
<relatedStateVariable>AVTransportURI</relatedStateVariable>
</argument>
<argument>
<name>CurrentURIMetaData</name>
<direction>in</direction>
<relatedStateVariable>AVTransportURIMetaData</relatedStateVariable>
</argument>
</argumentList>
</action>
...
</actionList>
<serviceStateTable>
<stateVariable sendEvents="no">
<name>AVTransportURI</name>
<dataType>string</dataType>
</stateVariable>
...
</serviceStateTable>
</scpd>

SetAVTransportURI这个请求的功能是将一个音视频资源的URI发送给渲染端。一个Action就好比一个API请求,你还需要传递一些要求的参数,这时就会用到该Action后面参数列表里规定的参数

服务动作请求

有了动作所需要的全部信息,就可以按照DLNA规定的方式发给设备请求服务

1
2
3
4
5
6
7
8
9
10
11
12
13
POST /AVTransport/9c443d47158b-dmr/control.xml HTTP/1.1
HOST: 10.2.9.152
Content-Type: text/xml; charset="utf-8"
SOAPAction: "urn:schemas-upnp-org:service:AVTransport:1#SetAVTransportURI"
<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:SetAVTransportURI xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<InstanceID>0</InstanceID>
<CurrentURI>yourAVURI</CurrentURI>
</u:SetAVTransportURI>
</s:Body>
</s:Envelope>

响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
HTTP/1.1 200 OK
Content-Type: text/xml; charset="utf-8"
Date: Thu, 16 Feb 2017 09:09:45 GMT
Server: OS/version UPnP/1.1 product/version
<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:SetAVTransportURI xmlns:u="urn:schemas-upnp-org:service:AVTransport:1">
<u:SetAVTransportURIResponse>
<_xmlns:u>"urn:schemas-upnp-org:service:AVTransport:1"</_xmlns:u>
</u:SetAVTransportURIResponse>
</u:SetAVTransportURI>
</s:Body>
</s:Envelope>

UPnP工作原理

UPnP的工作分为6个步骤……

寻址

当设备首次与网络建立连接后,利用DHCP服务,使设备得到一个IP地址。这个IP地址可以是DHCP系统指定的,也可以是由设备选择的。设备还可以使用Friendly Name,这就需要DNS来转换得到IP

发现

当一个设备被添加到网络后,UPnP的发现协议允许该设备向网络上的控制点(Control Point, CP)通知自己拥有的服务。同样,当一个CP被添加到网络后,UPnP发现协议允许该CP搜索网络上可用的设备。这两种情况下的组播消息一般是设备和服务的基本信息,如设备类型,唯一标识符,状态参数等等。要注意设备信息和服务信息都是要组播出去的

发现过程使用的协议是SSDP(Simple Service Discovery Protocol,简单服务发现协议),采用UDP传输

描述

描述分为两部分:一个是设备描述,另一个是服务描述

控制

在设备描述部分,还有关于如何控制设备的描述,会给出一个Control URL,CP可以向这个URL发送不同的控制信息就可以控制了,然后设备也可以返回一个信息反馈。

这种CP和设备之间沟通信息按照SOAP(Simple Object Access Protocol)的格式来写。消息体里面就可以写想调用的动作了,叫做Action Invocation,可能还要传参数,比如想播放一个视频,要把视频的URL传过去,设备收到后会响应,表示能不能执行调用,出错的话会返回一个错误代码

事件

在服务进行的整个时间内,只要变量值发生了变化或者模式的状态发生了改变,就产生了一个事件,该事件服务提供者(某设备的某个服务)会把该事件向整个网络进行多播。而且,CP也可以事先向事件服务器订阅事件信息,保证将该CP感兴趣的事件及时准确地单播传送过来

事件的订阅和推送这块用的通信协议是General Event Notification Architecture(GENA),通过HTTP/TCP/IP传送。订阅过程如下:

  1. 订阅。Subscriber发送订阅消息主要包含事件URL,服务ID号,这两个可以在设备服务描述信息中找到,以及寄送URL,还会包含一个订阅期限Duration
  2. 成功订阅。Publisher收到订阅信息,如果同意订阅的话就会为每个新Subscriber生成一个唯一的ID并记录Subscriber的Duration和Delivery URL。还会记录一个顺序增长EventKey用来保证事件确实推送到Subscriber那里
  3. 首次推送。订阅同意订阅之后还会向Subscriber发送一组初始变量或状态值,进行首次同步
  4. 续订。Subscriber必须在订阅到期前发送Renewal续订
  5. 订阅到期。订阅到期后Publisher会把Subscriber的信息删除,Subscriber又回到订阅前的状态。
  6. 退订。Subscriber发送Cancel信息将会取消订阅。Subscriber因非正常退出网络的话,则不会退订直到订阅到期
  7. 订阅操作失败信息。当订阅、续订和退订不能被Publisher接收或者出现错误时,Publisher会发送一个错误代码

表达

只要得到了设备的URL,就可以取得该设备表达的URL,取得该设备表达的HTML,然后可以将此HTML纳入CP的本地浏览器上。这部分还包括与用户对话的界面,以及与用户进行会话的处理。因此设备表达可以理解成“遥控器”。这部分定义描述界面,规范界面以及传输界面内容。远程界面是供CP用户使用的,CP用户通过远程界面完成设备描述的获取,控制设备,订阅收取设备事件等等

Cling

Cling类库是由JAVA实现的DLNA/UPnP协议栈

重要组件

UpnpService

通过UpnpService接口获得ControlPoint,基于ControlPoint进行操作

UpnpService.java

1
2
3
4
5
6
7
8
public interface UpnpService {
UpnpServiceConfiguration getConfiguration();
ControlPoint getControlPoint();
ProtocolFactory getProtocolFactory();
Registry getRegistry();
Router getRouter();
void shutdown();
}

主要提供控制点、协议工厂、注册表、路由和配置信息,UpnpServiceImplUpnpService的实现类

ControlPoint

ControlPoint.java

1
2
3
4
5
6
7
8
9
10
11
public interface ControlPoint {
UpnpServiceConfiguration getConfiguration();
ProtocolFactory getProtocolFactory();
Registry getRegistry();
void search();
void search(UpnpHeader searchType);
void search(int mxSeconds);
void search(UpnpHeader searchType, int mxSeconds);
void execute(ActionCallback callback);
void execute(SubscriptionCallback callback);
}

主要包含搜索和执行命令两个功能

UpnpServiceConfiguration

UPnP协议栈的配置信息,AndroidUpnpServiceConfiguration是Android平台上的实现类,继承了DefaultUpnpServiceConfiguration

ProtocoFactory

UPnP协议的工厂类,用于根据收到的协议或是本地设备的信息,创建一个可执行的协议

Registry

设备资源管理器,用于设备、资源、订阅消息的管理,包括添加、更新、移除、查询

情景分析

搜索设备

ControlPoint()进行search(),需要两个东西:

  1. 通过UpnpServiceConfiguration.getAsyncProtocolExecutor()获得的UPnP协议栈的异步线程池
  2. 通过ProtocoFactory.createSendingSearch()获得的执行内容即搜索动作

得到的搜索命令就是SendingSearch

SendingSearch.java

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected void execute() {
OutgoingSearchRequest msg = new OutgoingSearchRequest(searchTarget, getMxSeconds());
for (int i = 0; i < getBulkRepeat(); i++) {
try {
getUpnpService().getRouter().send(msg);
Thread.sleep(getBulkIntervalMilliseconds());
} catch (InterruptedException ex) {

}
}
}

可见搜索就是是构造了一个OutgoingSearchRequest,它是一个OutgoingDatagramMessage,可见是一个UDP数据报消息,然后循环发送了若干次

Router.send() 开始了网络传输过程

RouterImpl.java

1
2
3
4
5
6
@Override
public void send(OutgoingDatagramMessage msg){
for (DatagramIO datagramIO : getDatagramIOs().values()) {
datagramIO.send(msg);
}
}

在所有的UDP端口上发送消息

DatagramIOImpl.java

1
2
3
4
synchronized public void send(OutgoingDatagramMessage message){
DatagramPacket packet = datagramProcessor.write(message);
send(packet);
}

先由DatagramProcessor 把传入的OutgoingDatagramMessage转化成了一个DatagramPacket,然后发送出去

DatagramIOImpl.java

1
2
3
4
5
6
7
8
9
10
11
synchronized public void send(DatagramPacket datagram) {
try {
socket.send(datagram);
} catch (IOException ex) {

} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

调用了DatagramSocket.send()发送消息

DatagramIOImpl 是一个Runnable,在运行时一直接收数据报

DatagramIOImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void run() {
while (true) {
try {
byte[] buf = new byte[getConfiguration().getMaxDatagramBytes()];
DatagramPacket datagram = new DatagramPacket(buf, buf.length);
socket.receive(datagram);
router.received(datagramProcessor.read(localAddress.getAddress(), datagram));
} catch (SocketException ex) {
break;
} catch (UnsupportedDataException ex) {

} catch (Exception ex) {
break;
}
}
try {
if (!socket.isClosed()) {
socket.close();
}
} catch (Exception ex) {

}
}

DatagramSocket.receive()接收数据报,Router.received()处理接收的数据报

RouterImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void received(IncomingDatagramMessage msg){
try {
ReceivingAsync protocol = getProtocolFactory().createReceivingAsync(msg);
if (protocol == null) {
return;
}
getConfiguration().getAsyncProtocolExecutor().execute(protocol);
} catch (ProtocolCreationException ex) {

}
}

通过ProtocolFactory.createReceivingAsync()创建一个异步消息,并放入异步线程池中

ProtocolFactory.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public ReceivingAsync createReceivingAsync(IncomingDatagramMessage message) throws ProtocolCreationException {
if (message.getOperation() instanceof UpnpRequest) {
IncomingDatagramMessage<UpnpRequest> incomingRequest = message;
switch (incomingRequest.getOperation().getMethod()) {
case NOTIFY:
return isByeBye(incomingRequest) || isSupportedServiceAdvertisement(incomingRequest) ? new ReceivingNotification(getUpnpService(), incomingRequest) : null;
case MSEARCH:
return new ReceivingSearch(getUpnpService(), incomingRequest);
}
} else if (message.getOperation() instanceof UpnpResponse) {
IncomingDatagramMessage<UpnpResponse> incomingResponse = message;
return new ReceivingSearchResponse(getUpnpService(), incomingResponse);
}
throw new ProtocolCreationException("Protocol for incoming datagram message not found: " + message);
}

根据消息是UPnP请求还是UPnP响应,创建不同的接受消息ReceivingNotification/ReceivingSearch/ReceivingSearchResponse,这里是接收的ReceivingSearchResponse(搜索响应,不是搜索命令)

ReceivingSearchResponse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected void execute() {
if (getInputMessage().getHeaders().containsKey("Easylink")) {
matchEasylink(getInputMessage());
}

if (!getInputMessage().isSearchResponseMessage()) {
return;
}

UDN udn = getInputMessage().getRootDeviceUDN();
if (udn == null) {
return;
}

RemoteDeviceIdentity rdIdentity = new RemoteDeviceIdentity(getInputMessage());

if (getUpnpService().getRegistry().update(rdIdentity)) {
return;
}

RemoteDevice rd;
try {
rd = new RemoteDevice(rdIdentity);
} catch (ValidationException ex) {
return;
}

if (rdIdentity.getDescriptorURL() == null) {
return;
}

if (rdIdentity.getMaxAgeSeconds() == null) {
return;
}
getUpnpService().getConfiguration().getAsyncProtocolExecutor().execute(new RetrieveRemoteDescriptors(getUpnpService(), rd));
}

先处理EasyLink消息,然后更新设备列表,最后发送消息请求设备描述文件

RegistryImpl.java

1
2
3
4
@Override
synchronized public boolean update(RemoteDeviceIdentity rdIdentity) {
return remoteItems.update(rdIdentity);
}
发送命令

ControlPoint()通过execute()发送命令,同样的,也需要两个东西:

  1. 通过UpnpServiceConfiguration.getSyncProtocolExecutor()获得命令的同步线程池
  2. 获得命令,通常传入的ActionCallback就是,它是一个Runnable

ActionCallback.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void run() {
Service service = actionInvocation.getAction().getService();
if (service instanceof LocalService) {
LocalService localService = (LocalService) service;
localService.getExecutor(actionInvocation.getAction()).execute(actionInvocation);

if (actionInvocation.getFailure() != null) {
failure(actionInvocation, null);
} else {
success(actionInvocation);
}
} else if (service instanceof RemoteService) {
if (getControlPoint() == null) {
throw new IllegalStateException("Callback must be executed through ControlPoint");
}
RemoteService remoteService = (RemoteService) service;
URL controLURL = remoteService.getDevice().normalizeURI(remoteService.getControlURI());
SendingAction prot = getControlPoint().getProtocolFactory().createSendingAction(actionInvocation, controLURL);
prot.run();

IncomingActionResponseMessage response = prot.getOutputMessage();

if (response == null) {
failure(actionInvocation, null);
} else if (response.getOperation().isFailed()) {
failure(actionInvocation, response.getOperation());
} else {
success(actionInvocation);
}
}
}

首先获得服务,如果是本地服务,就用本地的线程池执行,等待响应;如果是远程服务,拿到控制URL,构造命令发送并等待响应

ActionInvocation.getAction().getService()如何获得服务?

是执行命令的时候由外部传入的,以播放命令为例:

1
2
3
4
5
6
7
8
9
10
11
getControlPoint().execute(new PlayActionCallback(device.getAVservice()) {
@Override
public boolean failure(ActionInvocation invocation, UpnpResponse response, String arg2) {
return false;
}

@Override
public void success(ActionInvocation invocation) {

}
}

可见是从设备获得了所需的服务,通过Device.findDevice() 找到对应描述的服务

然后通过ProtocolFactory.createSendingAction()创建发送命令,并执行

SendingAction.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
protected IncomingActionResponseMessage executeSync() {
return invokeRemote(getInputMessage());
}

protected IncomingActionResponseMessage invokeRemote(OutgoingActionRequestMessage requestMessage) {
Device device = actionInvocation.getAction().getService().getDevice();

IncomingActionResponseMessage responseMessage = null;
try {
StreamResponseMessage streamResponse = sendRemoteRequest(requestMessage);
if (streamResponse == null) {
actionInvocation.setFailure(new ActionException(ErrorCode.ACTION_FAILED, "Connection error or no response received"));
return null;
}

responseMessage = new IncomingActionResponseMessage(streamResponse);
if (responseMessage.isFailedNonRecoverable()) {
throw new ActionException(ErrorCode.ACTION_FAILED, "Non-recoverable remote execution failure: " + responseMessage.getOperation().getResponseDetails());
} else if (responseMessage.isFailedRecoverable()) {
handleResponseFailure(responseMessage);
} else {
handleResponse(responseMessage);
}
return responseMessage;
} catch (ActionException ex) {
actionInvocation.setFailure(ex);
if (responseMessage == null || !responseMessage.getOperation().isFailed()) {
return new IncomingActionResponseMessage(new UpnpResponse(UpnpResponse.Status.INTERNAL_SERVER_ERROR));
} else {
return responseMessage;
}
}
}

protected StreamResponseMessage sendRemoteRequest(OutgoingActionRequestMessage requestMessage) throws ActionException {
try {
getUpnpService().getConfiguration().getSoapActionProcessor().writeBody(requestMessage, actionInvocation);
return getUpnpService().getRouter().send(requestMessage);
} catch (UnsupportedDataException ex) {
throw new ActionException(ErrorCode.ACTION_FAILED, "Error writing request message. " + ex.getMessage());
}
}

通过sendRemoteRequest() 发送请求,处理结果

用SOAP处理器转换得到SOAP消息,然后发送

RouteImpl.java

1
2
3
4
5
6
7
@Override
public StreamResponseMessage send(StreamRequestMessage msg) {
if (getStreamClient() == null) {
return null;
}
return getStreamClient().sendRequest(msg);
}

StreamClientImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public StreamResponseMessage sendRequest(StreamRequestMessage requestMessage) {
final UpnpRequest requestOperation = requestMessage.getOperation();
try {
HttpUriRequest httpRequest = createHttpRequest(requestMessage, requestOperation);
HttpParams requestParams = getRequestParams(requestMessage);
HttpConnectionParams.setConnectionTimeout(requestParams, 20000);
HttpConnectionParams.setSoTimeout(requestParams, 20000);
httpRequest.setParams(requestParams);
HeaderUtil.add(httpRequest, requestMessage.getHeaders());
return httpClient.execute(httpRequest, createResponseHandler());
} catch (MethodNotSupportedException ex) {
return null;
} catch (ClientProtocolException ex) {
return null;
} catch (IOException ex) {
return null;
}
}

创建请求,添加参数并调用HttpClient.execute() 处理,createResponseHandler()处理响应并返回

然后回到SendingAction处理返回值

SendingAction.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void handleResponse(IncomingActionResponseMessage responseMsg) throws ActionException {
try {
getUpnpService().getConfiguration().getSoapActionProcessor().readBody(responseMsg, actionInvocation);
} catch (UnsupportedDataException ex) {
throw new ActionException(ErrorCode.ACTION_FAILED, "Error reading response message. " + ex.getMessage());
}
}

protected void handleResponseFailure(IncomingActionResponseMessage responseMsg) throws ActionException {
try {
getUpnpService().getConfiguration().getSoapActionProcessor().readBody(responseMsg, actionInvocation);
} catch (UnsupportedDataException ex) {
throw new ActionException(ErrorCode.ACTION_FAILED, "Error reading response failure message. " + ex.getMessage());
}
}

用SOAP处理器读取响应体,最终通过ActionInvocation.setOutput()将解析得到的响应字段设置到ActionInvocation

总结:

  1. 获得信息,构造命令
  2. 通过SOAP处理器转换为SOAP消息
  3. 通过Http发送消息
  4. 接受Http响应消息
  5. 通过SOAP处理器解析SOAP消息
  6. 将解析的字段回传
订阅事件

ControlPoint()通过execute()订阅事件

  1. 通过UpnpServiceConfiguration.getSyncProtocolExecutor()获得同步线程池
  2. 传入SubscriptionCallback

SubscriptionCallback.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
public void run() {
if (getControlPoint() == null) {
throw new IllegalStateException("Callback must be executed through ControlPoint");
}

if (getService() instanceof LocalService) {
establishLocalSubscription((LocalService) service);
} else if (getService() instanceof RemoteService) {
establishRemoteSubscription((RemoteService) service);
}
}

private void establishRemoteSubscription(RemoteService service) {
RemoteGENASubscription remoteSubscription = new RemoteGENASubscription(service, requestedDurationSeconds) {
@Override
public void failed(UpnpResponse responseStatus) {
synchronized (SubscriptionCallback.this) {
SubscriptionCallback.this.setSubscription(null);
SubscriptionCallback.this.failed(this, responseStatus, null);
}
}

@Override
public void established() {
synchronized (SubscriptionCallback.this) {
SubscriptionCallback.this.setSubscription(this);
SubscriptionCallback.this.established(this);
}
}

@Override
public void ended(CancelReason reason, UpnpResponse responseStatus) {
synchronized (SubscriptionCallback.this) {
SubscriptionCallback.this.setSubscription(null);
SubscriptionCallback.this.ended(this, reason, responseStatus);
}
}

@Override
public void eventReceived() {
synchronized (SubscriptionCallback.this) {
SubscriptionCallback.this.eventReceived(this);
}
}

@Override
public void eventsMissed(int numberOfMissedEvents) {
synchronized (SubscriptionCallback.this) {
SubscriptionCallback.this.eventsMissed(this, numberOfMissedEvents);
}
}
};

getControlPoint().getProtocolFactory().createSendingSubscribe(remoteSubscription).run();
}

构造一个GENA订阅回调,通过ProtocolFactory.createSendingSubscribe() 创建订阅消息并发送

SendingSubscribe.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
 @Override
protected IncomingSubscribeResponseMessage executeSync() {
if (!getInputMessage().hasCallbackURLs()) {
getUpnpService().getConfiguration().getRegistryListenerExecutor().execute(new Runnable() {
@Override
public void run() {
subscription.fail(null);
}
});
return null;
}

try {
getUpnpService().getRegistry().lockRemoteSubscriptions();
StreamResponseMessage response = getUpnpService().getRouter().send(getInputMessage());
if (response == null) {
getUpnpService().getConfiguration().getRegistryListenerExecutor().execute(new Runnable() {
@Override
public void run() {
subscription.fail(null);
}
});
return null;
}

final IncomingSubscribeResponseMessage responseMessage = new IncomingSubscribeResponseMessage(response);

if (response.getOperation().isFailed()) {
getUpnpService().getConfiguration().getRegistryListenerExecutor().execute(new Runnable() {
@Override
public void run() {
subscription.fail(responseMessage.getOperation());
}
});
} else if (!responseMessage.isVaildHeaders()) {
getUpnpService().getConfiguration().getRegistryListenerExecutor().execute(new Runnable() {
@Override
public void run() {
subscription.fail(responseMessage.getOperation());
}
});
} else {
subscription.setSubscriptionId(responseMessage.getSubscriptionId());
subscription.setActualSubscriptionDurationSeconds(responseMessage.getSubscriptionDurationSeconds());
getUpnpService().getRegistry().addRemoteSubscription(subscription);
getUpnpService().getConfiguration().getRegistryListenerExecutor().execute(new Runnable() {
@Override
public void run() {
subscription.establish();
}
});
}
return responseMessage;
} finally {
getUpnpService().getRegistry().unlockRemoteSubscriptions();
}
}

调用Router.send()发送订阅消息,并接收响应,通过Registry.addRemoteSubscription()添加订阅

接收消息

UDP数据报的接收上面说过了,这里看Http数据流

SteamServer接口描述了接收Http数据流的能力,SteamServerImpl是其实现类

创建监听Socket,通过accept()接收连接,并创建新的通信Socket

StreamServerImpl.java

1
2
UpnpStream connectionStream = new HttpServerConnectionUpnpStream(router.getProtocolFactory(), httpServerConnection, globalParams);
router.received(connectionStream);

从连接中得到一个UpnpStream,交给Router,放到同步线程池中执行

Router.java

1
2
3
4
@Override
public void received(UpnpStream stream){
getConfiguration().getSyncProtocolExecutor().execute(stream);
}

HttpServerConnectionUpnpStream.java

1
2
3
4
5
@Override
public void run() {
...
httpService.handleRequest(connection, context);
}

UpnpHttpService.java

1
2
3
4
5
6
protected void doService(HttpRequest httpRequest, HttpResponse httpResponse, HttpContext ctx) throws HttpException, IOException {
...
responseMsg = process(requestMessage);
...
responseSent(responseMsg);
}

UpnpStream.java

1
2
3
4
5
6
7
public StreamResponseMessage process(StreamRequestMessage requestMsg) {
...
syncProtocol = getProtocolFactory().createReceivingSync(requestMsg);
syncProtocol.run();
StreamResponseMessage responseMsg = syncProtocol.getOutputMessage();
...
}

ProtocolFactoryImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public ReceivingSync createReceivingSync(StreamRequestMessage message) throws ProtocolCreationException {
if (message.getOperation().getMethod().equals(UpnpRequest.Method.GET)) {
return new ReceivingRetrieval(getUpnpService(), message);
} else if (getUpnpService().getConfiguration().getNamespace().isControlPath(message.getUri())) {
if (message.getOperation().getMethod().equals(UpnpRequest.Method.POST))
return new ReceivingAction(getUpnpService(), message);
} else if (getUpnpService().getConfiguration().getNamespace().isEventSubscriptionPath(message.getUri())) {
if (message.getOperation().getMethod().equals(UpnpRequest.Method.SUBSCRIBE)) {
return new ReceivingSubscribe(getUpnpService(), message);
} else if (message.getOperation().getMethod().equals(UpnpRequest.Method.UNSUBSCRIBE)) {
return new ReceivingUnsubscribe(getUpnpService(), message);
} else if (getUpnpService().getConfiguration().getNamespace().isEventCallbackPath(message.getUri())) {
if (message.getOperation().getMethod().equals(UpnpRequest.Method.NOTIFY))
return new ReceivingEvent(getUpnpService(), message);
}
throw new ProtocolCreationException("Protocol for message type not found: " + message);
}

根据UPNP消息头构造不同的接收消息

以收到事件为例

ReceivingEvent.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected OutgoingEventResponseMessage executeSync() {
...
getUpnpService().getConfiguration().getGenaEventProcessor().readBody(requestMessage);
...
final RemoteGENASubscription subscription = getUpnpService().getRegistry().getRemoteSubscription(requestMessage.getSubscrptionId());
getUpnpService().getConfiguration().getRegistryListenerExecutor().execute(new Runnable() {
@Override
public void run() {
subscription.receive(requestMessage.getSequence(), requestMessage.getStateVariableValues());
}
}
}
}

最终通知到注册的监听器上

重要的类

UpnpOperation

UPnP操作,有UpnpRequestUpnpResponse两个子类

UpnpMessage

UPnP消息,类结构如下:

  • StreamRequestMessage 请求数据流
  • StreamResponseMessage 响应数据流
  • IncomingDatagramMessage 接收数据报
  • OutgoingDatagramMessage 发送数据报

有很多具体的消息类如
OutgoingSubscribeRequestMessage/OutgoingSearchRequest/OutgoingActionRequestMessage

Device

设备类,有LocalDeviceRemoteDevice 两个子类,表示本地和网络设备

SendingAsync

异步命令,类结构如下:

  • SendingSync 同步命令
    • SendingSubscribe
    • SendingRenewal
    • SendingEvent
    • SendingAction
    • SendingUnsubscribe
  • SendingNotification 通知
    • SendingNotificationByebye
    • SendingNotificationAlive
  • SendingSearch 搜索
ReceivingAsync

接收异步消息,类结构如下

  • ReceivingSearch 接收搜索请求
  • ReceivingNotification 接收通知
  • ReceivingSearchResponse 接收搜索响应
  • ReceivingSync 接收同步消息
    • ReceivingEvent
    • ReceivingAction
    • ReceivingSubscribe
    • ReceivingRetrieval
    • ReceivingUnsubscribe
协议相关

org.teleal.cling.transport包里包含大量涉及到的协议的接口和实现类,如SOAP/GENA/TCP/UDP等

应用相关

org.teleal.cling.support包含应用层方面的代码,可以看到一些基本的服务AVTransport/MediaManager/PlayQueue/RenderingControl

callback子目录里面都是相关命令,是ActionCallback的子类

还有个lastchange子目录,里面是XML的解析器,是LastChangeParser的子类,收到订阅事件时,GENASubscriptioncurrentValues保存了一张表,里面的LastChange则描述了事件变化,需要用LastChangeParser解析,得到Event

分享