Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Secure connection option for GRPC and HTTP #134

Merged
merged 3 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ SkyWalking Python SDK requires SkyWalking 8.0+ and Python 3.5+.
```python
from skywalking import agent, config

config.init(collector='127.0.0.1:11800', service='your awesome service')
config.init(collector_address='127.0.0.1:11800', service_name='your awesome service')
agent.start()
```

Expand Down
5 changes: 3 additions & 2 deletions docs/EnvVars.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Environment Variable | Description | Default
| `SW_AGENT_INSTANCE` | The name of the Python service instance | Randomly generated |
| `SW_AGENT_NAMESPACE` | The agent namespace of the Python service | unset |
| `SW_AGENT_COLLECTOR_BACKEND_SERVICES` | The backend OAP server address | `127.0.0.1:11800` |
| `SW_AGENT_FORCE_TLS` | Use TLS for communication with server (no cert required) | `False` |
| `SW_AGENT_PROTOCOL` | The protocol to communicate with the backend OAP, `http`, `grpc` or `kafka`, **we highly suggest using `grpc` in production as it's well optimized than `http`**. The `kafka` protocol provides an alternative way to submit data to the backend. | `grpc` |
| `SW_AGENT_AUTHENTICATION` | The authentication token to verify that the agent is trusted by the backend OAP, as for how to configure the backend, refer to [the yaml](https://github.com/apache/skywalking/blob/4f0f39ffccdc9b41049903cc540b8904f7c9728e/oap-server/server-bootstrap/src/main/resources/application.yml#L155-L158). | unset |
| `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `CRITICAL`, `FATAL`, `ERROR`, `WARN`(`WARNING`), `INFO`, `DEBUG` | `INFO` |
Expand All @@ -27,5 +28,5 @@ Environment Variable | Description | Default
| `SW_KAFKA_REPORTER_TOPIC_SEGMENT` | Specifying Kafka topic name for Tracing data. | `skywalking-segments` |
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |
| `SW_CELERY_PARAMETERS_LENGTH`| The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off | `512` |
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
| `SW_AGENT_PROFILE_ACTIVE` | If `True`, Python agent will enable profile when user create a new profile task. Otherwise disable profile. | `False` |
| `SW_PROFILE_TASK_QUERY_INTERVAL` | The number of seconds between two profile task query. | `20` |
11 changes: 9 additions & 2 deletions skywalking/agent/protocol/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,15 @@
class GrpcProtocol(Protocol):
def __init__(self):
self.state = None
self.channel = grpc.insecure_channel(config.collector_address, options=(('grpc.max_connection_age_grace_ms',
1000 * config.GRPC_TIMEOUT),))

if config.force_tls:
self.channel = grpc.secure_channel(config.collector_address, grpc.ssl_channel_credentials(),
options=(('grpc.max_connection_age_grace_ms',
1000 * config.GRPC_TIMEOUT),))
else:
self.channel = grpc.insecure_channel(config.collector_address, options=(('grpc.max_connection_age_grace_ms',
1000 * config.GRPC_TIMEOUT),))

if config.authentication:
self.channel = grpc.intercept_channel(
self.channel, header_adder_interceptor('authentication', config.authentication)
Expand Down
14 changes: 8 additions & 6 deletions skywalking/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@

class HttpServiceManagementClient(ServiceManagementClient):
def __init__(self):
proto = 'https://' if config.force_tls else 'http://'
self.url_instance_props = proto + config.collector_address.rstrip('/') + '/v3/management/reportProperties'
self.url_heart_beat = proto + config.collector_address.rstrip('/') + '/v3/management/keepAlive'
self.session = requests.Session()

def fork_after_in_child(self):
self.session.close()
self.session = requests.Session()

def send_instance_props(self):
url = 'http://' + config.collector_address.rstrip('/') + '/v3/management/reportProperties'
res = self.session.post(url, json={
res = self.session.post(self.url_instance_props, json={
'service': config.service_name,
'serviceInstance': config.service_instance,
'properties': [{
Expand All @@ -48,8 +50,7 @@ def send_heart_beat(self):
config.service_name,
config.service_instance,
)
url = 'http://' + config.collector_address.rstrip('/') + '/v3/management/keepAlive'
res = self.session.post(url, json={
res = self.session.post(self.url_heart_beat, json={
'service': config.service_name,
'serviceInstance': config.service_instance,
})
Expand All @@ -58,16 +59,17 @@ def send_heart_beat(self):

class HttpTraceSegmentReportService(TraceSegmentReportService):
def __init__(self):
proto = 'https://' if config.force_tls else 'http://'
self.url_report = proto + config.collector_address.rstrip('/') + '/v3/segment'
self.session = requests.Session()

def fork_after_in_child(self):
self.session.close()
self.session = requests.Session()

def report(self, generator):
url = 'http://' + config.collector_address.rstrip('/') + '/v3/segment'
for segment in generator:
res = self.session.post(url, json={
res = self.session.post(self.url_report, json={
'traceId': str(segment.related_traces[0]),
'traceSegmentId': str(segment.segment_id),
'service': config.service_name,
Expand Down
48 changes: 15 additions & 33 deletions skywalking/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import os
import re
import uuid
Expand All @@ -30,10 +29,14 @@

RE_IGNORE_PATH = re.compile('^$') # type: re.Pattern

options = None # here to include 'options' in globals
options = globals().copy() # THIS MUST PRECEDE DIRECTLY BEFORE LIST OF CONFIG OPTIONS!

service_name = os.getenv('SW_AGENT_NAME') or 'Python Service Name' # type: str
service_instance = os.getenv('SW_AGENT_INSTANCE') or str(uuid.uuid1()).replace('-', '') # type: str
agent_namespace = os.getenv('SW_AGENT_NAMESPACE') # type: str
collector_address = os.getenv('SW_AGENT_COLLECTOR_BACKEND_SERVICES') or '127.0.0.1:11800' # type: str
force_tls = os.getenv('SW_AGENT_FORCE_TLS', '').lower() == 'true' # type: bool
protocol = (os.getenv('SW_AGENT_PROTOCOL') or 'grpc').lower() # type: str
authentication = os.getenv('SW_AGENT_AUTHENTICATION') # type: str
logging_level = os.getenv('SW_AGENT_LOGGING_LEVEL') or 'INFO' # type: str
Expand Down Expand Up @@ -64,28 +67,17 @@
os.getenv('SW_AGENT_PROFILE_ACTIVE') == 'True' else False # type: bool
profile_task_query_interval = int(os.getenv('SW_PROFILE_TASK_QUERY_INTERVAL') or '20')

options = {key for key in globals() if key not in options} # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS!

def init(
service: str = None,
instance: str = None,
collector: str = None,
protocol_type: str = None,
token: str = None,
):
global service_name
service_name = service or service_name

global service_instance
service_instance = instance or service_instance

global collector_address
collector_address = collector or collector_address
def init(**kwargs):
glob = globals()

global protocol
protocol = protocol_type or protocol
for key, val in kwargs.items():
if key not in options:
raise KeyError('invalid config option %s' % key)

global authentication
authentication = token or authentication
glob[key] = val


def finalize():
Expand All @@ -107,21 +99,11 @@ def finalize():


def serialize():
from skywalking import config
return {
key: value for key, value in config.__dict__.items() if not (
key.startswith('_') or key == 'TYPE_CHECKING' or key == 'RE_IGNORE_PATH'
or inspect.isfunction(value)
or inspect.ismodule(value)
or inspect.isbuiltin(value)
or inspect.isclass(value)
)
}
glob = globals()

return {key: glob[key] for key in options}


def deserialize(data):
from skywalking import config
for key, value in data.items():
if key in config.__dict__:
config.__dict__[key] = value
init(**data)
finalize()
4 changes: 2 additions & 2 deletions skywalking/trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,12 @@ def __init__(self):
def new_local_span(self, op: str) -> Span:
return self._noop_span

def new_entry_span(self, op: str, carrier: 'Carrier' = None) -> Span:
def new_entry_span(self, op: str, carrier: 'Carrier' = None, inherit: Component = None) -> Span:
if carrier is not None:
self._noop_span.extract(carrier)
return self._noop_span

def new_exit_span(self, op: str, peer: str) -> Span:
def new_exit_span(self, op: str, peer: str, component: Component = None, inherit: Component = None) -> Span:
return self._noop_span

def start(self, span: Span):
Expand Down