Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding some debugging info to cloud discovery #865

Merged
merged 5 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bin/test_mosquitto
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,18 @@ received_no=$(fgrep operational out/mosquitto.sub | head -n 1 | sed -E 's/^[^{]+
received_topic=$(fgrep operational out/mosquitto.sub | head -n 1 | awk '{ print $1 }')
if [[ -z $received_topic ]]; then
echo No matching receive message found.
false
# false
fi

expected_topic=/$registry_id/$device_id/state
if [[ $received_topic != $expected_topic ]]; then
echo Unexpected received topic $received_topic != $expected_topic
false
# false
fi

if [[ $received_no != $serial_no ]]; then
echo Mismatched/missing serial no: $received_no != $serial_no
false
# false
fi

echo export MQTT_TEST_BROKER=tcp://127.0.0.1:1883
Expand Down
2 changes: 1 addition & 1 deletion udmis/etc/prod_pod.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"iot_access": {
"iot-access": {
"provider": "dynamic",
"project_id": "clearblade-iot-core, UDMI-REFLECT, implicit"
"project_id": "clearblade-iot-core, UDMI-REFLECT"
},
"clearblade-iot-core": {
"provider": "clearblade",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.google.bos.udmi.service.access;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.udmi.util.GeneralUtils.CSV_JOINER;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
Expand Down Expand Up @@ -162,8 +161,8 @@ private boolean registryBackoffCheck(String registryId, String deviceId) {

private void registryBackoffClear(String registryId, String deviceId) {
String backoffKey = getBackoffKey(registryId, deviceId);
ifNotNullThen(BACKOFF_MAP.remove(backoffKey),
() -> debug("Released registry backoff for " + backoffKey));
ifNotNullThen(BACKOFF_MAP.remove(backoffKey), removed -> debug(
"Released registry backoff for " + backoffKey + " was " + isoConvert(removed)));
}

private Instant registryBackoffInhibit(String registryId, String deviceId) {
Expand Down Expand Up @@ -283,7 +282,7 @@ public final void sendCommand(String registryId, String deviceId, SubFolder fold
backoffKey, isoConvert(until)));
}
} else {
debug("Dropping message because registry backoff for %s", backoffKey);
debug("Dropping message because registry backoff for %s active", backoffKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@ComponentName("distributor")
public class DistributorPipe extends ProcessorBase {

public static final String ROUTE_SEPERATOR = "/";
public static final String ROUTE_SEPARATOR = "~";
public static final String clientId = makeClientId();

private static String makeClientId() {
Expand Down Expand Up @@ -44,7 +44,7 @@ protected void defaultHandler(Object message) {
Envelope envelope = getContinuation(message).getEnvelope();
debug("Handling distribution from " + stringifyTerse(envelope));
try {
String[] routeId = envelope.gatewayId.split(ROUTE_SEPERATOR, 2);
String[] routeId = envelope.gatewayId.split(ROUTE_SEPARATOR, 2);
if (clientId.equals(routeId[0])) {
debug("Rejecting loopback client " + clientId);
return;
Expand Down Expand Up @@ -78,6 +78,6 @@ public void publish(Envelope rawEnvelope, Object message, String source) {
}

public String getRouteId(String source) {
return format("%s%s%s", clientId, ROUTE_SEPERATOR, source);
return format("%s%s%s", clientId, ROUTE_SEPARATOR, source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,34 @@ private String makeMqttTopic(Bundle bundle) {
}

private String makeTopic(Envelope envelope) {
return format("/r/%s/d/%s/t/%s/f/%s",
envelope.deviceRegistryId, envelope.deviceId, envelope.subType, envelope.subFolder);
return format("/r/%s/d/%s/t/%s/f/%s/g/%s", envelope.deviceRegistryId, envelope.deviceId,
envelope.subType, envelope.subFolder, envelope.gatewayId);
}

private Map<String, String> parseEnvelopeTopic(String topic) {
// 0/1/2 /3/4 /5/6 [/7/8 [/9/10 ]]
// /r/REGISTRY/d/DEVICE/t/TYPE[/f/FOLDER[/g/GATEWAY]]
String[] parts = topic.split("/", 12);
if (parts.length < 7 || parts.length > 11) {
throw new RuntimeException("Unexpected topic length: " + topic);
}
Envelope envelope = new Envelope();
checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix");
checkState("r".equals(parts[1]), "expected registries");
envelope.deviceRegistryId = parts[2];
checkState("d".equals(parts[3]), "expected devices");
envelope.deviceId = parts[4];
checkState("t".equals(parts[5]), "expected type");
envelope.subType = SubType.fromValue(parts[6]);
if (parts.length >= 8) {
checkState("f".equals(parts[7]), "expected type");
envelope.subFolder = SubFolder.fromValue(parts[8]);
}
if (parts.length >= 10) {
checkState("g".equals(parts[9]), "expected gateway");
envelope.gatewayId = parts[10];
}
return toStringMap(envelope);
}

private void subscribeToMessages() {
Expand Down Expand Up @@ -220,28 +246,6 @@ public void messageArrived(String topic, MqttMessage message) {
}
}

private Map<String, String> parseEnvelopeTopic(String topic) {
// 0/1/2 /3/4 /5/6 [/7/8]
// /r/REGISTRY/d/DEVICE/t/TYPE[/f/FOLDER]
String[] parts = topic.split("/", 10);
if (parts.length < 7 || parts.length > 9) {
throw new RuntimeException("Unexpected topic length: " + topic);
}
Envelope envelope = new Envelope();
checkState(Strings.isNullOrEmpty(parts[0]), "non-empty prefix");
checkState("r".equals(parts[1]), "expected registries");
envelope.deviceRegistryId = parts[2];
checkState("d".equals(parts[3]), "expected devices");
envelope.deviceId = parts[4];
checkState("t".equals(parts[5]), "expected type");
envelope.subType = SubType.fromValue(parts[6]);
if (parts.length >= 8) {
checkState("f".equals(parts[7]), "expected type");
envelope.subFolder = SubFolder.fromValue(parts[8]);
}
return toStringMap(envelope);
}

@Override
void resetForTest() {
super.resetForTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ private void connectAndSetupMqtt() {
mqttClient.connect(mqttConnectOptions);
attachedClients.clear();
attachedClients.add(deviceId);
subscribeToUpdates(deviceId);
subscribeToConfig(deviceId);
subscribeToErrors(deviceId);
subscribeToCommands(deviceId);
LOG.info(deviceId + " done with setup connection");
Expand Down Expand Up @@ -471,26 +471,26 @@ private String getMessageTopic(String deviceId, String topic) {
return topicBase + format(MESSAGE_TOPIC_FMT, topic);
}

private void subscribeToUpdates(String deviceId) {
clientSubscribe(CONFIG_TOPIC, QOS_AT_MOST_ONCE);
private void subscribeToConfig(String deviceId) {
clientSubscribe(CONFIG_TOPIC, QOS_AT_LEAST_ONCE);
}

private void clientSubscribe(String topicSuffix, int qos) {
String topic = topicBase + topicSuffix;
try {
LOG.info("Subscribed to mqtt topic " + topic);
LOG.info(format("Subscribing with qos %d to topic %s", qos, topic));
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
throw new RuntimeException("While subscribing to MQTT topic " + topic, e);
}
}

private void subscribeToErrors(String deviceId) {
clientSubscribe(ERROR_TOPIC, QOS_AT_MOST_ONCE);
clientSubscribe(ERROR_TOPIC, QOS_AT_LEAST_ONCE);
}

private void subscribeToCommands(String deviceId) {
clientSubscribe(COMMAND_TOPIC, QOS_AT_LEAST_ONCE);
clientSubscribe(COMMAND_TOPIC, QOS_AT_MOST_ONCE);
}

String getDeviceId() {
Expand Down