lscc负责管理链码的生命周期,其就是说链码容器的启动应该是lscc触发的。但是链码容器是如何从peer chaincode instantiate 触发实例化命令到peer服务完成链码的实例化,即链码容器的启动,看了一些网上的资料机会没有讲这块的。也因为又这块二次开发的需求,所以追踪了一下Fabric源码从lscc到用户链码容器启动的全过程,在此记录一下。
fabric/peer/chaincode/instantiate.go
func instantiate(cmd *cobra.Command, cf *ChaincodeCmdFactory) (*protcommon.Envelope, error) {
...
// instantiate is currently only supported for one peer
proposalResponse, err := cf.EndorserClients[0].ProcessProposal(context.Background(), signedProp
/* EndorserClients[0]就是这个peer本身(但是这是在客户端里呀,配置中设置好的地址,详细看cli的配置 CORE_PEER_ADDRESS=peer0.org1.example.com:7051) ProcessProposal是grpc定义的函数,可以在peer.pb.go中看到定义的EndorserClient和EndorserServern内容,这里是客户端调用ProcessProposal把相关内容发送给Peer Server Peer chaincode instantiate是客户端,但客户端的endorseClient在哪里处理的?InitCmdFactory中 */
...
}
ProcessProposal可以定位到grpc的定义,这是双方交互的接口
fabric/protos/peer/peer.go
message PeerID {
string name = 1;
}
message PeerEndpoint {
PeerID id = 1;
string address = 2;
}
service Endorser {
rpc ProcessProposal(SignedProposal) returns (ProposalResponse) {}
}
接下来,看下Endorser客户端是如何处理的,如何请求服务的。
先找到cf.EndorserClients是在哪里实例化的,定位到InitCmdFactory
fabric/peer/chaincode/common.go
// InitCmdFactory init the ChaincodeCmdFactory with default clients
func InitCmdFactory(cmdName string, isEndorserRequired, isOrdererRequired bool) (*ChaincodeCmdFactory, error) {
var err error
var endorserClients []pb.EndorserClient
var deliverClients []api.PeerDeliverClient
if isEndorserRequired {
if err = validatePeerConnectionParameters(cmdName); err != nil {
return nil, errors.WithMessage(err, "error validating peer connection parameters")
}
for i, address := range peerAddresses {
var tlsRootCertFile string
if tlsRootCertFiles != nil {
tlsRootCertFile = tlsRootCertFiles[i]
}
//找到EndorserClient实例化的代码
endorserClient, err := common.GetEndorserClientFnc(address, tlsRootCertFile)
//GetEndorserClientFnc的实例是在common/common.go/init()中构建
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error getting endorser client for %s", cmdName))
}
endorserClients = append(endorserClients, endorserClient)
deliverClient, err := common.GetPeerDeliverClientFnc(address, tlsRootCertFile)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("error getting deliver client for %s", cmdName))
}
deliverClients = append(deliverClients, deliverClient)
}
if len(endorserClients) == 0 {
return nil, errors.New("no endorser clients retrieved - this might indicate a bug")
}
}
...
return &ChaincodeCmdFactory{
EndorserClients: endorserClients,
DeliverClients: deliverClients,
Signer: signer,
BroadcastClient: broadcastClient,
Certificate: certificate,
}, nil
}
通过GetEndorserClientFnc找到func GetEndorserClient(address, tlsRootCertFile string) (pb.EndorserClient, error)
fabric/peer/common/peerclient.go
// GetEndorserClient returns a new endorser client. If the both the address and
// tlsRootCertFile are not provided, the target values for the client are taken
// from the configuration settings for "peer.address" and
// "peer.tls.rootcert.file"
func GetEndorserClient(address, tlsRootCertFile string) (pb.EndorserClient, error) {
var peerClient *PeerClient
var err error
if address != "" {
peerClient, err = NewPeerClientForAddress(address, tlsRootCertFile)
} else {
peerClient, err = NewPeerClientFromEnv()
}
if err != nil {
return nil, err
}
//又引出peerClient
return peerClient.Endorser()
}
定位到peerclient.go,发现peerClient的许多方法,对应这不同的grpc客户端,这里是fabric中grpc客户端封装处理的地方,Endorser grpc客户端生成的方法func (pc *PeerClient) Endorser() (pb.EndorserClient, error)
endorser grpc client实例,fabric/peer/common/peerclient.go
// Endorser returns a client for the Endorser service
func (pc *PeerClient) Endorser() (pb.EndorserClient, error) {
conn, err := pc.commonClient.NewConnection(pc.address, pc.sn)
if err != nil {
return nil, errors.WithMessage(err, fmt.Sprintf("endorser client failed to connect to %s", pc.address))
}
return pb.NewEndorserClient(conn), nil
}
到此,找到了endorser grpc客户端实例的创建,以及grpc的请求,调用接口ProcessProposal
我们看到实例化是启动了背书的grpc,实际上其他交易也是启动背书过程,这是fabric,区块链的特性,大多数的处理都会在背书上,所以待会我们主要看服务端的背书处理上
peer chaincode instantiate 链码实例化客户端部分已经完成
服务端首先处理的是lscc的调用处理,就是lscc Invoke的地方,但如果继续看下去,就会发现,这里只是将作为参数的用户链码,存了起来,也就是说,将链码存储的是这里实现的。
fabric/core/scc/lscc/lscc.go/Invoke
// Invoke implements lifecycle functions "deploy", "start", "stop", "upgrade".
// Deploy's arguments - {[]byte("deploy"), []byte(<chainname>), <unmarshalled pb.ChaincodeDeploymentSpec>}
//
// Invoke also implements some query-like functions
// Get chaincode arguments - {[]byte("getid"), []byte(<chainname>), []byte(<chaincodename>)}
func (lscc *LifeCycleSysCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response {
args := stub.GetArgs()
if len(args) < 1 {
return shim.Error(InvalidArgsLenErr(len(args)).Error())
}
function := string(args[0])
// Handle ACL:
// 1. get the signed proposal
sp, err := stub.GetSignedProposal()
if err != nil {
return shim.Error(fmt.Sprintf("Failed retrieving signed proposal on executing %s with error %s", function, err))
}
switch function {
case INSTALL:
...
case DEPLOY, UPGRADE:
// we expect a minimum of 3 arguments, the function
// name, the chain name and deployment spec
if len(args) < 3 {
return shim.Error(InvalidArgsLenErr(len(args)).Error())
}
// channel the chaincode should be associated with. It
// should be created with a register call
channel := string(args[1])
if !lscc.isValidChannelName(channel) {
return shim.Error(InvalidChannelNameErr(channel).Error())
}
ac, exists := lscc.SCCProvider.GetApplicationConfig(channel)
if !exists {
logger.Panicf("programming error, non-existent appplication config for channel '%s'", channel)
}
// the maximum number of arguments depends on the capability of the channel
if !ac.Capabilities().PrivateChannelData() && len(args) > 6 {
return shim.Error(PrivateChannelDataNotAvailable("").Error())
}
if ac.Capabilities().PrivateChannelData() && len(args) > 7 {
return shim.Error(InvalidArgsLenErr(len(args)).Error())
}
depSpec := args[2]
cds := &pb.ChaincodeDeploymentSpec{
}
err := proto.Unmarshal(depSpec, cds)
if err != nil {
return shim.Error(fmt.Sprintf("error unmarshaling ChaincodeDeploymentSpec: %s", err))
}
// optional arguments here (they can each be nil and may or may not be present)
// args[3] is a marshalled SignaturePolicyEnvelope representing the endorsement policy
// args[4] is the name of escc
// args[5] is the name of vscc
// args[6] is a marshalled CollectionConfigPackage struct
var EP []byte
if len(args) > 3 && len(args[3]) > 0 {
EP = args[3]
} else {
p := cauthdsl.SignedByAnyMember(peer.GetMSPIDs(channel))
EP, err = utils.Marshal(p)
if err != nil {
return shim.Error(err.Error())
}
}
var escc []byte
if len(args) > 4 && len(args[4]) > 0 {
escc = args[4]
} else {
escc = []byte("escc")
}
var vscc []byte
if len(args) > 5 && len(args[5]) > 0 {
vscc = args[5]
} else {
vscc = []byte("vscc")
}
var collectionsConfig []byte
// we proceed with a non-nil collection configuration only if
// we Support the PrivateChannelData capability
if ac.Capabilities().PrivateChannelData() && len(args) > 6 {
collectionsConfig = args[6]
}
cd, err := lscc.executeDeployOrUpgrade(stub, channel, cds, EP, escc, vscc, collectionsConfig, function)
if err != nil {
return shim.Error(err.Error())
}
cdbytes, err := proto.Marshal(cd)
if err != nil {
return shim.Error(err.Error())
}
return shim.Success(cdbytes)
...
}
return shim.Error(InvalidFunctionErr(function).Error())
}
最后完成lscc的处理也是需要背书、模拟执行的,所以我们关心的链码容器启动不是在lscc的invoke上的,我们还是看背书的服务端吧。
peer服务端都是在peer node start中完成启动的,先找endorser
peer node start中启动endorser grpc server: fabric/peer/chaincode/node/start.go/serve
pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{
ChannelStateRetriever: channelStateRetriever,
TransientStoreRetriever: peer.TransientStoreFactory,
PluginMapper: pluginMapper,
SigningIdentityFetcher: signingIdentityFetcher,
})
endorserSupport.PluginEndorser = pluginEndorser
serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider)
...
auth := authHandler.ChainFilters(serverEndorser, authFilters...)
...
通过NewEndorserServer定位到Endorser的server处理端ProcessProposal,从这里,我们将看到链码启动的完成过程,lscc启动链码的全过程
fabric/core/endorser/endorser.go
// ProcessProposal process the Proposal
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
...
// 0 -- check and validate
vr, err := e.preProcess(signedProp)
if err != nil {
resp := vr.resp
return resp, err
}
prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid
// obtaining once the tx simulator for this proposal. This will be nil
// for chainless proposals
// Also obtain a history query executor for history queries, since tx simulator does not cover history
var txsim ledger.TxSimulator
var historyQueryExecutor ledger.HistoryQueryExecutor
if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) {
if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil {
return &pb.ProposalResponse{
Response: &pb.Response{
Status: 500, Message: err.Error()}}, nil
}
// txsim acquires a shared lock on the stateDB. As this would impact the block commits (i.e., commit
// of valid write-sets to the stateDB), we must release the lock as early as possible.
// Hence, this txsim object is closed in simulateProposal() as soon as the tx is simulated and
// rwset is collected before gossip dissemination if required for privateData. For safety, we
// add the following defer statement and is useful when an error occur. Note that calling
// txsim.Done() more than once does not cause any issue. If the txsim is already
// released, the following txsim.Done() simply returns.
defer txsim.Done()
if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil {
return &pb.ProposalResponse{
Response: &pb.Response{
Status: 500, Message: err.Error()}}, nil
}
}
txParams := &ccprovider.TransactionParams{
ChannelID: chainID,
TxID: txid,
SignedProp: signedProp,
Proposal: prop,
TXSimulator: txsim,
HistoryQueryExecutor: historyQueryExecutor,
}
// this could be a request to a chainless SysCC
// TODO: if the proposal has an extension, it will be of type ChaincodeAction;
// if it's present it means that no simulation is to be performed because
// we're trying to emulate a submitting peer. On the other hand, we need
// to validate the supplied action before endorsing it
// 1 -- simulate
//进入simulateproposal,我们可以看到,交易执行的过程,并且将遇到一个重要的处理函数callChaincode
cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)
if err != nil {
return &pb.ProposalResponse{
Response: &pb.Response{
Status: 500, Message: err.Error()}}, nil
}
if res != nil {
if res.Status >= shim.ERROR {
endorserLogger.Errorf("[%s][%s] simulateProposal() resulted in chaincode %s response status %d for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, res.Status, txid)
var cceventBytes []byte
if ccevent != nil {
cceventBytes, err = putils.GetBytesChaincodeEvent(ccevent)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal event bytes")
}
}
pResp, err := putils.CreateProposalResponseFailure(prop.Header, prop.Payload, res, simulationResult, cceventBytes, hdrExt.ChaincodeId, hdrExt.PayloadVisibility)
if err != nil {
return &pb.ProposalResponse{
Response: &pb.Response{
Status: 500, Message: err.Error()}}, nil
}
return pResp, nil
}
}
// 2 -- endorse and get a marshalled ProposalResponse message ,这里在那时不关心
var pResp *pb.ProposalResponse
// TODO till we implement global ESCC, CSCC for system chaincodes
// chainless proposals (such as CSCC) don't have to be endorsed
if chainID == "" {
pResp = &pb.ProposalResponse{
Response: res}
} else {
// Note: To endorseProposal(), we pass the released txsim. Hence, an error would occur if we try to use this txsim
pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)
// if error, capture endorsement failure metric
meterLabels := []string{
"channel", chainID,
"chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version,
}
if err != nil {
meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(false))
e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1)
return &pb.ProposalResponse{
Response: &pb.Response{
Status: 500, Message: err.Error()}}, nil
}
if pResp.Response.Status >= shim.ERRORTHRESHOLD {
// the default ESCC treats all status codes about threshold as errors and fails endorsement
// useful to track this as a separate metric
meterLabels = append(meterLabels, "chaincodeerror", strconv.FormatBool(true))
e.Metrics.EndorsementsFailed.With(meterLabels...).Add(1)
endorserLogger.Debugf("[%s][%s] endorseProposal() resulted in chaincode %s error for txid: %s", chainID, shorttxid(txid), hdrExt.ChaincodeId, txid)
return pResp, nil
}
}
// Set the proposal response payload - it
// contains the "return value" from the
// chaincode invocation
pResp.Response = res
// total failed proposals = ProposalsReceived-SuccessfulProposals
e.Metrics.SuccessfulProposals.Add(1)
success = true
return pResp, nil
}
通过e.SimulateProposal(txParams, hdrExt.ChaincodeId),找到执行处理的代码callChaincode
fabric/core/endorser/endorser.go
// SimulateProposal simulates the proposal by calling the chaincode
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) {
// we do expect the payload to be a ChaincodeInvocationSpec
// if we are supporting other payloads in future, this be glaringly point
// as something that should change
cis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal)
if err != nil {
return nil, nil, nil, nil, err
}
var cdLedger ccprovider.ChaincodeDefinition
var version string
if !e.s.IsSysCC(cid.Name) {
cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name))
}
version = cdLedger.CCVersion()
err = e.s.CheckInstantiationPolicy(cid.Name, version, cdLedger)
if err != nil {
return nil, nil, nil, nil, err
}
} else {
version = util.GetSysCCVersion()
}
// ---3. execute the proposal and get simulation results
var simResult *ledger.TxSimulationResults
var pubSimResBytes []byte
var res *pb.Response
var ccevent *pb.ChaincodeEvent
//找到处理核心,调用链码的部分,lscc启动mycc也是在这里
res, ccevent, err = e.callChaincode(txParams, version, cis.ChaincodeSpec.Input, cid)
...
}
我们找到了,lscc启动用户链码的地方
fabric/core/endorser/endorser.go
// call specified chaincode (system or user)
func (e *Endorser) callChaincode(txParams *ccprovider.TransactionParams, version string, input *pb.ChaincodeInput, cid *pb.ChaincodeID) (*pb.Response, *pb.ChaincodeEvent, error) {
var err error
var res *pb.Response
var ccevent *pb.ChaincodeEvent
//暂不关心
// is this a system chaincode
res, ccevent, err = e.s.Execute(txParams, txParams.ChannelID, cid.Name, version, txParams.TxID, txParams.SignedProp, txParams.Proposal, input)
if err != nil {
return nil, nil, err
}
// per doc anything < 400 can be sent as TX.
// fabric errors will always be >= 400 (ie, unambiguous errors )
// "lscc" will respond with status 200 or 500 (ie, unambiguous OK or ERROR)
if res.Status >= shim.ERRORTHRESHOLD {
return res, nil, nil
}
// ----- BEGIN - SECTION THAT MAY NEED TO BE DONE IN LSCC ------
// if this a call to deploy a chaincode, We need a mechanism
// to pass TxSimulator into LSCC. Till that is worked out this
// special code does the actual deploy, upgrade here so as to collect
// all state under one TxSimulator
//
// NOTE that if there's an error all simulation, including the chaincode
// table changes in lscc will be thrown away
if cid.Name == "lscc" && len(input.Args) >= 3 && (string(input.Args[0]) == "deploy" || string(input.Args[0]) == "upgrade") {
//就是这里,判断lscc,并解析lscc链码的参数,因为参数中就是链码内容
//userCDS就是我们要启动的链码
userCDS, err := putils.GetChaincodeDeploymentSpec(input.Args[2], e.PlatformRegistry)
if err != nil {
return nil, nil, err
}
var cds *pb.ChaincodeDeploymentSpec
cds, err = e.SanitizeUserCDS(userCDS)
if err != nil {
return nil, nil, err
}
// this should not be a system chaincode
if e.s.IsSysCC(cds.ChaincodeSpec.ChaincodeId.Name) {
return nil, nil, errors.Errorf("attempting to deploy a system chaincode %s/%s", cds.ChaincodeSpec.ChaincodeId.Name, txParams.ChannelID)
}
//启动链码,
_, _, err = e.s.ExecuteLegacyInit(txParams, txParams.ChannelID, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, txParams.TxID, txParams.SignedProp, txParams.Proposal, cds)
if err != nil {
// increment the failure to indicate instantion/upgrade failures
meterLabels := []string{
"channel", txParams.ChannelID,
"chaincode", cds.ChaincodeSpec.ChaincodeId.Name + ":" + cds.ChaincodeSpec.ChaincodeId.Version,
}
e.Metrics.InitFailed.With(meterLabels...).Add(1)
return nil, nil, err
}
//用户链码启动结束
}
// ----- END -------
return res, ccevent, err
}
如果分析过fabric链码容器Start的过程,那接下来的部分就不用看了,我们看lscc启动链码容器,最关键的就是找到上边这个函数处理链码启动的地方。
再定位到ExecuteLegacyInit
fabric/core/chaincode/chaincode_support.go/ExecuteLegacyInit
// ExecuteLegacyInit is a temporary method which should be removed once the old style lifecycle
// is entirely deprecated. Ideally one release after the introduction of the new lifecycle.
// It does not attempt to start the chaincode based on the information from lifecycle, but instead
// accepts the container information directly in the form of a ChaincodeDeploymentSpec.
func (cs *ChaincodeSupport) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) {
ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec)
ccci.Version = cccid.Version
//启动链码的地方
err := cs.LaunchInit(ccci)
if err != nil {
return nil, nil, err
}
//注册链码,就是添加个记录
cname := ccci.Name + ":" + ccci.Version
h := cs.HandlerRegistry.Handler(cname)
if h == nil {
return nil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname)
}
//这里暂不关心,其实还应该看链码的初始化的部分
resp, err := cs.execute(pb.ChaincodeMessage_INIT, txParams, cccid, spec.GetChaincodeSpec().Input, h)
return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
}
启动链码的地方LaunchInit
fabric/core/chaincode/chaincode_support.go/LaunchInit
// LaunchInit bypasses getting the chaincode spec from the LSCC table
// as in the case of v1.0-v1.2 lifecycle, the chaincode will not yet be
// defined in the LSCC table
func (cs *ChaincodeSupport) LaunchInit(ccci *ccprovider.ChaincodeContainerInfo) error {
cname := ccci.Name + ":" + ccci.Version
if cs.HandlerRegistry.Handler(cname) != nil {
return nil
}
return cs.Launcher.Launch(ccci)//启动函数
}
定位cs.Launcher.Launch(ccci)//启动函数
fabric/core/chaincode/runtime_supoort.go/Launch
func (r *RuntimeLauncher) Launch(ccci *ccprovider.ChaincodeContainerInfo) error {
var startFailCh chan error
var timeoutCh <-chan time.Time
startTime := time.Now()
cname := ccci.Name + ":" + ccci.Version
launchState, alreadyStarted := r.Registry.Launching(cname)
if !alreadyStarted {
...
go func() {
//启动的地方
if err := r.Runtime.Start(ccci, codePackage); err != nil {
startFailCh <- errors.WithMessage(err, "error starting container")
return
}
exitCode, err := r.Runtime.Wait(ccci)
if err != nil {
launchState.Notify(errors.Wrap(err, "failed to wait on container exit"))
}
launchState.Notify(errors.Errorf("container exited with %d", exitCode))
}()
}
//阻塞,等待启动完成
var err error
select {
case <-launchState.Done():
err = errors.WithMessage(launchState.Err(), "chaincode registration failed")
case err = <-startFailCh:
launchState.Notify(err)
r.Metrics.LaunchFailures.With("chaincode", cname).Add(1)
case <-timeoutCh:
err = errors.Errorf("timeout expired while starting chaincode %s for transaction", cname)
launchState.Notify(err)
r.Metrics.LaunchTimeouts.With("chaincode", cname).Add(1)
}
success := true
...
}
定位Runtime.Start
fabric/core/chaincode/container_runtime.go
// Start launches chaincode in a runtime environment.
func (c *ContainerRuntime) Start(ccci *ccprovider.ChaincodeContainerInfo, codePackage []byte) error {
cname := ccci.Name + ":" + ccci.Version
//读取peer的配置文件,关于docker engine的
lc, err := c.LaunchConfig(cname, ccci.Type)
if err != nil {
return err
}
...
//这里是设置容器的启动配置
scr := container.StartContainerReq{
Builder: &container.PlatformBuilder{
Type: ccci.Type,
Name: ccci.Name,
Version: ccci.Version,
Path: ccci.Path,
CodePackage: codePackage,
PlatformRegistry: c.PlatformRegistry,
},
Args: lc.Args,
Env: lc.Envs,
FilesToUpload: lc.Files,
CCID: ccintf.CCID{
Name: ccci.Name,
Version: ccci.Version,
},
}
//启动的地方,ccci.ContainerType又两种sys和docker,用户链码是docker启动,接下来会构造docker client向docker engine请求启动容器
if err := c.Processor.Process(ccci.ContainerType, scr); err != nil {
return errors.WithMessage(err, "error starting container")
}
return nil
}
定位到Processor,是个接口,我们通过ChaincodeSupport的实例化的时候的配置,找到Processor的实例化
fabric/core/chaincode/container_runtime.go
// Processor processes vm and container requests.
type Processor interface {
Process(vmtype string, req container.VMCReq) error
}
定位到Controller
fabric/conre/container/controller.go
func (vmc *VMController) Process(vmtype string, req VMCReq) error {
v := vmc.newVM(vmtype)
ccid := req.GetCCID()
id := ccid.GetName()
vmc.lockContainer(id)
defer vmc.unlockContainer(id)
return req.Do(v)
}
VMCReq也是接口,这里又两个实现,一个是Start,一个是Stop,都是通过Do完成相关操作的,这里我们就看Start的
fabric/conre/container/controller.go
type VMCReq interface {
Do(v VM) error
GetCCID() ccintf.CCID
}
//StartContainerReq - properties for starting a container.
type StartContainerReq struct {
ccintf.CCID
Builder Builder
Args []string
Env []string
FilesToUpload map[string][]byte
}
func (si StartContainerReq) Do(v VM) error {
//通过v启动了start,我们看VMcontroller.Proccess中v的实例
return v.Start(si.CCID, si.Args, si.Env, si.FilesToUpload, si.Builder)
}
看实例化VM的函数
fabric/conre/container/controller.go
func (vmc *VMController) newVM(typ string) VM {
//通过map得到了一个实例,我们找到最初定义chaincodeSupport的地方看实例了哪几个vm
v, ok := vmc.vmProviders[typ]
if !ok {
vmLogger.Panicf("Programming error: unsupported VM type: %s", typ)
}
return v.NewVM()
}
回看vm实例
fabric/peer/node/start.go/serve
chaincodeSupport := NewChaincodeSupport(
config,
"0.0.0.0:7052",
true,
ca.CertBytes(),
certGenerator,
&ccprovider.CCInfoFSImpl{
},
lsccImpl,
mockAclProvider,
container.NewVMController(
map[string]container.VMProvider{
//const ContainerType = "DOCKER"
//第一个dockercontroller,这是我们需要的用户链码执行环境
dockercontroller.ContainerType: dockercontroller.NewProvider("", "", &disabled.Provider{
}),
//const ContainerType = "SYSTEM"
//这就是sys的执行环境
inproccontroller.ContainerType: ipRegistry,
},
),
sccp,
pr,
peer.DefaultSupport,
&disabled.Provider{
},
)
我们看实现了VM接口的controller
type VMProvider interface {
NewVM() VM
}
//fabric/core/contaner/dockercontroller/dockercontroller.go docker
// NewVM creates a new DockerVM instance
func (p *Provider) NewVM() container.VM {
return NewDockerVM(p.PeerID, p.NetworkID, p.BuildMetrics)
}
//fabric/core/contaner/dockercontroller/inproccontroller.go sys
// NewVM creates an inproc VM instance
func (r *Registry) NewVM() container.VM {
return NewInprocVM(r)
}
我们关心用户链码的启动,我们直接看Dockercontroller,
func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder)其实就是docker cleint向docker server请求启动容器的具体过程了
fabric/core/contaner/dockercontroller/dockercontroller.go
// Start starts a container using a previously created docker image
func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
imageName, err := vm.GetVMNameForDocker(ccid)
if err != nil {
return err
}
containerName := vm.GetVMName(ccid)
logger := dockerLogger.With("imageName", imageName, "containerName", containerName)
//获取docker client
client, err := vm.getClientFnc()
...
vm.stopInternal(client, containerName, 0, false, false)
...
err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
...
// upload specified files to the container before starting it
// this can be used for configurations such as TLS key and certs
if len(filesToUpload) != 0 {
// the docker upload API takes a tar file, so we need to first
// consolidate the file entries to a tar
payload := bytes.NewBuffer(nil)
gw := gzip.NewWriter(payload)
tw := tar.NewWriter(gw)
for path, fileToUpload := range filesToUpload {
cutil.WriteBytesToPackage(path, fileToUpload, tw)
}
// Write the tar file out
if err := tw.Close(); err != nil {
return fmt.Errorf("Error writing files to upload to Docker instance into a temporary tar blob: %s", err)
}
gw.Close()
...
err := client.UploadToContainer(containerName, docker.UploadToContainerOptions{
InputStream: bytes.NewReader(payload.Bytes()),
Path: "/",
NoOverwriteDirNonDir: false,
})
if err != nil {
return fmt.Errorf("Error uploading files to the container instance %s: %s", containerName, err)
}
}
// start container with HostConfig was deprecated since v1.10 and removed in v1.2
err = client.StartContainer(containerName, nil)
if err != nil {
dockerLogger.Errorf("start-could not start container: %s", err)
return err
}
dockerLogger.Debugf("Started container %s", containerName)
return nil
}
可以再看下getDockerClient
fabric/core/chaincode/container_runtime.go
func getDockerClient() (dockerClient, error) {
return cutil.NewDockerClient()
}
读取peer关于链码容器的配置,构建docker客户端
fabric/core/container/util/dokcerutil.go
func NewDockerClient() (client *docker.Client, err error) {
endpoint := viper.GetString("vm.endpoint")
tlsenabled := viper.GetBool("vm.docker.tls.enabled")
if tlsenabled {
cert := config.GetPath("vm.docker.tls.cert.file")
key := config.GetPath("vm.docker.tls.key.file")
ca := config.GetPath("vm.docker.tls.ca.file")
client, err = docker.NewTLSClient(endpoint, cert, key, ca)
} else {
client, err = docker.NewClient(endpoint)
}
return
}
至此,Peer chaincode instantiate服务端链码启动的过程我们也分析完了。这里我们主要对lscc invoke启动链码容器的过程进行了分析,并没有对lscc完整的交易流程分析,但是交易流程分析也大体是这个过程,不再描述。