Skip to content

Commit

Permalink
bugfix: fix issue in which plugin registration fails to register cycl…
Browse files Browse the repository at this point in the history
…ed plugin
  • Loading branch information
edaniszewski committed Dec 15, 2020
1 parent 3246599 commit 225ae87
Show file tree
Hide file tree
Showing 6 changed files with 315 additions and 87 deletions.
40 changes: 10 additions & 30 deletions examples/deploying/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,58 +17,38 @@ For this example you will need:
Deploying to your cluster should be straightforward using standard `kubectl` commands:

```
kubectl apply -f deployment.yaml
kubectl apply -f .
```

You can check to see if the Pod came up and is running. Passing in the `-o wide` flag
will also give the address of the Pod in the cluster.

```console
$ kubectl get pods -o wide
NAME READY STATUS RESTARTS AGE IP NODE
synse-f6956f758-l7hz2 2/2 Running 0 28s 10.1.0.189 docker-for-desktop
NAME READY STATUS RESTARTS AGE IP NODE
synse-server-f6956f758-l7hz2 1/1 Running 0 28s 10.1.0.189 docker-for-desktop
emulator-86df9dcb9c-wpcfd 1/1 Running 0 28s 10.1.0.190 docker-for-desktop
```

You can't query the server endpoint directly without setting up a service, ingress, or some other means of
access. Instead, you can [run a debug container](https://kubernetes.io/docs/tasks/debug-application-cluster/debug-service/#running-commands-in-a-pod)
on the cluster to get on the same network.
You can query the Synse endpoints by setting up port-forwarding to the pod.

```console
$ kubectl run -it --rm --restart=Never alpine --image=alpine sh
If you don't see a command prompt, try pressing enter.
/ #
```

We'll need something like `curl`, which can be installed with
kubectl port-forward synse-server-f6956f758-l7hz2 5000:5000
Forwarding from 127.0.0.1:5000 -> 5000
Forwarding from [::1]:5000 -> 5000

```console
/ # apk add curl
fetch http://dl-cdn.alpinelinux.org/alpine/v3.9/main/x86_64/APKINDEX.tar.gz
fetch http://dl-cdn.alpinelinux.org/alpine/v3.9/community/x86_64/APKINDEX.tar.gz
(1/5) Installing ca-certificates (20190108-r0)
(2/5) Installing nghttp2-libs (1.35.1-r0)
(3/5) Installing libssh2 (1.8.2-r0)
(4/5) Installing libcurl (7.64.0-r1)
(5/5) Installing curl (7.64.0-r1)
Executing busybox-1.29.3-r10.trigger
Executing ca-certificates-20190108-r0.trigger
OK: 7 MiB in 19 packages
```

Now, using the Synse Pod IP from before, the API should be accessible:
Now, the API should be accessible from localhost:

```console
/ # curl 10.1.0.189:5000/test
/ # curl localhost:5000/test
{
"status":"ok",
"timestamp":"2019-05-17T13:13:48.412790Z"
}
```

> **Note**: In this example deployment, Synse Server and the emulator plugin are being
> run in the same Pod for simplicity's sake. This works fine as an example use case, but
> it is generally not recommended to run plugins in the same Pod as Synse Server - they
> should really be their own deployment.

[kubernetes]: https://kubernetes.io/
[kubectl]: https://kubernetes.io/docs/tasks/tools/install-kubectl/
44 changes: 0 additions & 44 deletions examples/deploying/kubernetes/deployment.yaml

This file was deleted.

96 changes: 96 additions & 0 deletions examples/deploying/kubernetes/emulator.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#
# emulator.yaml
#
# A simple example Kubernetes deployment for Synse's emulator plugin,
# intended to be run in conjunction with a Synse Server deployment.
#

---
apiVersion: v1
kind: ConfigMap
metadata:
name: emulator-config
labels:
synse-component: plugin
app: emulator
data:
config.yml: "debug: true\nid:\n useCustom:\n - emu-1\nnetwork:\n address: :5001\n type: tcp\nsettings:\n cache:\n enabled: true\n ttl: 5m\n mode: parallel\n read:\n interval: 1s\n write:\n interval: 2s\nversion: 3\n"

---

apiVersion: v1
kind: ConfigMap
metadata:
name: emulator-devices
labels:
synse-component: plugin
app: emulator
data:
config.yml: "devices:\n- context:\n model: emul8-temp\n instances:\n - data:\n id: 1\n info: Synse Temperature Sensor 1\n - data:\n id: 2\n info: Synse Temperature Sensor 2\n - data:\n id: 3\n info: Synse Temperature Sensor 3\n - data:\n id: 4\n info: Synse Temperature Sensor 4\n type: temperature\nversion: 3\n"

---

apiVersion: v1
kind: Service
metadata:
name: emulator
labels:
synse-component: plugin
app: emulator
spec:
type: ClusterIP
clusterIP: None
ports:
- port: 5001
targetPort: http
name: http
selector:
synse-component: plugin
app: emulator

---

apiVersion: apps/v1
kind: Deployment
metadata:
name: emulator
labels:
app: emulator
synse-component: plugin
spec:
replicas: 1
selector:
matchLabels:
synse-component: plugin
app: emulator
template:
metadata:
name: emulator
labels:
synse-component: plugin
app: emulator
spec:
terminationGracePeriodSeconds: 3
volumes:
- name: config
configMap:
name: emulator-config
- name: devices
configMap:
name: emulator-devices
containers:
- name: emulator-plugin
image: vaporio/emulator-plugin
imagePullPolicy: Always
ports:
- name: http
containerPort: 5001
env:
- name: PLUGIN_METRICS_ENABLED
value: "true"
volumeMounts:
- name: config
mountPath: /etc/synse/plugin/config/config.yml
subPath: config.yml
- name: devices
mountPath: /etc/synse/plugin/config/device
69 changes: 69 additions & 0 deletions examples/deploying/kubernetes/synse-server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# deployment.yaml
#
# A simple example Kubernetes deployment for Synse Server and the
# emulator plugin.
#

apiVersion: v1
kind: ConfigMap
metadata:
name: synse-server-config
labels:
app: synse-server
data:
config.yml: "logging: debug\nplugin:\n discover:\n kubernetes:\n endpoints:\n labels:\n synse-component: plugin\n namespace: default\npretty_json: true\n"

---

apiVersion: v1
kind: Service
metadata:
name: synse-server
labels:
app: synse-server
spec:
type: ClusterIP
ports:
- port: 5000
name: http
selector:
app: synse-server

---

apiVersion: apps/v1
kind: Deployment
metadata:
name: synse-server
labels:
app: synse-server
spec:
replicas: 1
selector:
matchLabels:
app: synse-server
template:
metadata:
name: synse-server
labels:
app: synse-server
spec:
terminationGracePeriodSeconds: 3
volumes:
- name: config
configMap:
name: synse-server-config
containers:
- name: synse-server
image: vaporio/synse-server
imagePullPolicy: Always
ports:
- name: http
containerPort: 5000
env:
- name: SYNSE_METRICS_ENABLED
value: "true"
volumeMounts:
- name: config
mountPath: /etc/synse/server
51 changes: 47 additions & 4 deletions synse_server/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,56 @@ def register(self, address: str, protocol: str) -> str:
)

if plugin.id in self.plugins:
# The plugin has already been registered. There is nothing left to
# do here, so just log and move on.
logger.debug('plugin with id already registered - skipping', id=plugin.id)
# A plugin with the given ID has already been registered. This could happen
# in a number of cases:
#
# - During routine refresh of plugins, discovery or some other mechanism will
# find plugins and attempt to re-register them, relying on this block to
# determine whether or not the plugin needs to be re-registered.
# - Plugins were mis-configured and are having ID collisions. There is nothing
# that can be done here other than logging the potential collision.
# - Plugins were rescheduled and could potentially be given a new IP address.
# In this case, the 'cached' plugin could exist, but have the wrong address.
#
# In addition to checking whether or not the plugin exists in the manager cache,
# we also need to check some general state of the plugin, including whether it is
# enabled/disabled, what its address is, etc.
cached = self.plugins[plugin.id]

if cached.disabled:
if cached.address != plugin.address:
logger.info(
'address changed for existing plugin',
id=plugin.id, old_addr=cached.address, new_addr=plugin.address,
)

# Regardless of whether the plugin address changed, it was previously disabled
# due to some error. Since we were able to connect to it, we will cancel any pending
# reconnect tasks and use the newly connect client instance.
cached.cancel_tasks()

# Update the exported metrics disabled plugins gauge: remove the old disabled plugin
Monitor.plugin_disabled.labels(plugin.id).dec()

self.plugins[plugin.id] = plugin
logger.debug('re-registered existing plugin', new=plugin, previous=cached)

else:
if cached.address != plugin.address:
# If we have matching plugin IDs, but differing addresses, and both plugins
# are considered "active", there is a chance that we are communicating with
# different plugins which have the same ID. This is indicative of a plugin ID
# collision due to misconfiguration.
logger.warning(
'potential plugin ID collision: plugins with same ID, different addresses '
'detected. this may also indicate plugin cycling.',
id=plugin.id, old_addr=cached.address, new_addr=plugin.address,
)
else:
self.plugins[plugin.id] = plugin
logger.info('successfully registered new plugin', id=plugin.id, tag=plugin.tag)

# Since we were able to communicate with the plugin, ensure it is put in the active state.
self.plugins[plugin.id].mark_active()
return plugin.id

Expand Down Expand Up @@ -387,7 +430,7 @@ def __init__(
self._reconnect_task: Optional[asyncio.Task] = None

def __str__(self) -> str:
return f'<Plugin ({self.tag}): {self.id}>'
return f'<Plugin ({self.tag} @ {self.address}): {self.id}>'

def __repr__(self) -> str:
return str(self)
Expand Down
Loading

0 comments on commit 225ae87

Please sign in to comment.