forked from yurist/mqlambdatm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqlambdatm.go
155 lines (110 loc) · 4.01 KB
/
mqlambdatm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package main
/*
A simple MQ trigger monitor for AWS Lambda.
Follows pretty closely a usual trigger monitor logic, like one found in
/opt/mqm/samp/amqstrg0.c
Can be invoked quite similarly to runmqtrm, either from command line or
using MQSC DEFINE SERVICE:
mqlambdatm -q <initiation-queue> [-m <queue-manager>] [--log-level <log-level>]
-q parameter is mandatory (nobody I know of uses SYSTEM.DEFAULT.INITIATION.QUEUE anyway)
--log-level defaults to INFO (case is immaterial,) set it to DEBUG to get trace information
about every trigger message
The name of Lambda function must be encoded in ApplicId of the triggered process, case-sensitive.
The function is invoked asynchronously (Event invocation) and passed a JSON-encoded trigger
message. The JSON encoding uses the MQTM field names from
cmqc.h *with the first letter in lower case* to permit Java bean deserialization
on the Lambda side.
AWS credential chain and AWS region must be externally configured. See AWS documentation for possible
ways of configuring the credential chain. As for the region, the simplest way to make it work
is to set AWS_REGION environment variable. (Warning - AWS_DEFAULT_REGION may require additional
tweaking, see AWS Go SDK documentation for details if you want to use it.)
Invalid trigger messages are skipped with an error message (no dead letter queue etc.)
Any MQI error on MQGET will terminate the program. No retries.
*/
import (
"flag"
log "github.com/Sirupsen/logrus"
"github.com/ibm-messaging/mq-golang/ibmmq"
"strings"
)
// Initiation queue to serve
var initQ string
// Queue manager to connect
var qMgrName string
func init() {
flag.StringVar(&initQ, "q", "", "initiation queue to serve")
flag.StringVar(&qMgrName, "m", "", "queue manager to connect, default queue manager if not given")
sLogLevel := flag.String("log-level", "info", "log level (DEBUG, INFO, WARN, ERROR, FATAL, PANIC)")
flag.Parse()
logLevel, err := log.ParseLevel(*sLogLevel)
if err != nil {
log.WithField("LOG-LEVEL", sLogLevel).Error("invalid log level, INFO assumed")
logLevel = log.InfoLevel
}
log.SetLevel(logLevel)
}
func main() {
log.Infoln("MQ trigger monitor for AWS Lambda")
if initQ == "" {
log.Fatalln("-q parameter missing")
}
log.WithFields(log.Fields{
"INITQ": initQ,
"QMGR": qMgrName,
}).Info("parameters")
qMgr, mqreturn, err := ibmmq.Conn(qMgrName)
if err != nil {
log.WithFields(log.Fields{
"MQRC": mqreturn.MQRC,
}).Fatal("error connecting to queue manager")
}
defer qMgr.Disc()
mqod := ibmmq.NewMQOD()
mqod.ObjectType = ibmmq.MQOT_Q
mqod.ObjectName = initQ
var openOpts int32 = ibmmq.MQOO_INPUT_AS_Q_DEF + ibmmq.MQOO_FAIL_IF_QUIESCING
qObj, mqreturn, err := qMgr.Open(mqod, openOpts)
if err != nil {
log.WithFields(log.Fields{
"INITQ": initQ,
"MQRC": mqreturn.MQRC,
}).Fatal("error opening initiation queue")
}
defer qObj.Close(ibmmq.MQCO_NONE)
md := ibmmq.NewMQMD()
gmo := ibmmq.NewMQGMO()
gmo.Version = ibmmq.MQGMO_VERSION_2
gmo.MatchOptions = ibmmq.MQGMO_NONE
gmo.Options = ibmmq.MQGMO_WAIT +
ibmmq.MQGMO_FAIL_IF_QUIESCING +
ibmmq.MQGMO_ACCEPT_TRUNCATED_MSG +
ibmmq.MQGMO_NO_SYNCPOINT
gmo.WaitInterval = ibmmq.MQWI_UNLIMITED
// ibmmq package currently doesn't provide correct MQTM structure length,
// so we are using a constant
const mqtm_length = 684 // should be ibmmq.MQTM_CURRENT_LENGTH
msg := make([]byte, mqtm_length)
for {
datalen, mqreturn, err := qObj.Get(md, gmo, msg)
if err != nil {
log.WithFields(log.Fields{
"INITQ": initQ,
"MQRC": mqreturn.MQRC,
}).Fatal("error getting a message")
}
if datalen != mqtm_length {
log.Error("invalid message received, skipping (wrong length)")
continue
}
tm := TMfromC(msg)
if tm.StrucId != "TM " {
log.Error("invalid message received, skipping (wrong StrucId)")
continue
}
log.WithFields(log.Fields{
"TM": tm,
}).Debug("trigger message received")
// Ignoring any error, lambdaCall is logging its errors
lambdaCall(strings.TrimSpace(tm.ApplId), tm)
}
}