Skip to content

Commit

Permalink
Fix cron distributor and add debugging to listDevices pagination (#882)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu authored May 9, 2024
1 parent 26d37f5 commit 5fbeebd
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,31 @@ private HashMap<String, CloudModel> fetchDevices(String deviceRegistryId, String
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
collect.putAll(responseMap);
pageToken = response.getNextPageToken();
debug(format("fetchDevices %s found %d total %d more %s", deviceRegistryId,
responseMap.size(), collect.size(), pageToken != null));
} while (pageToken != null);
return collect;
}

private List<String> findGateways(String registryId, Device proxyDevice) {
CloudModel cloudModel = listDevices(registryId);
List<String> gateways =
cloudModel.device_ids.entrySet().stream().filter(entry ->
entry.getValue().resource_type == GATEWAY).map(Entry::getKey).toList();
return gateways;
}

private CloudModel findUnbindAndDelete(String registryId, Device device) {
List<String> allGateways = findGateways(registryId, device);
if (allGateways.isEmpty()) {
throw new RuntimeException("Was expecting at least one bound gateway!");
}
String deviceId = device.toBuilder().getId();
info("Unbinding %s/%s from gateways: " + registryId, deviceId, CSV_JOINER.join(allGateways));
allGateways.forEach(gatewayId -> unbindDevice(registryId, gatewayId, deviceId));
return unbindAndDeleteCore(registryId, device);
}

private String getDeviceName(String registryId, String deviceId) {
return DeviceName.of(projectId, getRegistryLocation(registryId), registryId, deviceId)
.toString();
Expand Down Expand Up @@ -436,6 +457,17 @@ private CloudModel modelRegistry(String registryId, String deviceId, CloudModel
}
}

private CloudModel modifyDevice(String registryId, Device device) {
Device.Builder builder = device.toBuilder();
String deviceId = builder.getId();
CloudModel model = fetchDevice(registryId, deviceId);
model.metadata.putAll(builder.getMetadata());
builder.setMetadata(model.metadata);
CloudModel cloudModel = updateDevice(registryId, builder.build(), METADATA_FIELD_MASK);
cloudModel.operation = Operation.MODIFY;
return cloudModel;
}

private StateNotificationConfig stateNotificationConfig() {
String topicName = getScopedTopic(UDMI_STATE_TOPIC);
return StateNotificationConfig.newBuilder().setPubsubTopicName(topicName).build();
Expand Down Expand Up @@ -485,25 +517,6 @@ private void unbindDevice(String registryId, String gatewayId, String proxyId) {
}
}

private CloudModel findUnbindAndDelete(String registryId, Device device) {
List<String> allGateways = findGateways(registryId, device);
if (allGateways.isEmpty()) {
throw new RuntimeException("Was expecting at least one bound gateway!");
}
String deviceId = device.toBuilder().getId();
info("Unbinding %s/%s from gateways: " + registryId, deviceId, CSV_JOINER.join(allGateways));
allGateways.forEach(gatewayId -> unbindDevice(registryId, gatewayId, deviceId));
return unbindAndDeleteCore(registryId, device);
}

private List<String> findGateways(String registryId, Device proxyDevice) {
CloudModel cloudModel = listDevices(registryId);
List<String> gateways =
cloudModel.device_ids.entrySet().stream().filter(entry ->
entry.getValue().resource_type == GATEWAY).map(Entry::getKey).toList();
return gateways;
}

private void unbindGatewayDevices(String registryId, Device gatewayDevice) {
String gatewayId = gatewayDevice.toBuilder().getId();
CloudModel cloudModel = listRegistryDevices(registryId, gatewayId);
Expand All @@ -515,17 +528,6 @@ private void unbindGatewayDevices(String registryId, Device gatewayDevice) {
}
}

private CloudModel modifyDevice(String registryId, Device device) {
Device.Builder builder = device.toBuilder();
String deviceId = builder.getId();
CloudModel model = fetchDevice(registryId, deviceId);
model.metadata.putAll(builder.getMetadata());
builder.setMetadata(model.metadata);
CloudModel cloudModel = updateDevice(registryId, builder.build(), METADATA_FIELD_MASK);
cloudModel.operation = Operation.MODIFY;
return cloudModel;
}

private CloudModel updateDevice(String registryId, Device device) {
CloudModel cloudModel = updateDevice(registryId, device, UPDATE_FIELD_MASK);
cloudModel.operation = Operation.UPDATE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.google.bos.udmi.service.core;

import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.CSV_JOINER;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.GeneralUtils.ifTrueGet;
Expand Down Expand Up @@ -72,7 +73,9 @@ public CronProcessor(EndpointConfiguration config) {
}

private static String getContainerId(Envelope envelope) {
return envelope.gatewayId.split(PATH_SEPARATOR, 2)[0];
String[] split = envelope.gatewayId.split(ID_SEPARATOR, 2);
checkState(split.length == 2, "malformed container id");
return split[0];
}

@Override
Expand Down

0 comments on commit 5fbeebd

Please sign in to comment.