-
Notifications
You must be signed in to change notification settings - Fork 0
/
sensors.py
executable file
·122 lines (96 loc) · 2.85 KB
/
sensors.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python
import math
import time
import signal
import sqlite3
import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BOARD)
# anemometer
GPIO.setup(16, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# temperature / vane
GPIO.setup(24, GPIO.OUT)
GPIO.setup(23, GPIO.OUT)
GPIO.setup(19, GPIO.OUT)
GPIO.setup(21, GPIO.IN)
class Speed:
count = 0
def read(self):
mph = self.count * 2.5
self.count = 0
return mph
def increment(self):
self.count += 1
anemometer = Speed()
class Accumulator:
def __init__(self, many):
self.many = many
self.reset()
def reset(self):
self.speeds = []
self.temps = []
self.vanes = []
def append(self, speed, temp, vane):
self.speeds.append(speed)
self.temps.append(temp)
self.vanes.append(vane)
if len(self.speeds) == 10:
avSpeed = sum(self.speeds) / len(self.speeds)
avTemp = sum(self.temps) / len(self.temps)
avVane = sum(self.vanes) / len(self.vanes)
self.reset()
self.store(avSpeed, avTemp, avVane)
def store(self, mph, temp, vane):
# print 'Measured %3.1f mph, dir %d, temperature %3.1fC' % (mph, vane, temp)
write_temp(time.time(), temp, mph, vane, None)
accumulator = Accumulator(10)
def alarm_handler(signum, frame):
mph = anemometer.read()
temp = read_temp()
vane = read_vane()
accumulator.append(mph, temp, vane)
signal.signal(signal.SIGALRM, alarm_handler)
signal.setitimer(signal.ITIMER_REAL, 1, 1)
def my_callback(channel):
anemometer.increment()
GPIO.add_event_detect(16, GPIO.FALLING, callback=my_callback, bouncetime=10)
def read_volt(channel):
GPIO.output(24, True)
GPIO.output(23, False)
GPIO.output(19, True)
word1 = [1, 1, channel, 1, 1]
GPIO.output(24, False)
anip = 0
for x in range (0,5):
GPIO.output(19, word1[x])
time.sleep(0.001)
GPIO.output(23, True)
time.sleep(0.001)
GPIO.output(23, False)
for x in range (0,12):
GPIO.output(23, True)
time.sleep(0.001)
bit = GPIO.input(21)
time.sleep(0.001)
GPIO.output(23, False)
value=bit*2**(12-x-1)
anip = anip + value
GPIO.output(24, True)
volt = anip*3.3/4096
return volt
def read_temp():
volt = read_volt(1)
temp = (55.5*volt) + 255.37 - 273.15
return temp
def read_vane():
volt = read_volt(0)
vane = (volt - 0.17) * 359 / 2.91
return math.floor(vane)
def write_temp(when, temp, speed, dir, rain):
with sqlite3.connect('weather.db') as conn:
cursor = conn.cursor()
cursor.execute('INSERT into weather (time, temp, speed, dir, rain) values (cast(?*1000 as integer),round(?,1),?,?,?)', [when, temp, speed, dir, rain])
try:
while 1:
time.sleep(1)
except KeyboardInterrupt:
GPIO.cleanup()